diff --git a/platform/internal/handlers/restart_context.go b/platform/internal/handlers/restart_context.go new file mode 100644 index 00000000..3ae2f41d --- /dev/null +++ b/platform/internal/handlers/restart_context.go @@ -0,0 +1,240 @@ +// Package handlers — restart_context.go implements Layer 1 of issue #19: +// after a workspace is restarted and comes back online, the platform +// generates a state snapshot (timestamp, previous session end, env-var +// keys now available) and delivers it as a synthetic A2A message/send +// so the agent sees what changed across the restart boundary. +// +// Layer 2 (user-defined restart_prompt via config.yaml / org.yaml) is +// out of scope for this file — tracked as a separate follow-up issue. +package handlers + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "sort" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/google/uuid" +) + +// restartContextOnlineTimeout bounds how long we wait for a workspace +// to re-register after restart before dropping the context message. +// The Restart HTTP handler has already returned 200 by the time this +// waiter runs, so a timeout here is purely a best-effort skip. +const restartContextOnlineTimeout = 30 * time.Second + +// restartContextOnlinePollInterval is the poll cadence while waiting +// for WORKSPACE_ONLINE. 500ms keeps the typical-case latency low +// without hammering Postgres. +const restartContextOnlinePollInterval = 500 * time.Millisecond + +// restartContextData captures the platform-computed snapshot that will +// be rendered into a human-readable message. Keeping it as a struct +// (rather than building the string inline) makes the builder +// unit-testable without stubbing time/DB calls. +type restartContextData struct { + RestartAt time.Time + PrevSessionAt time.Time // zero value = no prior session recorded + EnvKeys []string // sorted list of env-var keys (no values) +} + +// buildRestartContextMessage renders the restart context into the +// exact format proposed in issue #19. Fields that have no data (e.g. +// first-ever session) are rendered with a neutral placeholder so the +// agent always sees a consistent shape. +func buildRestartContextMessage(d restartContextData) string { + msg := "=== WORKSPACE RESTART CONTEXT ===\n" + msg += fmt.Sprintf("Restart at: %s\n", d.RestartAt.UTC().Format(time.RFC3339)) + + if d.PrevSessionAt.IsZero() { + msg += "Previous session ended: (no prior session on record)\n" + } else { + delta := d.RestartAt.Sub(d.PrevSessionAt) + msg += fmt.Sprintf("Previous session ended: %s (%s ago)\n", + d.PrevSessionAt.UTC().Format(time.RFC3339), + humanDuration(delta)) + } + + if len(d.EnvKeys) == 0 { + msg += "Env vars now available: (none)\n" + } else { + msg += fmt.Sprintf("Env vars now available: %s\n", joinStrings(d.EnvKeys, ", ")) + } + + msg += "=== END RESTART CONTEXT ===\n" + return msg +} + +// humanDuration formats a duration for display in the restart context. +// Keeps the output terse ("2h14m", "38s") without pulling in a +// humanize library. Negative/zero deltas render as "0s". +func humanDuration(d time.Duration) string { + if d <= 0 { + return "0s" + } + d = d.Round(time.Second) + h := int(d / time.Hour) + m := int((d % time.Hour) / time.Minute) + s := int((d % time.Minute) / time.Second) + switch { + case h > 0: + return fmt.Sprintf("%dh%dm", h, m) + case m > 0: + return fmt.Sprintf("%dm%ds", m, s) + default: + return fmt.Sprintf("%ds", s) + } +} + +// joinStrings is strings.Join — inlined to avoid an import cycle +// concern in a file that already carries a handful of stdlib deps. +func joinStrings(parts []string, sep string) string { + switch len(parts) { + case 0: + return "" + case 1: + return parts[0] + } + n := len(sep) * (len(parts) - 1) + for i := 0; i < len(parts); i++ { + n += len(parts[i]) + } + b := make([]byte, 0, n) + b = append(b, parts[0]...) + for _, p := range parts[1:] { + b = append(b, sep...) + b = append(b, p...) + } + return string(b) +} + +// loadRestartContextData gathers the snapshot inputs from the DB. +// Called *before* the restart mutates workspace state so the "previous +// session ended" timestamp reflects the pre-restart heartbeat, not the +// newly-provisioning row. +func loadRestartContextData(ctx context.Context, workspaceID string) restartContextData { + d := restartContextData{RestartAt: time.Now()} + + var lastHB sql.NullTime + if err := db.DB.QueryRowContext(ctx, + `SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&lastHB); err == nil && lastHB.Valid { + d.PrevSessionAt = lastHB.Time + } + + // Env-var keys: union of global secrets + workspace-specific + // secrets. Values are NEVER included — only keys — so the agent + // can reason about "did my missing credential arrive?" without + // the platform ever echoing secret material back into the + // message bus. + keySet := map[string]struct{}{} + if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil { + for rows.Next() { + var k string + if rows.Scan(&k) == nil { + keySet[k] = struct{}{} + } + } + rows.Close() + } + if rows, err := db.DB.QueryContext(ctx, + `SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID, + ); err == nil { + for rows.Next() { + var k string + if rows.Scan(&k) == nil { + keySet[k] = struct{}{} + } + } + rows.Close() + } + for k := range keySet { + d.EnvKeys = append(d.EnvKeys, k) + } + sort.Strings(d.EnvKeys) + return d +} + +// waitForWorkspaceOnline polls the workspaces table until the target +// workspace's status flips to 'online' or the deadline expires. +// Returns true on success; callers log+drop on false. +func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + var status string + if err := db.DB.QueryRowContext(ctx, + `SELECT status FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&status); err == nil && status == "online" { + return true + } + select { + case <-ctx.Done(): + return false + case <-time.After(restartContextOnlinePollInterval): + } + } + return false +} + +// buildRestartA2APayload wraps the rendered context string in the +// JSON-RPC 2.0 / A2A message/send shape that the proxy already knows +// how to normalize. Returns the marshalled body ready for ProxyA2ARequest. +func buildRestartA2APayload(text string) ([]byte, error) { + payload := map[string]any{ + "jsonrpc": "2.0", + "id": uuid.New().String(), + "method": "message/send", + "params": map[string]any{ + "message": map[string]any{ + "messageId": uuid.New().String(), + "role": "user", + "parts": []any{map[string]any{"kind": "text", "text": text}}, + "metadata": map[string]any{ + "source": "platform", + "kind": "restart_context", + "layer": 1, + "restart_context": true, + }, + }, + }, + } + return json.Marshal(payload) +} + +// sendRestartContext is called by the Restart handler in a background +// goroutine. It waits for the workspace to come online, then delivers +// the snapshot via the existing A2A proxy. Failures are logged and +// dropped — the restart itself is already considered successful at +// this point. +func (h *WorkspaceHandler) sendRestartContext(workspaceID string, data restartContextData) { + // Detach from any request context — this runs after the HTTP + // response is flushed. + ctx, cancel := context.WithTimeout(context.Background(), restartContextOnlineTimeout+30*time.Second) + defer cancel() + + if !waitForWorkspaceOnline(ctx, workspaceID, restartContextOnlineTimeout) { + log.Printf("restart-context: workspace %s did not come online within %s — dropping context message", workspaceID, restartContextOnlineTimeout) + return + } + + text := buildRestartContextMessage(data) + body, err := buildRestartA2APayload(text) + if err != nil { + log.Printf("restart-context: failed to marshal payload for %s: %v", workspaceID, err) + return + } + + // "system:restart-context" prefix flags this as a trusted + // non-workspace caller — bypasses CanCommunicate and the + // caller-token check in a2a_proxy.go. + status, _, proxyErr := h.ProxyA2ARequest(ctx, workspaceID, body, "system:restart-context", false) + if proxyErr != nil { + log.Printf("restart-context: ProxyA2ARequest failed for %s (status=%d): %v", workspaceID, status, proxyErr) + return + } + log.Printf("restart-context: delivered to %s (status=%d, keys=%d)", workspaceID, status, len(data.EnvKeys)) +} diff --git a/platform/internal/handlers/restart_context_test.go b/platform/internal/handlers/restart_context_test.go new file mode 100644 index 00000000..75c5e7e8 --- /dev/null +++ b/platform/internal/handlers/restart_context_test.go @@ -0,0 +1,120 @@ +package handlers + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +func TestHumanDuration(t *testing.T) { + cases := []struct { + in time.Duration + want string + }{ + {0, "0s"}, + {-5 * time.Second, "0s"}, + {45 * time.Second, "45s"}, + {3 * time.Minute, "3m0s"}, + {2*time.Hour + 14*time.Minute, "2h14m"}, + {25 * time.Hour, "25h0m"}, + } + for _, c := range cases { + got := humanDuration(c.in) + if got != c.want { + t.Errorf("humanDuration(%v) = %q, want %q", c.in, got, c.want) + } + } +} + +func TestBuildRestartContextMessage_NoPriorSession(t *testing.T) { + d := restartContextData{ + RestartAt: time.Date(2026, 4, 13, 12, 0, 0, 0, time.UTC), + PrevSessionAt: time.Time{}, + EnvKeys: nil, + } + msg := buildRestartContextMessage(d) + + mustContain(t, msg, "=== WORKSPACE RESTART CONTEXT ===") + mustContain(t, msg, "Restart at: 2026-04-13T12:00:00Z") + mustContain(t, msg, "Previous session ended: (no prior session on record)") + mustContain(t, msg, "Env vars now available: (none)") + mustContain(t, msg, "=== END RESTART CONTEXT ===") +} + +func TestBuildRestartContextMessage_WithDataRendersKeysAndDelta(t *testing.T) { + prev := time.Date(2026, 4, 13, 10, 0, 0, 0, time.UTC) + d := restartContextData{ + RestartAt: prev.Add(2*time.Hour + 14*time.Minute), + PrevSessionAt: prev, + EnvKeys: []string{"ANTHROPIC_API_KEY", "OPENAI_API_KEY"}, + } + msg := buildRestartContextMessage(d) + + mustContain(t, msg, "Previous session ended: 2026-04-13T10:00:00Z (2h14m ago)") + mustContain(t, msg, "Env vars now available: ANTHROPIC_API_KEY, OPENAI_API_KEY") + if strings.Contains(msg, "sk-") || strings.Contains(msg, "secret-value") { + t.Errorf("rendered message leaked secret-shaped content: %q", msg) + } +} + +func TestBuildRestartA2APayload_ShapeIsJSONRPCMessageSend(t *testing.T) { + body, err := buildRestartA2APayload("hello world") + if err != nil { + t.Fatalf("buildRestartA2APayload: %v", err) + } + + var parsed map[string]any + if err := json.Unmarshal(body, &parsed); err != nil { + t.Fatalf("payload is not valid JSON: %v", err) + } + + if parsed["jsonrpc"] != "2.0" { + t.Errorf("jsonrpc = %v, want 2.0", parsed["jsonrpc"]) + } + if parsed["method"] != "message/send" { + t.Errorf("method = %v, want message/send", parsed["method"]) + } + + params, ok := parsed["params"].(map[string]any) + if !ok { + t.Fatalf("params missing or wrong type: %v", parsed["params"]) + } + message, ok := params["message"].(map[string]any) + if !ok { + t.Fatalf("params.message missing: %v", params) + } + if message["role"] != "user" { + t.Errorf("role = %v, want user", message["role"]) + } + if message["messageId"] == nil || message["messageId"] == "" { + t.Errorf("messageId missing") + } + + meta, ok := message["metadata"].(map[string]any) + if !ok { + t.Fatalf("metadata missing: %v", message) + } + if meta["kind"] != "restart_context" { + t.Errorf("metadata.kind = %v, want restart_context", meta["kind"]) + } + if meta["source"] != "platform" { + t.Errorf("metadata.source = %v, want platform", meta["source"]) + } + + parts, ok := message["parts"].([]any) + if !ok || len(parts) != 1 { + t.Fatalf("parts wrong shape: %v", message["parts"]) + } + firstPart, _ := parts[0].(map[string]any) + if firstPart["text"] != "hello world" { + t.Errorf("parts[0].text = %v, want hello world", firstPart["text"]) + } +} + +func mustContain(t *testing.T, haystack, needle string) { + t.Helper() + if !strings.Contains(haystack, needle) { + t.Errorf("expected output to contain %q, got:\n%s", needle, haystack) + } +} diff --git a/platform/internal/handlers/workspace_restart.go b/platform/internal/handlers/workspace_restart.go index e40f9670..503c6d68 100644 --- a/platform/internal/handlers/workspace_restart.go +++ b/platform/internal/handlers/workspace_restart.go @@ -159,7 +159,12 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { log.Printf("Restart: reset=true — will discard claude-sessions volume for %s (%s)", wsName, id) } + // Capture restart-context data BEFORE provisionWorkspaceOpts flips + // last_heartbeat_at with the new session. Issue #19 Layer 1. + restartData := loadRestartContextData(ctx, id) + go h.provisionWorkspaceOpts(id, templatePath, configFiles, payload, resetClaudeSession) + go h.sendRestartContext(id, restartData) c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession}) } @@ -219,8 +224,13 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) { // Runtime from DB — no more config file parsing payload := models.CreateWorkspacePayload{Name: wsName, Tier: tier, Runtime: dbRuntime} + // Snapshot restart-context data before the new session overwrites + // last_heartbeat_at. Issue #19 Layer 1. + restartData := loadRestartContextData(ctx, workspaceID) + // On auto-restart, do NOT re-apply templates — preserve existing config volume. go h.provisionWorkspace(workspaceID, "", nil, payload) + go h.sendRestartContext(workspaceID, restartData) } // Pause handles POST /workspaces/:id/pause