Skip to content

Commit c06c592

Browse files
committed
feat(cli): add schedule trigger links
1 parent f71f06a commit c06c592

File tree

3 files changed

+288
-7
lines changed

3 files changed

+288
-7
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package scheduleserver
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"net/http"
7+
"strconv"
8+
9+
"github.com/nitrictech/suga/cli/internal/netx"
10+
)
11+
12+
// Server provides HTTP endpoints to manually trigger schedules
13+
type Server struct {
14+
services map[string]ServiceWithSchedules
15+
mux *http.ServeMux
16+
listener net.Listener
17+
port netx.ReservedPort
18+
server *http.Server
19+
}
20+
21+
// ServiceWithSchedules interface for services that support schedule triggering
22+
type ServiceWithSchedules interface {
23+
GetName() string
24+
TriggerSchedule(index int, async bool) error
25+
}
26+
27+
// NewServer creates a new schedule trigger server
28+
func NewServer(services map[string]ServiceWithSchedules) (*Server, error) {
29+
// Get an available port
30+
port, err := netx.GetNextPort(netx.MinPort(8000), netx.MaxPort(8999))
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to find open port: %w", err)
33+
}
34+
35+
s := &Server{
36+
services: services,
37+
mux: http.NewServeMux(),
38+
port: port,
39+
}
40+
41+
s.setupRoutes()
42+
43+
return s, nil
44+
}
45+
46+
func (s *Server) setupRoutes() {
47+
// Schedule trigger endpoint: GET /schedules/{serviceId}/{scheduleIndex}?async=true
48+
s.mux.HandleFunc("/schedules/{serviceId}/{scheduleIndex}", s.handleTriggerSchedule)
49+
}
50+
51+
func (s *Server) handleTriggerSchedule(w http.ResponseWriter, r *http.Request) {
52+
// Only accept GET requests
53+
if r.Method != http.MethodGet {
54+
w.WriteHeader(http.StatusMethodNotAllowed)
55+
fmt.Fprintf(w, "✗ Method not allowed. Use GET to trigger schedules.\n")
56+
return
57+
}
58+
59+
// Extract path parameters
60+
serviceId := r.PathValue("serviceId")
61+
scheduleIndexStr := r.PathValue("scheduleIndex")
62+
63+
// Parse schedule index
64+
scheduleIndex, err := strconv.Atoi(scheduleIndexStr)
65+
if err != nil {
66+
w.WriteHeader(http.StatusBadRequest)
67+
fmt.Fprintf(w, "✗ Invalid schedule index: %s\n", scheduleIndexStr)
68+
return
69+
}
70+
71+
// Check if schedule index is valid (non-negative)
72+
if scheduleIndex < 0 {
73+
w.WriteHeader(http.StatusBadRequest)
74+
fmt.Fprintf(w, "✗ Schedule index must be non-negative\n")
75+
return
76+
}
77+
78+
// Find the service
79+
svc, ok := s.services[serviceId]
80+
if !ok {
81+
w.WriteHeader(http.StatusNotFound)
82+
fmt.Fprintf(w, "✗ Service '%s' not found\n", serviceId)
83+
return
84+
}
85+
86+
// Parse async query parameter (defaults to false for synchronous execution)
87+
asyncStr := r.URL.Query().Get("async")
88+
async := false
89+
if asyncStr == "true" || asyncStr == "1" {
90+
async = true
91+
}
92+
93+
// Trigger the schedule
94+
err = svc.TriggerSchedule(scheduleIndex, async)
95+
if err != nil {
96+
w.WriteHeader(http.StatusInternalServerError)
97+
fmt.Fprintf(w, "✗ Failed to trigger schedule %d on service '%s': %v\n", scheduleIndex, serviceId, err)
98+
return
99+
}
100+
101+
// Success response
102+
w.WriteHeader(http.StatusOK)
103+
if async {
104+
fmt.Fprintf(w, "✓ Schedule %d on service '%s' triggered asynchronously\n", scheduleIndex, serviceId)
105+
} else {
106+
fmt.Fprintf(w, "✓ Schedule %d on service '%s' executed successfully\n", scheduleIndex, serviceId)
107+
}
108+
}
109+
110+
// Start starts the HTTP server
111+
func (s *Server) Start() error {
112+
addr := fmt.Sprintf("localhost:%d", s.port)
113+
114+
listener, err := net.Listen("tcp", addr)
115+
if err != nil {
116+
return fmt.Errorf("failed to listen on %s: %w", addr, err)
117+
}
118+
119+
s.listener = listener
120+
s.server = &http.Server{
121+
Handler: s.mux,
122+
}
123+
124+
// Start server in goroutine
125+
go func() {
126+
if err := s.server.Serve(listener); err != nil && err != http.ErrServerClosed {
127+
fmt.Printf("Schedule trigger server error: %v\n", err)
128+
}
129+
}()
130+
131+
return nil
132+
}
133+
134+
// Stop stops the HTTP server
135+
func (s *Server) Stop() error {
136+
if s.server != nil {
137+
return s.server.Close()
138+
}
139+
return nil
140+
}
141+
142+
// GetPort returns the port the server is listening on
143+
func (s *Server) GetPort() int {
144+
return int(s.port)
145+
}
146+
147+
// GetURL returns the base URL of the schedule trigger server
148+
func (s *Server) GetURL() string {
149+
return fmt.Sprintf("http://localhost:%d", s.port)
150+
}

