forked from molecule-ai/molecule-core
Merge pull request #65 from Molecule-AI/fix/issue-19-restart-context-layer1
feat(platform): inject restart context system message (#19 Layer 1)
This commit is contained in:
commit
3ea8cda5b0
240
platform/internal/handlers/restart_context.go
Normal file
240
platform/internal/handlers/restart_context.go
Normal file
@ -0,0 +1,240 @@
|
||||
// Package handlers — restart_context.go implements Layer 1 of issue #19:
|
||||
// after a workspace is restarted and comes back online, the platform
|
||||
// generates a state snapshot (timestamp, previous session end, env-var
|
||||
// keys now available) and delivers it as a synthetic A2A message/send
|
||||
// so the agent sees what changed across the restart boundary.
|
||||
//
|
||||
// Layer 2 (user-defined restart_prompt via config.yaml / org.yaml) is
|
||||
// out of scope for this file — tracked as a separate follow-up issue.
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// restartContextOnlineTimeout bounds how long we wait for a workspace
|
||||
// to re-register after restart before dropping the context message.
|
||||
// The Restart HTTP handler has already returned 200 by the time this
|
||||
// waiter runs, so a timeout here is purely a best-effort skip.
|
||||
const restartContextOnlineTimeout = 30 * time.Second
|
||||
|
||||
// restartContextOnlinePollInterval is the poll cadence while waiting
|
||||
// for WORKSPACE_ONLINE. 500ms keeps the typical-case latency low
|
||||
// without hammering Postgres.
|
||||
const restartContextOnlinePollInterval = 500 * time.Millisecond
|
||||
|
||||
// restartContextData captures the platform-computed snapshot that will
|
||||
// be rendered into a human-readable message. Keeping it as a struct
|
||||
// (rather than building the string inline) makes the builder
|
||||
// unit-testable without stubbing time/DB calls.
|
||||
type restartContextData struct {
|
||||
RestartAt time.Time
|
||||
PrevSessionAt time.Time // zero value = no prior session recorded
|
||||
EnvKeys []string // sorted list of env-var keys (no values)
|
||||
}
|
||||
|
||||
// buildRestartContextMessage renders the restart context into the
|
||||
// exact format proposed in issue #19. Fields that have no data (e.g.
|
||||
// first-ever session) are rendered with a neutral placeholder so the
|
||||
// agent always sees a consistent shape.
|
||||
func buildRestartContextMessage(d restartContextData) string {
|
||||
msg := "=== WORKSPACE RESTART CONTEXT ===\n"
|
||||
msg += fmt.Sprintf("Restart at: %s\n", d.RestartAt.UTC().Format(time.RFC3339))
|
||||
|
||||
if d.PrevSessionAt.IsZero() {
|
||||
msg += "Previous session ended: (no prior session on record)\n"
|
||||
} else {
|
||||
delta := d.RestartAt.Sub(d.PrevSessionAt)
|
||||
msg += fmt.Sprintf("Previous session ended: %s (%s ago)\n",
|
||||
d.PrevSessionAt.UTC().Format(time.RFC3339),
|
||||
humanDuration(delta))
|
||||
}
|
||||
|
||||
if len(d.EnvKeys) == 0 {
|
||||
msg += "Env vars now available: (none)\n"
|
||||
} else {
|
||||
msg += fmt.Sprintf("Env vars now available: %s\n", joinStrings(d.EnvKeys, ", "))
|
||||
}
|
||||
|
||||
msg += "=== END RESTART CONTEXT ===\n"
|
||||
return msg
|
||||
}
|
||||
|
||||
// humanDuration formats a duration for display in the restart context.
|
||||
// Keeps the output terse ("2h14m", "38s") without pulling in a
|
||||
// humanize library. Negative/zero deltas render as "0s".
|
||||
func humanDuration(d time.Duration) string {
|
||||
if d <= 0 {
|
||||
return "0s"
|
||||
}
|
||||
d = d.Round(time.Second)
|
||||
h := int(d / time.Hour)
|
||||
m := int((d % time.Hour) / time.Minute)
|
||||
s := int((d % time.Minute) / time.Second)
|
||||
switch {
|
||||
case h > 0:
|
||||
return fmt.Sprintf("%dh%dm", h, m)
|
||||
case m > 0:
|
||||
return fmt.Sprintf("%dm%ds", m, s)
|
||||
default:
|
||||
return fmt.Sprintf("%ds", s)
|
||||
}
|
||||
}
|
||||
|
||||
// joinStrings is strings.Join — inlined to avoid an import cycle
|
||||
// concern in a file that already carries a handful of stdlib deps.
|
||||
func joinStrings(parts []string, sep string) string {
|
||||
switch len(parts) {
|
||||
case 0:
|
||||
return ""
|
||||
case 1:
|
||||
return parts[0]
|
||||
}
|
||||
n := len(sep) * (len(parts) - 1)
|
||||
for i := 0; i < len(parts); i++ {
|
||||
n += len(parts[i])
|
||||
}
|
||||
b := make([]byte, 0, n)
|
||||
b = append(b, parts[0]...)
|
||||
for _, p := range parts[1:] {
|
||||
b = append(b, sep...)
|
||||
b = append(b, p...)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// loadRestartContextData gathers the snapshot inputs from the DB.
|
||||
// Called *before* the restart mutates workspace state so the "previous
|
||||
// session ended" timestamp reflects the pre-restart heartbeat, not the
|
||||
// newly-provisioning row.
|
||||
func loadRestartContextData(ctx context.Context, workspaceID string) restartContextData {
|
||||
d := restartContextData{RestartAt: time.Now()}
|
||||
|
||||
var lastHB sql.NullTime
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&lastHB); err == nil && lastHB.Valid {
|
||||
d.PrevSessionAt = lastHB.Time
|
||||
}
|
||||
|
||||
// Env-var keys: union of global secrets + workspace-specific
|
||||
// secrets. Values are NEVER included — only keys — so the agent
|
||||
// can reason about "did my missing credential arrive?" without
|
||||
// the platform ever echoing secret material back into the
|
||||
// message bus.
|
||||
keySet := map[string]struct{}{}
|
||||
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
if rows, err := db.DB.QueryContext(ctx,
|
||||
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
|
||||
); err == nil {
|
||||
for rows.Next() {
|
||||
var k string
|
||||
if rows.Scan(&k) == nil {
|
||||
keySet[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
for k := range keySet {
|
||||
d.EnvKeys = append(d.EnvKeys, k)
|
||||
}
|
||||
sort.Strings(d.EnvKeys)
|
||||
return d
|
||||
}
|
||||
|
||||
// waitForWorkspaceOnline polls the workspaces table until the target
|
||||
// workspace's status flips to 'online' or the deadline expires.
|
||||
// Returns true on success; callers log+drop on false.
|
||||
func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout time.Duration) bool {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
var status string
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT status FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&status); err == nil && status == "online" {
|
||||
return true
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(restartContextOnlinePollInterval):
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// buildRestartA2APayload wraps the rendered context string in the
|
||||
// JSON-RPC 2.0 / A2A message/send shape that the proxy already knows
|
||||
// how to normalize. Returns the marshalled body ready for ProxyA2ARequest.
|
||||
func buildRestartA2APayload(text string) ([]byte, error) {
|
||||
payload := map[string]any{
|
||||
"jsonrpc": "2.0",
|
||||
"id": uuid.New().String(),
|
||||
"method": "message/send",
|
||||
"params": map[string]any{
|
||||
"message": map[string]any{
|
||||
"messageId": uuid.New().String(),
|
||||
"role": "user",
|
||||
"parts": []any{map[string]any{"kind": "text", "text": text}},
|
||||
"metadata": map[string]any{
|
||||
"source": "platform",
|
||||
"kind": "restart_context",
|
||||
"layer": 1,
|
||||
"restart_context": true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return json.Marshal(payload)
|
||||
}
|
||||
|
||||
// sendRestartContext is called by the Restart handler in a background
|
||||
// goroutine. It waits for the workspace to come online, then delivers
|
||||
// the snapshot via the existing A2A proxy. Failures are logged and
|
||||
// dropped — the restart itself is already considered successful at
|
||||
// this point.
|
||||
func (h *WorkspaceHandler) sendRestartContext(workspaceID string, data restartContextData) {
|
||||
// Detach from any request context — this runs after the HTTP
|
||||
// response is flushed.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), restartContextOnlineTimeout+30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if !waitForWorkspaceOnline(ctx, workspaceID, restartContextOnlineTimeout) {
|
||||
log.Printf("restart-context: workspace %s did not come online within %s — dropping context message", workspaceID, restartContextOnlineTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
text := buildRestartContextMessage(data)
|
||||
body, err := buildRestartA2APayload(text)
|
||||
if err != nil {
|
||||
log.Printf("restart-context: failed to marshal payload for %s: %v", workspaceID, err)
|
||||
return
|
||||
}
|
||||
|
||||
// "system:restart-context" prefix flags this as a trusted
|
||||
// non-workspace caller — bypasses CanCommunicate and the
|
||||
// caller-token check in a2a_proxy.go.
|
||||
status, _, proxyErr := h.ProxyA2ARequest(ctx, workspaceID, body, "system:restart-context", false)
|
||||
if proxyErr != nil {
|
||||
log.Printf("restart-context: ProxyA2ARequest failed for %s (status=%d): %v", workspaceID, status, proxyErr)
|
||||
return
|
||||
}
|
||||
log.Printf("restart-context: delivered to %s (status=%d, keys=%d)", workspaceID, status, len(data.EnvKeys))
|
||||
}
|
||||
120
platform/internal/handlers/restart_context_test.go
Normal file
120
platform/internal/handlers/restart_context_test.go
Normal file
@ -0,0 +1,120 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHumanDuration(t *testing.T) {
|
||||
cases := []struct {
|
||||
in time.Duration
|
||||
want string
|
||||
}{
|
||||
{0, "0s"},
|
||||
{-5 * time.Second, "0s"},
|
||||
{45 * time.Second, "45s"},
|
||||
{3 * time.Minute, "3m0s"},
|
||||
{2*time.Hour + 14*time.Minute, "2h14m"},
|
||||
{25 * time.Hour, "25h0m"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got := humanDuration(c.in)
|
||||
if got != c.want {
|
||||
t.Errorf("humanDuration(%v) = %q, want %q", c.in, got, c.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildRestartContextMessage_NoPriorSession(t *testing.T) {
|
||||
d := restartContextData{
|
||||
RestartAt: time.Date(2026, 4, 13, 12, 0, 0, 0, time.UTC),
|
||||
PrevSessionAt: time.Time{},
|
||||
EnvKeys: nil,
|
||||
}
|
||||
msg := buildRestartContextMessage(d)
|
||||
|
||||
mustContain(t, msg, "=== WORKSPACE RESTART CONTEXT ===")
|
||||
mustContain(t, msg, "Restart at: 2026-04-13T12:00:00Z")
|
||||
mustContain(t, msg, "Previous session ended: (no prior session on record)")
|
||||
mustContain(t, msg, "Env vars now available: (none)")
|
||||
mustContain(t, msg, "=== END RESTART CONTEXT ===")
|
||||
}
|
||||
|
||||
func TestBuildRestartContextMessage_WithDataRendersKeysAndDelta(t *testing.T) {
|
||||
prev := time.Date(2026, 4, 13, 10, 0, 0, 0, time.UTC)
|
||||
d := restartContextData{
|
||||
RestartAt: prev.Add(2*time.Hour + 14*time.Minute),
|
||||
PrevSessionAt: prev,
|
||||
EnvKeys: []string{"ANTHROPIC_API_KEY", "OPENAI_API_KEY"},
|
||||
}
|
||||
msg := buildRestartContextMessage(d)
|
||||
|
||||
mustContain(t, msg, "Previous session ended: 2026-04-13T10:00:00Z (2h14m ago)")
|
||||
mustContain(t, msg, "Env vars now available: ANTHROPIC_API_KEY, OPENAI_API_KEY")
|
||||
if strings.Contains(msg, "sk-") || strings.Contains(msg, "secret-value") {
|
||||
t.Errorf("rendered message leaked secret-shaped content: %q", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildRestartA2APayload_ShapeIsJSONRPCMessageSend(t *testing.T) {
|
||||
body, err := buildRestartA2APayload("hello world")
|
||||
if err != nil {
|
||||
t.Fatalf("buildRestartA2APayload: %v", err)
|
||||
}
|
||||
|
||||
var parsed map[string]any
|
||||
if err := json.Unmarshal(body, &parsed); err != nil {
|
||||
t.Fatalf("payload is not valid JSON: %v", err)
|
||||
}
|
||||
|
||||
if parsed["jsonrpc"] != "2.0" {
|
||||
t.Errorf("jsonrpc = %v, want 2.0", parsed["jsonrpc"])
|
||||
}
|
||||
if parsed["method"] != "message/send" {
|
||||
t.Errorf("method = %v, want message/send", parsed["method"])
|
||||
}
|
||||
|
||||
params, ok := parsed["params"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("params missing or wrong type: %v", parsed["params"])
|
||||
}
|
||||
message, ok := params["message"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("params.message missing: %v", params)
|
||||
}
|
||||
if message["role"] != "user" {
|
||||
t.Errorf("role = %v, want user", message["role"])
|
||||
}
|
||||
if message["messageId"] == nil || message["messageId"] == "" {
|
||||
t.Errorf("messageId missing")
|
||||
}
|
||||
|
||||
meta, ok := message["metadata"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("metadata missing: %v", message)
|
||||
}
|
||||
if meta["kind"] != "restart_context" {
|
||||
t.Errorf("metadata.kind = %v, want restart_context", meta["kind"])
|
||||
}
|
||||
if meta["source"] != "platform" {
|
||||
t.Errorf("metadata.source = %v, want platform", meta["source"])
|
||||
}
|
||||
|
||||
parts, ok := message["parts"].([]any)
|
||||
if !ok || len(parts) != 1 {
|
||||
t.Fatalf("parts wrong shape: %v", message["parts"])
|
||||
}
|
||||
firstPart, _ := parts[0].(map[string]any)
|
||||
if firstPart["text"] != "hello world" {
|
||||
t.Errorf("parts[0].text = %v, want hello world", firstPart["text"])
|
||||
}
|
||||
}
|
||||
|
||||
func mustContain(t *testing.T, haystack, needle string) {
|
||||
t.Helper()
|
||||
if !strings.Contains(haystack, needle) {
|
||||
t.Errorf("expected output to contain %q, got:\n%s", needle, haystack)
|
||||
}
|
||||
}
|
||||
@ -159,7 +159,12 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
log.Printf("Restart: reset=true — will discard claude-sessions volume for %s (%s)", wsName, id)
|
||||
}
|
||||
|
||||
// Capture restart-context data BEFORE provisionWorkspaceOpts flips
|
||||
// last_heartbeat_at with the new session. Issue #19 Layer 1.
|
||||
restartData := loadRestartContextData(ctx, id)
|
||||
|
||||
go h.provisionWorkspaceOpts(id, templatePath, configFiles, payload, resetClaudeSession)
|
||||
go h.sendRestartContext(id, restartData)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
|
||||
}
|
||||
@ -219,8 +224,13 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
// Runtime from DB — no more config file parsing
|
||||
payload := models.CreateWorkspacePayload{Name: wsName, Tier: tier, Runtime: dbRuntime}
|
||||
|
||||
// Snapshot restart-context data before the new session overwrites
|
||||
// last_heartbeat_at. Issue #19 Layer 1.
|
||||
restartData := loadRestartContextData(ctx, workspaceID)
|
||||
|
||||
// On auto-restart, do NOT re-apply templates — preserve existing config volume.
|
||||
go h.provisionWorkspace(workspaceID, "", nil, payload)
|
||||
go h.sendRestartContext(workspaceID, restartData)
|
||||
}
|
||||
|
||||
// Pause handles POST /workspaces/:id/pause
|
||||
|
||||
Loading…
Reference in New Issue
Block a user