8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
187 lines
5.6 KiB
Go
187 lines
5.6 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/ws"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const broadcastChannel = "events:broadcast"
|
|
|
|
// EventEmitter is the contract handler code needs from a broadcaster.
|
|
// Defining it here lets tests substitute a capture-only stub instead
|
|
// of standing up the full Redis + WebSocket hub topology that the
|
|
// concrete *Broadcaster builds (and that previously blocked
|
|
// TestProvisionWorkspace_* regression tests on issue #1814).
|
|
//
|
|
// Includes BroadcastOnly because the activity-log + A2A-response paths
|
|
// inside the handler package fan out via that method — narrowing
|
|
// further would force production callers back to the concrete type.
|
|
//
|
|
// *Broadcaster satisfies this interface trivially. Production code that
|
|
// needs the wider surface (SubscribeSSE, Subscribe) keeps using the
|
|
// concrete *Broadcaster type — sse.go + cmd/server/main.go are the
|
|
// only such call sites today.
|
|
type EventEmitter interface {
|
|
RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error
|
|
BroadcastOnly(workspaceID string, eventType string, payload interface{})
|
|
}
|
|
|
|
// Compile-time assertion: a renamed/reshaped Broadcaster method that
|
|
// silently broke this interface would fail to build here.
|
|
var _ EventEmitter = (*Broadcaster)(nil)
|
|
|
|
// sseSubscription is a single in-process SSE subscriber.
|
|
// deliverToSSE writes to ch; StreamEvents reads from it.
|
|
type sseSubscription struct {
|
|
workspaceID string
|
|
ch chan models.WSMessage
|
|
}
|
|
|
|
type Broadcaster struct {
|
|
hub *ws.Hub
|
|
ssesMu sync.RWMutex
|
|
sses []*sseSubscription
|
|
}
|
|
|
|
func NewBroadcaster(hub *ws.Hub) *Broadcaster {
|
|
return &Broadcaster{hub: hub}
|
|
}
|
|
|
|
// RecordAndBroadcast inserts a structure event into Postgres and publishes to Redis pub/sub.
|
|
func (b *Broadcaster) RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error {
|
|
payloadJSON, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert into structure_events — cast to jsonb explicitly
|
|
_, err = db.DB.ExecContext(ctx, `
|
|
INSERT INTO structure_events (event_type, workspace_id, payload)
|
|
VALUES ($1, $2, $3::jsonb)
|
|
`, eventType, workspaceID, string(payloadJSON))
|
|
if err != nil {
|
|
log.Printf("RecordAndBroadcast: insert event error: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Build WebSocket message
|
|
msg := models.WSMessage{
|
|
Event: eventType,
|
|
WorkspaceID: workspaceID,
|
|
Timestamp: time.Now().UTC(),
|
|
Payload: payloadJSON,
|
|
}
|
|
|
|
// Publish to Redis pub/sub for multi-instance support
|
|
msgJSON, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := db.RDB.Publish(ctx, broadcastChannel, msgJSON).Err(); err != nil {
|
|
log.Printf("Warning: Redis publish failed: %v", err)
|
|
}
|
|
|
|
// Broadcast to local WebSocket clients
|
|
b.hub.Broadcast(msg)
|
|
|
|
// Fan out to in-process SSE subscribers (e.g. GET /events/stream).
|
|
b.deliverToSSE(msg)
|
|
|
|
return nil
|
|
}
|
|
|
|
// BroadcastOnly sends a WebSocket event without recording in structure_events.
|
|
// Used for high-frequency events like activity logs that have their own table.
|
|
func (b *Broadcaster) BroadcastOnly(workspaceID string, eventType string, payload interface{}) {
|
|
payloadJSON, err := json.Marshal(payload)
|
|
if err != nil {
|
|
log.Printf("BroadcastOnly: marshal error: %v", err)
|
|
return
|
|
}
|
|
|
|
msg := models.WSMessage{
|
|
Event: eventType,
|
|
WorkspaceID: workspaceID,
|
|
Timestamp: time.Now().UTC(),
|
|
Payload: payloadJSON,
|
|
}
|
|
|
|
b.hub.Broadcast(msg)
|
|
|
|
// Fan out to in-process SSE subscribers.
|
|
b.deliverToSSE(msg)
|
|
}
|
|
|
|
// SubscribeSSE registers a per-workspace in-process channel for SSE streaming.
|
|
// The caller MUST invoke the returned cancel func when it disconnects so the
|
|
// subscription is removed and the channel is not leaked.
|
|
func (b *Broadcaster) SubscribeSSE(workspaceID string) (<-chan models.WSMessage, func()) {
|
|
sub := &sseSubscription{
|
|
workspaceID: workspaceID,
|
|
ch: make(chan models.WSMessage, 64),
|
|
}
|
|
b.ssesMu.Lock()
|
|
b.sses = append(b.sses, sub)
|
|
b.ssesMu.Unlock()
|
|
|
|
cancel := func() {
|
|
b.ssesMu.Lock()
|
|
defer b.ssesMu.Unlock()
|
|
for i, s := range b.sses {
|
|
if s == sub {
|
|
b.sses = append(b.sses[:i], b.sses[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return sub.ch, cancel
|
|
}
|
|
|
|
// deliverToSSE fans msg out to every in-process SSE subscriber watching the
|
|
// same workspace. Non-blocking: if a subscriber's buffer is full the event is
|
|
// dropped with a log line (the WebSocket path still delivers it).
|
|
func (b *Broadcaster) deliverToSSE(msg models.WSMessage) {
|
|
b.ssesMu.RLock()
|
|
defer b.ssesMu.RUnlock()
|
|
for _, s := range b.sses {
|
|
if s.workspaceID != msg.WorkspaceID {
|
|
continue
|
|
}
|
|
select {
|
|
case s.ch <- msg:
|
|
default:
|
|
log.Printf("SSE: subscriber buffer full for workspace %s, dropping event %s", msg.WorkspaceID, msg.Event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Subscribe listens to Redis pub/sub and relays events to the WebSocket hub.
|
|
func (b *Broadcaster) Subscribe(ctx context.Context) {
|
|
sub := db.RDB.Subscribe(ctx, broadcastChannel)
|
|
ch := sub.Channel(redis.WithChannelHealthCheckInterval(30 * time.Second))
|
|
|
|
log.Println("Subscribed to Redis broadcast channel")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
sub.Close()
|
|
return
|
|
case redisMsg := <-ch:
|
|
if redisMsg == nil {
|
|
continue
|
|
}
|
|
// In single-instance mode, RecordAndBroadcast already calls hub.Broadcast().
|
|
// This subscriber becomes relevant in multi-instance deployments.
|
|
_ = redisMsg
|
|
}
|
|
}
|
|
}
|