forked from molecule-ai/molecule-core
fix(a2a): detect dead EC2 agents on upstream 5xx + reactive auto-restart for SaaS
Class-of-bugs fix surfaced by hongmingwang.moleculesai.app's canvas chat
to a dead workspace returning a generic Cloudflare 502 page on
2026-04-30. Three independent gaps in the reactive-health path that
together leak dead-agent failures to canvas with no auto-recovery.
## Bug 1 — maybeMarkContainerDead is a no-op for SaaS tenants
`maybeMarkContainerDead` only consulted `h.provisioner` (local Docker
provisioner). SaaS tenants set `h.cpProv` (CP-backed EC2 provisioner)
and leave `h.provisioner` nil — so the function early-returned false
on every call and dead EC2 agents never triggered the offline-flip /
broadcast / restart cascade.
Fix: extend `CPProvisionerAPI` interface with `IsRunning(ctx, id)
(bool, error)` (already implemented on `*CPProvisioner`; just needs
to surface on the interface). `maybeMarkContainerDead` now branches:
local-Docker path uses `h.provisioner.IsRunning`; SaaS path uses
`h.cpProv.IsRunning` which calls the CP's `/cp/workspaces/:id/status`
endpoint to read the EC2 state.
## Bug 2 — RestartByID short-circuits on `h.provisioner == nil`
Same shape as Bug 1: the auto-restart cascade triggered by
`maybeMarkContainerDead` calls `RestartByID` which short-circuited
when the local Docker provisioner was missing. So even if Bug 1 were
fixed, the workspace-offline state would never recover.
Fix: change the gate to `h.provisioner == nil && h.cpProv == nil`
and update `runRestartCycle` to branch on which provisioner is
wired for the Stop call. (The HTTP `Restart` handler already does
this branching correctly — we're just bringing the auto-restart path
to parity.)
## Bug 3 — upstream 502/503/504 propagated as-is, masked by Cloudflare
When the agent's tunnel returns 5xx (the "tunnel up but no origin"
shape — agent process dead but cloudflared connection still healthy),
`dispatchA2A` returns successfully at the HTTP layer with a 5xx body.
`handleA2ADispatchError`'s reactive-health path doesn't run because
that path is only triggered on transport-level errors. The pre-fix
code propagated the 502 status to canvas; Cloudflare in front of the
platform then masked the 502 with its own opaque "error code: 502"
page, hiding any structured response and any Retry-After hint.
Fix: in `proxyA2ARequest`, when the upstream returns 502/503/504, run
`maybeMarkContainerDead` BEFORE propagating. If IsRunning confirms
the agent is dead → return a structured 503 with restarting=true +
Retry-After (CF doesn't mask 503s the same way). If running, propagate
the original status (don't recycle a healthy agent on a transient
hiccup — it might have legitimately returned 502).
## Drive-by — a2aClient transport timeouts
a2aClient was `&http.Client{}` with no Transport timeouts. When a
workspace's EC2 black-holes TCP connects (instance terminated mid-flight,
SG flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS —
long enough for Cloudflare's ~100s edge timeout to fire first and
surface a generic 502. Added DialContext (10s connect), TLSHandshake
(10s), and ResponseHeaderTimeout (60s). Client.Timeout DELIBERATELY
unset — that would pre-empt slow-cold-start flows (Claude Code OAuth
first-token, multi-minute agent synthesis). Long-tail body streaming
is still governed by per-request context deadline.
## Tests
- `TestMaybeMarkContainerDead_CPOnly_NotRunning` — IsRunning(false) →
marks workspace offline, returns true.
- `TestMaybeMarkContainerDead_CPOnly_Running` — IsRunning(true) →
no offline-flip, returns false (don't recycle a healthy agent).
- `TestProxyA2A_Upstream502_TriggersContainerDeadCheck` — agent server
returns 502 + cpProv reports dead → caller gets 503 with restarting=
true and Retry-After: 15.
- `TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs` — same upstream
502 but cpProv reports running → propagates 502 (existing behavior;
safety check that prevents over-eager recycling).
- Existing `TestMaybeMarkContainerDead_NilProvisioner` /
`TestMaybeMarkContainerDead_ExternalRuntime` still pass.
- Full handlers + provisioner test suites pass.
## Impact
Pre-fix: dead EC2 agent on a SaaS tenant → CF-masked 502 to canvas, no
auto-recovery, manual restart from canvas required.
Post-fix: dead EC2 agent on a SaaS tenant → structured 503 with
restarting=true + Retry-After to canvas, workspace flipped to offline,
auto-restart cycle triggered. Canvas can show a user-actionable
"agent is restarting, please wait" message instead of a generic 502.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
92a29bb37c
commit
9f35788aee
@ -13,6 +13,7 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -92,13 +93,47 @@ func isSystemCaller(callerID string) bool {
|
||||
const maxProxyResponseBody = 10 << 20
|
||||
|
||||
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
|
||||
// No client-level timeout — timeouts are enforced per-request via context
|
||||
// deadlines: canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap). Do NOT
|
||||
// set a Client.Timeout here: it is enforced independently of ctx deadlines and
|
||||
// would pre-empt legitimate slow cold-start flows (e.g. Claude Code first-token
|
||||
// over OAuth can take 30-60s on boot). Callers that want a safety net should
|
||||
// build a context.WithTimeout themselves.
|
||||
var a2aClient = &http.Client{}
|
||||
//
|
||||
// Timeout model — three independent budgets, none of which gets in each other's way:
|
||||
//
|
||||
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
|
||||
// the entire request including streamed body reads, and would pre-empt
|
||||
// legitimate slow cold-start flows (Claude Code first-token over OAuth
|
||||
// can take 30-60s on boot; long-running agent synthesis can stream
|
||||
// tokens for minutes). Total-request budget is enforced per-request
|
||||
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
|
||||
//
|
||||
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
|
||||
// black-holes TCP connects (instance terminated mid-flight, security group
|
||||
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
|
||||
// enough that Cloudflare's ~100s edge timeout can fire first and surface
|
||||
// a generic 502 page to canvas. 10s is well above realistic intra-region
|
||||
// latencies and well below CF's edge timeout.
|
||||
//
|
||||
// 3. Transport.ResponseHeaderTimeout — 60s. From request-body-end to
|
||||
// response-headers-start. Covers cold-start first-byte (the 30-60s OAuth
|
||||
// flow above), with margin. Body streaming after headers is governed by
|
||||
// the per-request context deadline, NOT this timeout — so multi-minute
|
||||
// agent responses still work fine.
|
||||
//
|
||||
// The point of (2) and (3) is to surface a *structured* 503 from
|
||||
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
|
||||
// gets `{"error":"workspace agent unreachable","restarting":true}` instead
|
||||
// of Cloudflare's opaque 502 error page. Without these, dead workspaces hang
|
||||
// long enough that CF gives up first and shows its own page.
|
||||
var a2aClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: 60 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
// MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent
|
||||
// fan-in is bounded by the platform's broadcaster fan-out, not by
|
||||
// connection-pool sizing.
|
||||
},
|
||||
}
|
||||
|
||||
type proxyA2AError struct {
|
||||
Status int
|
||||
@ -422,6 +457,45 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
if errMsg == "" {
|
||||
errMsg = http.StatusText(resp.StatusCode)
|
||||
}
|
||||
|
||||
// Upstream returned 502/503/504 (gateway/proxy failure). This is
|
||||
// the "agent process is dead but the tunnel between us and the
|
||||
// workspace is still up" signal — handleA2ADispatchError's
|
||||
// network-error path doesn't run because Do() succeeded at the
|
||||
// HTTP layer. Without this branch, the dead-agent failure mode
|
||||
// surfaces to canvas as a generic 502 (and CF in front of the
|
||||
// platform masks it with its own error page, hiding any
|
||||
// structured response we might write).
|
||||
//
|
||||
// Treatment matches handleA2ADispatchError's container-dead path:
|
||||
// 1. Probe IsRunning via maybeMarkContainerDead. If the
|
||||
// container truly is dead, mark workspace offline + kick
|
||||
// a restart goroutine.
|
||||
// 2. Return a structured 503 with restarting=true + Retry-After
|
||||
// so canvas shows a useful "agent is restarting" message
|
||||
// (and CF doesn't intercept the 503 the way it does 502).
|
||||
// If IsRunning reports the container is alive, we leave the
|
||||
// upstream status untouched — the agent legitimately returned
|
||||
// 502/503/504 (e.g. it's returning its own Bad-Gateway from
|
||||
// some downstream call) and we shouldn't mistakenly recycle it.
|
||||
//
|
||||
// Empty body is the strong signal here — a CF-tunnel "no-origin"
|
||||
// 502 has 0 bytes; an agent-authored 502 typically has a JSON
|
||||
// error body. We probe IsRunning regardless (it's the
|
||||
// authoritative check) but the empty-body case is what makes
|
||||
// this fix necessary.
|
||||
if resp.StatusCode == http.StatusBadGateway ||
|
||||
resp.StatusCode == http.StatusServiceUnavailable ||
|
||||
resp.StatusCode == http.StatusGatewayTimeout {
|
||||
if h.maybeMarkContainerDead(ctx, workspaceID) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": "15"},
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true, "retry_after": 15},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp.StatusCode, respBody, &proxyA2AError{
|
||||
Status: resp.StatusCode,
|
||||
Response: gin.H{"error": errMsg},
|
||||
|
||||
@ -141,22 +141,46 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
}
|
||||
|
||||
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
||||
// If the workspace's Docker container is no longer running (and the workspace
|
||||
// isn't external), it marks the workspace offline, clears Redis state,
|
||||
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
|
||||
// when the container was found dead.
|
||||
// If the workspace's compute (Docker container OR EC2 instance) is no longer
|
||||
// running (and the workspace isn't external), it marks the workspace offline,
|
||||
// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async
|
||||
// restart. Returns true when the compute was found dead.
|
||||
//
|
||||
// Provisioner selection (mutually exclusive in production):
|
||||
// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect.
|
||||
// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's
|
||||
// /cp/workspaces/:id/status to read the EC2 state.
|
||||
//
|
||||
// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants
|
||||
// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every
|
||||
// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas
|
||||
// with no auto-recovery and Cloudflare in front would mask the response with
|
||||
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
|
||||
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner == nil || wsRuntime == "external" {
|
||||
if wsRuntime == "external" {
|
||||
return false
|
||||
}
|
||||
running, inspectErr := h.provisioner.IsRunning(ctx, workspaceID)
|
||||
if h.provisioner == nil && h.cpProv == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var running bool
|
||||
var inspectErr error
|
||||
if h.provisioner != nil {
|
||||
running, inspectErr = h.provisioner.IsRunning(ctx, workspaceID)
|
||||
} else {
|
||||
// SaaS path: ask the CP about the EC2 state. Same (true, err) on
|
||||
// transport errors contract — keeps the caller on the alive path
|
||||
// instead of triggering a restart cascade on a flaky CP call.
|
||||
running, inspectErr = h.cpProv.IsRunning(ctx, workspaceID)
|
||||
}
|
||||
if inspectErr != nil {
|
||||
// Transient Docker-daemon error (timeout, socket EOF, etc.). Post-
|
||||
// #386, IsRunning returns (true, err) in this case — caller stays
|
||||
// on the alive path and does not trigger a restart cascade. Log
|
||||
// so the defect is visible without being destructive.
|
||||
// Transient backend error (Docker daemon EOF, CP HTTP 5xx, etc.).
|
||||
// IsRunning's contract returns (true, err) in this case so we stay
|
||||
// on the alive path without triggering a restart cascade.
|
||||
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
|
||||
}
|
||||
if running {
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
@ -244,6 +245,117 @@ func TestProxyA2A_AgentReturnsError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_Upstream502_TriggersContainerDeadCheck — when the agent
|
||||
// tunnel returns 502 (the "tunnel up but no origin" failure mode that
|
||||
// surfaces a Cloudflare error page to canvas), proxyA2A must consult
|
||||
// IsRunning on cpProv. If the EC2 instance truly is dead, the response
|
||||
// becomes a structured 503 with restarting=true (not the upstream 502
|
||||
// which CF would mask), and the workspace flips to status='offline' so
|
||||
// the next reactive poll sees the right state. This is the
|
||||
// 2026-04-30 hongmingwang.moleculesai.app canvas-chat-to-dead-workspace
|
||||
// regression: upstream 502 was previously propagated as-is, CF masked
|
||||
// it, and no auto-restart fired.
|
||||
func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
// Agent tunnel returns 502 with empty body — the CF "no-origin" shape.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-tunnel-dead"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-tunnel-dead")
|
||||
// Activity log fires (delivery_confirmed is true on Do() success regardless
|
||||
// of upstream status — handler's existing logA2ASuccess path runs first
|
||||
// and logs as success because the dispatch did get a response).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// maybeMarkContainerDead's runtime lookup, then the offline-flip UPDATE.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-tunnel-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
|
||||
WithArgs("ws-tunnel-dead").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-tunnel-dead"}}
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-tunnel-dead/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
// Caller sees a structured 503 (NOT the upstream 502 which CF would mask).
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("upstream 502 should translate to 503 once cpProv reports dead; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "restarting") {
|
||||
t.Errorf("response body should mention restart trigger; got %s", w.Body.String())
|
||||
}
|
||||
if w.Header().Get("Retry-After") != "15" {
|
||||
t.Errorf("Retry-After header should be 15 to throttle canvas-side retry loop; got %q", w.Header().Get("Retry-After"))
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("cpProv.IsRunning must be consulted exactly once; got %d calls", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs — the safety check:
|
||||
// if cpProv reports the EC2 IS running, the upstream 502 is propagated
|
||||
// as-is. Don't recycle a healthy agent on a transient hiccup — the agent
|
||||
// might have legitimately returned 502 (e.g. a downstream service it
|
||||
// called returned 502 and it forwarded). Net behavior matches pre-fix
|
||||
// for the alive-agent case.
|
||||
func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: true}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
fmt.Fprint(w, `{"error":"downstream service returned 502"}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-alive-502"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-alive-502")
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// IsRunning runtime lookup runs but no UPDATE follows (running=true).
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-alive-502").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-alive-502"}}
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-alive-502/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusBadGateway {
|
||||
t.Fatalf("alive agent 502 should propagate as 502; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== ProxyA2A — messageId injection ====================
|
||||
|
||||
func TestProxyA2A_MessageIDInjected(t *testing.T) {
|
||||
@ -1640,6 +1752,84 @@ func TestMaybeMarkContainerDead_NilProvisioner(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// SaaS path: h.provisioner=nil but h.cpProv is wired and reports the EC2
|
||||
// instance is NOT running. maybeMarkContainerDead must consult cpProv,
|
||||
// flip the workspace to status='offline', clear keys, broadcast OFFLINE,
|
||||
// and return true so the caller surfaces the structured 503. Pre-fix
|
||||
// (#NNN) it returned false unconditionally on h.provisioner==nil, so
|
||||
// dead EC2 agents leaked upstream 502 to canvas with no recovery.
|
||||
func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-dead")
|
||||
if !got {
|
||||
t.Fatal("expected true (cpProv reports not running) — without cpProv consultation, SaaS dead-agent recovery is impossible")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// SaaS path: h.cpProv reports running=true → maybeMarkContainerDead must
|
||||
// return false (don't restart a healthy agent on a transient upstream
|
||||
// hiccup). This is the safety check that prevents over-eager recycling.
|
||||
func TestMaybeMarkContainerDead_CPOnly_Running(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: true}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-alive").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-alive"); got {
|
||||
t.Error("expected false when cpProv reports running — must not recycle a healthy agent")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// fakeCPProv satisfies provisioner.CPProvisionerAPI for tests that need
|
||||
// to exercise the SaaS / EC2-backed path. Only IsRunning is used by the
|
||||
// reactive-health path under test; Start/Stop/GetConsoleOutput panic so
|
||||
// an unexpected reach into them is a loud test failure rather than a
|
||||
// silent passthrough.
|
||||
type fakeCPProv struct {
|
||||
running bool
|
||||
calls int
|
||||
}
|
||||
|
||||
func (f *fakeCPProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
|
||||
panic("fakeCPProv.Start not expected on the maybeMarkContainerDead path")
|
||||
}
|
||||
func (f *fakeCPProv) Stop(_ context.Context, _ string) error {
|
||||
panic("fakeCPProv.Stop not expected on the maybeMarkContainerDead path")
|
||||
}
|
||||
func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
|
||||
panic("fakeCPProv.GetConsoleOutput not expected on the maybeMarkContainerDead path")
|
||||
}
|
||||
func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
f.calls++
|
||||
return f.running, nil
|
||||
}
|
||||
|
||||
// external runtime → false regardless of provisioner.
|
||||
func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
@ -1206,6 +1206,10 @@ func (s *stubFailingCPProv) GetConsoleOutput(_ context.Context, _ string) (strin
|
||||
panic("stubFailingCPProv.GetConsoleOutput not expected on the provisionWorkspaceCP failure path")
|
||||
}
|
||||
|
||||
func (s *stubFailingCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
panic("stubFailingCPProv.IsRunning not expected on the provisionWorkspaceCP failure path")
|
||||
}
|
||||
|
||||
// TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast asserts that
|
||||
// provisionWorkspaceCP never leaks err.Error() in
|
||||
// WORKSPACE_PROVISION_FAILED broadcasts. Regression test for #1206.
|
||||
|
||||
@ -326,7 +326,12 @@ func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID s
|
||||
// in-flight runner picks up the pending request after its current cycle
|
||||
// completes, so writes that committed mid-restart are guaranteed to land.
|
||||
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
if h.provisioner == nil {
|
||||
// At least one of the two provisioners must be wired. Pre-fix this
|
||||
// short-circuited on h.provisioner==nil alone, which silently disabled
|
||||
// reactive auto-restart on every SaaS tenant (where the local Docker
|
||||
// provisioner is intentionally nil). The runRestartCycle below now
|
||||
// branches on which one is set for the Stop call.
|
||||
if h.provisioner == nil && h.cpProv == nil {
|
||||
return
|
||||
}
|
||||
coalesceRestart(workspaceID, func() { h.runRestartCycle(workspaceID) })
|
||||
@ -426,7 +431,20 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
|
||||
log.Printf("Auto-restart: restarting %s (%s) runtime=%q (was: %s)", wsName, workspaceID, dbRuntime, status)
|
||||
|
||||
h.provisioner.Stop(ctx, workspaceID)
|
||||
// Stop existing compute. Branch on which provisioner is wired (mutually
|
||||
// exclusive in production): Docker provisioner.Stop kills the local
|
||||
// container; CP provisioner.Stop calls DELETE /cp/workspaces/:id which
|
||||
// terminates the EC2 instance. Pre-fix this only called the Docker path,
|
||||
// so on SaaS (h.provisioner=nil) the auto-restart cycle silently NPE'd
|
||||
// before reaching the reprovision step — which is why every SaaS dead-
|
||||
// agent incident pre-this-fix required manual restart from canvas.
|
||||
if h.provisioner != nil {
|
||||
h.provisioner.Stop(ctx, workspaceID)
|
||||
} else if h.cpProv != nil {
|
||||
if err := h.cpProv.Stop(ctx, workspaceID); err != nil {
|
||||
log.Printf("Auto-restart: cpProv.Stop(%s) failed: %v (continuing to reprovision)", workspaceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = 'provisioning', url = '', updated_at = now() WHERE id = $1`, workspaceID)
|
||||
|
||||
@ -29,6 +29,16 @@ type CPProvisionerAPI interface {
|
||||
Start(ctx context.Context, cfg WorkspaceConfig) (string, error)
|
||||
Stop(ctx context.Context, workspaceID string) error
|
||||
GetConsoleOutput(ctx context.Context, workspaceID string) (string, error)
|
||||
// IsRunning reports whether the workspace's compute (EC2 instance) is
|
||||
// currently in the running state. Surfaced on the interface (rather than
|
||||
// only on *CPProvisioner) so the a2a-proxy reactive-health path can
|
||||
// detect dead EC2 agents the same way it detects dead Docker containers.
|
||||
// Pre-#NNN, maybeMarkContainerDead only consulted the local Docker
|
||||
// provisioner — for SaaS tenants (h.provisioner=nil) the check was a
|
||||
// no-op, so a dead EC2 agent would leak 502/503 to canvas with no
|
||||
// auto-recovery. (true, err) on transport errors keeps callers on the
|
||||
// alive path; (false, nil) is the only definitive "dead" signal.
|
||||
IsRunning(ctx context.Context, workspaceID string) (bool, error)
|
||||
}
|
||||
|
||||
// Compile-time assertion: *CPProvisioner satisfies CPProvisionerAPI.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user