fix(workspace-server): a2a-proxy preflight container check (closes #36)
Some checks failed
Retarget main PRs to staging / Retarget to staging (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Detect changes (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 7s
Harness Replays / detect-changes (pull_request) Successful in 7s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 5s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Harness Replays / Harness Replays (pull_request) Failing after 56s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 1m25s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m25s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m37s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m38s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m46s
CI / Platform (Go) (pull_request) Successful in 2m44s
Some checks failed
Retarget main PRs to staging / Retarget to staging (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Detect changes (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 7s
Harness Replays / detect-changes (pull_request) Successful in 7s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 5s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Harness Replays / Harness Replays (pull_request) Failing after 56s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 1m25s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m25s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m37s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m38s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m46s
CI / Platform (Go) (pull_request) Successful in 2m44s
Same SSOT-divergence shape as #10 / fixed in #12, but on the a2a-proxy code path. The plugin handler was routed through `provisioner.RunningContainerName`; a2a-proxy was forwarding optimistically and only catching missing containers REACTIVELY via `maybeMarkContainerDead` after the network call timed out. Result on tenants whose agent containers had been recycled (e.g. post-EC2 replace from molecule-controlplane#20): canvas waits 2-30s for the network forward to fail before getting a 503, and the workspace-server logs only "ProxyA2A forward error" without the "container is dead" signal. This PR adds a proactive `Provisioner.IsRunning` check in `proxyA2ARequest` between `resolveAgentURL` and `dispatchA2A`, gated on the conditions where we know we're talking to a sibling Docker container we own (`h.provisioner != nil` AND `platformInDocker` AND the URL was rewritten to Docker-DNS form). Three outcomes via the SSOT helper: (true, nil) → forward as today (false, nil) → fast-503 with `error="workspace container not running — restart triggered"`, `restarting=true`, `preflight=true`, plus the same offline-flip + WORKSPACE_OFFLINE broadcast + async restart that `maybeMarkContainerDead` produces (true, err) → fall through to optimistic forward (matches IsRunning's "fail-soft as alive" contract — flaky daemon must not trigger a restart cascade) The `preflight=true` flag in the response distinguishes the proactive short-circuit from the reactive `maybeMarkContainerDead` path so canvas or downstream callers can render distinct messages later. * `internal/handlers/a2a_proxy.go` — preflight call site between resolveAgentURL and dispatchA2A; gated on `h.provisioner != nil && platformInDocker && url == http://<ContainerName(id)>:port`. * `internal/handlers/a2a_proxy_helpers.go` — `preflightContainerHealth` helper. Routes through `h.provisioner.IsRunning` (which itself wraps `RunningContainerName`). Identical offline-flip side-effects as `maybeMarkContainerDead` for the dead-container case. * `internal/handlers/a2a_proxy_preflight_test.go` — 4 tests: running → nil; not-running → structured 503 + sqlmock expectations on the offline-flip + structure_events insert; transient error → nil (fail-soft); AST gate pinning the SSOT routing (mirror of #12's gate). Mutation-tested: removing the `if running { return nil }` guard makes the production code fail to compile (unused var). A subtler mutation (replacing the !running branch with `return nil`) would make TestPreflight_ContainerNotRunning_StructuredFastFail fail at runtime with sqlmock's "expected DB call did not occur." Refs: molecule-core#36. Companion to #12 (issue #10). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b9ca4ad84a
commit
be5fbb5ad3
@ -435,6 +435,34 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
// Pre-flight container-health check (#36). The dispatchA2A path below
|
||||
// does Docker-DNS forwarding to `ws-<wsShort>:8000` and only catches a
|
||||
// missing/dead container REACTIVELY via maybeMarkContainerDead in
|
||||
// handleA2ADispatchError. That works but costs the caller a full
|
||||
// network-timeout (2-30s) before the structured 503 surfaces.
|
||||
//
|
||||
// When we KNOW the workspace is container-backed (h.docker != nil + we
|
||||
// rewrite to Docker-DNS form below), do a single proactive
|
||||
// RunningContainerName lookup. If the container is genuinely missing,
|
||||
// short-circuit with the same structured 503 + async restart that
|
||||
// maybeMarkContainerDead would produce — but immediately, without the
|
||||
// network round-trip.
|
||||
//
|
||||
// Three outcomes of provisioner.RunningContainerName(ctx, h.docker, id):
|
||||
// ("ws-<id>", nil) → forward as today.
|
||||
// ("", nil) → container is genuinely not running. Fast-503.
|
||||
// ("", err) → transient daemon error. Fall through to optimistic
|
||||
// forward — matches Provisioner.IsRunning's
|
||||
// (true, err) "fail-soft as alive" contract.
|
||||
//
|
||||
// Same SSOT as findRunningContainer (#10/#12). See AST gate
|
||||
// TestProxyA2A_RoutesThroughProvisionerSSOT.
|
||||
if h.provisioner != nil && platformInDocker && strings.HasPrefix(agentURL, "http://"+provisioner.ContainerName(workspaceID)+":") {
|
||||
if proxyErr := h.preflightContainerHealth(ctx, workspaceID); proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
|
||||
@ -198,6 +198,60 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
|
||||
return true
|
||||
}
|
||||
|
||||
// preflightContainerHealth runs a proactive Provisioner.IsRunning check
|
||||
// (#36) before dispatching the a2a forward. Routed through provisioner's
|
||||
// SSOT IsRunning, which itself wraps RunningContainerName — same source
|
||||
// as findRunningContainer in the plugins handler (#10/#12).
|
||||
//
|
||||
// Returns nil when the forward should proceed:
|
||||
// - container is running, OR
|
||||
// - daemon errored transiently (matches IsRunning's (true, err)
|
||||
// "fail-soft as alive" contract — let the optimistic forward run
|
||||
// and reactive maybeMarkContainerDead catch a real failure).
|
||||
//
|
||||
// Returns a structured 503 + triggers the same async restart that
|
||||
// maybeMarkContainerDead would produce, when:
|
||||
// - container is genuinely not running (NotFound / Exited / Created…).
|
||||
//
|
||||
// The point of running this BEFORE the forward is to save the caller
|
||||
// 2-30s of network-timeout cost when the container is missing — a common
|
||||
// shape post-EC2-replace (see molecule-controlplane#20 incident
|
||||
// 2026-05-07) where the reconciler hasn't respawned the agent yet.
|
||||
func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspaceID string) *proxyA2AError {
|
||||
running, err := h.provisioner.IsRunning(ctx, workspaceID)
|
||||
if err != nil {
|
||||
// Transient daemon error. Provisioner.IsRunning returns (true, err)
|
||||
// in this case — fall through to the optimistic forward, reactive
|
||||
// maybeMarkContainerDead handles a real failure later.
|
||||
log.Printf("ProxyA2A preflight: IsRunning transient error for %s: %v (proceeding with forward)", workspaceID, err)
|
||||
return nil
|
||||
}
|
||||
if running {
|
||||
// Container is running — forward as today.
|
||||
return nil
|
||||
}
|
||||
// Container is genuinely not running. Mark offline + trigger restart
|
||||
// (same effect as maybeMarkContainerDead's branch), and return the
|
||||
// structured 503 immediately so the caller skips the forward.
|
||||
log.Printf("ProxyA2A preflight: container for %s is not running — marking offline and triggering restart (#36)", workspaceID)
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`,
|
||||
models.StatusOffline, workspaceID); dbErr != nil {
|
||||
log.Printf("ProxyA2A preflight: failed to mark workspace %s offline: %v", workspaceID, dbErr)
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
|
||||
go h.RestartByID(workspaceID)
|
||||
return &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{
|
||||
"error": "workspace container not running — restart triggered",
|
||||
"restarting": true,
|
||||
"preflight": true, // distinguishes from reactive containerDead path
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
||||
// goroutine (the request context may already be done by the time it runs).
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
||||
|
||||
194
workspace-server/internal/handlers/a2a_proxy_preflight_test.go
Normal file
194
workspace-server/internal/handlers/a2a_proxy_preflight_test.go
Normal file
@ -0,0 +1,194 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"go/ast"
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
)
|
||||
|
||||
// preflightLocalProv is a controllable LocalProvisionerAPI stub for the
|
||||
// preflight tests (#36). Other API methods panic to guard against tests
|
||||
// that should be using a different stub.
|
||||
type preflightLocalProv struct {
|
||||
running bool
|
||||
err error
|
||||
calls int
|
||||
calledWith []string
|
||||
}
|
||||
|
||||
func (p *preflightLocalProv) IsRunning(_ context.Context, workspaceID string) (bool, error) {
|
||||
p.calls++
|
||||
p.calledWith = append(p.calledWith, workspaceID)
|
||||
return p.running, p.err
|
||||
}
|
||||
func (p *preflightLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
|
||||
panic("preflightLocalProv: Start not implemented")
|
||||
}
|
||||
func (p *preflightLocalProv) Stop(_ context.Context, _ string) error {
|
||||
panic("preflightLocalProv: Stop not implemented")
|
||||
}
|
||||
func (p *preflightLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) {
|
||||
panic("preflightLocalProv: ExecRead not implemented")
|
||||
}
|
||||
func (p *preflightLocalProv) RemoveVolume(_ context.Context, _ string) error {
|
||||
panic("preflightLocalProv: RemoveVolume not implemented")
|
||||
}
|
||||
func (p *preflightLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) {
|
||||
panic("preflightLocalProv: VolumeHasFile not implemented")
|
||||
}
|
||||
func (p *preflightLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error {
|
||||
panic("preflightLocalProv: WriteAuthTokenToVolume not implemented")
|
||||
}
|
||||
|
||||
// TestPreflight_ContainerRunning_ReturnsNil — IsRunning(true,nil): forward
|
||||
// proceeds. preflight returns nil → caller continues to dispatchA2A.
|
||||
func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) {
|
||||
_ = setupTestDB(t)
|
||||
stub := &preflightLocalProv{running: true, err: nil}
|
||||
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
h.provisioner = stub
|
||||
|
||||
if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil {
|
||||
t.Fatalf("preflight should return nil when container running, got %+v", err)
|
||||
}
|
||||
if stub.calls != 1 {
|
||||
t.Errorf("IsRunning should be called exactly once, got %d", stub.calls)
|
||||
}
|
||||
if len(stub.calledWith) != 1 || stub.calledWith[0] != "ws-running-123" {
|
||||
t.Errorf("IsRunning should be called with workspace id, got %v", stub.calledWith)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPreflight_ContainerNotRunning_StructuredFastFail — IsRunning(false,nil):
|
||||
// preflight returns structured 503 with restarting=true + preflight=true, AND
|
||||
// triggers the offline-flip + WORKSPACE_OFFLINE broadcast + async restart.
|
||||
// This is the load-bearing case — saves the caller 2-30s of network timeout.
|
||||
func TestPreflight_ContainerNotRunning_StructuredFastFail(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
_ = setupTestRedis(t)
|
||||
stub := &preflightLocalProv{running: false, err: nil}
|
||||
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
h.provisioner = stub
|
||||
|
||||
// Expect the offline-flip UPDATE.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WithArgs(models.StatusOffline, "ws-dead-456").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Broadcaster's INSERT INTO structure_events fires too — best-effort
|
||||
// log entry for the WORKSPACE_OFFLINE event. Match permissively.
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
proxyErr := h.preflightContainerHealth(context.Background(), "ws-dead-456")
|
||||
if proxyErr == nil {
|
||||
t.Fatal("preflight should return *proxyA2AError when container not running")
|
||||
}
|
||||
if proxyErr.Status != 503 {
|
||||
t.Errorf("expected 503, got %d", proxyErr.Status)
|
||||
}
|
||||
if got := proxyErr.Response["restarting"]; got != true {
|
||||
t.Errorf("response should mark restarting=true, got %v", got)
|
||||
}
|
||||
if got := proxyErr.Response["preflight"]; got != true {
|
||||
t.Errorf("response should mark preflight=true so callers can distinguish from reactive containerDead, got %v", got)
|
||||
}
|
||||
if got := proxyErr.Response["error"]; got != "workspace container not running — restart triggered" {
|
||||
t.Errorf("error message mismatch, got %q", got)
|
||||
}
|
||||
|
||||
// Note: broadcaster firing is exercised by the production path's
|
||||
// h.broadcaster.RecordAndBroadcast call but not asserted here — the
|
||||
// real *events.Broadcaster doesn't expose received events for inspection.
|
||||
// The DB UPDATE expectation is sufficient to pin the offline-flip path.
|
||||
}
|
||||
|
||||
// TestPreflight_TransientError_FailsSoftAsAlive — IsRunning(true,err): the
|
||||
// (true, err) "fail-soft" contract — preflight returns nil so the optimistic
|
||||
// forward runs; reactive maybeMarkContainerDead handles a real failure later.
|
||||
// This pin is critical: a flaky daemon must NOT trigger a restart cascade.
|
||||
func TestPreflight_TransientError_FailsSoftAsAlive(t *testing.T) {
|
||||
_ = setupTestDB(t)
|
||||
stub := &preflightLocalProv{running: true, err: errors.New("docker daemon EOF")}
|
||||
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
h.provisioner = stub
|
||||
|
||||
if err := h.preflightContainerHealth(context.Background(), "ws-flaky-789"); err != nil {
|
||||
t.Fatalf("preflight should return nil on transient error (fail-soft), got %+v", err)
|
||||
}
|
||||
// No DB UPDATE expected — sqlmock would complain about unexpected calls
|
||||
// at test cleanup if the offline-flip path fired.
|
||||
}
|
||||
|
||||
// TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT — AST gate (#36 mirror
|
||||
// of #12's gate). Pins the invariant that preflightContainerHealth uses the
|
||||
// SSOT Provisioner.IsRunning helper, NOT a parallel docker.ContainerInspect
|
||||
// of its own.
|
||||
//
|
||||
// Mutation invariant: if a future PR replaces h.provisioner.IsRunning with
|
||||
// a direct cli.ContainerInspect call, this test fails. That's the signal to
|
||||
// either (a) extend Provisioner.IsRunning's contract OR (b) document why
|
||||
// this call site needs to differ. Either way, the drift gets a reviewer's
|
||||
// attention instead of shipping silently.
|
||||
func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) {
|
||||
fset := token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, "a2a_proxy_helpers.go", nil, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse a2a_proxy_helpers.go: %v", err)
|
||||
}
|
||||
|
||||
var fn *ast.FuncDecl
|
||||
ast.Inspect(file, func(n ast.Node) bool {
|
||||
f, ok := n.(*ast.FuncDecl)
|
||||
if !ok || f.Name.Name != "preflightContainerHealth" {
|
||||
return true
|
||||
}
|
||||
fn = f
|
||||
return false
|
||||
})
|
||||
if fn == nil {
|
||||
t.Fatal("preflightContainerHealth not found — was it renamed? update this gate or the SSOT routing assumption")
|
||||
}
|
||||
|
||||
var (
|
||||
callsIsRunning bool
|
||||
callsContainerInspectRaw bool
|
||||
callsRunningContainerNameDirect bool
|
||||
)
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
call, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
sel, ok := call.Fun.(*ast.SelectorExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
switch sel.Sel.Name {
|
||||
case "IsRunning":
|
||||
callsIsRunning = true
|
||||
case "ContainerInspect":
|
||||
callsContainerInspectRaw = true
|
||||
case "RunningContainerName":
|
||||
// Direct RunningContainerName is also acceptable SSOT — but
|
||||
// preferring IsRunning keeps the (bool, error) contract that
|
||||
// already exists in the helper API surface.
|
||||
callsRunningContainerNameDirect = true
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if !callsIsRunning && !callsRunningContainerNameDirect {
|
||||
t.Errorf("preflightContainerHealth must call provisioner.IsRunning OR provisioner.RunningContainerName for the SSOT health check — see molecule-core#36. Found neither.")
|
||||
}
|
||||
if callsContainerInspectRaw {
|
||||
t.Errorf("preflightContainerHealth carries a direct ContainerInspect call. This is the parallel-impl drift molecule-core#36 fixed. " +
|
||||
"Either route through provisioner.IsRunning OR — if a new use case truly needs a different inspect — extend the helper's contract first and update this gate to allow the specific delta.")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user