diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index 79b8c5f9..7399f54c 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -163,7 +163,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { }) // Fire-and-forget: send A2A in background goroutine - go h.executeDelegation(sourceID, body.TargetID, delegationID, a2aBody) + go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody) // Broadcast event so canvas shows delegation in real-time h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{ @@ -317,19 +317,22 @@ const delegationRetryDelay = 8 * time.Second // a subtle stack-sharing race between the inlined body and the test goroutine // causes the test to hang. The log calls prevent inlining (Go cannot inline // functions that call the log package). This is a known Go compiler behaviour. -// runtime.LockOSThread() below provides an additional hardening: pinning the +// runtime.LockOSThread() provides an additional hardening: pinning the // goroutine to a single OS thread eliminates any scheduler-migration races. +// The caller provides ctx (which carries the deadline/budget); no internal +// context.WithTimeout is created here. -func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID string, a2aBody []byte) { +// executeDelegation runs the A2A dispatch for a delegation. ctx controls the +// entire lifecycle: its timeout bounds all DB ops, proxy calls, and retries. +// Pass context.Background() when no external deadline applies (e.g. tests). +func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, targetID, delegationID string, a2aBody []byte) { runtime.LockOSThread() // pin to thread; prevents scheduler-migration races in integration tests - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() log.Printf("Delegation %s: %s → %s (dispatched)", delegationID, sourceID, targetID) log.Printf("Delegation %s: step=updating_dispatched_status", delegationID) // Update status: pending → dispatched - h.updateDelegationStatus(sourceID, delegationID, "dispatched", "") + h.updateDelegationStatus(ctx, sourceID, delegationID, "dispatched", "") log.Printf("Delegation %s: step=broadcasting_dispatched", delegationID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{ "delegation_id": delegationID, "target_id": targetID, "status": "dispatched", @@ -372,7 +375,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s if proxyErr != nil { log.Printf("Delegation %s: step=handling_failure err=%v", delegationID, proxyErr) log.Printf("Delegation %s: failed — %s", delegationID, proxyErr.Error()) - h.updateDelegationStatus(sourceID, delegationID, "failed", proxyErr.Error()) + h.updateDelegationStatus(ctx, sourceID, delegationID, "failed", proxyErr.Error()) if _, err := db.DB.ExecContext(ctx, ` INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, status, error_detail) @@ -404,7 +407,7 @@ handleSuccess: // the user. if status == http.StatusAccepted && isQueuedProxyResponse(respBody) { log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID) - h.updateDelegationStatus(sourceID, delegationID, "queued", "") + h.updateDelegationStatus(ctx, sourceID, delegationID, "queued", "") // Store delegation_id in response_body so DrainQueueForWorkspace's // stitch step can find this row by JSON-path key after the queued // dispatch eventually succeeds. Without the key, the drain finds @@ -454,7 +457,7 @@ handleSuccess: // delegation_ledger_integration_test.go. recordLedgerStatus(ctx, delegationID, "completed", "", responseText) log.Printf("Delegation %s: step=updating_completed_status", delegationID) - h.updateDelegationStatus(sourceID, delegationID, "completed", "") + h.updateDelegationStatus(ctx, sourceID, delegationID, "completed", "") log.Printf("Delegation %s: step=broadcasting_complete", delegationID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{ "delegation_id": delegationID, @@ -467,8 +470,8 @@ handleSuccess: } // updateDelegationStatus updates the status of a delegation record in activity_logs. -func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) { - ctx := context.Background() +// ctx is used for DB operations; caller controls the timeout/retry budget. +func (h *DelegationHandler) updateDelegationStatus(ctx context.Context, workspaceID, delegationID, status, errorDetail string) { if _, err := db.DB.ExecContext(ctx, ` UPDATE activity_logs SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END @@ -582,7 +585,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) { recordLedgerStatus(ctx, delegationID, "completed", "", body.ResponsePreview) } - h.updateDelegationStatus(sourceID, delegationID, body.Status, body.Error) + h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error) if body.Status == "completed" { respJSON, _ := json.Marshal(map[string]interface{}{ diff --git a/workspace-server/internal/handlers/delegation_executor_integration_test.go b/workspace-server/internal/handlers/delegation_executor_integration_test.go index f86dfebe..a021a373 100644 --- a/workspace-server/internal/handlers/delegation_executor_integration_test.go +++ b/workspace-server/internal/handlers/delegation_executor_integration_test.go @@ -314,7 +314,7 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce start := time.Now() runWithTimeout(t, 30*time.Second, func() { - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody) }) t.Logf("executeDelegation took %v", time.Since(start)) @@ -366,7 +366,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing }) start := time.Now() runWithTimeout(t, 30*time.Second, func() { - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody) }) t.Logf("executeDelegation took %v", time.Since(start)) @@ -415,7 +415,7 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test }) start := time.Now() runWithTimeout(t, 30*time.Second, func() { - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody) }) t.Logf("executeDelegation took %v", time.Since(start)) @@ -463,7 +463,7 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T }) start := time.Now() runWithTimeout(t, 30*time.Second, func() { - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody) }) t.Logf("executeDelegation took %v", time.Since(start)) @@ -508,7 +508,7 @@ func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) { }) start := time.Now() runWithTimeout(t, 30*time.Second, func() { - dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody) + dh.executeDelegation(context.Background(), testSourceID, testTargetID, testDelegationID, a2aBody) }) t.Logf("executeDelegation took %v", time.Since(start))