cli/internal/simulation/service/service.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *ServiceSimulation) hasExceededFailureLimit() bool {
126126
return len(s.consecutiveFailures) >= s.maxFailures
127127
}
128128

129-
func (s *ServiceSimulation) startSchedules(stdoutWriter, stderrorWriter io.Writer) (*cron.Cron, error) {
129+
func (s *ServiceSimulation) startSchedules(stderrorWriter io.Writer) (*cron.Cron, error) {
130130
cron := cron.New()
131131

132132
for _, schedule := range s.intent.Schedules {
@@ -244,7 +244,7 @@ func (s *ServiceSimulation) Start(autoRestart bool) error {
244244
s.cmd = srvCommand
245245
s.updateStatus(Status_Running)
246246

247-
cron, err := s.startSchedules(stdoutWriter, stderrWriter)
247+
cron, err := s.startSchedules(stderrWriter)
248248
if err != nil {
249249
s.updateStatus(Status_Fatal)
250250
return err
@@ -277,6 +277,61 @@ func (s *ServiceSimulation) Start(autoRestart bool) error {
277277
return nil
278278
}
279279

280+
// TriggerSchedule manually triggers a schedule by index
281+
// If async is true, the schedule runs in a goroutine and returns immediately
282+
// If async is false, waits for the HTTP response
283+
func (s *ServiceSimulation) TriggerSchedule(index int, async bool) error {
284+
// Validate schedule index
285+
if index < 0 || index >= len(s.intent.Schedules) {
286+
return fmt.Errorf("schedule index %d out of range (service has %d schedules)", index, len(s.intent.Schedules))
287+
}
288+
289+
// Check if service is running
290+
if s.currentStatus != Status_Running {
291+
return fmt.Errorf("service is not running (current status: %v)", s.currentStatus)
292+
}
293+
294+
schedule := s.intent.Schedules[index]
295+
296+
// Build the URL
297+
url := url.URL{
298+
Scheme: "http",
299+
Host: fmt.Sprintf("localhost:%d", s.port),
300+
Path: schedule.Path,
301+
}
302+
303+
// Function to execute the schedule
304+
executeSchedule := func() error {
305+
req, err := http.NewRequest(http.MethodPost, url.String(), nil)
306+
if err != nil {
307+
return fmt.Errorf("error creating request: %w", err)
308+
}
309+
310+
resp, err := http.DefaultClient.Do(req)
311+
if err != nil {
312+
return fmt.Errorf("error sending request: %w", err)
313+
}
314+
defer resp.Body.Close()
315+
316+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
317+
return fmt.Errorf("request returned status %d", resp.StatusCode)
318+
}
319+
320+
return nil
321+
}
322+
323+
if async {
324+
// Asynchronous execution
325+
go func() {
326+
_ = executeSchedule()
327+
}()
328+
return nil
329+
}
330+
331+
// Synchronous execution
332+
return executeSchedule()
333+
}
334+
280335
func NewServiceSimulation(name string, intent schema.ServiceIntent, port netx.ReservedPort, apiPort netx.ReservedPort) (*ServiceSimulation, <-chan ServiceEvent, error) {
281336
if intent.Dev == nil {
282337
return nil, nil, fmt.Errorf("service does not have a dev configuration and cannot be started")

cli/internal/simulation/simulation.go

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sync"
1717

1818
"github.com/nitrictech/suga/cli/internal/netx"
19+
"github.com/nitrictech/suga/cli/internal/scheduleserver"
1920
"github.com/nitrictech/suga/cli/internal/simulation/database"
2021
"github.com/nitrictech/suga/cli/internal/simulation/middleware"
2122
"github.com/nitrictech/suga/cli/internal/simulation/service"
@@ -37,11 +38,12 @@ type SimulationServer struct {
3738
storagepb.UnimplementedStorageServer
3839
pubsubpb.UnimplementedPubsubServer
3940

40-
apiPort netx.ReservedPort
41-
fileServerPort int
42-
services map[string]*service.ServiceSimulation
43-
databaseManager *database.DatabaseManager
44-
servicesWg sync.WaitGroup
41+
apiPort netx.ReservedPort
42+
fileServerPort int
43+
services map[string]*service.ServiceSimulation
44+
databaseManager *database.DatabaseManager
45+
servicesWg sync.WaitGroup
46+
scheduleTriggerSv *scheduleserver.Server
4547
}
4648

4749
const (
@@ -408,6 +410,64 @@ func styledName(name string, styleFunc func(...string) string) string {
408410
return styledNames[name]
409411
}
410412

413+
func (s *SimulationServer) startScheduleTriggerServer(output io.Writer) error {
414+
// Check if any service has schedules
415+
hasSchedules := false
416+
for _, serviceIntent := range s.appSpec.ServiceIntents {
417+
if len(serviceIntent.Schedules) > 0 {
418+
hasSchedules = true
419+
break
420+
}
421+
}
422+
423+
if !hasSchedules {
424+
return nil
425+
}
426+
427+
// Convert services map to interface map for schedule trigger server
428+
servicesWithSchedules := make(map[string]scheduleserver.ServiceWithSchedules)
429+
for name, svc := range s.services {
430+
servicesWithSchedules[name] = svc
431+
}
432+
433+
// Create and start the schedule trigger server
434+
triggerServer, err := scheduleserver.NewServer(servicesWithSchedules)
435+
if err != nil {
436+
return fmt.Errorf("failed to create schedule trigger server: %w", err)
437+
}
438+
439+
err = triggerServer.Start()
440+
if err != nil {
441+
return fmt.Errorf("failed to start schedule trigger server: %w", err)
442+
}
443+
444+
s.scheduleTriggerSv = triggerServer
445+
446+
fmt.Fprintf(output, "%s\n\n", style.Purple("Schedule Triggers"))
447+
448+
// Print clickable trigger URLs for each service's schedules
449+
for serviceName, serviceIntent := range s.appSpec.ServiceIntents {
450+
if len(serviceIntent.Schedules) == 0 {
451+
continue
452+
}
453+
454+
for i, schedule := range serviceIntent.Schedules {
455+
triggerURL := fmt.Sprintf("%s/schedules/%s/%d", triggerServer.GetURL(), serviceName, i)
456+
fmt.Fprintf(output, "%s %s schedule %d (%s -> %s)\n Trigger: %s\n",
457+
greenCheck,
458+
styledName(serviceName, style.Teal),
459+
i,
460+
style.Gray(schedule.Cron),
461+
style.Gray(fmt.Sprintf("POST %s", schedule.Path)),
462+
style.Cyan(triggerURL))
463+
}
464+
}
465+
466+
fmt.Fprint(output, "\n")
467+
468+
return nil
469+
}
470+
411471
func (s *SimulationServer) Start(output io.Writer) error {
412472
err := s.startSugaApis()
413473
if err != nil {
@@ -449,6 +509,14 @@ func (s *SimulationServer) Start(output io.Writer) error {
449509
fmt.Fprint(output, "\n")
450510
}
451511

512+
// Start schedule trigger server after services are running
513+
if len(s.services) > 0 {
514+
err = s.startScheduleTriggerServer(output)
515+
if err != nil {
516+
return err
517+
}
518+
}
519+
452520
fmt.Println(style.Gray("Use Ctrl-C to exit\n"))
453521

454522
// block on handling service outputs for now
@@ -459,6 +527,14 @@ func (s *SimulationServer) Start(output io.Writer) error {
459527

460528
// Stop gracefully shuts down the simulation server and cleans up resources
461529
func (s *SimulationServer) Stop() error {
530+
// Stop the schedule trigger server first
531+
if s.scheduleTriggerSv != nil {
532+
fmt.Println("Stopping schedule trigger server...")
533+
if err := s.scheduleTriggerSv.Stop(); err != nil {
534+
return fmt.Errorf("failed to stop schedule trigger server: %w", err)
535+
}
536+
}
537+
462538
// Stop services first before stopping database
463539
for serviceName, svc := range s.services {
464540
if svc != nil {

0 commit comments

Comments
 (0)