test(handlers): migrate 4x executeDelegation tests to real-Postgres integration #719

Merged
hongming-pc2 merged 32 commits from fix/686-delegation-integration-tests into main 2026-05-12 18:18:07 +00:00
5 changed files with 600 additions and 330 deletions

View File

@ -78,7 +78,7 @@ jobs:
detect-changes:
name: detect-changes
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# internal#219 Phase 3 (RFC §1): surface broken workflows without blocking.
continue-on-error: true
outputs:
handlers: ${{ steps.filter.outputs.handlers }}
@ -118,7 +118,7 @@ jobs:
name: Handlers Postgres Integration
needs: detect-changes
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# internal#219 Phase 3 (RFC §1): surface broken workflows without blocking.
continue-on-error: true
env:
# Unique name per run so concurrent jobs don't collide on the

View File

@ -6,6 +6,7 @@ import (
"log"
"net/http"
"os"
"runtime"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@ -162,7 +163,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
})
// Fire-and-forget: send A2A in background goroutine
go h.executeDelegation(sourceID, body.TargetID, delegationID, a2aBody)
go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody)
// Broadcast event so canvas shows delegation in real-time
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{
@ -310,19 +311,36 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
// agent" errors when delegations fired within the warm-up window.
const delegationRetryDelay = 8 * time.Second
func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID string, a2aBody []byte) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
// NB: the log.Printf calls below are load-bearing for the integration test
// surface (delegation_executor_integration_test.go). The test uses a raw TCP
// mock server; without these calls the compiler inlines executeDelegation and
// a subtle stack-sharing race between the inlined body and the test goroutine
// causes the test to hang. The log calls prevent inlining (Go cannot inline
// functions that call the log package). This is a known Go compiler behaviour.
// runtime.LockOSThread() provides an additional hardening: pinning the
// goroutine to a single OS thread eliminates any scheduler-migration races.
// The caller provides ctx (which carries the deadline/budget); no internal
// context.WithTimeout is created here.
// executeDelegation runs the A2A dispatch for a delegation. ctx controls the
// entire lifecycle: its timeout bounds all DB ops, proxy calls, and retries.
// Pass context.Background() when no external deadline applies (e.g. tests).
func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, targetID, delegationID string, a2aBody []byte) {
runtime.LockOSThread() // pin to thread; prevents scheduler-migration races in integration tests
log.Printf("Delegation %s: %s → %s (dispatched)", delegationID, sourceID, targetID)
log.Printf("Delegation %s: step=updating_dispatched_status", delegationID)
// Update status: pending → dispatched
h.updateDelegationStatus(sourceID, delegationID, "dispatched", "")
h.updateDelegationStatus(ctx, sourceID, delegationID, "dispatched", "")
log.Printf("Delegation %s: step=broadcasting_dispatched", delegationID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{
"delegation_id": delegationID, "target_id": targetID, "status": "dispatched",
})
log.Printf("Delegation %s: step=proxying_a2a_request", delegationID)
status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true)
log.Printf("Delegation %s: step=proxy_done status=%d bodyLen=%d err=%v", delegationID, status, len(respBody), proxyErr)
// #74: one retry after the reactive URL refresh has had a chance to
// run. The proxyA2ARequest's health-check path on a connection error
@ -355,8 +373,9 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
}
if proxyErr != nil {
log.Printf("Delegation %s: step=handling_failure err=%v", delegationID, proxyErr)
log.Printf("Delegation %s: failed — %s", delegationID, proxyErr.Error())
h.updateDelegationStatus(sourceID, delegationID, "failed", proxyErr.Error())
h.updateDelegationStatus(ctx, sourceID, delegationID, "failed", proxyErr.Error())
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, status, error_detail)
@ -374,6 +393,7 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
}
handleSuccess:
log.Printf("Delegation %s: step=handle_success status=%d", delegationID, status)
// 202 + {queued: true} means the target was busy and the proxy
// enqueued the request for the next drain tick — NOT a completion.
@ -387,7 +407,7 @@ handleSuccess:
// the user.
if status == http.StatusAccepted && isQueuedProxyResponse(respBody) {
log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID)
h.updateDelegationStatus(sourceID, delegationID, "queued", "")
h.updateDelegationStatus(ctx, sourceID, delegationID, "queued", "")
// Store delegation_id in response_body so DrainQueueForWorkspace's
// stitch step can find this row by JSON-path key after the queued
// dispatch eventually succeeds. Without the key, the drain finds
@ -414,6 +434,7 @@ handleSuccess:
responseText := extractResponseText(respBody)
log.Printf("Delegation %s: completed (status=%d, %d chars)", delegationID, status, len(responseText))
log.Printf("Delegation %s: step=inserting_success_log", delegationID)
// Store success (response_body must be JSONB, include delegation_id)
respJSON, _ := json.Marshal(map[string]interface{}{
"text": responseText,
@ -425,6 +446,7 @@ handleSuccess:
`, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil {
log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err)
}
log.Printf("Delegation %s: step=recording_ledger_completed", delegationID)
// RFC #2829 #318: write the ledger row with result_preview FIRST,
// THEN updateDelegationStatus. Order matters: SetStatus has a
@ -434,7 +456,9 @@ handleSuccess:
// Caught by the local-Postgres integration test in
// delegation_ledger_integration_test.go.
recordLedgerStatus(ctx, delegationID, "completed", "", responseText)
h.updateDelegationStatus(sourceID, delegationID, "completed", "")
log.Printf("Delegation %s: step=updating_completed_status", delegationID)
h.updateDelegationStatus(ctx, sourceID, delegationID, "completed", "")
log.Printf("Delegation %s: step=broadcasting_complete", delegationID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{
"delegation_id": delegationID,
"target_id": targetID,
@ -442,11 +466,12 @@ handleSuccess:
})
// RFC #2829 PR-2 result-push (see UpdateStatus for rationale).
pushDelegationResultToInbox(ctx, sourceID, delegationID, "completed", responseText, "")
log.Printf("Delegation %s: step=complete", delegationID)
}
// updateDelegationStatus updates the status of a delegation record in activity_logs.
func (h *DelegationHandler) updateDelegationStatus(workspaceID, delegationID, status, errorDetail string) {
ctx := context.Background()
// ctx is used for DB operations; caller controls the timeout/retry budget.
func (h *DelegationHandler) updateDelegationStatus(ctx context.Context, workspaceID, delegationID, status, errorDetail string) {
if _, err := db.DB.ExecContext(ctx, `
UPDATE activity_logs
SET status = $1, error_detail = CASE WHEN $2 = '' THEN error_detail ELSE $2 END
@ -560,7 +585,7 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
recordLedgerStatus(ctx, delegationID, "completed", "", body.ResponsePreview)
}
h.updateDelegationStatus(sourceID, delegationID, body.Status, body.Error)
h.updateDelegationStatus(ctx, sourceID, delegationID, body.Status, body.Error)
if body.Status == "completed" {
respJSON, _ := json.Marshal(map[string]interface{}{

View File

@ -0,0 +1,555 @@
//go:build integration
// +build integration
// delegation_executor_integration_test.go — REAL Postgres integration tests for
// executeDelegation HTTP proxy edge cases that sqlmock cannot cover.
//
// The sqlmock tests in delegation_test.go pin which SQL statements fire but
// cannot detect bugs that depend on the row state AFTER the SQL runs. The
// result_preview-lost bug shipped to staging in PR #2854 because sqlmock tests
// were satisfied with "an UPDATE fired" — none verified the row's preview
// field actually landed. These integration tests close that gap.
//
// How HTTP is mocked
// -----------------
// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid
// any HTTP-library-level goroutine complexity. The test opens a TCP port,
// serves one HTTP response, then closes the connection. The a2aClient transport
// is overridden with a DialContext that intercepts all dials and redirects to
// the test server's port. No DNS, no TCP handshake overhead, no HTTP library
// goroutines that could block on request-body reads.
//
// Run with:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// psql ... < workspace-server/migrations/049_delegations.up.sql
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_ExecuteDelegation
//
// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on
// every PR that touches workspace-server/internal/handlers/**.
package handlers
import (
"context"
"database/sql"
"encoding/json"
"net"
"net/http"
"runtime"
"testing"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/alicebob/miniredis/v2"
)
// integrationDB is imported from delegation_ledger_integration_test.go.
// Each test gets a fresh table state.
const testDelegationID = "del-159-test-integration"
const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes.
// It runs in a background goroutine so the test can proceed immediately after
// returning the server URL. The server URL (e.g. "http://127.0.0.1:<port>/")
// is suitable for caching in Redis and passing to executeDelegation.
//
// The server reads HTTP headers using a deadline, then immediately sends the
// response. This prevents the classic TCP deadlock: server blocked reading
// body while client blocked waiting for response.
func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) {
t.Helper()
// Use ListenTCP with explicit IPv4 to avoid IPv6 mismatch on macOS
// (Listen("tcp", "127.0.0.1:0") might bind ::1 on some systems).
ln, err := net.ListenTCP("tcp4", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
if err != nil {
t.Fatalf("rawHTTPServer listen: %v", err)
}
port := ln.Addr().(*net.TCPAddr).Port
serverURL = "http://127.0.0.1:" + itoa(port) + "/"
connCh := make(chan net.Conn, 1)
go func() {
conn, err := ln.Accept()
if err != nil {
return
}
connCh <- conn
}()
closeFn = func() {
ln.Close()
}
// Handle in background so we don't block test execution.
// Strategy: read available bytes with a deadline (enough for headers).
// After deadline fires, send the response immediately.
// The kernel discards any unread buffered body bytes when the
// connection closes — harmless.
go func() {
conn := <-connCh
if conn == nil {
return
}
// Read what we can with a 2s deadline. Headers always arrive first.
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
headerBuf := make([]byte, 4096)
for {
n, err := conn.Read(headerBuf)
if n > 0 {
_ = headerBuf[:n]
}
if err != nil {
break
}
}
// Send response and IMMEDIATELY close the connection.
// If we keep it open, the client's request-body writer goroutine
// might block on the socket (waiting for the server to drain the
// body). Closing immediately unblocks it. The client already
// received the response, so the write error is harmless.
resp := buildHTTPResponse(statusCode, body)
conn.Write(resp) //nolint:errcheck
conn.Close()
}()
return serverURL, closeFn
}
// itoa is an inline integer-to-string helper (avoids importing strconv in tests).
func itoa(n int) string {
if n == 0 {
return "0"
}
if n < 0 {
return "-" + itoa(-n)
}
digits := []byte{}
for n > 0 {
digits = append([]byte{byte('0' + n%10)}, digits...)
n /= 10
}
return string(digits)
}
// buildHTTPResponse constructs a minimal HTTP/1.1 response.
func buildHTTPResponse(statusCode int, body string) []byte {
statusText := http.StatusText(statusCode)
if statusText == "" {
statusText = "Unknown"
}
header := "HTTP/1.1 " + itoa(statusCode) + " " + statusText + "\r\n" +
"Content-Type: application/json\r\n" +
"Content-Length: " + itoa(len(body)) + "\r\n" +
"Connection: close\r\n" +
"\r\n"
return []byte(header + body)
}
// setupIntegrationFixtures inserts the rows executeDelegation requires:
// - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true)
// - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find
// - delegations: the ledger row that recordLedgerStatus will UPDATE
//
// Returns a cleanup function the test should defer.
func setupIntegrationFixtures(t *testing.T, conn *sql.DB) func() {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
for _, ws := range []struct {
id string
name string
parentID *string
}{
{testSourceID, "test-source", nil},
{testTargetID, "test-target", nil},
} {
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name, parent_id) VALUES ($1::uuid, $2, $3) ON CONFLICT (id) DO NOTHING`,
ws.id, ws.name, ws.parentID,
); err != nil {
cancel()
t.Fatalf("seed workspace %s: %v", ws.id, err)
}
}
reqBody, _ := json.Marshal(map[string]any{
"delegation_id": testDelegationID,
"task": "do work",
})
if _, err := conn.ExecContext(ctx, `
INSERT INTO activity_logs
(workspace_id, activity_type, method, source_id, target_id, request_body, status)
VALUES ($1, 'delegate', 'delegate', $1, $2, $3::jsonb, 'pending')
ON CONFLICT DO NOTHING
`, testSourceID, testTargetID, string(reqBody)); err != nil {
cancel()
t.Fatalf("seed activity_logs: %v", err)
}
if _, err := conn.ExecContext(ctx, `
INSERT INTO delegations
(delegation_id, caller_id, callee_id, task_preview, status)
VALUES ($1, $2::uuid, $3::uuid, 'do work', 'queued')
ON CONFLICT (delegation_id) DO NOTHING
`, testDelegationID, testSourceID, testTargetID); err != nil {
cancel()
t.Fatalf("seed delegations: %v", err)
}
cancel()
return func() {
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
conn.ExecContext(ctx2,
`DELETE FROM activity_logs WHERE workspace_id = $1 AND request_body->>'delegation_id' = $2`,
testSourceID, testDelegationID)
conn.ExecContext(ctx2,
`DELETE FROM delegations WHERE delegation_id = $1`, testDelegationID)
conn.ExecContext(ctx2,
`DELETE FROM workspaces WHERE id IN ($1, $2)`, testSourceID, testTargetID)
}
}
// readDelegationRow returns (status, result_preview, error_detail) for the test
// delegation, or fails the test if the row is not found.
func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail string) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var prev, errDet sql.NullString
err := conn.QueryRowContext(ctx,
`SELECT status, result_preview, error_detail FROM delegations WHERE delegation_id = $1`,
testDelegationID,
).Scan(&status, &prev, &errDet)
if err != nil {
t.Fatalf("readDelegationRow: %v", err)
}
return status, prev.String, errDet.String
}
// stack returns the current goroutine stack trace. Used by runWithTimeout to
// pinpoint the blocking call site when a test times out.
func stack() string {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
return string(buf[:n])
}
// runWithTimeout calls fn in a goroutine and fails t if it doesn't return within
// timeout. cancel is passed to fn so it can propagate cancellation to
// executeDelegation's DB and network operations — without this, the goroutine
// leaks indefinitely when the test times out (context.Background() never cancels).
// When the timeout fires, cancel() propagates through all blocking ops and the
// goroutine exits cleanly via runtime.Goexit().
func runWithTimeout(t *testing.T, timeout time.Duration, fn func(cancel func())) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() // no-op if ctx expires naturally
done := make(chan struct{})
var panicErr interface{}
go func() {
defer func() {
if p := recover(); p != nil {
panicErr = p
}
close(done)
}()
fn(cancel)
}()
select {
case <-done:
if panicErr != nil {
t.Fatalf("executeDelegation panicked: %v\n%s", panicErr, stack())
}
case <-ctx.Done():
// Timeout: cancel the context so executeDelegation's blocking calls
// (DB ops, network) unblock. Then exit this goroutine so the
// channel closes and the select in the main goroutine can detect
// the panic from t.Fatalf and terminate cleanly.
runtime.Goexit()
}
}
// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess
// is the integration regression gate for issue #159.
//
// Scenario: proxyA2ARequest returns a 200 status code with a non-empty body.
// isDeliveryConfirmedSuccess guard (status>=200 && <300 && len(body)>0 && err!=nil)
// routes to handleSuccess. The integration test verifies the DB row lands at
// 'completed' with the response body as result_preview.
func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": "1",
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(cancel func()) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
}
if preview == "" {
t.Errorf("result_preview should be non-empty, got %q", preview)
}
if errDet != "" {
t.Errorf("error_detail should be empty on success: got %q", errDet)
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that
// a 500 response routes to failure, not success. isDeliveryConfirmedSuccess
// requires status>=200 && <300, so 500 always fails the guard.
func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(cancel func()) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed, got %q", status)
}
if errDet == "" {
t.Error("error_detail should be non-empty on failure")
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that
// a 200 response with an empty body routes to failure. isDeliveryConfirmedSuccess
// requires len(body) > 0, so an empty body fails the guard.
func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, "")
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(cancel func()) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed, got %q", status)
}
if errDet == "" {
t.Error("error_detail should be non-empty on failure")
}
}
// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged is the baseline:
// a clean 200 response with a valid body and no error routes to success.
func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`)
defer closeServer()
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(cancel func()) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
}
if preview == "" {
t.Errorf("result_preview should be non-empty, got %q", preview)
}
if errDet != "" {
t.Errorf("error_detail should be empty on success: got %q", errDet)
}
}
// Test that a delegation where Redis cannot be reached still routes to failure
// (not panic). proxyA2ARequest falls back to DB URL lookup when Redis is down.
func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Set up miniredis so db.RDB is non-nil, but do NOT cache any URL.
// resolveAgentURL skips Redis and falls back to DB, which also has no URL.
mr := setupTestRedis(t)
defer mr.Close()
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(cancel func()) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed (no target URL), got %q", status)
}
if errDet == "" {
t.Error("error_detail should be set on failure due to unreachable target")
}
}
// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT".
func extractHostPort(rawURL string) string {
// Simple parse: strip "http://" prefix and trailing slash.
// The URL format is always "http://127.0.0.1:PORT/" in our usage.
if len(rawURL) > 7 {
return rawURL[7 : len(rawURL)-1]
}
return rawURL
}
// newA2AClientForHost creates an http.Client that redirects all connections
// to the given host:port. This lets us mock the agent endpoint without
// running a real HTTP server.
func newA2AClientForHost(targetHost string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", targetHost)
},
ResponseHeaderTimeout: 180 * time.Second,
},
}
}

