test(handlers): migrate 4x TestExecuteDelegation tests to real-Postgres integration (mc#664 Class 1)
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 13s
Harness Replays / detect-changes (pull_request) Successful in 11s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 12s
CI / Detect changes (pull_request) Successful in 25s
qa-review / approved (pull_request) Failing after 12s
E2E API Smoke Test / detect-changes (pull_request) Successful in 26s
security-review / approved (pull_request) Failing after 13s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 23s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 23s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 21s
gate-check-v3 / gate-check (pull_request) Successful in 19s
sop-tier-check / tier-check (pull_request) Successful in 11s
Harness Replays / Harness Replays (pull_request) Successful in 3s
CI / Canvas (Next.js) (pull_request) Successful in 5s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 5s
CI / Python Lint & Test (pull_request) Successful in 6s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 7s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 2m56s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 4m11s
CI / Platform (Go) (pull_request) Failing after 8m20s
CI / all-required (pull_request) Failing after 6s

The pre-migration sqlmock-based tests in delegation_test.go (4 tests +
3 helpers expectExecuteDelegationBase/Success/Failed) had been failing
silently for weeks behind `Platform (Go)`'s continue-on-error: true.
The root cause is structural, not a missing expectation: every new DB
query the production executeDelegation path picks up imposes a fresh
sqlmock-expectation tax, and the helpers fell behind on five recent
additions:

  1. last_outbound_at UPDATE   (a2a_proxy_helpers.go logA2ASuccess)
  2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit)
  3. lookupRuntime SELECT      (a2a_proxy.go mock-runtime short-circuit)
  4. a2a_receive INSERT into activity_logs (LogActivity goroutine)
  5. recordLedgerStatus writes (delegation.go + delegation_ledger.go)

Per feedback_real_subprocess_test_for_boot_path +
feedback_local_must_mimic_production +
feedback_mandatory_local_e2e_before_ship, the right fix isn't to patch
the helpers — it's to run these tests against a real Postgres so the
downstream queries fire for real and the test signal tracks production
drift automatically. That eliminates the structural anti-pattern.

This commit:

  - Deletes the 4 sqlmock TestExecuteDelegation_* tests and the 3
    helpers entirely from delegation_test.go (no backward-compat shim;
    the helpers were the failure surface, not load-bearing).
  - Adds 4 TestIntegration_ExecuteDelegation_* tests in a new file
    delegation_executor_integration_test.go under `//go:build integration`
    so the existing `Handlers Postgres Integration` CI job picks them
    up via its `-run "^TestIntegration_"` runner.

The new tests seed real workspaces + delegations rows, run
executeDelegation end-to-end (including the production retry path and
the ledger gate flipped on via DELEGATION_LEDGER_WRITE=1), and assert
both activity_logs and delegations.status land as expected.

Wall-clock: ~9s × 3 partial-body tests + 1s clean = 28s end-to-end.
Within CI's 5min timeout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-11 23:03:36 -07:00 committed by molecule-operator
parent b462270201
commit 54ee033973
2 changed files with 407 additions and 315 deletions

View File

