Merge pull request #1073 from Molecule-AI/fix/isrunning-alive-on-transient

fix(cp_provisioner): IsRunning returns (true, err) on transient failures
This commit is contained in:
Hongming Wang 2026-04-20 08:58:44 -07:00 committed by GitHub
commit 35f7193ca9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 110 additions and 18 deletions

View File

@ -169,29 +169,44 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
// IsRunning checks workspace EC2 instance state via the control plane.
//
// Contract:
// - transport error → (false, error)
// - non-2xx HTTP response → (false, error). Previously swallowed;
// a CP 500 would return (false, nil) and the sweeper couldn't
// distinguish "workspace stopped" from "CP broken".
// Contract (matches the Docker Provisioner.IsRunning contract —
// critical for a2a_proxy's alive-on-transient-error path):
//
// - transport error → (true, error)
// - non-2xx HTTP response → (true, error)
// - JSON decode failure → (true, error)
// - 2xx with state!="running" → (false, nil)
// - 2xx with state=="running" → (true, nil)
//
// Why "true on error": a2a_proxy inspects (running, err) and only
// triggers the restart cascade when running==false. Returning false
// on a transient CP outage would cause every brief CP blip to
// stampede every workspace into a restart storm. Returning true
// with the error preserves the signal for logging while keeping the
// workspace on the alive path.
//
// healthsweep.go takes the mirror stance: `if err != nil { continue }`,
// so it skips uncertain results and never marks a workspace offline
// on transport error regardless of the running bool.
//
// Both callers are happy with (true, err); callers that need the
// previous (false, err) shape must inspect err themselves.
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return false, fmt.Errorf("cp provisioner: status: %w", err)
return true, fmt.Errorf("cp provisioner: status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// Don't leak the body — upstream errors may echo headers.
return false, fmt.Errorf("cp provisioner: status: unexpected %d", resp.StatusCode)
return true, fmt.Errorf("cp provisioner: status: unexpected %d", resp.StatusCode)
}
var result struct{ State string `json:"state"` }
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, fmt.Errorf("cp provisioner: status decode: %w", err)
return true, fmt.Errorf("cp provisioner: status decode: %w", err)
}
return result.State == "running", nil
}

View File

@ -320,10 +320,15 @@ func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
}
}
// TestIsRunning_TransportErrorReturnsFalse — when the CP is
// unreachable, IsRunning must not claim the workspace is running
// (that'd mislead the sweeper into leaving a dead row in place).
func TestIsRunning_TransportErrorReturnsFalse(t *testing.T) {
// TestIsRunning_TransportErrorReturnsTrue — when the CP is
// unreachable, IsRunning must return (true, err) — matching the
// Docker provisioner contract so a2a_proxy stays on the alive path
// during a transient CP outage. Returning false here would trigger
// restart cascades on every brief CP blip.
//
// The sweeper (healthsweep.go) inspects err independently and skips
// on any error, so (true, err) is equally safe for that caller.
func TestIsRunning_TransportErrorReturnsTrue(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
@ -333,8 +338,8 @@ func TestIsRunning_TransportErrorReturnsFalse(t *testing.T) {
if err == nil {
t.Errorf("expected transport error, got nil (got=%v)", got)
}
if got {
t.Errorf("transport failure must not report running=true")
if !got {
t.Errorf("transport failure must report running=true so a2a_proxy stays on the alive path (matches Docker provisioner contract); got false")
}
}
@ -367,8 +372,8 @@ func TestIsRunning_Non2xxSurfacesError(t *testing.T) {
if err == nil {
t.Errorf("status %d: expected error, got nil", tc.status)
}
if got {
t.Errorf("status %d: must not report running=true on non-2xx", tc.status)
if !got {
t.Errorf("status %d: must report running=true on non-2xx so a2a_proxy stays on alive path; got false", tc.status)
}
// Error must NOT echo the upstream body — CP 5xx bodies
// can contain echoed headers and we don't want logs to
@ -396,11 +401,83 @@ func TestIsRunning_MalformedJSONBodyReturnsError(t *testing.T) {
if err == nil {
t.Errorf("malformed body: expected error, got nil (got=%v)", got)
}
if got {
t.Errorf("malformed body must not report running=true")
if !got {
t.Errorf("malformed body must report running=true so a2a_proxy stays on alive path; got false")
}
}
// TestIsRunning_ContractCompat_A2AProxy — codifies the critical
// invariant that a2a_proxy.go line ~534 depends on: during CP
// transient errors, the handler must inspect `running`, see true,
// and skip the restart cascade. If this contract drifts (e.g., a
// future refactor returns false on error), every brief CP outage
// cascades into a workspace restart storm.
//
// This is a regression guard, not a functional test — it asserts
// the documented contract values rather than simulating the whole
// a2a_proxy flow.
func TestIsRunning_ContractCompat_A2AProxy(t *testing.T) {
// Simulate every error path and assert running==true for each.
t.Run("transport error", func(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1", orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
running, err := p.IsRunning(context.Background(), "ws-1")
if err == nil || !running {
t.Errorf("want (true, err); got (%v, %v)", running, err)
}
})
t.Run("CP 500 response", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(500)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
running, err := p.IsRunning(context.Background(), "ws-1")
if err == nil || !running {
t.Errorf("want (true, err); got (%v, %v)", running, err)
}
})
t.Run("malformed 200 body", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
_, _ = io.WriteString(w, "garbage")
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
running, err := p.IsRunning(context.Background(), "ws-1")
if err == nil || !running {
t.Errorf("want (true, err); got (%v, %v)", running, err)
}
})
// And the non-error paths must still report the truth.
t.Run("2xx stopped → false nil", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
_, _ = io.WriteString(w, `{"state":"stopped"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
running, err := p.IsRunning(context.Background(), "ws-1")
if err != nil || running {
t.Errorf("want (false, nil); got (%v, %v)", running, err)
}
})
t.Run("2xx running → true nil", func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
_, _ = io.WriteString(w, `{"state":"running"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
running, err := p.IsRunning(context.Background(), "ws-1")
if err != nil || !running {
t.Errorf("want (true, nil); got (%v, %v)", running, err)
}
})
}
// TestClose_Noop — explicit contract: Close has no side effects and
// no error. Exists for the Provisioner interface; compliance guard.
func TestClose_Noop(t *testing.T) {