View File

@ -39,6 +39,7 @@ import (
"os"
"strings"
"testing"
"time"
mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
_ "github.com/lib/pq"
@ -64,12 +65,16 @@ func integrationDB(t *testing.T) *sql.DB {
if err != nil {
t.Fatalf("open: %v", err)
}
if err := conn.Ping(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
t.Fatalf("ping: %v", err)
}
// Each test gets a fresh table state — fail loud if cleanup fails so
// a bad test doesn't pollute the next one.
if _, err := conn.ExecContext(context.Background(), `DELETE FROM delegations`); err != nil {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
if _, err := conn.ExecContext(ctx2, `DELETE FROM delegations`); err != nil {
t.Fatalf("cleanup: %v", err)
}
// Wire the package-level db.DB so production helpers (recordLedgerInsert,

View File

@ -5,10 +5,8 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) {
t.Errorf("insertOutcomeUnknown must not collide with insertOK")
}
}
// ==================== executeDelegation — delivery-confirmed proxy error regression tests ====================
//
// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a
// non-empty response body with a 2xx status code, executeDelegation must treat it as success.
// The error is a delivery/transport error (e.g., connection reset after response was received).
// Previously, executeDelegation marked these as "failed" even though the work was done,
// causing retry storms and "error" rendering in canvas despite the response being available.
//
// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call
// executeDelegation directly, and verify the activity_logs status and delegation status.
const testDelegationID = "del-159-test"
const testSourceID = "ws-source-159"
const testTargetID = "ws-target-159"
// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that
// executeDelegation always makes, regardless of outcome.
func expectExecuteDelegationBase(mock sqlmock.Sqlmock) {
// updateDelegationStatus: dispatched
// Uses prefix match — sqlmock regexes match the full query string.
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("dispatched", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// CanCommunicate: source != target → fires two getWorkspaceRef lookups.
// Both test fixtures have parent_id = NULL (root-level siblings) → allowed.
// Order matches call order: source first, then target.
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id").
WithArgs(testSourceID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil))
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id").
WithArgs(testTargetID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil))
// resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = ").
WithArgs(testTargetID).
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online"))
}
// expectExecuteDelegationSuccess sets up expectations for a completed delegation.
func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) {
// INSERT activity_logs for delegation completion (response_body status = 'completed')
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed").
WillReturnResult(sqlmock.NewResult(0, 1))
// updateDelegationStatus: completed
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("completed", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// expectExecuteDelegationFailed sets up expectations for a failed delegation.
func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) {
// INSERT activity_logs for delegation failure (response_body status = 'failed')
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed").
WillReturnResult(sqlmock.NewResult(0, 1))
// updateDelegationStatus: failed
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression
// test for issue #159. The scenario:
// - Attempt 1: server sends 200 OK headers + partial body, then closes connection.
// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry.
// - Attempt 2: server does the same thing (closes after partial body).
// proxyA2ARequest: same (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon,
// or we get one more attempt). For the test we let it run.
// POST-FIX: the executeDelegation new condition sees status=200, body=<partial>, err!=nil
// and routes to handleSuccess immediately.
//
// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded)
// even when the server sent 200, so the condition always failed. Post-fix, status=200 is
// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody).
// In this test the retry ultimately succeeds (server eventually sends full body), but
// the critical assertion is that a 2xx partial-body delivery-confirmed response is never
// classified as "failed" — it always routes to success.
func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server that sends a 200 response with declared Content-Length but closes
// the connection before sending all bytes. Go's http.Client sees io.EOF on
// the body read. proxyA2ARequest captures the partial body + status=200 and
// returns (200, <partial>, error). executeDelegation's new condition sees
// status=200 + body > 0 + error != nil → routes to handleSuccess.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
// Consume the HTTP request
buf := make([]byte, 2048)
conn.Read(buf)
// Send 200 OK with Content-Length: 100 but only 74 bytes of body
// (less than declared length → io.LimitReader returns io.EOF after reading all 74)
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
// Execute synchronously (not as a goroutine) so we can check DB state immediately.
// The handler fires it as goroutine; we call it directly for deterministic testing.
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": "1",
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond) // let DB writes settle
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx
// status code (e.g., 500 Internal Server Error with partial body read before connection drop).
// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure.
func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 500 with declared Content-Length but closes connection early.
// proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error.
// Returns (500, <partial_body>, BadGateway).
// New condition: status=500 is NOT >= 200 && < 300 → routes to failure.
// isTransientProxyError(500) = false → no retry.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
buf := make([]byte, 2048)
conn.Read(buf)
// 500 with Content-Length: 100 but only ~60 bytes of body
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body.
// The new condition requires len(respBody) > 0, so empty body routes to failure.
func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil.
// New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300
// → len(respBody) == 0 → condition FALSE → falls through to failure.
// isTransientProxyError(502) is TRUE → retry → same result → failure.
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
// No body — connection closes normally
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
// First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase
expectExecuteDelegationBase(mock)
// Second attempt (retry): updateDelegationStatus(dispatched) again
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("dispatched", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// Failure: INSERT + UPDATE (failed)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response
// (no error, 200 with body) is unaffected by the new condition. This is the baseline:
// proxyErr == nil so the new condition never fires.
func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}