From 518a09a9110e197cdaabd734174e10b2598a4f7e Mon Sep 17 00:00:00 2001 From: Molecule AI Backend Engineer Date: Fri, 17 Apr 2026 05:16:51 +0000 Subject: [PATCH] feat(platform): AG-UI compatible SSE endpoint for streaming agent events (#590) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add in-process SSE subscription mechanism to Broadcaster (SubscribeSSE, deliverToSSE) so both RecordAndBroadcast *and* BroadcastOnly fan out to SSE subscribers — critical because BroadcastOnly skips Redis pub/sub and would be invisible to a Redis-only subscriber (AGENT_MESSAGE, A2A_RESPONSE, TASK_UPDATED are all BroadcastOnly events). - Add handlers/sse.go: SSEHandler.StreamEvents sets text/event-stream headers, checks workspace existence (404 if missing), subscribes via broadcaster, and wraps each WSMessage in an AG-UI envelope: data: {"type":"","timestamp":,"data":{...}}\n\n - Register wsAuth.GET("/workspaces/:id/events/stream") behind existing WorkspaceAuth middleware — bearer token bound to :id. - Add 6 tests: Content-Type, initial ping, AG-UI format, workspace filter (cross-workspace events not leaked), 404 on missing workspace, multiple sequential events. All 19 packages pass. Build clean. Co-Authored-By: Claude Sonnet 4.6 --- platform/internal/events/broadcaster.go | 61 +++++- platform/internal/handlers/sse.go | 107 +++++++++++ platform/internal/handlers/sse_test.go | 237 ++++++++++++++++++++++++ platform/internal/router/router.go | 5 + 4 files changed, 409 insertions(+), 1 deletion(-) create mode 100644 platform/internal/handlers/sse.go create mode 100644 platform/internal/handlers/sse_test.go diff --git a/platform/internal/events/broadcaster.go b/platform/internal/events/broadcaster.go index 91fc8b2e..514d9781 100644 --- a/platform/internal/events/broadcaster.go +++ b/platform/internal/events/broadcaster.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "log" + "sync" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" @@ -14,8 +15,17 @@ import ( const broadcastChannel = "events:broadcast" +// sseSubscription is a single in-process SSE subscriber. +// deliverToSSE writes to ch; StreamEvents reads from it. +type sseSubscription struct { + workspaceID string + ch chan models.WSMessage +} + type Broadcaster struct { - hub *ws.Hub + hub *ws.Hub + ssesMu sync.RWMutex + sses []*sseSubscription } func NewBroadcaster(hub *ws.Hub) *Broadcaster { @@ -59,6 +69,9 @@ func (b *Broadcaster) RecordAndBroadcast(ctx context.Context, eventType string, // Broadcast to local WebSocket clients b.hub.Broadcast(msg) + // Fan out to in-process SSE subscribers (e.g. GET /events/stream). + b.deliverToSSE(msg) + return nil } @@ -79,6 +92,52 @@ func (b *Broadcaster) BroadcastOnly(workspaceID string, eventType string, payloa } b.hub.Broadcast(msg) + + // Fan out to in-process SSE subscribers. + b.deliverToSSE(msg) +} + +// SubscribeSSE registers a per-workspace in-process channel for SSE streaming. +// The caller MUST invoke the returned cancel func when it disconnects so the +// subscription is removed and the channel is not leaked. +func (b *Broadcaster) SubscribeSSE(workspaceID string) (<-chan models.WSMessage, func()) { + sub := &sseSubscription{ + workspaceID: workspaceID, + ch: make(chan models.WSMessage, 64), + } + b.ssesMu.Lock() + b.sses = append(b.sses, sub) + b.ssesMu.Unlock() + + cancel := func() { + b.ssesMu.Lock() + defer b.ssesMu.Unlock() + for i, s := range b.sses { + if s == sub { + b.sses = append(b.sses[:i], b.sses[i+1:]...) + break + } + } + } + return sub.ch, cancel +} + +// deliverToSSE fans msg out to every in-process SSE subscriber watching the +// same workspace. Non-blocking: if a subscriber's buffer is full the event is +// dropped with a log line (the WebSocket path still delivers it). +func (b *Broadcaster) deliverToSSE(msg models.WSMessage) { + b.ssesMu.RLock() + defer b.ssesMu.RUnlock() + for _, s := range b.sses { + if s.workspaceID != msg.WorkspaceID { + continue + } + select { + case s.ch <- msg: + default: + log.Printf("SSE: subscriber buffer full for workspace %s, dropping event %s", msg.WorkspaceID, msg.Event) + } + } } // Subscribe listens to Redis pub/sub and relays events to the WebSocket hub. diff --git a/platform/internal/handlers/sse.go b/platform/internal/handlers/sse.go new file mode 100644 index 00000000..5e578b15 --- /dev/null +++ b/platform/internal/handlers/sse.go @@ -0,0 +1,107 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/gin-gonic/gin" +) + +// aguiEvent is the AG-UI envelope written to the SSE stream. +// Spec: {"type":"","timestamp":,"data":{...}} +type aguiEvent struct { + Type string `json:"type"` + Timestamp int64 `json:"timestamp"` // Unix milliseconds + Data json.RawMessage `json:"data"` +} + +// SSEHandler streams workspace events as AG-UI-compatible Server-Sent Events. +type SSEHandler struct { + broadcaster *events.Broadcaster +} + +// NewSSEHandler returns an SSEHandler that sources events from b. +func NewSSEHandler(b *events.Broadcaster) *SSEHandler { + return &SSEHandler{broadcaster: b} +} + +// StreamEvents handles GET /workspaces/:id/events/stream. +// +// Authentication is enforced by the upstream WorkspaceAuth middleware (bearer +// token bound to :id). This handler only needs to: +// 1. Verify the workspace exists (returns 404 if not). +// 2. Set SSE headers. +// 3. Subscribe to the in-process broadcaster and relay events until the +// client disconnects (context cancellation). +// +// AG-UI envelope per event: +// +// data: {"type":"","timestamp":,"data":{...}}\n\n +func (h *SSEHandler) StreamEvents(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + // Verify the workspace exists — 404 early rather than serving an empty stream. + var exists bool + if err := db.DB.QueryRowContext(ctx, + `SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1)`, + workspaceID, + ).Scan(&exists); err != nil { + log.Printf("SSE: workspace existence check failed for %s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to verify workspace"}) + return + } + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return + } + + // SSE response headers. + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + // Instruct nginx / reverse-proxies to disable buffering so events reach + // the client immediately rather than being held in a proxy buffer. + c.Header("X-Accel-Buffering", "no") + + flusher, ok := c.Writer.(http.Flusher) + if !ok { + // Should never happen with gin's responseWriter, but guard defensively. + c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming not supported"}) + return + } + + ch, cancel := h.broadcaster.SubscribeSSE(workspaceID) + defer cancel() + + // Send an initial SSE comment so the client knows the stream is live. + fmt.Fprintf(c.Writer, ": ping\n\n") + flusher.Flush() + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-ch: + if !ok { + return + } + env := aguiEvent{ + Type: msg.Event, + Timestamp: msg.Timestamp.UnixMilli(), + Data: msg.Payload, + } + b, err := json.Marshal(env) + if err != nil { + log.Printf("SSE: marshal error for workspace %s event %s: %v", workspaceID, msg.Event, err) + continue + } + fmt.Fprintf(c.Writer, "data: %s\n\n", b) + flusher.Flush() + } + } +} diff --git a/platform/internal/handlers/sse_test.go b/platform/internal/handlers/sse_test.go new file mode 100644 index 00000000..b2d4264b --- /dev/null +++ b/platform/internal/handlers/sse_test.go @@ -0,0 +1,237 @@ +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// expectWorkspaceExists queues the EXISTS query that StreamEvents fires first. +func expectWorkspaceExists(mock sqlmock.Sqlmock, workspaceID string, exists bool) { + rows := sqlmock.NewRows([]string{"exists"}).AddRow(exists) + mock.ExpectQuery(`SELECT EXISTS`). + WithArgs(workspaceID). + WillReturnRows(rows) +} + +// runSSEHandler starts StreamEvents in a background goroutine using a +// cancellable context, waits waitAfterStart for the handler to subscribe, +// then returns a drain function (cancel + wait for goroutine exit). +func runSSEHandler(t *testing.T, h *SSEHandler, workspaceID string) ( + w *httptest.ResponseRecorder, + inject func(), // call to cancel immediately + done <-chan struct{}, +) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + w = httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: workspaceID}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+workspaceID+"/events/stream", nil).WithContext(ctx) + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + h.StreamEvents(c) + }() + + return w, cancel, doneCh +} + +// TestSSE_ContentType verifies the handler sets text/event-stream on the response. +func TestSSE_ContentType(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "ws-1", true) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w, cancel, done := runSSEHandler(t, h, "ws-1") + + // Allow the handler to subscribe, then tear it down. + time.Sleep(30 * time.Millisecond) + cancel() + <-done + + ct := w.Header().Get("Content-Type") + if !strings.HasPrefix(ct, "text/event-stream") { + t.Errorf("expected Content-Type text/event-stream, got %q", ct) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet DB expectations: %v", err) + } +} + +// TestSSE_InitialPing verifies the handler emits the ": ping" SSE comment on connect. +func TestSSE_InitialPing(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "ws-1", true) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w, cancel, done := runSSEHandler(t, h, "ws-1") + time.Sleep(30 * time.Millisecond) + cancel() + <-done + + body := w.Body.String() + if !strings.Contains(body, ": ping") { + t.Errorf("expected SSE ping comment, body was:\n%s", body) + } +} + +// TestSSE_AGUIFormat verifies that a broadcast event is wrapped in the AG-UI envelope. +func TestSSE_AGUIFormat(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "ws-1", true) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w, cancel, done := runSSEHandler(t, h, "ws-1") + + // Wait for the handler goroutine to reach its select loop. + time.Sleep(30 * time.Millisecond) + b.BroadcastOnly("ws-1", "TASK_UPDATED", map[string]string{"status": "running"}) + time.Sleep(30 * time.Millisecond) + cancel() + <-done + + body := w.Body.String() + // Find the first "data: ..." line. + var dataLine string + for _, line := range strings.Split(body, "\n") { + if strings.HasPrefix(line, "data: ") { + dataLine = strings.TrimPrefix(line, "data: ") + break + } + } + if dataLine == "" { + t.Fatalf("no data: line found in SSE response:\n%s", body) + } + + var env struct { + Type string `json:"type"` + Timestamp int64 `json:"timestamp"` + Data json.RawMessage `json:"data"` + } + if err := json.Unmarshal([]byte(dataLine), &env); err != nil { + t.Fatalf("invalid AG-UI envelope JSON %q: %v", dataLine, err) + } + if env.Type != "TASK_UPDATED" { + t.Errorf("expected type TASK_UPDATED, got %q", env.Type) + } + if env.Timestamp <= 0 { + t.Errorf("expected positive timestamp, got %d", env.Timestamp) + } + if len(env.Data) == 0 || string(env.Data) == "null" { + t.Errorf("expected non-null data field, got %q", string(env.Data)) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet DB expectations: %v", err) + } +} + +// TestSSE_WorkspaceFilter verifies that events for a different workspace are NOT delivered. +func TestSSE_WorkspaceFilter(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "ws-1", true) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w, cancel, done := runSSEHandler(t, h, "ws-1") + + time.Sleep(30 * time.Millisecond) + // Broadcast to a completely different workspace. + b.BroadcastOnly("ws-99", "AGENT_MESSAGE", map[string]string{"text": "secret"}) + time.Sleep(30 * time.Millisecond) + cancel() + <-done + + body := w.Body.String() + for _, line := range strings.Split(body, "\n") { + if strings.HasPrefix(line, "data: ") { + t.Errorf("expected no data: events for different workspace, got: %s", line) + } + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet DB expectations: %v", err) + } +} + +// TestSSE_WorkspaceNotFound verifies a 404 is returned when the workspace does not exist. +func TestSSE_WorkspaceNotFound(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "missing-ws", false) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "missing-ws"}} + c.Request = httptest.NewRequest("GET", "/workspaces/missing-ws/events/stream", nil) + + h.StreamEvents(c) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected 404 for missing workspace, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet DB expectations: %v", err) + } +} + +// TestSSE_MultipleEventsDelivered verifies multiple sequential broadcasts all arrive. +func TestSSE_MultipleEventsDelivered(t *testing.T) { + mock := setupTestDB(t) + expectWorkspaceExists(mock, "ws-1", true) + + b := newTestBroadcaster() + h := NewSSEHandler(b) + + w, cancel, done := runSSEHandler(t, h, "ws-1") + + time.Sleep(30 * time.Millisecond) + b.BroadcastOnly("ws-1", "AGENT_MESSAGE", map[string]string{"msg": "one"}) + b.BroadcastOnly("ws-1", "TASK_UPDATED", map[string]string{"status": "done"}) + b.BroadcastOnly("ws-1", "A2A_RESPONSE", map[string]string{"result": "ok"}) + time.Sleep(50 * time.Millisecond) + cancel() + <-done + + body := w.Body.String() + var dataLines []string + for _, line := range strings.Split(body, "\n") { + if strings.HasPrefix(line, "data: ") { + dataLines = append(dataLines, line) + } + } + if len(dataLines) != 3 { + t.Errorf("expected 3 data: lines, got %d:\n%s", len(dataLines), body) + } + + // Verify event types appear in order. + expectedTypes := []string{"AGENT_MESSAGE", "TASK_UPDATED", "A2A_RESPONSE"} + for i, dl := range dataLines { + var env struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(strings.TrimPrefix(dl, "data: ")), &env); err != nil { + t.Fatalf("line %d: invalid JSON: %v", i, err) + } + if env.Type != expectedTypes[i] { + t.Errorf("line %d: expected type %s, got %s", i, expectedTypes[i], env.Type) + } + } +} diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index 5a76f640..a4e80a33 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -408,6 +408,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi r.POST("/channels/discover", middleware.AdminAuth(db.DB), chh.Discover) r.POST("/webhooks/:type", chh.Webhook) + // SSE — AG-UI compatible event stream per workspace (#590). + // WorkspaceAuth middleware (on wsAuth) binds the bearer token to :id. + sseh := handlers.NewSSEHandler(broadcaster) + wsAuth.GET("/events/stream", sseh.StreamEvents) + // WebSocket sh := handlers.NewSocketHandler(hub) r.GET("/ws", sh.HandleConnect)