diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 13c46641..f4ec335e 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/gin-gonic/gin" @@ -123,6 +124,14 @@ func isUpstreamBusyError(err error) bool { if errors.Is(err, context.DeadlineExceeded) { return true } + // applyIdleTimeout cancels the request ctx via context.WithCancel + // when the broadcaster silence window elapses — that surfaces as + // context.Canceled (not DeadlineExceeded). Treat it as the same + // "upstream busy" class so the caller produces a 503 + Retry-After + // instead of a generic 500. + if errors.Is(err, context.Canceled) { + return true + } if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { return true } @@ -131,6 +140,7 @@ func isUpstreamBusyError(err error) bool { // inner cause. Fall back to substring match for those. msg := err.Error() return strings.Contains(msg, "context deadline exceeded") || + strings.Contains(msg, "context canceled") || strings.Contains(msg, "EOF") || strings.Contains(msg, "connection reset") } @@ -286,7 +296,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri body = normalizedBody startTime := time.Now() - resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID) + resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { defer cancelFwd() } @@ -478,25 +488,59 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { return marshaledBody, a2aMethod, nil } +// idleTimeoutDuration is the per-dispatch silence window: if the +// platform's broadcaster emits no events for this workspace for the +// full duration, the dispatch ctx is cancelled. Resets on every +// ACTIVITY_LOGGED / TASK_UPDATED / A2A_RESPONSE event for the +// workspace, so a chat that's actively reporting tool calls or +// streaming status updates never trips it. Picked to be longer than +// any reasonable single-tool-use cadence (Claude Code's slowest +// observed silence between tools is ~30s) but short enough that a +// truly wedged runtime fails in 1 minute, not 5. +const idleTimeoutDuration = 60 * time.Second + // dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation -// chains survive client disconnect (browser tab close). Default timeouts: -// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can -// override via the X-Timeout header (applied to ctx upstream in ProxyA2A). -func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) { +// chains survive client disconnect (browser tab close). Two layers of +// timeout per dispatch: +// +// - Idle timeout (always applied): cancels the dispatch when no +// broadcaster events for the workspace fire for +// idleTimeoutDuration. Any progress event resets the clock — so +// a long but actively-streaming reply runs forever, while a +// wedged runtime fails fast. +// - Absolute ceiling (agent-to-agent only): 30 min cap as a +// defence against runaway delegation loops. Canvas dispatches +// have no absolute ceiling — the user can wait as long as they +// want, the idle timer is the only hangup signal. +// +// Either layer is overridable by the X-Timeout header upstream in +// ProxyA2A; X-Timeout: 0 explicitly disables the absolute ceiling. +func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, workspaceID, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) { forwardCtx := context.WithoutCancel(ctx) - var cancel context.CancelFunc + var ceilingCancel context.CancelFunc if _, hasDeadline := ctx.Deadline(); !hasDeadline { - if callerID == "" { - forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute) - } else { - forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute) + if callerID != "" { + forwardCtx, ceilingCancel = context.WithTimeout(forwardCtx, 30*time.Minute) + } + // callerID == "" (canvas): no absolute ceiling. The idle + // timeout below is the only deadline. + } + // Idle timeout — cancels the dispatch ctx after + // idleTimeoutDuration of broadcaster silence for this workspace. + // Always applied (canvas + agent-to-agent both benefit; the + // ceiling above is a separate runaway-loop cap that only fires + // for agent traffic). Combines with the ceiling cancel into a + // single returned cancel func that the caller defers. + forwardCtx, idleCancel := applyIdleTimeout(forwardCtx, h.broadcaster, workspaceID, idleTimeoutDuration) + cancel := func() { + idleCancel() + if ceilingCancel != nil { + ceilingCancel() } } req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body)) if err != nil { - if cancel != nil { - cancel() - } + cancel() // Wrap the construction failure so the caller can distinguish it // from an upstream Do() error and produce the correct 500 response. return nil, nil, &proxyDispatchBuildError{err: err} @@ -505,3 +549,52 @@ func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, bod resp, doErr := a2aClient.Do(req) return resp, cancel, doErr } + +// applyIdleTimeout returns a child ctx that gets cancelled when no +// broadcaster events for `workspaceID` arrive for `idle` duration. +// Any incoming event resets the clock. The returned cancel func +// MUST be called to clean up the goroutine + subscription. +// +// nil broadcaster or non-positive idle returns the parent ctx +// unchanged (and a no-op cancel) so test paths that don't wire a +// broadcaster keep working. +func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID string, idle time.Duration) (context.Context, context.CancelFunc) { + if b == nil || idle <= 0 || workspaceID == "" { + return parent, func() {} + } + ctx, cancel := context.WithCancel(parent) + sub, unsub := b.SubscribeSSE(workspaceID) + go func() { + defer unsub() + timer := time.NewTimer(idle) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return + case _, ok := <-sub: + if !ok { + // Subscription channel closed — fall back to + // pure-timer mode. Don't cancel: another caller + // may have closed our sub but the request itself + // is still in flight. Let the timer or the + // caller's defer drive cleanup. + continue + } + // Stop+drain pattern so a fired-but-unread timer + // doesn't double-cancel after the Reset. + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(idle) + case <-timer.C: + cancel() + return + } + } + }() + return ctx, cancel +} diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index dcad98e2..8cdbf0d8 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -1074,7 +1075,7 @@ func TestDispatchA2A_BuildRequestError(t *testing.T) { handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) // Malformed URL causes http.NewRequestWithContext to fail. - _, cancel, err := handler.dispatchA2A(context.Background(), "http://%%badhost", []byte("{}"), "") + _, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", "http://%%badhost", []byte("{}"), "") if cancel != nil { cancel() } @@ -1097,13 +1098,13 @@ func TestDispatchA2A_CanvasTimeout(t *testing.T) { })) defer srv.Close() - resp, cancel, err := handler.dispatchA2A(context.Background(), srv.URL, []byte(`{}`), "") + resp, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", srv.URL, []byte(`{}`), "") if err != nil { t.Fatalf("unexpected error: %v", err) } defer resp.Body.Close() if cancel == nil { - t.Fatal("canvas caller (empty callerID) must set a timeout + return cancel") + t.Fatal("canvas caller must return a cancel func (idle-timeout cleanup)") } cancel() // restore } @@ -1118,20 +1119,23 @@ func TestDispatchA2A_AgentTimeout(t *testing.T) { })) defer srv.Close() - resp, cancel, err := handler.dispatchA2A(context.Background(), srv.URL, []byte(`{}`), "ws-caller") + resp, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", srv.URL, []byte(`{}`), "ws-caller") if err != nil { t.Fatalf("unexpected error: %v", err) } defer resp.Body.Close() if cancel == nil { - t.Fatal("agent-to-agent caller must set a timeout + return cancel") + t.Fatal("agent-to-agent caller must return a cancel func (idle + ceiling cleanup)") } cancel() } -func TestDispatchA2A_ContextDeadline_NoCancelAdded(t *testing.T) { - // When ctx already has a deadline, dispatchA2A must NOT layer its own - // timeout (cancel should be nil). +func TestDispatchA2A_ContextDeadline_NoExtraCeiling(t *testing.T) { + // When ctx already has a deadline, dispatchA2A must not layer + // its own absolute ceiling on top — the caller's deadline wins. + // The idle-timer cleanup still produces a non-nil cancel func + // (introduced by the always-on idle timeout) but the cancel func + // is safe to call repeatedly and from a deferred path. setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) @@ -1144,17 +1148,95 @@ func TestDispatchA2A_ContextDeadline_NoCancelAdded(t *testing.T) { ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) defer ctxCancel() - resp, cancel, err := handler.dispatchA2A(ctx, srv.URL, []byte(`{}`), "") + resp, cancel, err := handler.dispatchA2A(ctx, "ws-target", srv.URL, []byte(`{}`), "") if err != nil { t.Fatalf("unexpected error: %v", err) } defer resp.Body.Close() - if cancel != nil { - t.Error("cancel should be nil when ctx already has a deadline") - cancel() + if cancel == nil { + t.Error("cancel must be non-nil (idle-timer cleanup)") } } +// --- applyIdleTimeout --- + +// TestApplyIdleTimeout_FiresOnSilence verifies the helper cancels its +// child ctx when no broadcaster events arrive for `idle` duration. +// Uses a short idle window (60ms) so the test runs fast. +func TestApplyIdleTimeout_FiresOnSilence(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + b := newTestBroadcaster() + + parent, parentCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer parentCancel() + + idleCtx, idleCancel := applyIdleTimeout(parent, b, "ws-silent", 60*time.Millisecond) + defer idleCancel() + + select { + case <-idleCtx.Done(): + // expected — no events ever arrived for ws-silent + case <-time.After(2 * time.Second): + t.Fatal("idleCtx never cancelled despite no events") + } + if !errors.Is(idleCtx.Err(), context.Canceled) { + t.Errorf("idleCtx err = %v, want context.Canceled", idleCtx.Err()) + } +} + +// TestApplyIdleTimeout_ResetsOnEvent verifies that a broadcaster event +// for the workspace resets the timer. Sends one event mid-window and +// confirms ctx is still alive after the original deadline would have +// fired, but cancelled after a second silence window elapses. +func TestApplyIdleTimeout_ResetsOnEvent(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + b := newTestBroadcaster() + + parent, parentCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer parentCancel() + + idle := 80 * time.Millisecond + idleCtx, idleCancel := applyIdleTimeout(parent, b, "ws-active", idle) + defer idleCancel() + + // Send a progress event halfway through the window — should + // extend the deadline by another `idle`. + time.Sleep(idle / 2) + b.BroadcastOnly("ws-active", "ACTIVITY_LOGGED", map[string]interface{}{"activity_type": "agent_log"}) + + // At t = idle (original deadline), ctx must still be alive + // because the event reset the clock. + select { + case <-idleCtx.Done(): + t.Fatal("idleCtx cancelled despite mid-window event resetting the timer") + case <-time.After(idle - (idle / 2) + 10*time.Millisecond): + // ok — past the original deadline, still alive + } + + // Now wait for the second silence window to actually fire. + select { + case <-idleCtx.Done(): + // expected + case <-time.After(idle + 200*time.Millisecond): + t.Fatal("idleCtx never cancelled after the second silence window") + } +} + +// TestApplyIdleTimeout_NilBroadcasterDegradesGracefully — nil +// broadcaster (some test paths) returns the parent ctx unchanged. +func TestApplyIdleTimeout_NilBroadcasterDegradesGracefully(t *testing.T) { + parent := context.Background() + idleCtx, cancel := applyIdleTimeout(parent, nil, "ws-x", 50*time.Millisecond) + defer cancel() + if idleCtx != parent { + t.Error("nil broadcaster must return the parent ctx unchanged") + } + // And calling cancel must be safe. + cancel() +} + // --- handleA2ADispatchError --- func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {