handlers: pass cancellable context through executeDelegation

executeDelegation previously created its own context.Background() with a
30-minute timeout internally, so updateDelegationStatus and all DB ops
ignored external cancellation. The test helper runWithTimeout could fire
its 30-second deadline but the goroutine kept running for the full 30
minutes because the cancellation never propagated.

Fix: add ctx context.Context as first parameter to both executeDelegation
and updateDelegationStatus. The caller now provides the context budget —
Delegate() passes c.Request.Context() (5 min idle timeout), and tests pass
context.Background(). This means runWithTimeout's deadline now actually
terminates the goroutine when it fires.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-12 16:08:54 +00:00
parent 1bd1180199
commit ce2db75fa1
2 changed files with 20 additions and 17 deletions

View File

@ -163,7 +163,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
})
// Fire-and-forget: send A2A in background goroutine
go h.executeDelegation(sourceID, body.TargetID, delegationID, a2aBody)
go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody)
// Broadcast event so canvas shows delegation in real-time
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{
@ -317,19 +317,22 @@ const delegationRetryDelay = 8 * time.Second
// a subtle stack-sharing race between the inlined body and the test goroutine
// causes the test to hang. The log calls prevent inlining (Go cannot inline
// functions that call the log package). This is a known Go compiler behaviour.
// runtime.LockOSThread() below provides an additional hardening: pinning the
// runtime.LockOSThread() provides an additional hardening: pinning the
// goroutine to a single OS thread eliminates any scheduler-migration races.
// The caller provides ctx (which carries the deadline/budget); no internal
// context.WithTimeout is created here.
func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID string, a2aBody []byte) {
// executeDelegation runs the A2A dispatch for a delegation. ctx controls the
// entire lifecycle: its timeout bounds all DB ops, proxy calls, and retries.
// Pass context.Background() when no external deadline applies (e.g. tests).
func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, targetID, delegationID string, a2aBody []byte) {
runtime.LockOSThread() // pin to thread; prevents scheduler-migration races in integration tests
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
log.Printf("Delegation %s: %s → %s (dispatched)", delegationID, sourceID, targetID)
log.Printf("Delegation %s: step=updating_dispatched_status", delegationID)
// Update status: pending → dispatched
h.updateDelegationStatus(sourceID, delegationID, "dispatched", "")
h.updateDelegationStatus(ctx, sourceID, delegationID, "dispatched", "")
log.Printf("Delegation %s: step=broadcasting_dispatched", delegationID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{
"delegation_id": delegationID, "target_id": targetID, "status": "dispatched",
@ -372,7 +375,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
if proxyErr != nil {
log.Printf("Delegation %s: step=handling_failure err=%v", delegationID, proxyErr)
log.Printf("Delegation %s: failed — %s", delegationID, proxyErr.Error())
h.updateDelegationStatus(sourceID, delegationID, "failed", proxyErr.Error())
h.updateDelegationStatus(ctx, sourceID, delegationID, "failed", proxyErr.Error())
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, status, error_detail)
@ -404,7 +407,7 @@ handleSuccess:
// the user.
if status == http.StatusAccepted && isQueuedProxyResponse(respBody) {
log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID)
h.updateDelegationStatus(sourceID, delegationID, "queued", "")
h.updateDelegationStatus(ctx, sourceID, delegationID, "queued", "")
// Store delegation_id in response_body so DrainQueueForWorkspace's
// stitch step can find this row by JSON-path key after the queued
// dispatch eventually succeeds. Without the key, the drain finds
@ -454,7 +457,7 @@ handleSuccess:
// delegation_ledger_integration_test.go.
recordLedgerStatus(ctx, delegationID, "completed", "", responseText)
log.Printf("Delegation %s: step=updating_completed_status", delegationID)
h.updateDelegationStatus(sourceID, delegationID, "completed", "")
h.updateDelegationStatus(ctx, sourceID, delegationID, "completed", "")
log.Printf("Delegation %s: step=broadcasting_complete", delegationID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{
"delegation_id": delegationID,
@ -467,8 +470,8 @@ handleSuccess:
}
// updateDelegationStatus updates the status of a delegation record in activity_logs.
func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) {
ctx := context.Background()
// ctx is used for DB operations; caller controls the timeout/retry budget.
func (h *DelegationHandler) updateDelegationStatus(ctx context.Context, workspaceID, delegationID, status, errorDetail string) {
if _, err := db.DB.ExecContext(ctx, `
UPDATE activity_logs
SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END
@ -582,7 +585,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
recordLedgerStatus(ctx, delegationID, "completed", "", body.ResponsePreview)
}
h.updateDelegationStatus(sourceID, delegationID, body.Status, body.Error)
h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error)
if body.Status == "completed" {
respJSON, _ := json.Marshal(map[string]interface{}{

View File

@ -314,7 +314,7 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
start := time.Now()
runWithTimeout(t, 30*time.Second, func() {
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
@ -366,7 +366,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func() {
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
@ -415,7 +415,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func() {
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
@ -463,7 +463,7 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func() {
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
@ -508,7 +508,7 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func() {
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))