forked from molecule-ai/molecule-core
Merge pull request #601 from Molecule-AI/feat/issue-590-agui-sse-endpoint
feat(platform): AG-UI compatible SSE endpoint for streaming agent events
This commit is contained in:
commit
3bb3737d5c
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
@ -14,8 +15,17 @@ import (
|
||||
|
||||
const broadcastChannel = "events:broadcast"
|
||||
|
||||
// sseSubscription is a single in-process SSE subscriber.
|
||||
// deliverToSSE writes to ch; StreamEvents reads from it.
|
||||
type sseSubscription struct {
|
||||
workspaceID string
|
||||
ch chan models.WSMessage
|
||||
}
|
||||
|
||||
type Broadcaster struct {
|
||||
hub *ws.Hub
|
||||
hub *ws.Hub
|
||||
ssesMu sync.RWMutex
|
||||
sses []*sseSubscription
|
||||
}
|
||||
|
||||
func NewBroadcaster(hub *ws.Hub) *Broadcaster {
|
||||
@ -59,6 +69,9 @@ func (b *Broadcaster) RecordAndBroadcast(ctx context.Context, eventType string,
|
||||
// Broadcast to local WebSocket clients
|
||||
b.hub.Broadcast(msg)
|
||||
|
||||
// Fan out to in-process SSE subscribers (e.g. GET /events/stream).
|
||||
b.deliverToSSE(msg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -79,6 +92,52 @@ func (b *Broadcaster) BroadcastOnly(workspaceID string, eventType string, payloa
|
||||
}
|
||||
|
||||
b.hub.Broadcast(msg)
|
||||
|
||||
// Fan out to in-process SSE subscribers.
|
||||
b.deliverToSSE(msg)
|
||||
}
|
||||
|
||||
// SubscribeSSE registers a per-workspace in-process channel for SSE streaming.
|
||||
// The caller MUST invoke the returned cancel func when it disconnects so the
|
||||
// subscription is removed and the channel is not leaked.
|
||||
func (b *Broadcaster) SubscribeSSE(workspaceID string) (<-chan models.WSMessage, func()) {
|
||||
sub := &sseSubscription{
|
||||
workspaceID: workspaceID,
|
||||
ch: make(chan models.WSMessage, 64),
|
||||
}
|
||||
b.ssesMu.Lock()
|
||||
b.sses = append(b.sses, sub)
|
||||
b.ssesMu.Unlock()
|
||||
|
||||
cancel := func() {
|
||||
b.ssesMu.Lock()
|
||||
defer b.ssesMu.Unlock()
|
||||
for i, s := range b.sses {
|
||||
if s == sub {
|
||||
b.sses = append(b.sses[:i], b.sses[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return sub.ch, cancel
|
||||
}
|
||||
|
||||
// deliverToSSE fans msg out to every in-process SSE subscriber watching the
|
||||
// same workspace. Non-blocking: if a subscriber's buffer is full the event is
|
||||
// dropped with a log line (the WebSocket path still delivers it).
|
||||
func (b *Broadcaster) deliverToSSE(msg models.WSMessage) {
|
||||
b.ssesMu.RLock()
|
||||
defer b.ssesMu.RUnlock()
|
||||
for _, s := range b.sses {
|
||||
if s.workspaceID != msg.WorkspaceID {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case s.ch <- msg:
|
||||
default:
|
||||
log.Printf("SSE: subscriber buffer full for workspace %s, dropping event %s", msg.WorkspaceID, msg.Event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe listens to Redis pub/sub and relays events to the WebSocket hub.
|
||||
|
||||
107
platform/internal/handlers/sse.go
Normal file
107
platform/internal/handlers/sse.go
Normal file
@ -0,0 +1,107 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// aguiEvent is the AG-UI envelope written to the SSE stream.
|
||||
// Spec: {"type":"<event_name>","timestamp":<unix_ms>,"data":{...}}
|
||||
type aguiEvent struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp int64 `json:"timestamp"` // Unix milliseconds
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
// SSEHandler streams workspace events as AG-UI-compatible Server-Sent Events.
|
||||
type SSEHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
}
|
||||
|
||||
// NewSSEHandler returns an SSEHandler that sources events from b.
|
||||
func NewSSEHandler(b *events.Broadcaster) *SSEHandler {
|
||||
return &SSEHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// StreamEvents handles GET /workspaces/:id/events/stream.
|
||||
//
|
||||
// Authentication is enforced by the upstream WorkspaceAuth middleware (bearer
|
||||
// token bound to :id). This handler only needs to:
|
||||
// 1. Verify the workspace exists (returns 404 if not).
|
||||
// 2. Set SSE headers.
|
||||
// 3. Subscribe to the in-process broadcaster and relay events until the
|
||||
// client disconnects (context cancellation).
|
||||
//
|
||||
// AG-UI envelope per event:
|
||||
//
|
||||
// data: {"type":"<event>","timestamp":<unix_ms>,"data":{...}}\n\n
|
||||
func (h *SSEHandler) StreamEvents(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Verify the workspace exists — 404 early rather than serving an empty stream.
|
||||
var exists bool
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1)`,
|
||||
workspaceID,
|
||||
).Scan(&exists); err != nil {
|
||||
log.Printf("SSE: workspace existence check failed for %s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to verify workspace"})
|
||||
return
|
||||
}
|
||||
if !exists {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
|
||||
// SSE response headers.
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
c.Header("Connection", "keep-alive")
|
||||
// Instruct nginx / reverse-proxies to disable buffering so events reach
|
||||
// the client immediately rather than being held in a proxy buffer.
|
||||
c.Header("X-Accel-Buffering", "no")
|
||||
|
||||
flusher, ok := c.Writer.(http.Flusher)
|
||||
if !ok {
|
||||
// Should never happen with gin's responseWriter, but guard defensively.
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming not supported"})
|
||||
return
|
||||
}
|
||||
|
||||
ch, cancel := h.broadcaster.SubscribeSSE(workspaceID)
|
||||
defer cancel()
|
||||
|
||||
// Send an initial SSE comment so the client knows the stream is live.
|
||||
fmt.Fprintf(c.Writer, ": ping\n\n")
|
||||
flusher.Flush()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
env := aguiEvent{
|
||||
Type: msg.Event,
|
||||
Timestamp: msg.Timestamp.UnixMilli(),
|
||||
Data: msg.Payload,
|
||||
}
|
||||
b, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
log.Printf("SSE: marshal error for workspace %s event %s: %v", workspaceID, msg.Event, err)
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(c.Writer, "data: %s\n\n", b)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
237
platform/internal/handlers/sse_test.go
Normal file
237
platform/internal/handlers/sse_test.go
Normal file
@ -0,0 +1,237 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// expectWorkspaceExists queues the EXISTS query that StreamEvents fires first.
|
||||
func expectWorkspaceExists(mock sqlmock.Sqlmock, workspaceID string, exists bool) {
|
||||
rows := sqlmock.NewRows([]string{"exists"}).AddRow(exists)
|
||||
mock.ExpectQuery(`SELECT EXISTS`).
|
||||
WithArgs(workspaceID).
|
||||
WillReturnRows(rows)
|
||||
}
|
||||
|
||||
// runSSEHandler starts StreamEvents in a background goroutine using a
|
||||
// cancellable context, waits waitAfterStart for the handler to subscribe,
|
||||
// then returns a drain function (cancel + wait for goroutine exit).
|
||||
func runSSEHandler(t *testing.T, h *SSEHandler, workspaceID string) (
|
||||
w *httptest.ResponseRecorder,
|
||||
inject func(), // call to cancel immediately
|
||||
done <-chan struct{},
|
||||
) {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w = httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: workspaceID}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/"+workspaceID+"/events/stream", nil).WithContext(ctx)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
h.StreamEvents(c)
|
||||
}()
|
||||
|
||||
return w, cancel, doneCh
|
||||
}
|
||||
|
||||
// TestSSE_ContentType verifies the handler sets text/event-stream on the response.
|
||||
func TestSSE_ContentType(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "ws-1", true)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w, cancel, done := runSSEHandler(t, h, "ws-1")
|
||||
|
||||
// Allow the handler to subscribe, then tear it down.
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
ct := w.Header().Get("Content-Type")
|
||||
if !strings.HasPrefix(ct, "text/event-stream") {
|
||||
t.Errorf("expected Content-Type text/event-stream, got %q", ct)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSE_InitialPing verifies the handler emits the ": ping" SSE comment on connect.
|
||||
func TestSSE_InitialPing(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "ws-1", true)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w, cancel, done := runSSEHandler(t, h, "ws-1")
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
body := w.Body.String()
|
||||
if !strings.Contains(body, ": ping") {
|
||||
t.Errorf("expected SSE ping comment, body was:\n%s", body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSE_AGUIFormat verifies that a broadcast event is wrapped in the AG-UI envelope.
|
||||
func TestSSE_AGUIFormat(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "ws-1", true)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w, cancel, done := runSSEHandler(t, h, "ws-1")
|
||||
|
||||
// Wait for the handler goroutine to reach its select loop.
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
b.BroadcastOnly("ws-1", "TASK_UPDATED", map[string]string{"status": "running"})
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
body := w.Body.String()
|
||||
// Find the first "data: ..." line.
|
||||
var dataLine string
|
||||
for _, line := range strings.Split(body, "\n") {
|
||||
if strings.HasPrefix(line, "data: ") {
|
||||
dataLine = strings.TrimPrefix(line, "data: ")
|
||||
break
|
||||
}
|
||||
}
|
||||
if dataLine == "" {
|
||||
t.Fatalf("no data: line found in SSE response:\n%s", body)
|
||||
}
|
||||
|
||||
var env struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataLine), &env); err != nil {
|
||||
t.Fatalf("invalid AG-UI envelope JSON %q: %v", dataLine, err)
|
||||
}
|
||||
if env.Type != "TASK_UPDATED" {
|
||||
t.Errorf("expected type TASK_UPDATED, got %q", env.Type)
|
||||
}
|
||||
if env.Timestamp <= 0 {
|
||||
t.Errorf("expected positive timestamp, got %d", env.Timestamp)
|
||||
}
|
||||
if len(env.Data) == 0 || string(env.Data) == "null" {
|
||||
t.Errorf("expected non-null data field, got %q", string(env.Data))
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSE_WorkspaceFilter verifies that events for a different workspace are NOT delivered.
|
||||
func TestSSE_WorkspaceFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "ws-1", true)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w, cancel, done := runSSEHandler(t, h, "ws-1")
|
||||
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
// Broadcast to a completely different workspace.
|
||||
b.BroadcastOnly("ws-99", "AGENT_MESSAGE", map[string]string{"text": "secret"})
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
body := w.Body.String()
|
||||
for _, line := range strings.Split(body, "\n") {
|
||||
if strings.HasPrefix(line, "data: ") {
|
||||
t.Errorf("expected no data: events for different workspace, got: %s", line)
|
||||
}
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSE_WorkspaceNotFound verifies a 404 is returned when the workspace does not exist.
|
||||
func TestSSE_WorkspaceNotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "missing-ws", false)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "missing-ws"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/missing-ws/events/stream", nil)
|
||||
|
||||
h.StreamEvents(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404 for missing workspace, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSSE_MultipleEventsDelivered verifies multiple sequential broadcasts all arrive.
|
||||
func TestSSE_MultipleEventsDelivered(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
expectWorkspaceExists(mock, "ws-1", true)
|
||||
|
||||
b := newTestBroadcaster()
|
||||
h := NewSSEHandler(b)
|
||||
|
||||
w, cancel, done := runSSEHandler(t, h, "ws-1")
|
||||
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
b.BroadcastOnly("ws-1", "AGENT_MESSAGE", map[string]string{"msg": "one"})
|
||||
b.BroadcastOnly("ws-1", "TASK_UPDATED", map[string]string{"status": "done"})
|
||||
b.BroadcastOnly("ws-1", "A2A_RESPONSE", map[string]string{"result": "ok"})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
body := w.Body.String()
|
||||
var dataLines []string
|
||||
for _, line := range strings.Split(body, "\n") {
|
||||
if strings.HasPrefix(line, "data: ") {
|
||||
dataLines = append(dataLines, line)
|
||||
}
|
||||
}
|
||||
if len(dataLines) != 3 {
|
||||
t.Errorf("expected 3 data: lines, got %d:\n%s", len(dataLines), body)
|
||||
}
|
||||
|
||||
// Verify event types appear in order.
|
||||
expectedTypes := []string{"AGENT_MESSAGE", "TASK_UPDATED", "A2A_RESPONSE"}
|
||||
for i, dl := range dataLines {
|
||||
var env struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(strings.TrimPrefix(dl, "data: ")), &env); err != nil {
|
||||
t.Fatalf("line %d: invalid JSON: %v", i, err)
|
||||
}
|
||||
if env.Type != expectedTypes[i] {
|
||||
t.Errorf("line %d: expected type %s, got %s", i, expectedTypes[i], env.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -419,6 +419,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.POST("/channels/discover", middleware.AdminAuth(db.DB), chh.Discover)
|
||||
r.POST("/webhooks/:type", chh.Webhook)
|
||||
|
||||
// SSE — AG-UI compatible event stream per workspace (#590).
|
||||
// WorkspaceAuth middleware (on wsAuth) binds the bearer token to :id.
|
||||
sseh := handlers.NewSSEHandler(broadcaster)
|
||||
wsAuth.GET("/events/stream", sseh.StreamEvents)
|
||||
|
||||
// WebSocket
|
||||
sh := handlers.NewSocketHandler(hub)
|
||||
r.GET("/ws", sh.HandleConnect)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user