diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 3389cb96..3ce8f4f5 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -13,6 +13,7 @@ import ( "errors" "io" "log" + "net" "net/http" "os" "strconv" @@ -92,13 +93,47 @@ func isSystemCaller(callerID string) bool { const maxProxyResponseBody = 10 << 20 // a2aClient is a shared HTTP client for proxying A2A requests to workspace agents. -// No client-level timeout — timeouts are enforced per-request via context -// deadlines: canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap). Do NOT -// set a Client.Timeout here: it is enforced independently of ctx deadlines and -// would pre-empt legitimate slow cold-start flows (e.g. Claude Code first-token -// over OAuth can take 30-60s on boot). Callers that want a safety net should -// build a context.WithTimeout themselves. -var a2aClient = &http.Client{} +// +// Timeout model — three independent budgets, none of which gets in each other's way: +// +// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on +// the entire request including streamed body reads, and would pre-empt +// legitimate slow cold-start flows (Claude Code first-token over OAuth +// can take 30-60s on boot; long-running agent synthesis can stream +// tokens for minutes). Total-request budget is enforced per-request +// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling). +// +// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2 +// black-holes TCP connects (instance terminated mid-flight, security group +// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long +// enough that Cloudflare's ~100s edge timeout can fire first and surface +// a generic 502 page to canvas. 10s is well above realistic intra-region +// latencies and well below CF's edge timeout. +// +// 3. Transport.ResponseHeaderTimeout — 60s. From request-body-end to +// response-headers-start. Covers cold-start first-byte (the 30-60s OAuth +// flow above), with margin. Body streaming after headers is governed by +// the per-request context deadline, NOT this timeout — so multi-minute +// agent responses still work fine. +// +// The point of (2) and (3) is to surface a *structured* 503 from +// handleA2ADispatchError when the workspace agent is unreachable, so canvas +// gets `{"error":"workspace agent unreachable","restarting":true}` instead +// of Cloudflare's opaque 502 error page. Without these, dead workspaces hang +// long enough that CF gives up first and shows its own page. +var a2aClient = &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ResponseHeaderTimeout: 60 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + // MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent + // fan-in is bounded by the platform's broadcaster fan-out, not by + // connection-pool sizing. + }, +} type proxyA2AError struct { Status int @@ -422,6 +457,45 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri if errMsg == "" { errMsg = http.StatusText(resp.StatusCode) } + + // Upstream returned 502/503/504 (gateway/proxy failure). This is + // the "agent process is dead but the tunnel between us and the + // workspace is still up" signal — handleA2ADispatchError's + // network-error path doesn't run because Do() succeeded at the + // HTTP layer. Without this branch, the dead-agent failure mode + // surfaces to canvas as a generic 502 (and CF in front of the + // platform masks it with its own error page, hiding any + // structured response we might write). + // + // Treatment matches handleA2ADispatchError's container-dead path: + // 1. Probe IsRunning via maybeMarkContainerDead. If the + // container truly is dead, mark workspace offline + kick + // a restart goroutine. + // 2. Return a structured 503 with restarting=true + Retry-After + // so canvas shows a useful "agent is restarting" message + // (and CF doesn't intercept the 503 the way it does 502). + // If IsRunning reports the container is alive, we leave the + // upstream status untouched — the agent legitimately returned + // 502/503/504 (e.g. it's returning its own Bad-Gateway from + // some downstream call) and we shouldn't mistakenly recycle it. + // + // Empty body is the strong signal here — a CF-tunnel "no-origin" + // 502 has 0 bytes; an agent-authored 502 typically has a JSON + // error body. We probe IsRunning regardless (it's the + // authoritative check) but the empty-body case is what makes + // this fix necessary. + if resp.StatusCode == http.StatusBadGateway || + resp.StatusCode == http.StatusServiceUnavailable || + resp.StatusCode == http.StatusGatewayTimeout { + if h.maybeMarkContainerDead(ctx, workspaceID) { + return 0, nil, &proxyA2AError{ + Status: http.StatusServiceUnavailable, + Headers: map[string]string{"Retry-After": "15"}, + Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true, "retry_after": 15}, + } + } + } + return resp.StatusCode, respBody, &proxyA2AError{ Status: resp.StatusCode, Response: gin.H{"error": errMsg}, diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index d0ccea86..1bcf9d4b 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -141,22 +141,46 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace } // maybeMarkContainerDead runs the reactive health check after a forward error. -// If the workspace's Docker container is no longer running (and the workspace -// isn't external), it marks the workspace offline, clears Redis state, -// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true -// when the container was found dead. +// If the workspace's compute (Docker container OR EC2 instance) is no longer +// running (and the workspace isn't external), it marks the workspace offline, +// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async +// restart. Returns true when the compute was found dead. +// +// Provisioner selection (mutually exclusive in production): +// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect. +// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's +// /cp/workspaces/:id/status to read the EC2 state. +// +// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants +// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every +// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas +// with no auto-recovery and Cloudflare in front would mask the response with +// its own error page. The 2026-04-30 hongmingwang.moleculesai.app +// canvas-chat-to-dead-workspace incident traces to exactly this gap. func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool { var wsRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime) - if h.provisioner == nil || wsRuntime == "external" { + if wsRuntime == "external" { return false } - running, inspectErr := h.provisioner.IsRunning(ctx, workspaceID) + if h.provisioner == nil && h.cpProv == nil { + return false + } + + var running bool + var inspectErr error + if h.provisioner != nil { + running, inspectErr = h.provisioner.IsRunning(ctx, workspaceID) + } else { + // SaaS path: ask the CP about the EC2 state. Same (true, err) on + // transport errors contract — keeps the caller on the alive path + // instead of triggering a restart cascade on a flaky CP call. + running, inspectErr = h.cpProv.IsRunning(ctx, workspaceID) + } if inspectErr != nil { - // Transient Docker-daemon error (timeout, socket EOF, etc.). Post- - // #386, IsRunning returns (true, err) in this case — caller stays - // on the alive path and does not trigger a restart cascade. Log - // so the defect is visible without being destructive. + // Transient backend error (Docker daemon EOF, CP HTTP 5xx, etc.). + // IsRunning's contract returns (true, err) in this case so we stay + // on the alive path without triggering a restart cascade. log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr) } if running { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 9d0b6e28..53d7daf9 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/gin-gonic/gin" ) @@ -244,6 +245,117 @@ func TestProxyA2A_AgentReturnsError(t *testing.T) { } } +// TestProxyA2A_Upstream502_TriggersContainerDeadCheck — when the agent +// tunnel returns 502 (the "tunnel up but no origin" failure mode that +// surfaces a Cloudflare error page to canvas), proxyA2A must consult +// IsRunning on cpProv. If the EC2 instance truly is dead, the response +// becomes a structured 503 with restarting=true (not the upstream 502 +// which CF would mask), and the workspace flips to status='offline' so +// the next reactive poll sees the right state. This is the +// 2026-04-30 hongmingwang.moleculesai.app canvas-chat-to-dead-workspace +// regression: upstream 502 was previously propagated as-is, CF masked +// it, and no auto-restart fired. +func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{running: false} + handler.SetCPProvisioner(cp) + + // Agent tunnel returns 502 with empty body — the CF "no-origin" shape. + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-tunnel-dead"), agentServer.URL) + expectBudgetCheck(mock, "ws-tunnel-dead") + // Activity log fires (delivery_confirmed is true on Do() success regardless + // of upstream status — handler's existing logA2ASuccess path runs first + // and logs as success because the dispatch did get a response). + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // maybeMarkContainerDead's runtime lookup, then the offline-flip UPDATE. + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`). + WithArgs("ws-tunnel-dead"). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`). + WithArgs("ws-tunnel-dead"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-tunnel-dead"}} + body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-tunnel-dead/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + + time.Sleep(80 * time.Millisecond) + + // Caller sees a structured 503 (NOT the upstream 502 which CF would mask). + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("upstream 502 should translate to 503 once cpProv reports dead; got %d: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "restarting") { + t.Errorf("response body should mention restart trigger; got %s", w.Body.String()) + } + if w.Header().Get("Retry-After") != "15" { + t.Errorf("Retry-After header should be 15 to throttle canvas-side retry loop; got %q", w.Header().Get("Retry-After")) + } + if cp.calls != 1 { + t.Errorf("cpProv.IsRunning must be consulted exactly once; got %d calls", cp.calls) + } +} + +// TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs — the safety check: +// if cpProv reports the EC2 IS running, the upstream 502 is propagated +// as-is. Don't recycle a healthy agent on a transient hiccup — the agent +// might have legitimately returned 502 (e.g. a downstream service it +// called returned 502 and it forwarded). Net behavior matches pre-fix +// for the alive-agent case. +func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{running: true} + handler.SetCPProvisioner(cp) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadGateway) + fmt.Fprint(w, `{"error":"downstream service returned 502"}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-alive-502"), agentServer.URL) + expectBudgetCheck(mock, "ws-alive-502") + mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1)) + // IsRunning runtime lookup runs but no UPDATE follows (running=true). + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`). + WithArgs("ws-alive-502"). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-alive-502"}} + body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-alive-502/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.ProxyA2A(c) + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusBadGateway { + t.Fatalf("alive agent 502 should propagate as 502; got %d: %s", w.Code, w.Body.String()) + } +} + // ==================== ProxyA2A — messageId injection ==================== func TestProxyA2A_MessageIDInjected(t *testing.T) { @@ -1640,6 +1752,84 @@ func TestMaybeMarkContainerDead_NilProvisioner(t *testing.T) { } } +// SaaS path: h.provisioner=nil but h.cpProv is wired and reports the EC2 +// instance is NOT running. maybeMarkContainerDead must consult cpProv, +// flip the workspace to status='offline', clear keys, broadcast OFFLINE, +// and return true so the caller surfaces the structured 503. Pre-fix +// (#NNN) it returned false unconditionally on h.provisioner==nil, so +// dead EC2 agents leaked upstream 502 to canvas with no recovery. +func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{running: false} + handler.SetCPProvisioner(cp) + + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`). + WithArgs("ws-saas-dead"). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`). + WithArgs("ws-saas-dead"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-dead") + if !got { + t.Fatal("expected true (cpProv reports not running) — without cpProv consultation, SaaS dead-agent recovery is impossible") + } + if cp.calls != 1 { + t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// SaaS path: h.cpProv reports running=true → maybeMarkContainerDead must +// return false (don't restart a healthy agent on a transient upstream +// hiccup). This is the safety check that prevents over-eager recycling. +func TestMaybeMarkContainerDead_CPOnly_Running(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{running: true} + handler.SetCPProvisioner(cp) + + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`). + WithArgs("ws-saas-alive"). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + + if got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-alive"); got { + t.Error("expected false when cpProv reports running — must not recycle a healthy agent") + } + if cp.calls != 1 { + t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls) + } +} + +// fakeCPProv satisfies provisioner.CPProvisionerAPI for tests that need +// to exercise the SaaS / EC2-backed path. Only IsRunning is used by the +// reactive-health path under test; Start/Stop/GetConsoleOutput panic so +// an unexpected reach into them is a loud test failure rather than a +// silent passthrough. +type fakeCPProv struct { + running bool + calls int +} + +func (f *fakeCPProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) { + panic("fakeCPProv.Start not expected on the maybeMarkContainerDead path") +} +func (f *fakeCPProv) Stop(_ context.Context, _ string) error { + panic("fakeCPProv.Stop not expected on the maybeMarkContainerDead path") +} +func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) { + panic("fakeCPProv.GetConsoleOutput not expected on the maybeMarkContainerDead path") +} +func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) { + f.calls++ + return f.running, nil +} + // external runtime → false regardless of provisioner. func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) { mock := setupTestDB(t) diff --git a/workspace-server/internal/handlers/workspace_provision_test.go b/workspace-server/internal/handlers/workspace_provision_test.go index 4cbe30a7..b5124e37 100644 --- a/workspace-server/internal/handlers/workspace_provision_test.go +++ b/workspace-server/internal/handlers/workspace_provision_test.go @@ -1206,6 +1206,10 @@ func (s *stubFailingCPProv) GetConsoleOutput(_ context.Context, _ string) (strin panic("stubFailingCPProv.GetConsoleOutput not expected on the provisionWorkspaceCP failure path") } +func (s *stubFailingCPProv) IsRunning(_ context.Context, _ string) (bool, error) { + panic("stubFailingCPProv.IsRunning not expected on the provisionWorkspaceCP failure path") +} + // TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast asserts that // provisionWorkspaceCP never leaks err.Error() in // WORKSPACE_PROVISION_FAILED broadcasts. Regression test for #1206. diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index d986a2c0..29d8a94e 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -326,7 +326,12 @@ func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID s // in-flight runner picks up the pending request after its current cycle // completes, so writes that committed mid-restart are guaranteed to land. func (h *WorkspaceHandler) RestartByID(workspaceID string) { - if h.provisioner == nil { + // At least one of the two provisioners must be wired. Pre-fix this + // short-circuited on h.provisioner==nil alone, which silently disabled + // reactive auto-restart on every SaaS tenant (where the local Docker + // provisioner is intentionally nil). The runRestartCycle below now + // branches on which one is set for the Stop call. + if h.provisioner == nil && h.cpProv == nil { return } coalesceRestart(workspaceID, func() { h.runRestartCycle(workspaceID) }) @@ -426,7 +431,20 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { log.Printf("Auto-restart: restarting %s (%s) runtime=%q (was: %s)", wsName, workspaceID, dbRuntime, status) - h.provisioner.Stop(ctx, workspaceID) + // Stop existing compute. Branch on which provisioner is wired (mutually + // exclusive in production): Docker provisioner.Stop kills the local + // container; CP provisioner.Stop calls DELETE /cp/workspaces/:id which + // terminates the EC2 instance. Pre-fix this only called the Docker path, + // so on SaaS (h.provisioner=nil) the auto-restart cycle silently NPE'd + // before reaching the reprovision step — which is why every SaaS dead- + // agent incident pre-this-fix required manual restart from canvas. + if h.provisioner != nil { + h.provisioner.Stop(ctx, workspaceID) + } else if h.cpProv != nil { + if err := h.cpProv.Stop(ctx, workspaceID); err != nil { + log.Printf("Auto-restart: cpProv.Stop(%s) failed: %v (continuing to reprovision)", workspaceID, err) + } + } db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'provisioning', url = '', updated_at = now() WHERE id = $1`, workspaceID) diff --git a/workspace-server/internal/provisioner/cp_provisioner.go b/workspace-server/internal/provisioner/cp_provisioner.go index fd7e640d..1ad45f21 100644 --- a/workspace-server/internal/provisioner/cp_provisioner.go +++ b/workspace-server/internal/provisioner/cp_provisioner.go @@ -29,6 +29,16 @@ type CPProvisionerAPI interface { Start(ctx context.Context, cfg WorkspaceConfig) (string, error) Stop(ctx context.Context, workspaceID string) error GetConsoleOutput(ctx context.Context, workspaceID string) (string, error) + // IsRunning reports whether the workspace's compute (EC2 instance) is + // currently in the running state. Surfaced on the interface (rather than + // only on *CPProvisioner) so the a2a-proxy reactive-health path can + // detect dead EC2 agents the same way it detects dead Docker containers. + // Pre-#NNN, maybeMarkContainerDead only consulted the local Docker + // provisioner — for SaaS tenants (h.provisioner=nil) the check was a + // no-op, so a dead EC2 agent would leak 502/503 to canvas with no + // auto-recovery. (true, err) on transport errors keeps callers on the + // alive path; (false, nil) is the only definitive "dead" signal. + IsRunning(ctx context.Context, workspaceID string) (bool, error) } // Compile-time assertion: *CPProvisioner satisfies CPProvisionerAPI.