diff --git a/workspace-server/internal/bundle/importer.go b/workspace-server/internal/bundle/importer.go index 43ec618c..f61c7a98 100644 --- a/workspace-server/internal/bundle/importer.go +++ b/workspace-server/internal/bundle/importer.go @@ -51,7 +51,7 @@ func Import( return result } - _ = broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", wsID, map[string]interface{}{ + _ = broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), wsID, map[string]interface{}{ "name": b.Name, "tier": b.Tier, "source_bundle_id": b.ID, @@ -142,7 +142,7 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`, models.StatusFailed, msg, wsID) - broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", wsID, map[string]interface{}{ + broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ "error": msg, }) } diff --git a/workspace-server/internal/channels/manager.go b/workspace-server/internal/channels/manager.go index 0991d520..3085de35 100644 --- a/workspace-server/internal/channels/manager.go +++ b/workspace-server/internal/channels/manager.go @@ -10,6 +10,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" ) const ( @@ -304,14 +305,14 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound "parts": []map[string]interface{}{{"kind": "text", "text": msg.Text}}, }, "metadata": map[string]interface{}{ - "source": ch.ChannelType, - "channel_id": ch.ID, - "chat_id": msg.ChatID, - "user_id": msg.UserID, - "username": msg.Username, - "message_id": msg.MessageID, - "history": history, - "extra": msg.Metadata, + "source": ch.ChannelType, + "channel_id": ch.ID, + "chat_id": msg.ChatID, + "user_id": msg.UserID, + "username": msg.Username, + "message_id": msg.MessageID, + "history": history, + "extra": msg.Metadata, }, }, }) @@ -383,7 +384,7 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound // Broadcast event if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, "CHANNEL_MESSAGE", ch.WorkspaceID, map[string]interface{}{ + m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "username": msg.Username, @@ -427,7 +428,7 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin } if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, "CHANNEL_MESSAGE", ch.WorkspaceID, map[string]interface{}{ + m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "direction": "outbound", diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 10e9efc6..a0c7e0c6 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -14,10 +14,12 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" ) + // proxyDispatchBuildError is a sentinel wrapper for failures inside // http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the // "failed to create proxy request" 500 instead of the standard 502/503 paths. @@ -90,10 +92,10 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace Status: http.StatusServiceUnavailable, Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)}, Response: gin.H{ - "error": "workspace agent busy — adapter handles retry (native_session)", - "busy": true, - "retry_after": busyRetryAfterSeconds, - "native_session": true, + "error": "workspace agent busy — adapter handles retry (native_session)", + "busy": true, + "retry_after": busyRetryAfterSeconds, + "native_session": true, }, } } @@ -149,7 +151,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // Provisioner selection (mutually exclusive in production): // - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect. // - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's -// /cp/workspaces/:id/status to read the EC2 state. +// /cp/workspaces/:id/status to read the EC2 state. // // Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants // (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every @@ -191,7 +193,7 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err) } db.ClearWorkspaceKeys(ctx, workspaceID) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{}) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) go h.RestartByID(workspaceID) return true } @@ -272,7 +274,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle }(ctx) if callerID == "" && statusCode < 400 { - h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{ + h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ "response_body": json.RawMessage(respBody), "method": a2aMethod, "duration_ms": durationMs, diff --git a/workspace-server/internal/handlers/a2a_queue.go b/workspace-server/internal/handlers/a2a_queue.go index 6026b8f6..dd012e02 100644 --- a/workspace-server/internal/handlers/a2a_queue.go +++ b/workspace-server/internal/handlers/a2a_queue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" ) // extractIdempotencyKey pulls params.message.messageId out of an A2A JSON-RPC @@ -435,7 +436,7 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context, // "⏸ queued" line to "✓ completed" in real time. Without this the // transition only surfaces after the user reloads or polls activity. if h.broadcaster != nil { - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "response_preview": truncate(responseText, 200), diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index db63b155..cb533935 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -55,7 +55,7 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { func (h *ActivityHandler) List(c *gin.Context) { workspaceID := c.Param("id") activityType := c.Query("type") - source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL + source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL peerID := c.Query("peer_id") // optional UUID — restrict to rows where this peer is sender OR target limitStr := c.DefaultQuery("limit", "100") sinceSecsStr := c.Query("since_secs") @@ -650,7 +650,7 @@ func LogActivity(ctx context.Context, broadcaster events.EventEmitter, params Ac if respStr != nil { payload["response_body"] = json.RawMessage(respJSON) } - broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", payload) + broadcaster.BroadcastOnly(params.WorkspaceID, string(events.EventActivityLogged), payload) } } diff --git a/workspace-server/internal/handlers/agent.go b/workspace-server/internal/handlers/agent.go index 9daa0927..f98afd93 100644 --- a/workspace-server/internal/handlers/agent.go +++ b/workspace-server/internal/handlers/agent.go @@ -69,7 +69,7 @@ func (h *AgentHandler) Assign(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, "AGENT_ASSIGNED", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentAssigned), workspaceID, map[string]interface{}{ "agent_id": agentID, "model": body.Model, }) @@ -118,7 +118,7 @@ func (h *AgentHandler) Replace(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, "AGENT_REPLACED", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentReplaced), workspaceID, map[string]interface{}{ "agent_id": agentID, "model": body.Model, "old_model": oldModel, @@ -148,7 +148,7 @@ func (h *AgentHandler) Remove(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, "AGENT_REMOVED", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentRemoved), workspaceID, map[string]interface{}{ "agent_id": agentID, "model": model, }) @@ -215,21 +215,21 @@ func (h *AgentHandler) Move(c *gin.Context) { } // Broadcast on both workspaces - h.broadcaster.RecordAndBroadcast(ctx, "AGENT_MOVED", sourceID, map[string]interface{}{ - "agent_id": agentID, - "model": model, - "target_workspace_id": body.TargetWorkspaceID, + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentMoved), sourceID, map[string]interface{}{ + "agent_id": agentID, + "model": model, + "target_workspace_id": body.TargetWorkspaceID, }) - h.broadcaster.RecordAndBroadcast(ctx, "AGENT_MOVED", body.TargetWorkspaceID, map[string]interface{}{ - "agent_id": agentID, - "model": model, - "source_workspace_id": sourceID, + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentMoved), body.TargetWorkspaceID, map[string]interface{}{ + "agent_id": agentID, + "model": model, + "source_workspace_id": sourceID, }) c.JSON(http.StatusOK, gin.H{ - "agent_id": agentID, - "model": model, - "from_workspace": sourceID, - "to_workspace": body.TargetWorkspaceID, + "agent_id": agentID, + "model": model, + "from_workspace": sourceID, + "to_workspace": body.TargetWorkspaceID, }) } diff --git a/workspace-server/internal/handlers/approvals.go b/workspace-server/internal/handlers/approvals.go index 4b394c7e..1f091afa 100644 --- a/workspace-server/internal/handlers/approvals.go +++ b/workspace-server/internal/handlers/approvals.go @@ -51,7 +51,7 @@ func (h *ApprovalsHandler) Create(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, "APPROVAL_REQUESTED", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ "approval_id": approvalID, "action": body.Action, "reason": body.Reason, @@ -62,7 +62,7 @@ func (h *ApprovalsHandler) Create(c *gin.Context) { var parentID *string db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID) if parentID != nil { - h.broadcaster.RecordAndBroadcast(ctx, "APPROVAL_ESCALATED", *parentID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ "approval_id": approvalID, "from_workspace_id": workspaceID, "action": body.Action, diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index d9b66884..247fc35f 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -164,7 +164,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { go h.executeDelegation(sourceID, body.TargetID, delegationID, a2aBody) // Broadcast event so canvas shows delegation in real-time - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_SENT", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": body.TargetID, "task_preview": truncate(body.Task, 100), @@ -317,7 +317,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s // Update status: pending → dispatched h.updateDelegationStatus(sourceID, delegationID, "dispatched", "") - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_STATUS", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "status": "dispatched", }) @@ -352,7 +352,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s log.Printf("Delegation %s: failed to insert error log: %v", delegationID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationFailed), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "error": proxyErr.Error(), }) // RFC #2829 PR-2 result-push (see UpdateStatus for rationale). @@ -388,7 +388,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s `, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil { log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_STATUS", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "status": "queued", }) return @@ -420,7 +420,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s // delegation_ledger_integration_test.go. recordLedgerStatus(ctx, delegationID, "completed", "", responseText) h.updateDelegationStatus(sourceID, delegationID, "completed", "") - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "response_preview": truncate(responseText, 200), @@ -503,7 +503,7 @@ func (h *DelegationHandler) Record(c *gin.Context) { recordLedgerInsert(ctx, sourceID, body.TargetID, body.DelegationID, body.Task, "") recordLedgerStatus(ctx, body.DelegationID, "dispatched", "", "") - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_SENT", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{ "delegation_id": body.DelegationID, "target_id": body.TargetID, "task_preview": truncate(body.Task, 100), @@ -558,7 +558,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { `, sourceID, sourceID, "Delegation completed ("+truncate(body.ResponsePreview, 80)+")", string(respJSON)); err != nil { log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, "response_preview": truncate(body.ResponsePreview, 200), }) @@ -570,7 +570,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { // the result instead of holding open an HTTP connection. pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", body.ResponsePreview, "") } else { - h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_FAILED", sourceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationFailed), sourceID, map[string]interface{}{ "delegation_id": delegationID, "error": body.Error, }) diff --git a/workspace-server/internal/handlers/external_rotate.go b/workspace-server/internal/handlers/external_rotate.go index 887bddd5..ce029958 100644 --- a/workspace-server/internal/handlers/external_rotate.go +++ b/workspace-server/internal/handlers/external_rotate.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" ) @@ -100,7 +101,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) { // see when credentials were rotated. No PII; the token plaintext // is NOT logged. if h.broadcaster != nil { - h.broadcaster.RecordAndBroadcast(ctx, "EXTERNAL_CREDENTIALS_ROTATED", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventExternalCredentialsRotated), id, map[string]interface{}{ "workspace_id": id, }) } diff --git a/workspace-server/internal/handlers/org_import.go b/workspace-server/internal/handlers/org_import.go index 3dfe2fbd..60cac720 100644 --- a/workspace-server/internal/handlers/org_import.go +++ b/workspace-server/internal/handlers/org_import.go @@ -20,12 +20,14 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/channels" "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" "github.com/google/uuid" ) + // createWorkspaceTree recursively materialises an OrgWorkspace (and its // descendants) into the workspaces + canvas_layouts tables and kicks off // Docker provisioning. absX/absY are THIS workspace's absolute canvas @@ -227,7 +229,7 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX if parentID != nil { payload["parent_id"] = *parentID } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", id, payload) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), id, payload) // Seed initial memories from workspace config or defaults (issue #1050). // Per-workspace initial_memories override defaults; if workspace has none, @@ -243,7 +245,7 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, url = $2 WHERE id = $3`, models.StatusOnline, ws.URL, id); err != nil { log.Printf("Org import: external workspace status update failed for %s: %v", ws.Name, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), id, map[string]interface{}{ "name": ws.Name, "external": true, }) } else if h.workspace.HasProvisioner() { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 8960170c..84333985 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -414,7 +414,7 @@ func (h *RegistryHandler) Register(c *gin.Context) { } // Broadcast WORKSPACE_ONLINE - if err := h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.ID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.ID, map[string]interface{}{ "url": cachedURL, "agent_card": payload.AgentCard, "delivery_mode": effectiveMode, @@ -572,7 +572,7 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { // Broadcast current task update only when it changed (avoid spamming on every heartbeat) if payload.CurrentTask != prevTask { - h.broadcaster.BroadcastOnly(payload.WorkspaceID, "TASK_UPDATED", map[string]interface{}{ + h.broadcaster.BroadcastOnly(payload.WorkspaceID, string(events.EventTaskUpdated), map[string]interface{}{ "current_task": payload.CurrentTask, "active_tasks": payload.ActiveTasks, }) @@ -593,7 +593,7 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { // so per-heartbeat cost is one in-memory channel send per active // SSE subscriber and one WS hub fan-out. At 30s heartbeat cadence // this is far below any noise floor on either path. - h.broadcaster.BroadcastOnly(payload.WorkspaceID, "WORKSPACE_HEARTBEAT", map[string]interface{}{ + h.broadcaster.BroadcastOnly(payload.WorkspaceID, string(events.EventWorkspaceHeartbeat), map[string]interface{}{ "active_tasks": payload.ActiveTasks, "uptime_seconds": payload.UptimeSeconds, }) @@ -678,7 +678,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea if err != nil { log.Printf("Heartbeat: failed to mark %s degraded (wedged): %v", payload.WorkspaceID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_DEGRADED", payload.WorkspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceDegraded), payload.WorkspaceID, map[string]interface{}{ "runtime_state": "wedged", "sample_error": payload.SampleError, }) @@ -699,7 +699,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusDegraded, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to mark %s degraded: %v", payload.WorkspaceID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_DEGRADED", payload.WorkspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceDegraded), payload.WorkspaceID, map[string]interface{}{ "error_rate": payload.ErrorRate, "sample_error": payload.SampleError, }) @@ -718,7 +718,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusOnline, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{}) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{}) } // Recovery: if workspace was offline but is now sending heartbeats, bring it back online. @@ -728,7 +728,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'offline'`, models.StatusOnline, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to recover %s from offline: %v", payload.WorkspaceID, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{}) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{}) } // Auto-recovery: if a workspace is marked "provisioning" but is actively sending @@ -743,7 +743,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea } else { log.Printf("Heartbeat: transitioned %s from provisioning to online (heartbeat received)", payload.WorkspaceID) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{ "recovered_from": currentStatus, }) } @@ -771,7 +771,7 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea } else { log.Printf("Heartbeat: transitioned %s from awaiting_agent to online (heartbeat received)", payload.WorkspaceID) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{ "recovered_from": currentStatus, }) } @@ -820,7 +820,7 @@ func (h *RegistryHandler) UpdateCard(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(c.Request.Context(), "AGENT_CARD_UPDATED", payload.WorkspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(c.Request.Context(), string(events.EventAgentCardUpdated), payload.WorkspaceID, map[string]interface{}{ "agent_card": payload.AgentCard, }) diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index cf210342..a163cee9 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -112,7 +112,6 @@ func (h *WorkspaceHandler) SetCPProvisioner(cp provisioner.CPProvisionerAPI) { h.cpProv = cp } - // SetEnvMutators wires a provisionhook.Registry into the handler. Plugins // living in separate repos register on the same Registry instance during // boot (see cmd/server/main.go) and main.go calls this setter once before @@ -361,7 +360,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { // populate the Runtime pill on the side panel immediately — without it // the node lives as "runtime: unknown" until something refetches the // workspace row (which nothing does during provisioning). - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), id, map[string]interface{}{ "name": payload.Name, "tier": payload.Tier, "runtime": payload.Runtime, @@ -388,7 +387,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { if err := db.CacheURL(ctx, id, payload.URL); err != nil { log.Printf("External workspace: failed to cache URL for %s: %v", id, err) } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), id, map[string]interface{}{ "name": payload.Name, "external": true, }) } else { @@ -407,7 +406,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { } else { connectionToken = tok } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_AWAITING_AGENT", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceAwaitingAgent), id, map[string]interface{}{ "name": payload.Name, "external": true, }) } @@ -539,24 +538,24 @@ func scanWorkspaceRow(rows interface { } ws := map[string]interface{}{ - "id": id, - "name": name, - "tier": tier, - "status": status, - "url": url, - "parent_id": parentID, - "active_tasks": activeTasks, - "max_concurrent_tasks": maxConcurrentTasks, - "last_error_rate": errorRate, - "last_sample_error": sampleError, - "uptime_seconds": uptimeSeconds, - "current_task": currentTask, - "runtime": runtime, - "workspace_dir": nilIfEmpty(workspaceDir), - "monthly_spend": monthlySpend, - "x": x, - "y": y, - "collapsed": collapsed, + "id": id, + "name": name, + "tier": tier, + "status": status, + "url": url, + "parent_id": parentID, + "active_tasks": activeTasks, + "max_concurrent_tasks": maxConcurrentTasks, + "last_error_rate": errorRate, + "last_sample_error": sampleError, + "uptime_seconds": uptimeSeconds, + "current_task": currentTask, + "runtime": runtime, + "workspace_dir": nilIfEmpty(workspaceDir), + "monthly_spend": monthlySpend, + "x": x, + "y": y, + "collapsed": collapsed, } // budget_limit: nil when no limit set, int64 otherwise diff --git a/workspace-server/internal/handlers/workspace_bootstrap.go b/workspace-server/internal/handlers/workspace_bootstrap.go index 7c84473e..2928ffd0 100644 --- a/workspace-server/internal/handlers/workspace_bootstrap.go +++ b/workspace-server/internal/handlers/workspace_bootstrap.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/gin-gonic/gin" ) @@ -85,7 +86,7 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(c.Request.Context(), "WORKSPACE_PROVISION_FAILED", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(c.Request.Context(), string(events.EventWorkspaceProvisionFailed), id, map[string]interface{}{ "error": errMsg, "log_tail": tail, "source": "bootstrap_watcher", diff --git a/workspace-server/internal/handlers/workspace_crud.go b/workspace-server/internal/handlers/workspace_crud.go index 4e58804f..200356b1 100644 --- a/workspace-server/internal/handlers/workspace_crud.go +++ b/workspace-server/internal/handlers/workspace_crud.go @@ -16,12 +16,14 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/lib/pq" ) + // State handles GET /workspaces/:id/state — minimal status payload for // remote-agent polling (Phase 30.4). Returns `{status, paused, deleted, // workspace_id}` so a remote agent can detect pause/resume/delete @@ -380,7 +382,7 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) { pq.Array(allIDs)); err != nil { log.Printf("Delete token revocation error for %s: %v", id, err) } -// #1027: cascade-disable all schedules for the deleted workspaces so + // #1027: cascade-disable all schedules for the deleted workspaces so // the scheduler never fires a cron into a removed container. if _, err := db.DB.ExecContext(ctx, `UPDATE workspace_schedules SET enabled = false, updated_at = now() @@ -466,14 +468,14 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) { // leaving other WS clients ignorant of the cascade. The DB // row is already 'removed' so it's recoverable, but the // inconsistency is avoidable. - h.broadcaster.RecordAndBroadcast(cleanupCtx, "WORKSPACE_REMOVED", descID, map[string]interface{}{}) + h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), descID, map[string]interface{}{}) } stopAndRemove(id) db.ClearWorkspaceKeys(cleanupCtx, id) restartStates.Delete(id) // #2269: same as descendants above - h.broadcaster.RecordAndBroadcast(cleanupCtx, "WORKSPACE_REMOVED", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(cleanupCtx, string(events.EventWorkspaceRemoved), id, map[string]interface{}{ "cascade_deleted": len(descendantIDs), }) diff --git a/workspace-server/internal/handlers/workspace_provision_shared.go b/workspace-server/internal/handlers/workspace_provision_shared.go index 00e00bd0..e879521a 100644 --- a/workspace-server/internal/handlers/workspace_provision_shared.go +++ b/workspace-server/internal/handlers/workspace_provision_shared.go @@ -41,6 +41,7 @@ import ( "path/filepath" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -212,7 +213,7 @@ func (h *WorkspaceHandler) markProvisionFailed(ctx context.Context, workspaceID, } else if _, hasErr := extra["error"]; !hasErr { extra["error"] = msg } - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, extra) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), workspaceID, extra) if _, dbErr := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $3, last_sample_error = $2, updated_at = now() WHERE id = $1`, workspaceID, msg, models.StatusFailed); dbErr != nil { diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index c5712be5..42b25f3a 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -11,6 +11,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provlog" "github.com/gin-gonic/gin" @@ -147,7 +148,7 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { // Reset to provisioning db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusProvisioning, id) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), id, map[string]interface{}{ "name": wsName, "tier": tier, "runtime": containerRuntime, @@ -341,7 +342,7 @@ func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID s } db.ClearWorkspaceKeys(ctx, workspaceID) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceHibernated), workspaceID, map[string]interface{}{ "name": wsName, "tier": tier, }) @@ -552,7 +553,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusProvisioning, workspaceID) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", workspaceID, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), workspaceID, map[string]interface{}{ "name": wsName, "tier": tier, "runtime": dbRuntime, }) @@ -640,7 +641,7 @@ func (h *WorkspaceHandler) Pause(c *gin.Context) { db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, url = '', updated_at = now() WHERE id = $2`, models.StatusPaused, ws.id) db.ClearWorkspaceKeys(ctx, ws.id) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PAUSED", ws.id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspacePaused), ws.id, map[string]interface{}{ "name": ws.name, }) } @@ -709,7 +710,7 @@ func (h *WorkspaceHandler) Resume(c *gin.Context) { for _, ws := range toResume { db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusProvisioning, ws.id) - h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", ws.id, map[string]interface{}{ + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisioning), ws.id, map[string]interface{}{ "name": ws.name, "tier": ws.tier, "runtime": ws.runtime, }) payload := models.CreateWorkspacePayload{Name: ws.name, Tier: ws.tier, Runtime: ws.runtime} diff --git a/workspace-server/internal/registry/provisiontimeout.go b/workspace-server/internal/registry/provisiontimeout.go index 1b35798e..46b9e157 100644 --- a/workspace-server/internal/registry/provisiontimeout.go +++ b/workspace-server/internal/registry/provisiontimeout.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" ) @@ -197,7 +198,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter // A separate event type was considered but the UI reaction is // identical either way — operators who need to distinguish can // tell from the `source` payload field. - if emitErr := emitter.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", c.id, map[string]interface{}{ + if emitErr := emitter.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), c.id, map[string]interface{}{ "error": msg, "timeout_secs": timeoutSec, "runtime": c.runtime, diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index e098586d..a7a969ca 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -14,6 +14,7 @@ import ( cronlib "github.com/robfig/cron/v3" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" ) @@ -541,7 +542,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { insertCancel() if s.broadcaster != nil { - s.broadcaster.RecordAndBroadcast(ctx, "CRON_EXECUTED", sched.WorkspaceID, map[string]interface{}{ + s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronExecuted), sched.WorkspaceID, map[string]interface{}{ "schedule_id": sched.ID, "schedule_name": sched.Name, "status": lastStatus, @@ -618,7 +619,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active skipInsCancel() if s.broadcaster != nil { - _ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{ + _ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{ "schedule_id": sched.ID, "schedule_name": sched.Name, "reason": reason,