diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 1a73b28e..6143982e 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -435,6 +435,34 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri return 0, nil, proxyErr } + // Pre-flight container-health check (#36). The dispatchA2A path below + // does Docker-DNS forwarding to `ws-:8000` and only catches a + // missing/dead container REACTIVELY via maybeMarkContainerDead in + // handleA2ADispatchError. That works but costs the caller a full + // network-timeout (2-30s) before the structured 503 surfaces. + // + // When we KNOW the workspace is container-backed (h.docker != nil + we + // rewrite to Docker-DNS form below), do a single proactive + // RunningContainerName lookup. If the container is genuinely missing, + // short-circuit with the same structured 503 + async restart that + // maybeMarkContainerDead would produce — but immediately, without the + // network round-trip. + // + // Three outcomes of provisioner.RunningContainerName(ctx, h.docker, id): + // ("ws-", nil) → forward as today. + // ("", nil) → container is genuinely not running. Fast-503. + // ("", err) → transient daemon error. Fall through to optimistic + // forward — matches Provisioner.IsRunning's + // (true, err) "fail-soft as alive" contract. + // + // Same SSOT as findRunningContainer (#10/#12). See AST gate + // TestProxyA2A_RoutesThroughProvisionerSSOT. + if h.provisioner != nil && platformInDocker && strings.HasPrefix(agentURL, "http://"+provisioner.ContainerName(workspaceID)+":") { + if proxyErr := h.preflightContainerHealth(ctx, workspaceID); proxyErr != nil { + return 0, nil, proxyErr + } + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index a0c7e0c6..ded26ec5 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -198,6 +198,60 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace return true } +// preflightContainerHealth runs a proactive Provisioner.IsRunning check +// (#36) before dispatching the a2a forward. Routed through provisioner's +// SSOT IsRunning, which itself wraps RunningContainerName — same source +// as findRunningContainer in the plugins handler (#10/#12). +// +// Returns nil when the forward should proceed: +// - container is running, OR +// - daemon errored transiently (matches IsRunning's (true, err) +// "fail-soft as alive" contract — let the optimistic forward run +// and reactive maybeMarkContainerDead catch a real failure). +// +// Returns a structured 503 + triggers the same async restart that +// maybeMarkContainerDead would produce, when: +// - container is genuinely not running (NotFound / Exited / Created…). +// +// The point of running this BEFORE the forward is to save the caller +// 2-30s of network-timeout cost when the container is missing — a common +// shape post-EC2-replace (see molecule-controlplane#20 incident +// 2026-05-07) where the reconciler hasn't respawned the agent yet. +func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspaceID string) *proxyA2AError { + running, err := h.provisioner.IsRunning(ctx, workspaceID) + if err != nil { + // Transient daemon error. Provisioner.IsRunning returns (true, err) + // in this case — fall through to the optimistic forward, reactive + // maybeMarkContainerDead handles a real failure later. + log.Printf("ProxyA2A preflight: IsRunning transient error for %s: %v (proceeding with forward)", workspaceID, err) + return nil + } + if running { + // Container is running — forward as today. + return nil + } + // Container is genuinely not running. Mark offline + trigger restart + // (same effect as maybeMarkContainerDead's branch), and return the + // structured 503 immediately so the caller skips the forward. + log.Printf("ProxyA2A preflight: container for %s is not running — marking offline and triggering restart (#36)", workspaceID) + if _, dbErr := db.DB.ExecContext(ctx, + `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`, + models.StatusOffline, workspaceID); dbErr != nil { + log.Printf("ProxyA2A preflight: failed to mark workspace %s offline: %v", workspaceID, dbErr) + } + db.ClearWorkspaceKeys(ctx, workspaceID) + h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) + go h.RestartByID(workspaceID) + return &proxyA2AError{ + Status: http.StatusServiceUnavailable, + Response: gin.H{ + "error": "workspace container not running — restart triggered", + "restarting": true, + "preflight": true, // distinguishes from reactive containerDead path + }, + } +} + // logA2AFailure records a failed A2A attempt to activity_logs in a detached // goroutine (the request context may already be done by the time it runs). func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) { diff --git a/workspace-server/internal/handlers/a2a_proxy_preflight_test.go b/workspace-server/internal/handlers/a2a_proxy_preflight_test.go new file mode 100644 index 00000000..f9639162 --- /dev/null +++ b/workspace-server/internal/handlers/a2a_proxy_preflight_test.go @@ -0,0 +1,194 @@ +package handlers + +import ( + "context" + "errors" + "go/ast" + "go/parser" + "go/token" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/models" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" +) + +// preflightLocalProv is a controllable LocalProvisionerAPI stub for the +// preflight tests (#36). Other API methods panic to guard against tests +// that should be using a different stub. +type preflightLocalProv struct { + running bool + err error + calls int + calledWith []string +} + +func (p *preflightLocalProv) IsRunning(_ context.Context, workspaceID string) (bool, error) { + p.calls++ + p.calledWith = append(p.calledWith, workspaceID) + return p.running, p.err +} +func (p *preflightLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) { + panic("preflightLocalProv: Start not implemented") +} +func (p *preflightLocalProv) Stop(_ context.Context, _ string) error { + panic("preflightLocalProv: Stop not implemented") +} +func (p *preflightLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) { + panic("preflightLocalProv: ExecRead not implemented") +} +func (p *preflightLocalProv) RemoveVolume(_ context.Context, _ string) error { + panic("preflightLocalProv: RemoveVolume not implemented") +} +func (p *preflightLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) { + panic("preflightLocalProv: VolumeHasFile not implemented") +} +func (p *preflightLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error { + panic("preflightLocalProv: WriteAuthTokenToVolume not implemented") +} + +// TestPreflight_ContainerRunning_ReturnsNil — IsRunning(true,nil): forward +// proceeds. preflight returns nil → caller continues to dispatchA2A. +func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) { + _ = setupTestDB(t) + stub := &preflightLocalProv{running: true, err: nil} + h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + h.provisioner = stub + + if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil { + t.Fatalf("preflight should return nil when container running, got %+v", err) + } + if stub.calls != 1 { + t.Errorf("IsRunning should be called exactly once, got %d", stub.calls) + } + if len(stub.calledWith) != 1 || stub.calledWith[0] != "ws-running-123" { + t.Errorf("IsRunning should be called with workspace id, got %v", stub.calledWith) + } +} + +// TestPreflight_ContainerNotRunning_StructuredFastFail — IsRunning(false,nil): +// preflight returns structured 503 with restarting=true + preflight=true, AND +// triggers the offline-flip + WORKSPACE_OFFLINE broadcast + async restart. +// This is the load-bearing case — saves the caller 2-30s of network timeout. +func TestPreflight_ContainerNotRunning_StructuredFastFail(t *testing.T) { + mock := setupTestDB(t) + _ = setupTestRedis(t) + stub := &preflightLocalProv{running: false, err: nil} + h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + h.provisioner = stub + + // Expect the offline-flip UPDATE. + mock.ExpectExec(`UPDATE workspaces SET status =`). + WithArgs(models.StatusOffline, "ws-dead-456"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Broadcaster's INSERT INTO structure_events fires too — best-effort + // log entry for the WORKSPACE_OFFLINE event. Match permissively. + mock.ExpectExec(`INSERT INTO structure_events`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + proxyErr := h.preflightContainerHealth(context.Background(), "ws-dead-456") + if proxyErr == nil { + t.Fatal("preflight should return *proxyA2AError when container not running") + } + if proxyErr.Status != 503 { + t.Errorf("expected 503, got %d", proxyErr.Status) + } + if got := proxyErr.Response["restarting"]; got != true { + t.Errorf("response should mark restarting=true, got %v", got) + } + if got := proxyErr.Response["preflight"]; got != true { + t.Errorf("response should mark preflight=true so callers can distinguish from reactive containerDead, got %v", got) + } + if got := proxyErr.Response["error"]; got != "workspace container not running — restart triggered" { + t.Errorf("error message mismatch, got %q", got) + } + + // Note: broadcaster firing is exercised by the production path's + // h.broadcaster.RecordAndBroadcast call but not asserted here — the + // real *events.Broadcaster doesn't expose received events for inspection. + // The DB UPDATE expectation is sufficient to pin the offline-flip path. +} + +// TestPreflight_TransientError_FailsSoftAsAlive — IsRunning(true,err): the +// (true, err) "fail-soft" contract — preflight returns nil so the optimistic +// forward runs; reactive maybeMarkContainerDead handles a real failure later. +// This pin is critical: a flaky daemon must NOT trigger a restart cascade. +func TestPreflight_TransientError_FailsSoftAsAlive(t *testing.T) { + _ = setupTestDB(t) + stub := &preflightLocalProv{running: true, err: errors.New("docker daemon EOF")} + h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + h.provisioner = stub + + if err := h.preflightContainerHealth(context.Background(), "ws-flaky-789"); err != nil { + t.Fatalf("preflight should return nil on transient error (fail-soft), got %+v", err) + } + // No DB UPDATE expected — sqlmock would complain about unexpected calls + // at test cleanup if the offline-flip path fired. +} + +// TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT — AST gate (#36 mirror +// of #12's gate). Pins the invariant that preflightContainerHealth uses the +// SSOT Provisioner.IsRunning helper, NOT a parallel docker.ContainerInspect +// of its own. +// +// Mutation invariant: if a future PR replaces h.provisioner.IsRunning with +// a direct cli.ContainerInspect call, this test fails. That's the signal to +// either (a) extend Provisioner.IsRunning's contract OR (b) document why +// this call site needs to differ. Either way, the drift gets a reviewer's +// attention instead of shipping silently. +func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) { + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, "a2a_proxy_helpers.go", nil, parser.ParseComments) + if err != nil { + t.Fatalf("parse a2a_proxy_helpers.go: %v", err) + } + + var fn *ast.FuncDecl + ast.Inspect(file, func(n ast.Node) bool { + f, ok := n.(*ast.FuncDecl) + if !ok || f.Name.Name != "preflightContainerHealth" { + return true + } + fn = f + return false + }) + if fn == nil { + t.Fatal("preflightContainerHealth not found — was it renamed? update this gate or the SSOT routing assumption") + } + + var ( + callsIsRunning bool + callsContainerInspectRaw bool + callsRunningContainerNameDirect bool + ) + ast.Inspect(fn.Body, func(n ast.Node) bool { + call, ok := n.(*ast.CallExpr) + if !ok { + return true + } + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return true + } + switch sel.Sel.Name { + case "IsRunning": + callsIsRunning = true + case "ContainerInspect": + callsContainerInspectRaw = true + case "RunningContainerName": + // Direct RunningContainerName is also acceptable SSOT — but + // preferring IsRunning keeps the (bool, error) contract that + // already exists in the helper API surface. + callsRunningContainerNameDirect = true + } + return true + }) + + if !callsIsRunning && !callsRunningContainerNameDirect { + t.Errorf("preflightContainerHealth must call provisioner.IsRunning OR provisioner.RunningContainerName for the SSOT health check — see molecule-core#36. Found neither.") + } + if callsContainerInspectRaw { + t.Errorf("preflightContainerHealth carries a direct ContainerInspect call. This is the parallel-impl drift molecule-core#36 fixed. " + + "Either route through provisioner.IsRunning OR — if a new use case truly needs a different inspect — extend the helper's contract first and update this gate to allow the specific delta.") + } +}