Merge pull request #2362 from Molecule-AI/auto/a2a-upstream-5xx-mark-dead
fix(a2a): detect dead EC2 agents on upstream 5xx + reactive auto-restart for SaaS
This commit is contained in:
commit
344e3e8914
@ -13,6 +13,7 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -92,13 +93,47 @@ func isSystemCaller(callerID string) bool {
|
||||
const maxProxyResponseBody = 10 << 20
|
||||
|
||||
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
|
||||
// No client-level timeout — timeouts are enforced per-request via context
|
||||
// deadlines: canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap). Do NOT
|
||||
// set a Client.Timeout here: it is enforced independently of ctx deadlines and
|
||||
// would pre-empt legitimate slow cold-start flows (e.g. Claude Code first-token
|
||||
// over OAuth can take 30-60s on boot). Callers that want a safety net should
|
||||
// build a context.WithTimeout themselves.
|
||||
var a2aClient = &http.Client{}
|
||||
//
|
||||
// Timeout model — three independent budgets, none of which gets in each other's way:
|
||||
//
|
||||
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
|
||||
// the entire request including streamed body reads, and would pre-empt
|
||||
// legitimate slow cold-start flows (Claude Code first-token over OAuth
|
||||
// can take 30-60s on boot; long-running agent synthesis can stream
|
||||
// tokens for minutes). Total-request budget is enforced per-request
|
||||
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
|
||||
//
|
||||
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
|
||||
// black-holes TCP connects (instance terminated mid-flight, security group
|
||||
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
|
||||
// enough that Cloudflare's ~100s edge timeout can fire first and surface
|
||||
// a generic 502 page to canvas. 10s is well above realistic intra-region
|
||||
// latencies and well below CF's edge timeout.
|
||||
//
|
||||
// 3. Transport.ResponseHeaderTimeout — 60s. From request-body-end to
|
||||
// response-headers-start. Covers cold-start first-byte (the 30-60s OAuth
|
||||
// flow above), with margin. Body streaming after headers is governed by
|
||||
// the per-request context deadline, NOT this timeout — so multi-minute
|
||||
// agent responses still work fine.
|
||||
//
|
||||
// The point of (2) and (3) is to surface a *structured* 503 from
|
||||
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
|
||||
// gets `{"error":"workspace agent unreachable","restarting":true}` instead
|
||||
// of Cloudflare's opaque 502 error page. Without these, dead workspaces hang
|
||||
// long enough that CF gives up first and shows its own page.
|
||||
var a2aClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: 60 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
// MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent
|
||||
// fan-in is bounded by the platform's broadcaster fan-out, not by
|
||||
// connection-pool sizing.
|
||||
},
|
||||
}
|
||||
|
||||
type proxyA2AError struct {
|
||||
Status int
|
||||
@ -422,6 +457,51 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
if errMsg == "" {
|
||||
errMsg = http.StatusText(resp.StatusCode)
|
||||
}
|
||||
|
||||
// Upstream returned 502/503/504 (gateway/proxy failure). This is
|
||||
// the "agent process is dead but the tunnel between us and the
|
||||
// workspace is still up" signal — handleA2ADispatchError's
|
||||
// network-error path doesn't run because Do() succeeded at the
|
||||
// HTTP layer. Without this branch, the dead-agent failure mode
|
||||
// surfaces to canvas as a generic 502 (and CF in front of the
|
||||
// platform masks it with its own error page, hiding any
|
||||
// structured response we might write).
|
||||
//
|
||||
// Treatment matches handleA2ADispatchError's container-dead path:
|
||||
// 1. Probe IsRunning via maybeMarkContainerDead. If the
|
||||
// container truly is dead, mark workspace offline + kick
|
||||
// a restart goroutine.
|
||||
// 2. Return a structured 503 with restarting=true + Retry-After
|
||||
// so canvas shows a useful "agent is restarting" message
|
||||
// (and CF doesn't intercept the 503 the way it does 502).
|
||||
// If IsRunning reports the container is alive, we leave the
|
||||
// upstream status untouched — the agent legitimately returned
|
||||
// 502/503/504 (e.g. it's returning its own Bad-Gateway from
|
||||
// some downstream call) and we shouldn't mistakenly recycle it.
|
||||
//
|
||||
// Empty body is the strong signal here — a CF-tunnel "no-origin"
|
||||
// 502 has 0 bytes; an agent-authored 502 typically has a JSON
|
||||
// error body. We probe IsRunning regardless (it's the
|
||||
// authoritative check) but the empty-body case is what makes
|
||||
// this fix necessary.
|
||||
// 524 = Cloudflare "origin timed out" — origin accepted the
|
||||
// connection but didn't return headers within ~100s. Same
|
||||
// signal as 504/502 for our purposes: the upstream agent is
|
||||
// not responsive. We probe IsRunning to confirm before
|
||||
// triggering a restart.
|
||||
if resp.StatusCode == http.StatusBadGateway ||
|
||||
resp.StatusCode == http.StatusServiceUnavailable ||
|
||||
resp.StatusCode == http.StatusGatewayTimeout ||
|
||||
resp.StatusCode == 524 {
|
||||
if h.maybeMarkContainerDead(ctx, workspaceID) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": "15"},
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true, "retry_after": 15},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp.StatusCode, respBody, &proxyA2AError{
|
||||
Status: resp.StatusCode,
|
||||
Response: gin.H{"error": errMsg},
|
||||
|
||||
@ -141,22 +141,46 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
}
|
||||
|
||||
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
||||
// If the workspace's Docker container is no longer running (and the workspace
|
||||
// isn't external), it marks the workspace offline, clears Redis state,
|
||||
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
|
||||
// when the container was found dead.
|
||||
// If the workspace's compute (Docker container OR EC2 instance) is no longer
|
||||
// running (and the workspace isn't external), it marks the workspace offline,
|
||||
// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async
|
||||
// restart. Returns true when the compute was found dead.
|
||||
//
|
||||
// Provisioner selection (mutually exclusive in production):
|
||||
// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect.
|
||||
// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's
|
||||
// /cp/workspaces/:id/status to read the EC2 state.
|
||||
//
|
||||
// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants
|
||||
// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every
|
||||
// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas
|
||||
// with no auto-recovery and Cloudflare in front would mask the response with
|
||||
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
|
||||
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner == nil || wsRuntime == "external" {
|
||||
if wsRuntime == "external" {
|
||||
return false
|
||||
}
|
||||
running, inspectErr := h.provisioner.IsRunning(ctx, workspaceID)
|
||||
if h.provisioner == nil && h.cpProv == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var running bool
|
||||
var inspectErr error
|
||||
if h.provisioner != nil {
|
||||
running, inspectErr = h.provisioner.IsRunning(ctx, workspaceID)
|
||||
} else {
|
||||
// SaaS path: ask the CP about the EC2 state. Same (true, err) on
|
||||
// transport errors contract — keeps the caller on the alive path
|
||||
// instead of triggering a restart cascade on a flaky CP call.
|
||||
running, inspectErr = h.cpProv.IsRunning(ctx, workspaceID)
|
||||
}
|
||||
if inspectErr != nil {
|
||||
// Transient Docker-daemon error (timeout, socket EOF, etc.). Post-
|
||||
// #386, IsRunning returns (true, err) in this case — caller stays
|
||||
// on the alive path and does not trigger a restart cascade. Log
|
||||
// so the defect is visible without being destructive.
|
||||
// Transient backend error (Docker daemon EOF, CP HTTP 5xx, etc.).
|
||||
// IsRunning's contract returns (true, err) in this case so we stay
|
||||
// on the alive path without triggering a restart cascade.
|
||||
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
|
||||
}
|
||||
if running {
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
@ -244,6 +245,117 @@ func TestProxyA2A_AgentReturnsError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_Upstream502_TriggersContainerDeadCheck — when the agent
|
||||
// tunnel returns 502 (the "tunnel up but no origin" failure mode that
|
||||
// surfaces a Cloudflare error page to canvas), proxyA2A must consult
|
||||
// IsRunning on cpProv. If the EC2 instance truly is dead, the response
|
||||
// becomes a structured 503 with restarting=true (not the upstream 502
|
||||
// which CF would mask), and the workspace flips to status='offline' so
|
||||
// the next reactive poll sees the right state. This is the
|
||||
// 2026-04-30 hongmingwang.moleculesai.app canvas-chat-to-dead-workspace
|
||||
// regression: upstream 502 was previously propagated as-is, CF masked
|
||||
// it, and no auto-restart fired.
|
||||
func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
// Agent tunnel returns 502 with empty body — the CF "no-origin" shape.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-tunnel-dead"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-tunnel-dead")
|
||||
// Activity log fires (delivery_confirmed is true on Do() success regardless
|
||||
// of upstream status — handler's existing logA2ASuccess path runs first
|
||||
// and logs as success because the dispatch did get a response).
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// maybeMarkContainerDead's runtime lookup, then the offline-flip UPDATE.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-tunnel-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
|
||||
WithArgs("ws-tunnel-dead").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-tunnel-dead"}}
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-tunnel-dead/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
// Caller sees a structured 503 (NOT the upstream 502 which CF would mask).
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Fatalf("upstream 502 should translate to 503 once cpProv reports dead; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "restarting") {
|
||||
t.Errorf("response body should mention restart trigger; got %s", w.Body.String())
|
||||
}
|
||||
if w.Header().Get("Retry-After") != "15" {
|
||||
t.Errorf("Retry-After header should be 15 to throttle canvas-side retry loop; got %q", w.Header().Get("Retry-After"))
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("cpProv.IsRunning must be consulted exactly once; got %d calls", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs — the safety check:
|
||||
// if cpProv reports the EC2 IS running, the upstream 502 is propagated
|
||||
// as-is. Don't recycle a healthy agent on a transient hiccup — the agent
|
||||
// might have legitimately returned 502 (e.g. a downstream service it
|
||||
// called returned 502 and it forwarded). Net behavior matches pre-fix
|
||||
// for the alive-agent case.
|
||||
func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: true}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
fmt.Fprint(w, `{"error":"downstream service returned 502"}`)
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", "ws-alive-502"), agentServer.URL)
|
||||
expectBudgetCheck(mock, "ws-alive-502")
|
||||
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// IsRunning runtime lookup runs but no UPDATE follows (running=true).
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-alive-502").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-alive-502"}}
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-alive-502/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusBadGateway {
|
||||
t.Fatalf("alive agent 502 should propagate as 502; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== ProxyA2A — messageId injection ====================
|
||||
|
||||
func TestProxyA2A_MessageIDInjected(t *testing.T) {
|
||||
@ -1640,6 +1752,136 @@ func TestMaybeMarkContainerDead_NilProvisioner(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// SaaS path: h.provisioner=nil but h.cpProv is wired and reports the EC2
|
||||
// instance is NOT running. maybeMarkContainerDead must consult cpProv,
|
||||
// flip the workspace to status='offline', clear keys, broadcast OFFLINE,
|
||||
// and return true so the caller surfaces the structured 503. Pre-fix
|
||||
// (#NNN) it returned false unconditionally on h.provisioner==nil, so
|
||||
// dead EC2 agents leaked upstream 502 to canvas with no recovery.
|
||||
func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'offline'`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-dead")
|
||||
if !got {
|
||||
t.Fatal("expected true (cpProv reports not running) — without cpProv consultation, SaaS dead-agent recovery is impossible")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// SaaS path: h.cpProv reports running=true → maybeMarkContainerDead must
|
||||
// return false (don't restart a healthy agent on a transient upstream
|
||||
// hiccup). This is the safety check that prevents over-eager recycling.
|
||||
func TestMaybeMarkContainerDead_CPOnly_Running(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: true}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-alive").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-alive"); got {
|
||||
t.Error("expected false when cpProv reports running — must not recycle a healthy agent")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("expected exactly 1 IsRunning call on cpProv; got %d", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// SaaS-path runRestartCycle: when h.provisioner is nil and h.cpProv is set,
|
||||
// the auto-restart cycle MUST call cpProv.Stop (not Docker provisioner.Stop).
|
||||
// Pre-fix this dispatched only to h.provisioner.Stop, NPE'd on nil, was
|
||||
// silently swallowed by coalesceRestart's recover-without-re-raise, and
|
||||
// left the workspace stuck in status='provisioning' forever — making
|
||||
// reactive auto-restart on SaaS effectively dead code. The independent
|
||||
// review of PR #2362 caught this gap.
|
||||
//
|
||||
// We drive runRestartCycle directly (not via RestartByID/coalesceRestart)
|
||||
// so we don't fight the goroutine's timing in a unit test. The full
|
||||
// restart chain (provisionWorkspaceCP) needs its own mocked DB rows that
|
||||
// would explode the surface area of this test; what we care about here
|
||||
// is the dispatch decision, which is observable on cpProv.stopCalls.
|
||||
// stopForRestart is the dispatch helper extracted from runRestartCycle so the
|
||||
// branch logic can be tested without spawning the async sendRestartContext
|
||||
// goroutine that the full cycle fires. Pre-fix runRestartCycle's Stop dispatch
|
||||
// only called the Docker path, so on SaaS (h.provisioner=nil) the cycle NPE'd
|
||||
// silently and left the workspace stuck in status='provisioning'.
|
||||
func TestStopForRestart_SaaSPath_DispatchesViaCPProv(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
handler.stopForRestart(context.Background(), "ws-saas-restart")
|
||||
|
||||
if cp.stopCalls != 1 {
|
||||
t.Fatalf("expected cpProv.Stop to be called once on SaaS auto-restart; got %d", cp.stopCalls)
|
||||
}
|
||||
if cp.startCalls != 0 {
|
||||
t.Fatalf("expected cpProv.Start NOT to be called by stopForRestart; got %d", cp.startCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// Both nil → no-op, no panic. Defensive guard against the dispatcher being
|
||||
// invoked on a misconfigured handler.
|
||||
func TestStopForRestart_NoProvisioner_NoOp(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
// no provisioner, no cpProv
|
||||
handler.stopForRestart(context.Background(), "ws-orphan")
|
||||
// no panic, no calls — assertion is reaching this line
|
||||
}
|
||||
|
||||
// fakeCPProv satisfies provisioner.CPProvisionerAPI for tests that exercise
|
||||
// the SaaS / EC2-backed reactive-health path.
|
||||
//
|
||||
// Methods all record calls. Start/Stop/GetConsoleOutput return nil/empty by
|
||||
// default — the maybeMarkContainerDead happy path triggers an async
|
||||
// `go h.RestartByID(...)` which calls Stop, so the previous "panic on
|
||||
// unexpected call" pattern was unsafe (the panic fires on a goroutine,
|
||||
// after the assertions ran). Tests that want to ASSERT a method is unused
|
||||
// can check `calls == 0` after a sync barrier.
|
||||
type fakeCPProv struct {
|
||||
running bool
|
||||
calls int
|
||||
stopCalls int
|
||||
startCalls int
|
||||
}
|
||||
|
||||
func (f *fakeCPProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
|
||||
f.startCalls++
|
||||
return "", nil
|
||||
}
|
||||
func (f *fakeCPProv) Stop(_ context.Context, _ string) error {
|
||||
f.stopCalls++
|
||||
return nil
|
||||
}
|
||||
func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
f.calls++
|
||||
return f.running, nil
|
||||
}
|
||||
|
||||
// external runtime → false regardless of provisioner.
|
||||
func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
@ -1206,6 +1206,10 @@ func (s *stubFailingCPProv) GetConsoleOutput(_ context.Context, _ string) (strin
|
||||
panic("stubFailingCPProv.GetConsoleOutput not expected on the provisionWorkspaceCP failure path")
|
||||
}
|
||||
|
||||
func (s *stubFailingCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
panic("stubFailingCPProv.IsRunning not expected on the provisionWorkspaceCP failure path")
|
||||
}
|
||||
|
||||
// TestProvisionWorkspaceCP_NoInternalErrorsInBroadcast asserts that
|
||||
// provisionWorkspaceCP never leaks err.Error() in
|
||||
// WORKSPACE_PROVISION_FAILED broadcasts. Regression test for #1206.
|
||||
|
||||
@ -326,7 +326,12 @@ func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID s
|
||||
// in-flight runner picks up the pending request after its current cycle
|
||||
// completes, so writes that committed mid-restart are guaranteed to land.
|
||||
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
if h.provisioner == nil {
|
||||
// At least one of the two provisioners must be wired. Pre-fix this
|
||||
// short-circuited on h.provisioner==nil alone, which silently disabled
|
||||
// reactive auto-restart on every SaaS tenant (where the local Docker
|
||||
// provisioner is intentionally nil). The runRestartCycle below now
|
||||
// branches on which one is set for the Stop call.
|
||||
if h.provisioner == nil && h.cpProv == nil {
|
||||
return
|
||||
}
|
||||
coalesceRestart(workspaceID, func() { h.runRestartCycle(workspaceID) })
|
||||
@ -391,6 +396,25 @@ func coalesceRestart(workspaceID string, cycle func()) {
|
||||
}
|
||||
}
|
||||
|
||||
// stopForRestart dispatches Stop to whichever provisioner is wired (Docker or
|
||||
// CP/EC2 — mutually exclusive in production). Docker provisioner.Stop kills
|
||||
// the local container; CP provisioner.Stop calls DELETE /cp/workspaces/:id
|
||||
// which terminates the EC2 instance. Pre-fix runRestartCycle only called the
|
||||
// Docker path, so on SaaS (h.provisioner=nil) the auto-restart cycle silently
|
||||
// NPE'd before reaching the reprovision step — which is why every SaaS dead-
|
||||
// agent incident pre-this-fix required manual restart from canvas.
|
||||
func (h *WorkspaceHandler) stopForRestart(ctx context.Context, workspaceID string) {
|
||||
if h.provisioner != nil {
|
||||
h.provisioner.Stop(ctx, workspaceID)
|
||||
return
|
||||
}
|
||||
if h.cpProv != nil {
|
||||
if err := h.cpProv.Stop(ctx, workspaceID); err != nil {
|
||||
log.Printf("Auto-restart: cpProv.Stop(%s) failed: %v (continuing to reprovision)", workspaceID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runRestartCycle does the actual stop+provision work for one restart
|
||||
// iteration. Synchronous (waits for provisionWorkspace to complete) so the
|
||||
// outer pending-flag loop in RestartByID can correctly coalesce — if this
|
||||
@ -426,7 +450,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
|
||||
log.Printf("Auto-restart: restarting %s (%s) runtime=%q (was: %s)", wsName, workspaceID, dbRuntime, status)
|
||||
|
||||
h.provisioner.Stop(ctx, workspaceID)
|
||||
h.stopForRestart(ctx, workspaceID)
|
||||
|
||||
db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = 'provisioning', url = '', updated_at = now() WHERE id = $1`, workspaceID)
|
||||
@ -445,7 +469,21 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
// SYNCHRONOUS provisionWorkspace: returns when the new container is up
|
||||
// (or has failed). The outer loop relies on this to know when it's safe
|
||||
// to start another restart cycle without racing this one's Stop call.
|
||||
h.provisionWorkspace(workspaceID, "", nil, payload)
|
||||
//
|
||||
// Branch on which provisioner is wired — same dispatch as the other call
|
||||
// sites in this package (workspace.go:431-433, workspace_restart.go:197+596).
|
||||
// Pre-fix this only called the Docker variant, so on SaaS the auto-restart
|
||||
// cycle would NPE inside provisionWorkspace's `h.provisioner.VolumeHasFile`
|
||||
// call, get swallowed by coalesceRestart's recover()-without-re-raise (a
|
||||
// platform-stability safeguard), and leave the workspace permanently
|
||||
// stuck in status='provisioning' (the UPDATE above already ran). User-
|
||||
// observable result before this fix on SaaS: dead workspace → manual
|
||||
// canvas restart was the only recovery path.
|
||||
if h.cpProv != nil {
|
||||
h.provisionWorkspaceCP(workspaceID, "", nil, payload)
|
||||
} else {
|
||||
h.provisionWorkspace(workspaceID, "", nil, payload)
|
||||
}
|
||||
// sendRestartContext is a one-way notification to the new container; safe
|
||||
// to fire async — the next restart cycle won't depend on it completing.
|
||||
go h.sendRestartContext(workspaceID, restartData)
|
||||
|
||||
@ -29,6 +29,16 @@ type CPProvisionerAPI interface {
|
||||
Start(ctx context.Context, cfg WorkspaceConfig) (string, error)
|
||||
Stop(ctx context.Context, workspaceID string) error
|
||||
GetConsoleOutput(ctx context.Context, workspaceID string) (string, error)
|
||||
// IsRunning reports whether the workspace's compute (EC2 instance) is
|
||||
// currently in the running state. Surfaced on the interface (rather than
|
||||
// only on *CPProvisioner) so the a2a-proxy reactive-health path can
|
||||
// detect dead EC2 agents the same way it detects dead Docker containers.
|
||||
// Pre-#NNN, maybeMarkContainerDead only consulted the local Docker
|
||||
// provisioner — for SaaS tenants (h.provisioner=nil) the check was a
|
||||
// no-op, so a dead EC2 agent would leak 502/503 to canvas with no
|
||||
// auto-recovery. (true, err) on transport errors keeps callers on the
|
||||
// alive path; (false, nil) is the only definitive "dead" signal.
|
||||
IsRunning(ctx context.Context, workspaceID string) (bool, error)
|
||||
}
|
||||
|
||||
// Compile-time assertion: *CPProvisioner satisfies CPProvisionerAPI.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user