@ -0,0 +1,407 @@
//go:build integration
// +build integration
// delegation_executor_integration_test.go — REAL Postgres integration tests
// for executeDelegation's delivery-confirmed proxy error regression path
// (issue #159 + mc#664 Class 1 follow-up).
//
// Background — mc#664 cascade root cause
// --------------------------------------
// Pre-mc#664 these 4 cases lived in delegation_test.go as sqlmock-based
// unit tests, driven by 3 helpers (expectExecuteDelegationBase /
// expectExecuteDelegationSuccess / expectExecuteDelegationFailed).
// They went stale as production code added new DB queries to
// executeDelegation's downstream paths:
//
// 1. last_outbound_at UPDATE (a2a_proxy_helpers.go logA2ASuccess)
// 2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit)
// 3. lookupRuntime SELECT (a2a_proxy.go mock-runtime short-circuit)
// 4. a2a_receive INSERT into activity_logs (LogActivity goroutine)
// 5. recordLedgerStatus writes (delegation.go + delegation_ledger.go)
//
// Each new query was a fresh sqlmock-expectation tax on the helpers, and
// the helpers fell behind. The mismatched expectations broke the 4 tests
// + their failures were masked for weeks behind `Platform (Go)`'s
// continue-on-error: true.
//
// Right fix per
// - feedback_real_subprocess_test_for_boot_path
// - feedback_local_must_mimic_production
// - feedback_mandatory_local_e2e_before_ship
// is to migrate these tests to real Postgres so the downstream queries
// run for real and the test signal tracks production drift automatically.
// That eliminates the structural anti-pattern — every new query the
// production code adds is automatically covered by these tests with no
// helper-maintenance tax.
//
// Why these tests are SLOW (~9s each for the partial-body cases)
// --------------------------------------------------------------
// executeDelegation's retry path (delegation.go:334) waits 8 seconds
// between the first failed proxy attempt and the retry — the production
// `delegationRetryDelay` const. The pre-migration sqlmock tests appear to
// have been broken in part because they set up the listener to handle a
// SINGLE Accept; the retry then connected to a dead socket and the rest
// of the test went off-rails. The integration version uses a long-lived
// listener loop that serves the same partial-body response on every
// connection, so the retry produces the same outcome and the
// isDeliveryConfirmedSuccess gate makes a clean decision.
//
// 9s × 3 partial-body tests + ~1s for the clean path = ~28s end-to-end.
// Still well under CI's `-timeout 5m`. Local devs running `-run TestInt`
// should pass `-timeout 60s` or higher.
//
// Build tag + naming
// ------------------
// `//go:build integration` + `TestIntegration_*` prefix so the existing
// `Handlers Postgres Integration` CI workflow picks them up via its
// `-tags=integration ... -run "^TestIntegration_"` runner. The same
// shape as delegation_ledger_integration_test.go (the file these tests
// were modelled after).
//
// Run locally:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply migrations (replays the Handlers Postgres Integration loop)
// for m in workspace-server/migrations/*.sql; do
// [[ "$m" == *.down.sql ]] && continue
// PGPASSWORD=test psql -h localhost -p 55432 -U postgres -d molecule \
// -v ON_ERROR_STOP=1 -f "$m" >/dev/null 2>&1 || true
// done
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration -timeout 60s ./internal/handlers/ \
// -run TestIntegration_ExecuteDelegation -v
package handlers
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// Real UUIDs — required because workspaces.id is UUID (not TEXT). The
// pre-migration sqlmock tests passed "ws-source-159"/"ws-target-159"
// strings, which sqlmock happily accepted but a real Postgres rejects.
const (
integExecSourceID = "11111111-aaaa-aaaa-aaaa-000000000159"
integExecTargetID = "22222222-aaaa-aaaa-aaaa-000000000159"
integExecDelegationID = "del-integ-159-test"
)
// seedExecuteDelegationFixtures inserts the source + target workspace rows
// and the queued delegations ledger row that executeDelegation expects to
// observe. Mirrors the pre-fix sqlmock helper's intent but in real DB
// terms.
//
// Per-test cleanup is handled by integrationDB(t) which DELETE-purges
// delegations before each test; workspaces/activity_logs are scrubbed
// here so cross-test fixture leak doesn't surface.
func seedExecuteDelegationFixtures(t *testing.T) {
t.Helper()
conn := mdb.DB
if _, err := conn.ExecContext(context.Background(),
`DELETE FROM activity_logs WHERE workspace_id IN ($1, $2)`,
integExecSourceID, integExecTargetID,
); err != nil {
t.Fatalf("cleanup activity_logs: %v", err)
}
if _, err := conn.ExecContext(context.Background(),
`DELETE FROM workspaces WHERE id IN ($1, $2)`,
integExecSourceID, integExecTargetID,
); err != nil {
t.Fatalf("cleanup workspaces: %v", err)
}
for _, id := range []string{integExecSourceID, integExecTargetID} {
if _, err := conn.ExecContext(context.Background(),
`INSERT INTO workspaces (id, name, status) VALUES ($1, $2, 'online')`,
id, "integ-"+id[:8],
); err != nil {
t.Fatalf("seed workspaces %s: %v", id, err)
}
}
// Seed the queued delegation row so recordLedgerStatus's first
// SetStatus("dispatched", ...) has somewhere to transition from.
// Without this row the SetStatus is a defensive no-op (logs "row
// missing, skipping") — the rest of the executeDelegation path still
// runs, but ledger-side state is silently lost. We want it real.
recordLedgerInsert(context.Background(),
integExecSourceID, integExecTargetID, integExecDelegationID,
"integration-test task", "")
}
// startPartialBodyServer spins up a raw TCP listener that responds to
// every connection with the given HTTP response prefix (headers + start
// of body) and then closes the connection. Go's http.Client sees io.EOF
// when reading the body. Returns the URL + a stop func.
//
// Unlike httptest.NewServer this serves repeat connections — necessary
// because executeDelegation's #74 retry path will reconnect once.
func startPartialBodyServer(t *testing.T, responseHead string) (url string, stop func()) {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
var done int32
go func() {
for atomic.LoadInt32(&done) == 0 {
conn, err := ln.Accept()
if err != nil {
return
}
go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 2048)
_, _ = c.Read(buf)
_, _ = c.Write([]byte(responseHead))
// Close immediately — client sees EOF mid body-read.
}(conn)
}
}()
return "http://" + ln.Addr().String(), func() {
atomic.StoreInt32(&done, 1)
ln.Close()
}
}
// activityRowsByStatus counts activity_logs rows that match the given
// (workspace_id, status) pair. Used to assert executeDelegation's
// INSERT INTO activity_logs landed (success path: status='completed';
// failure path: status='failed' or 'queued' depending on branch).
func activityRowsByStatus(t *testing.T, workspaceID, status string) int {
t.Helper()
var n int
if err := mdb.DB.QueryRowContext(context.Background(),
`SELECT count(*) FROM activity_logs WHERE workspace_id = $1 AND status = $2`,
workspaceID, status,
).Scan(&n); err != nil {
t.Fatalf("activity count(%s, %s): %v", workspaceID, status, err)
}
return n
}
// delegationLedgerStatus returns the current delegations.status for the
// seeded delegation_id, or "" if the row is missing. Real-Postgres
// version of "did the ledger transition we expected actually land".
func delegationLedgerStatus(t *testing.T, delegationID string) string {
t.Helper()
var s string
err := mdb.DB.QueryRowContext(context.Background(),
`SELECT status FROM delegations WHERE delegation_id = $1`, delegationID,
).Scan(&s)
if err != nil {
t.Fatalf("ledger status(%s): %v", delegationID, err)
}
return s
}
// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess
// is the primary regression test for issue #159 in real-Postgres form.
// Scenario: target sends a 200 response with declared Content-Length but
// closes the connection mid-body; client gets io.EOF on body read.
// proxyA2ARequest captures status=200 + partial body + transport error;
// executeDelegation's isDeliveryConfirmedSuccess branch must route to
// handleSuccess so the row lands as 'completed' (not 'failed').
//
// Real-Postgres advantage over the sqlmock version: this test will fail
// if a future refactor adds a new DB write to the success path without
// updating any helper — sqlmock would have required reflexive expectation
// updates; real Postgres just runs.
//
// Timing: executeDelegation's first attempt returns (200, <partial>, EOF
// → BadGateway-class err). isTransientProxyError(BadGateway)=true so the
// caller sleeps `delegationRetryDelay` (8s) and retries. Our listener
// loop serves the same partial response on attempt 2, producing the
// same (200, <partial>, BadGateway) triple. isDeliveryConfirmedSuccess
// then fires (status=200 ∈ [200,300) + body > 0 + err != nil) → success.
func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// 200 OK with declared Content-Length=100 but only 74 bytes of body.
// Connection closes after the partial body → client io.EOF.
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
agentURL, stop := startPartialBodyServer(t, resp)
defer stop()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
// executeDelegation is synchronous here; the 8s retry sleep is INSIDE
// the call. We still need a small buffer for the async logA2ASuccess /
// last_outbound_at goroutines that fan out after the success branch.
time.Sleep(500 * time.Millisecond)
// Assert the executeDelegation success path wrote the activity_logs
// completion row + transitioned the ledger to completed.
if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 {
t.Errorf("expected 1 'completed' activity_logs row, got %d", got)
}
if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" {
t.Errorf("delegation ledger: want status=completed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed —
// 500 with partial body + connection drop. The retry produces the same
// 500 partial. isDeliveryConfirmedSuccess fails on status>=300 → falls
// through to the failure branch. Pins that the new condition didn't
// accidentally widen the success branch.
func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared 100
agentURL, stop := startPartialBodyServer(t, resp)
defer stop()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 {
t.Errorf("expected 1 'failed' activity_logs row, got %d", got)
}
if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" {
t.Errorf("delegation ledger: want status=failed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed —
// 502 Bad Gateway with empty body, normal close. proxyA2ARequest returns
// (502, "", error). isDeliveryConfirmedSuccess requires len(respBody) > 0
// → false → falls through to the failure branch. isTransientProxyError
// (BadGateway) = true so we get a retry that also fails, then 'failed'.
func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 {
t.Errorf("expected 1 'failed' activity_logs row, got %d", got)
}
if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" {
t.Errorf("delegation ledger: want status=failed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged —
// baseline: clean 200 with full body, no error. proxyErr == nil so
// isDeliveryConfirmedSuccess never fires and no retry runs (fast path).
// Pins that the new error-recovery branch didn't regress the most
// common code path.
func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 {
t.Errorf("expected 1 'completed' activity_logs row, got %d", got)
}
if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" {
t.Errorf("delegation ledger: want status=completed, got %q", s)
}
}

View File

@ -5,10 +5,8 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) {
t.Errorf("insertOutcomeUnknown must not collide with insertOK")
}
}
// ==================== executeDelegation — delivery-confirmed proxy error regression tests ====================
//
// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a
// non-empty response body with a 2xx status code, executeDelegation must treat it as success.
// The error is a delivery/transport error (e.g., connection reset after response was received).
// Previously, executeDelegation marked these as "failed" even though the work was done,
// causing retry storms and "error" rendering in canvas despite the response being available.
//
// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call
// executeDelegation directly, and verify the activity_logs status and delegation status.
const testDelegationID = "del-159-test"
const testSourceID = "ws-source-159"
const testTargetID = "ws-target-159"
// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that
// executeDelegation always makes, regardless of outcome.
func expectExecuteDelegationBase(mock sqlmock.Sqlmock) {
// updateDelegationStatus: dispatched
// Uses prefix match — sqlmock regexes match the full query string.
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("dispatched", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// CanCommunicate: source != target → fires two getWorkspaceRef lookups.
// Both test fixtures have parent_id = NULL (root-level siblings) → allowed.
// Order matches call order: source first, then target.
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id").
WithArgs(testSourceID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil))
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id").
WithArgs(testTargetID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil))
// resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = ").
WithArgs(testTargetID).
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online"))
}
// expectExecuteDelegationSuccess sets up expectations for a completed delegation.
func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) {
// INSERT activity_logs for delegation completion (response_body status = 'completed')
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed").
WillReturnResult(sqlmock.NewResult(0, 1))
// updateDelegationStatus: completed
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("completed", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// expectExecuteDelegationFailed sets up expectations for a failed delegation.
func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) {
// INSERT activity_logs for delegation failure (response_body status = 'failed')
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed").
WillReturnResult(sqlmock.NewResult(0, 1))
// updateDelegationStatus: failed
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression
// test for issue #159. The scenario:
// - Attempt 1: server sends 200 OK headers + partial body, then closes connection.
// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry.
// - Attempt 2: server does the same thing (closes after partial body).
// proxyA2ARequest: same (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon,
// or we get one more attempt). For the test we let it run.
// POST-FIX: the executeDelegation new condition sees status=200, body=<partial>, err!=nil
// and routes to handleSuccess immediately.
//
// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded)
// even when the server sent 200, so the condition always failed. Post-fix, status=200 is
// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody).
// In this test the retry ultimately succeeds (server eventually sends full body), but
// the critical assertion is that a 2xx partial-body delivery-confirmed response is never
// classified as "failed" — it always routes to success.
func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server that sends a 200 response with declared Content-Length but closes
// the connection before sending all bytes. Go's http.Client sees io.EOF on
// the body read. proxyA2ARequest captures the partial body + status=200 and
// returns (200, <partial>, error). executeDelegation's new condition sees
// status=200 + body > 0 + error != nil → routes to handleSuccess.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
// Consume the HTTP request
buf := make([]byte, 2048)
conn.Read(buf)
// Send 200 OK with Content-Length: 100 but only 74 bytes of body
// (less than declared length → io.LimitReader returns io.EOF after reading all 74)
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
// Execute synchronously (not as a goroutine) so we can check DB state immediately.
// The handler fires it as goroutine; we call it directly for deterministic testing.
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": "1",
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond) // let DB writes settle
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx
// status code (e.g., 500 Internal Server Error with partial body read before connection drop).
// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure.
func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 500 with declared Content-Length but closes connection early.
// proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error.
// Returns (500, <partial_body>, BadGateway).
// New condition: status=500 is NOT >= 200 && < 300 → routes to failure.
// isTransientProxyError(500) = false → no retry.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
buf := make([]byte, 2048)
conn.Read(buf)
// 500 with Content-Length: 100 but only ~60 bytes of body
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body.
// The new condition requires len(respBody) > 0, so empty body routes to failure.
func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil.
// New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300
// → len(respBody) == 0 → condition FALSE → falls through to failure.
// isTransientProxyError(502) is TRUE → retry → same result → failure.
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
// No body — connection closes normally
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
// First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase
expectExecuteDelegationBase(mock)
// Second attempt (retry): updateDelegationStatus(dispatched) again
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("dispatched", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// Failure: INSERT + UPDATE (failed)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response
// (no error, 200 with body) is unaffected by the new condition. This is the baseline:
// proxyErr == nil so the new condition never fires.
func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}