feat(platform): idle-based A2A timeout, drop 5-min canvas hardcode
The previous canvas-default 5-min absolute deadline pre-empted any
chat that legitimately ran longer (multi-turn tool use, large
synthesis tasks) and made every wedged-SDK call burn 5 full minutes
before the user saw anything. Replaced with a per-dispatch idle
timeout: cancel the request only when the broadcaster has been
silent for `idleTimeoutDuration` (60s). Any progress event for the
workspace — agent_log tool-use rows, task_update, a2a_send,
a2a_receive — resets the clock.
Mechanics:
- new applyIdleTimeout helper subscribes to events.Broadcaster's
per-workspace SSE channel, drains its messages, resets a
time.Timer on each one, cancels the wrapped ctx when the timer
fires. Cleanup goroutine + subscription lives only as long as
the returned cancel func is uncalled.
- dispatchA2A now takes workspaceID as a parameter, applies the
idle timeout always (canvas + agent), and combines its cancel
with the existing 30-min agent-to-agent ceiling cancel into one
func the caller defers.
- Canvas dispatches no longer have an absolute ceiling at all —
the idle timer is the only "give up" signal. A healthy chat
reporting tool-use telemetry every few seconds runs forever;
a wedged runtime fails in 60s instead of 5 min.
- isUpstreamBusyError now also recognises context.Canceled (the
error class our idle cancel produces, distinct from
DeadlineExceeded). Same 503-busy retry semantics.
Tests:
- TestApplyIdleTimeout_FiresOnSilence — 60ms idle, no events,
ctx cancels with context.Canceled.
- TestApplyIdleTimeout_ResetsOnEvent — event mid-window extends
the deadline; ctx alive past original deadline, then cancels
on the second silence window.
- TestApplyIdleTimeout_NilBroadcasterDegradesGracefully — defensive
no-op for paths that don't wire a broadcaster.
- 3 existing dispatchA2A tests updated for the new workspaceID
param + the always-non-nil cancel return shape.
This pairs with Piece 1's per-tool-use telemetry (166c7f77): the
broadcaster events that reset the idle timer ARE the agent_log
rows the workspace started emitting per tool call. So the same
event stream feeds both the chat progress feed AND the proxy's
deadline.
Full Go test suite passes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
166c7f77af
commit
bf1dc6b6a5
@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -123,6 +124,14 @@ func isUpstreamBusyError(err error) bool {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
// applyIdleTimeout cancels the request ctx via context.WithCancel
|
||||
// when the broadcaster silence window elapses — that surfaces as
|
||||
// context.Canceled (not DeadlineExceeded). Treat it as the same
|
||||
// "upstream busy" class so the caller produces a 503 + Retry-After
|
||||
// instead of a generic 500.
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return true
|
||||
}
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
return true
|
||||
}
|
||||
@ -131,6 +140,7 @@ func isUpstreamBusyError(err error) bool {
|
||||
// inner cause. Fall back to substring match for those.
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "context deadline exceeded") ||
|
||||
strings.Contains(msg, "context canceled") ||
|
||||
strings.Contains(msg, "EOF") ||
|
||||
strings.Contains(msg, "connection reset")
|
||||
}
|
||||
@ -286,7 +296,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
body = normalizedBody
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID)
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
defer cancelFwd()
|
||||
}
|
||||
@ -478,25 +488,59 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
return marshaledBody, a2aMethod, nil
|
||||
}
|
||||
|
||||
// idleTimeoutDuration is the per-dispatch silence window: if the
|
||||
// platform's broadcaster emits no events for this workspace for the
|
||||
// full duration, the dispatch ctx is cancelled. Resets on every
|
||||
// ACTIVITY_LOGGED / TASK_UPDATED / A2A_RESPONSE event for the
|
||||
// workspace, so a chat that's actively reporting tool calls or
|
||||
// streaming status updates never trips it. Picked to be longer than
|
||||
// any reasonable single-tool-use cadence (Claude Code's slowest
|
||||
// observed silence between tools is ~30s) but short enough that a
|
||||
// truly wedged runtime fails in 1 minute, not 5.
|
||||
const idleTimeoutDuration = 60 * time.Second
|
||||
|
||||
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
|
||||
// chains survive client disconnect (browser tab close). Default timeouts:
|
||||
// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can
|
||||
// override via the X-Timeout header (applied to ctx upstream in ProxyA2A).
|
||||
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
|
||||
// chains survive client disconnect (browser tab close). Two layers of
|
||||
// timeout per dispatch:
|
||||
//
|
||||
// - Idle timeout (always applied): cancels the dispatch when no
|
||||
// broadcaster events for the workspace fire for
|
||||
// idleTimeoutDuration. Any progress event resets the clock — so
|
||||
// a long but actively-streaming reply runs forever, while a
|
||||
// wedged runtime fails fast.
|
||||
// - Absolute ceiling (agent-to-agent only): 30 min cap as a
|
||||
// defence against runaway delegation loops. Canvas dispatches
|
||||
// have no absolute ceiling — the user can wait as long as they
|
||||
// want, the idle timer is the only hangup signal.
|
||||
//
|
||||
// Either layer is overridable by the X-Timeout header upstream in
|
||||
// ProxyA2A; X-Timeout: 0 explicitly disables the absolute ceiling.
|
||||
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, workspaceID, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
|
||||
forwardCtx := context.WithoutCancel(ctx)
|
||||
var cancel context.CancelFunc
|
||||
var ceilingCancel context.CancelFunc
|
||||
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
|
||||
if callerID == "" {
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute)
|
||||
} else {
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute)
|
||||
if callerID != "" {
|
||||
forwardCtx, ceilingCancel = context.WithTimeout(forwardCtx, 30*time.Minute)
|
||||
}
|
||||
// callerID == "" (canvas): no absolute ceiling. The idle
|
||||
// timeout below is the only deadline.
|
||||
}
|
||||
// Idle timeout — cancels the dispatch ctx after
|
||||
// idleTimeoutDuration of broadcaster silence for this workspace.
|
||||
// Always applied (canvas + agent-to-agent both benefit; the
|
||||
// ceiling above is a separate runaway-loop cap that only fires
|
||||
// for agent traffic). Combines with the ceiling cancel into a
|
||||
// single returned cancel func that the caller defers.
|
||||
forwardCtx, idleCancel := applyIdleTimeout(forwardCtx, h.broadcaster, workspaceID, idleTimeoutDuration)
|
||||
cancel := func() {
|
||||
idleCancel()
|
||||
if ceilingCancel != nil {
|
||||
ceilingCancel()
|
||||
}
|
||||
}
|
||||
req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
cancel()
|
||||
// Wrap the construction failure so the caller can distinguish it
|
||||
// from an upstream Do() error and produce the correct 500 response.
|
||||
return nil, nil, &proxyDispatchBuildError{err: err}
|
||||
@ -505,3 +549,52 @@ func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, bod
|
||||
resp, doErr := a2aClient.Do(req)
|
||||
return resp, cancel, doErr
|
||||
}
|
||||
|
||||
// applyIdleTimeout returns a child ctx that gets cancelled when no
|
||||
// broadcaster events for `workspaceID` arrive for `idle` duration.
|
||||
// Any incoming event resets the clock. The returned cancel func
|
||||
// MUST be called to clean up the goroutine + subscription.
|
||||
//
|
||||
// nil broadcaster or non-positive idle returns the parent ctx
|
||||
// unchanged (and a no-op cancel) so test paths that don't wire a
|
||||
// broadcaster keep working.
|
||||
func applyIdleTimeout(parent context.Context, b *events.Broadcaster, workspaceID string, idle time.Duration) (context.Context, context.CancelFunc) {
|
||||
if b == nil || idle <= 0 || workspaceID == "" {
|
||||
return parent, func() {}
|
||||
}
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
sub, unsub := b.SubscribeSSE(workspaceID)
|
||||
go func() {
|
||||
defer unsub()
|
||||
timer := time.NewTimer(idle)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case _, ok := <-sub:
|
||||
if !ok {
|
||||
// Subscription channel closed — fall back to
|
||||
// pure-timer mode. Don't cancel: another caller
|
||||
// may have closed our sub but the request itself
|
||||
// is still in flight. Let the timer or the
|
||||
// caller's defer drive cleanup.
|
||||
continue
|
||||
}
|
||||
// Stop+drain pattern so a fired-but-unread timer
|
||||
// doesn't double-cancel after the Reset.
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(idle)
|
||||
case <-timer.C:
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -1074,7 +1075,7 @@ func TestDispatchA2A_BuildRequestError(t *testing.T) {
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Malformed URL causes http.NewRequestWithContext to fail.
|
||||
_, cancel, err := handler.dispatchA2A(context.Background(), "http://%%badhost", []byte("{}"), "")
|
||||
_, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", "http://%%badhost", []byte("{}"), "")
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
@ -1097,13 +1098,13 @@ func TestDispatchA2A_CanvasTimeout(t *testing.T) {
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
resp, cancel, err := handler.dispatchA2A(context.Background(), srv.URL, []byte(`{}`), "")
|
||||
resp, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", srv.URL, []byte(`{}`), "")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if cancel == nil {
|
||||
t.Fatal("canvas caller (empty callerID) must set a timeout + return cancel")
|
||||
t.Fatal("canvas caller must return a cancel func (idle-timeout cleanup)")
|
||||
}
|
||||
cancel() // restore
|
||||
}
|
||||
@ -1118,20 +1119,23 @@ func TestDispatchA2A_AgentTimeout(t *testing.T) {
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
resp, cancel, err := handler.dispatchA2A(context.Background(), srv.URL, []byte(`{}`), "ws-caller")
|
||||
resp, cancel, err := handler.dispatchA2A(context.Background(), "ws-target", srv.URL, []byte(`{}`), "ws-caller")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if cancel == nil {
|
||||
t.Fatal("agent-to-agent caller must set a timeout + return cancel")
|
||||
t.Fatal("agent-to-agent caller must return a cancel func (idle + ceiling cleanup)")
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestDispatchA2A_ContextDeadline_NoCancelAdded(t *testing.T) {
|
||||
// When ctx already has a deadline, dispatchA2A must NOT layer its own
|
||||
// timeout (cancel should be nil).
|
||||
func TestDispatchA2A_ContextDeadline_NoExtraCeiling(t *testing.T) {
|
||||
// When ctx already has a deadline, dispatchA2A must not layer
|
||||
// its own absolute ceiling on top — the caller's deadline wins.
|
||||
// The idle-timer cleanup still produces a non-nil cancel func
|
||||
// (introduced by the always-on idle timeout) but the cancel func
|
||||
// is safe to call repeatedly and from a deferred path.
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
@ -1144,17 +1148,95 @@ func TestDispatchA2A_ContextDeadline_NoCancelAdded(t *testing.T) {
|
||||
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer ctxCancel()
|
||||
|
||||
resp, cancel, err := handler.dispatchA2A(ctx, srv.URL, []byte(`{}`), "")
|
||||
resp, cancel, err := handler.dispatchA2A(ctx, "ws-target", srv.URL, []byte(`{}`), "")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if cancel != nil {
|
||||
t.Error("cancel should be nil when ctx already has a deadline")
|
||||
cancel()
|
||||
if cancel == nil {
|
||||
t.Error("cancel must be non-nil (idle-timer cleanup)")
|
||||
}
|
||||
}
|
||||
|
||||
// --- applyIdleTimeout ---
|
||||
|
||||
// TestApplyIdleTimeout_FiresOnSilence verifies the helper cancels its
|
||||
// child ctx when no broadcaster events arrive for `idle` duration.
|
||||
// Uses a short idle window (60ms) so the test runs fast.
|
||||
func TestApplyIdleTimeout_FiresOnSilence(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
b := newTestBroadcaster()
|
||||
|
||||
parent, parentCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer parentCancel()
|
||||
|
||||
idleCtx, idleCancel := applyIdleTimeout(parent, b, "ws-silent", 60*time.Millisecond)
|
||||
defer idleCancel()
|
||||
|
||||
select {
|
||||
case <-idleCtx.Done():
|
||||
// expected — no events ever arrived for ws-silent
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("idleCtx never cancelled despite no events")
|
||||
}
|
||||
if !errors.Is(idleCtx.Err(), context.Canceled) {
|
||||
t.Errorf("idleCtx err = %v, want context.Canceled", idleCtx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
// TestApplyIdleTimeout_ResetsOnEvent verifies that a broadcaster event
|
||||
// for the workspace resets the timer. Sends one event mid-window and
|
||||
// confirms ctx is still alive after the original deadline would have
|
||||
// fired, but cancelled after a second silence window elapses.
|
||||
func TestApplyIdleTimeout_ResetsOnEvent(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
b := newTestBroadcaster()
|
||||
|
||||
parent, parentCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer parentCancel()
|
||||
|
||||
idle := 80 * time.Millisecond
|
||||
idleCtx, idleCancel := applyIdleTimeout(parent, b, "ws-active", idle)
|
||||
defer idleCancel()
|
||||
|
||||
// Send a progress event halfway through the window — should
|
||||
// extend the deadline by another `idle`.
|
||||
time.Sleep(idle / 2)
|
||||
b.BroadcastOnly("ws-active", "ACTIVITY_LOGGED", map[string]interface{}{"activity_type": "agent_log"})
|
||||
|
||||
// At t = idle (original deadline), ctx must still be alive
|
||||
// because the event reset the clock.
|
||||
select {
|
||||
case <-idleCtx.Done():
|
||||
t.Fatal("idleCtx cancelled despite mid-window event resetting the timer")
|
||||
case <-time.After(idle - (idle / 2) + 10*time.Millisecond):
|
||||
// ok — past the original deadline, still alive
|
||||
}
|
||||
|
||||
// Now wait for the second silence window to actually fire.
|
||||
select {
|
||||
case <-idleCtx.Done():
|
||||
// expected
|
||||
case <-time.After(idle + 200*time.Millisecond):
|
||||
t.Fatal("idleCtx never cancelled after the second silence window")
|
||||
}
|
||||
}
|
||||
|
||||
// TestApplyIdleTimeout_NilBroadcasterDegradesGracefully — nil
|
||||
// broadcaster (some test paths) returns the parent ctx unchanged.
|
||||
func TestApplyIdleTimeout_NilBroadcasterDegradesGracefully(t *testing.T) {
|
||||
parent := context.Background()
|
||||
idleCtx, cancel := applyIdleTimeout(parent, nil, "ws-x", 50*time.Millisecond)
|
||||
defer cancel()
|
||||
if idleCtx != parent {
|
||||
t.Error("nil broadcaster must return the parent ctx unchanged")
|
||||
}
|
||||
// And calling cancel must be safe.
|
||||
cancel()
|
||||
}
|
||||
|
||||
// --- handleA2ADispatchError ---
|
||||
|
||||
func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user