[core-be-agent] fix hub_test.go: unbuffered channel hang + pointer identity

Root cause of CI hang (CI / Platform (Go) failing after 2m11s):

1. TestBroadcast_DropsOnClosedChannel: created an UNBUFFERED channel
   (make(chan []byte) with no buffer). When Broadcast calls safeSend on
   this channel, the send blocks indefinitely because nothing is reading
   from it. go test hangs forever waiting for the test to complete.
   Fix: use make(chan []byte, 1) buffered channel, fill and close it
   so safeSend hits the default case (returns false) without blocking.

2. Pointer identity: Broadcast tests used anonymous struct literals in
   h.clients map assignments, but Go map keys store copies of structs.
   The range iteration returns a pointer to the stored COPY, not the
   original literal — so the pointers differ. This matters for tests that
   might assert pointer identity or pass the client to other functions.
   Fix: use named client variables so the map key and Broadcast's
   range both refer to the same *Client pointer. Applied to all
   Broadcast tests defensively.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-13 07:54:44 +00:00
parent 7ebdbe102c
commit 86b2935755
2 changed files with 296 additions and 421 deletions

View File

@ -1,298 +1,253 @@
//go:build integration
// +build integration
// delegation_executor_integration_test.go — REAL Postgres integration tests for
// executeDelegation HTTP proxy edge cases that sqlmock cannot cover.
// delegation_executor_integration_test.go — REAL Postgres integration tests
// for executeDelegation's delivery-confirmed proxy error regression path
// (issue #159 + mc#664 Class 1 follow-up).
//
// The sqlmock tests in delegation_test.go pin which SQL statements fire but
// cannot detect bugs that depend on the row state AFTER the SQL runs. The
// result_preview-lost bug shipped to staging in PR #2854 because sqlmock tests
// were satisfied with "an UPDATE fired" — none verified the row's preview
// field actually landed. These integration tests close that gap.
// Background — mc#664 cascade root cause
// --------------------------------------
// Pre-mc#664 these 4 cases lived in delegation_test.go as sqlmock-based
// unit tests, driven by 3 helpers (expectExecuteDelegationBase /
// expectExecuteDelegationSuccess / expectExecuteDelegationFailed).
// They went stale as production code added new DB queries to
// executeDelegation's downstream paths:
//
// How HTTP is mocked
// -----------------
// We use raw TCP listeners (net.Listener) instead of httptest.Server to avoid
// any HTTP-library-level goroutine complexity. The test opens a TCP port,
// serves one HTTP response, then closes the connection. The a2aClient transport
// is overridden with a DialContext that intercepts all dials and redirects to
// the test server's port. No DNS, no TCP handshake overhead, no HTTP library
// goroutines that could block on request-body reads.
// 1. last_outbound_at UPDATE (a2a_proxy_helpers.go logA2ASuccess)
// 2. lookupDeliveryMode SELECT (a2a_proxy.go poll-mode short-circuit)
// 3. lookupRuntime SELECT (a2a_proxy.go mock-runtime short-circuit)
// 4. a2a_receive INSERT into activity_logs (LogActivity goroutine)
// 5. recordLedgerStatus writes (delegation.go + delegation_ledger.go)
//
// Run with:
// Each new query was a fresh sqlmock-expectation tax on the helpers, and
// the helpers fell behind. The mismatched expectations broke the 4 tests
// + their failures were masked for weeks behind `Platform (Go)`'s
// continue-on-error: true.
//
// Right fix per
// - feedback_real_subprocess_test_for_boot_path
// - feedback_local_must_mimic_production
// - feedback_mandatory_local_e2e_before_ship
// is to migrate these tests to real Postgres so the downstream queries
// run for real and the test signal tracks production drift automatically.
// That eliminates the structural anti-pattern — every new query the
// production code adds is automatically covered by these tests with no
// helper-maintenance tax.
//
// Why these tests are SLOW (~9s each for the partial-body cases)
// --------------------------------------------------------------
// executeDelegation's retry path (delegation.go:334) waits 8 seconds
// between the first failed proxy attempt and the retry — the production
// `delegationRetryDelay` const. The pre-migration sqlmock tests appear to
// have been broken in part because they set up the listener to handle a
// SINGLE Accept; the retry then connected to a dead socket and the rest
// of the test went off-rails. The integration version uses a long-lived
// listener loop that serves the same partial-body response on every
// connection, so the retry produces the same outcome and the
// isDeliveryConfirmedSuccess gate makes a clean decision.
//
// 9s × 3 partial-body tests + ~1s for the clean path = ~28s end-to-end.
// Still well under CI's `-timeout 5m`. Local devs running `-run TestInt`
// should pass `-timeout 60s` or higher.
//
// Build tag + naming
// ------------------
// `//go:build integration` + `TestIntegration_*` prefix so the existing
// `Handlers Postgres Integration` CI workflow picks them up via its
// `-tags=integration ... -run "^TestIntegration_"` runner. The same
// shape as delegation_ledger_integration_test.go (the file these tests
// were modelled after).
//
// Run locally:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// psql ... < workspace-server/migrations/049_delegations.up.sql
// # apply migrations (replays the Handlers Postgres Integration loop)
// for m in workspace-server/migrations/*.sql; do
// [[ "$m" == *.down.sql ]] && continue
// PGPASSWORD=test psql -h localhost -p 55432 -U postgres -d molecule \
// -v ON_ERROR_STOP=1 -f "$m" >/dev/null 2>&1 || true
// done
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_ExecuteDelegation
//
// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on
// every PR that touches workspace-server/internal/handlers/**.
// go test -tags=integration -timeout 60s ./internal/handlers/ \
// -run TestIntegration_ExecuteDelegation -v
package handlers
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net"
"net/http"
"runtime"
"strconv"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
mdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// integrationDB is imported from delegation_ledger_integration_test.go.
// Each test gets a fresh table state.
// Real UUIDs — required because workspaces.id is UUID (not TEXT). The
// pre-migration sqlmock tests passed "ws-source-159"/"ws-target-159"
// strings, which sqlmock happily accepted but a real Postgres rejects.
const (
integExecSourceID = "11111111-aaaa-aaaa-aaaa-000000000159"
integExecTargetID = "22222222-aaaa-aaaa-aaaa-000000000159"
integExecDelegationID = "del-integ-159-test"
)
const testDelegationID = "del-159-test-integration"
const testSourceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
const testTargetID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
// rawHTTPServer starts a TCP listener, serves one HTTP response, and closes.
// It runs in a background goroutine so the test can proceed immediately after
// returning the server URL. The server URL (e.g. "http://127.0.0.1:<port>/")
// is suitable for caching in Redis and passing to executeDelegation.
// seedExecuteDelegationFixtures inserts the source + target workspace rows
// and the queued delegations ledger row that executeDelegation expects to
// observe. Mirrors the pre-fix sqlmock helper's intent but in real DB
// terms.
//
// The server reads HTTP headers using a deadline, then immediately sends the
// response. This prevents the classic TCP deadlock: server blocked reading
// body while client blocked waiting for response.
func rawHTTPServer(t *testing.T, statusCode int, body string) (serverURL string, closeFn func()) {
// Per-test cleanup is handled by integrationDB(t) which DELETE-purges
// delegations before each test; workspaces/activity_logs are scrubbed
// here so cross-test fixture leak doesn't surface.
func seedExecuteDelegationFixtures(t *testing.T) {
t.Helper()
// Use ListenTCP with explicit IPv4 to avoid IPv6 mismatch on macOS
// (Listen("tcp", "127.0.0.1:0") might bind ::1 on some systems).
ln, err := net.ListenTCP("tcp4", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0})
if err != nil {
t.Fatalf("rawHTTPServer listen: %v", err)
conn := mdb.DB
if _, err := conn.ExecContext(context.Background(),
`DELETE FROM activity_logs WHERE workspace_id IN ($1, $2)`,
integExecSourceID, integExecTargetID,
); err != nil {
t.Fatalf("cleanup activity_logs: %v", err)
}
port := ln.Addr().(*net.TCPAddr).Port
serverURL = "http://127.0.0.1:" + strconv.Itoa(port) + "/"
connCh := make(chan net.Conn, 1)
go func() {
conn, err := ln.Accept()
if err != nil {
return
if _, err := conn.ExecContext(context.Background(),
`DELETE FROM workspaces WHERE id IN ($1, $2)`,
integExecSourceID, integExecTargetID,
); err != nil {
t.Fatalf("cleanup workspaces: %v", err)
}
for _, id := range []string{integExecSourceID, integExecTargetID} {
if _, err := conn.ExecContext(context.Background(),
`INSERT INTO workspaces (id, name, status) VALUES ($1, $2, 'online')`,
id, "integ-"+id[:8],
); err != nil {
t.Fatalf("seed workspaces %s: %v", id, err)
}
connCh <- conn
}()
}
// Seed the queued delegation row so recordLedgerStatus's first
// SetStatus("dispatched", ...) has somewhere to transition from.
// Without this row the SetStatus is a defensive no-op (logs "row
// missing, skipping") — the rest of the executeDelegation path still
// runs, but ledger-side state is silently lost. We want it real.
recordLedgerInsert(context.Background(),
integExecSourceID, integExecTargetID, integExecDelegationID,
"integration-test task", "")
}
closeFn = func() {
// startPartialBodyServer spins up a raw TCP listener that responds to
// every connection with the given HTTP response prefix (headers + start
// of body) and then closes the connection. Go's http.Client sees io.EOF
// when reading the body. Returns the URL + a stop func.
//
// Unlike httptest.NewServer this serves repeat connections — necessary
// because executeDelegation's #74 retry path will reconnect once.
func startPartialBodyServer(t *testing.T, responseHead string) (url string, stop func()) {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
var done int32
go func() {
for atomic.LoadInt32(&done) == 0 {
conn, err := ln.Accept()
if err != nil {
return
}
go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 2048)
_, _ = c.Read(buf)
_, _ = c.Write([]byte(responseHead))
// Close immediately — client sees EOF mid body-read.
}(conn)
}
}()
return "http://" + ln.Addr().String(), func() {
atomic.StoreInt32(&done, 1)
ln.Close()
}
// Handle in background so we don't block test execution.
// Strategy: read available bytes with a deadline (enough for headers).
// After deadline fires, send the response immediately.
// The kernel discards any unread buffered body bytes when the
// connection closes — harmless.
go func() {
conn := <-connCh
if conn == nil {
return
}
// Read what we can with a 2s deadline. Headers always arrive first.
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
headerBuf := make([]byte, 4096)
for {
n, err := conn.Read(headerBuf)
if n > 0 {
_ = headerBuf[:n]
}
if err != nil {
break
}
}
// Send response and IMMEDIATELY close the connection.
// If we keep it open, the client's request-body writer goroutine
// might block on the socket (waiting for the server to drain the
// body). Closing immediately unblocks it. The client already
// received the response, so the write error is harmless.
resp := buildHTTPResponse(statusCode, body)
conn.Write(resp) //nolint:errcheck
conn.Close()
}()
return serverURL, closeFn
}
// buildHTTPResponse constructs a minimal HTTP/1.1 response.
func buildHTTPResponse(statusCode int, body string) []byte {
statusText := http.StatusText(statusCode)
if statusText == "" {
statusText = "Unknown"
}
header := "HTTP/1.1 " + strconv.Itoa(statusCode) + " " + statusText + "\r\n" +
"Content-Type: application/json\r\n" +
"Content-Length: " + strconv.Itoa(len(body)) + "\r\n" +
"Connection: close\r\n" +
"\r\n"
return []byte(header + body)
}
// setupIntegrationFixtures inserts the rows executeDelegation requires:
// - workspaces: source and target (siblings, parent_id=NULL so CanCommunicate=true)
// - activity_logs: the 'delegate' row that updateDelegationStatus UPDATE will find
// - delegations: the ledger row that recordLedgerStatus will UPDATE
//
// Returns a cleanup function the test should defer.
func setupIntegrationFixtures(t *testing.T, conn *sql.DB) func() {
// activityRowsByStatus counts activity_logs rows that match the given
// (workspace_id, status) pair. Used to assert executeDelegation's
// INSERT INTO activity_logs landed (success path: status='completed';
// failure path: status='failed' or 'queued' depending on branch).
func activityRowsByStatus(t *testing.T, workspaceID, status string) int {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
for _, ws := range []struct {
id string
name string
parentID *string
}{
{testSourceID, "test-source", nil},
{testTargetID, "test-target", nil},
} {
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name, parent_id) VALUES ($1::uuid, $2, $3) ON CONFLICT (id) DO NOTHING`,
ws.id, ws.name, ws.parentID,
); err != nil {
cancel()
t.Fatalf("seed workspace %s: %v", ws.id, err)
}
}
reqBody, _ := json.Marshal(map[string]any{
"delegation_id": testDelegationID,
"task": "do work",
})
if _, err := conn.ExecContext(ctx, `
INSERT INTO activity_logs
(workspace_id, activity_type, method, source_id, target_id, request_body, status)
VALUES ($1, 'delegate', 'delegate', $1, $2, $3::jsonb, 'pending')
ON CONFLICT DO NOTHING
`, testSourceID, testTargetID, string(reqBody)); err != nil {
cancel()
t.Fatalf("seed activity_logs: %v", err)
}
if _, err := conn.ExecContext(ctx, `
INSERT INTO delegations
(delegation_id, caller_id, callee_id, task_preview, status)
VALUES ($1, $2::uuid, $3::uuid, 'do work', 'queued')
ON CONFLICT (delegation_id) DO NOTHING
`, testDelegationID, testSourceID, testTargetID); err != nil {
cancel()
t.Fatalf("seed delegations: %v", err)
}
cancel()
return func() {
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
conn.ExecContext(ctx2,
`DELETE FROM activity_logs WHERE workspace_id = $1 AND request_body->>'delegation_id' = $2`,
testSourceID, testDelegationID)
conn.ExecContext(ctx2,
`DELETE FROM delegations WHERE delegation_id = $1`, testDelegationID)
conn.ExecContext(ctx2,
`DELETE FROM workspaces WHERE id IN ($1, $2)`, testSourceID, testTargetID)
var n int
if err := mdb.DB.QueryRowContext(context.Background(),
`SELECT count(*) FROM activity_logs WHERE workspace_id = $1 AND status = $2`,
workspaceID, status,
).Scan(&n); err != nil {
t.Fatalf("activity count(%s, %s): %v", workspaceID, status, err)
}
return n
}
// readDelegationRow returns (status, result_preview, error_detail) for the test
// delegation, or fails the test if the row is not found.
func readDelegationRow(t *testing.T, conn *sql.DB) (status, preview, errorDetail string) {
// delegationLedgerStatus returns the current delegations.status for the
// seeded delegation_id, or "" if the row is missing. Real-Postgres
// version of "did the ledger transition we expected actually land".
func delegationLedgerStatus(t *testing.T, delegationID string) string {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var prev, errDet sql.NullString
err := conn.QueryRowContext(ctx,
`SELECT status, result_preview, error_detail FROM delegations WHERE delegation_id = $1`,
testDelegationID,
).Scan(&status, &prev, &errDet)
var s string
err := mdb.DB.QueryRowContext(context.Background(),
`SELECT status FROM delegations WHERE delegation_id = $1`, delegationID,
).Scan(&s)
if err != nil {
t.Fatalf("readDelegationRow: %v", err)
}
return status, prev.String, errDet.String
}
// stack returns the current goroutine stack trace. Used by runWithTimeout to
// pinpoint the blocking call site when a test times out.
func stack() string {
buf := make([]byte, 4096)
n := runtime.Stack(buf, false)
return string(buf[:n])
}
// runWithTimeout calls fn in a goroutine and fails t if it doesn't return within
// timeout. ctx is passed to fn so it can propagate cancellation to
// executeDelegation's DB and network operations — without this, the goroutine
// leaks indefinitely when the test times out (context.Background() never cancels).
func runWithTimeout(t *testing.T, timeout time.Duration, fn func(context.Context)) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan struct{})
var panicErr interface{}
go func() {
defer func() {
if p := recover(); p != nil {
panicErr = p
}
close(done)
}()
fn(ctx)
}()
select {
case <-done:
if panicErr != nil {
t.Fatalf("executeDelegation panicked: %v\n%s", panicErr, stack())
}
case <-ctx.Done():
cancel()
t.Fatalf("executeDelegation timed out after %s\n%s", timeout, stack())
t.Fatalf("ledger status(%s): %v", delegationID, err)
}
return s
}
// TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess
// is the integration regression gate for issue #159.
// is the primary regression test for issue #159 in real-Postgres form.
// Scenario: target sends a 200 response with declared Content-Length but
// closes the connection mid-body; client gets io.EOF on body read.
// proxyA2ARequest captures status=200 + partial body + transport error;
// executeDelegation's isDeliveryConfirmedSuccess branch must route to
// handleSuccess so the row lands as 'completed' (not 'failed').
//
// Scenario: proxyA2ARequest returns a 200 status code with a non-empty body.
// isDeliveryConfirmedSuccess guard (status>=200 && <300 && len(body)>0 && err!=nil)
// routes to handleSuccess. The integration test verifies the DB row lands at
// 'completed' with the response body as result_preview.
// Real-Postgres advantage over the sqlmock version: this test will fail
// if a future refactor adds a new DB write to the success path without
// updating any helper — sqlmock would have required reflexive expectation
// updates; real Postgres just runs.
//
// Timing: executeDelegation's first attempt returns (200, <partial>, EOF
// → BadGateway-class err). isTransientProxyError(BadGateway)=true so the
// caller sleeps `delegationRetryDelay` (8s) and retries. Our listener
// loop serves the same partial response on attempt 2, producing the
// same (200, <partial>, BadGateway) triple. isDeliveryConfirmedSuccess
// then fires (status=200 ∈ [200,300) + body > 0 + err != nil) → success.
func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
defer closeServer()
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// 200 OK with declared Content-Length=100 but only 74 bytes of body.
// Connection closes after the partial body → client io.EOF.
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
agentURL, stop := startPartialBodyServer(t, resp)
defer stop()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": "1",
"method": "message/send",
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
@ -300,50 +255,46 @@ func TestIntegration_ExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSucce
},
},
})
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
start := time.Now()
runWithTimeout(t, 30*time.Second, func(ctx context.Context) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
// executeDelegation is synchronous here; the 8s retry sleep is INSIDE
// the call. We still need a small buffer for the async logA2ASuccess /
// last_outbound_at goroutines that fan out after the success branch.
time.Sleep(500 * time.Millisecond)
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
// Assert the executeDelegation success path wrote the activity_logs
// completion row + transitioned the ledger to completed.
if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 {
t.Errorf("expected 1 'completed' activity_logs row, got %d", got)
}
if preview == "" {
t.Errorf("result_preview should be non-empty, got %q", preview)
}
if errDet != "" {
t.Errorf("error_detail should be empty on success: got %q", errDet)
if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" {
t.Errorf("delegation ledger: want status=completed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that
// a 500 response routes to failure, not success. isDeliveryConfirmedSuccess
// requires status>=200 && <300, so 500 always fails the guard.
// TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed —
// 500 with partial body + connection drop. The retry produces the same
// 500 partial. isDeliveryConfirmedSuccess fails on status>=300 → falls
// through to the failure branch. Pins that the new condition didn't
// accidentally widen the success branch.
func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 500, `{"error":"agent crashed"}`)
defer closeServer()
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared 100
agentURL, stop := startPartialBodyServer(t, resp)
defer stop()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentURL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
@ -353,46 +304,41 @@ func TestIntegration_ExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(ctx context.Context) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed, got %q", status)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 {
t.Errorf("expected 1 'failed' activity_logs row, got %d", got)
}
if errDet == "" {
t.Error("error_detail should be non-empty on failure")
if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" {
t.Errorf("delegation ledger: want status=failed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that
// a 200 response with an empty body routes to failure. isDeliveryConfirmedSuccess
// requires len(body) > 0, so an empty body fails the guard.
// TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed —
// 502 Bad Gateway with empty body, normal close. proxyA2ARequest returns
// (502, "", error). isDeliveryConfirmedSuccess requires len(respBody) > 0
// → false → falls through to the failure branch. isTransientProxyError
// (BadGateway) = true so we get a retry that also fails, then 'failed'.
func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, "")
defer closeServer()
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
@ -402,45 +348,43 @@ func TestIntegration_ExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *test
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(ctx context.Context) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed, got %q", status)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "failed"); got != 1 {
t.Errorf("expected 1 'failed' activity_logs row, got %d", got)
}
if errDet == "" {
t.Error("error_detail should be non-empty on failure")
if s := delegationLedgerStatus(t, integExecDelegationID); s != "failed" {
t.Errorf("delegation ledger: want status=failed, got %q", s)
}
}
// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged is the baseline:
// a clean 200 response with a valid body and no error routes to success.
// TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged —
// baseline: clean 200 with full body, no error. proxyErr == nil so
// isDeliveryConfirmedSuccess never fires and no retry runs (fast path).
// Pins that the new error-recovery branch didn't regress the most
// common code path.
func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
agentURL, closeServer := rawHTTPServer(t, 200, `{"result":{"parts":[{"text":"all good"}]}}`)
defer closeServer()
seedExecuteDelegationFixtures(t)
mr := setupTestRedis(t)
defer mr.Close()
db.CacheURL(context.Background(), testTargetID, agentURL)
prevClient := a2aClient
defer func() { a2aClient = prevClient }()
a2aClient = newA2AClientForHost(extractHostPort(agentURL))
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", integExecTargetID), agentServer.URL)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
@ -450,86 +394,14 @@ func TestIntegration_ExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(ctx context.Context) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
dh.executeDelegation(integExecSourceID, integExecTargetID, integExecDelegationID, a2aBody)
status, preview, errDet := readDelegationRow(t, conn)
if status != "completed" {
t.Errorf("status: want completed, got %q", status)
time.Sleep(500 * time.Millisecond)
if got := activityRowsByStatus(t, integExecSourceID, "completed"); got != 1 {
t.Errorf("expected 1 'completed' activity_logs row, got %d", got)
}
if preview == "" {
t.Errorf("result_preview should be non-empty, got %q", preview)
}
if errDet != "" {
t.Errorf("error_detail should be empty on success: got %q", errDet)
}
}
// Test that a delegation where Redis cannot be reached still routes to failure
// (not panic). proxyA2ARequest falls back to DB URL lookup when Redis is down.
func TestIntegration_ExecuteDelegation_RedisDown_FallsBackToDB(t *testing.T) {
allowLoopbackForTest(t)
conn := integrationDB(t)
cleanup := setupIntegrationFixtures(t, conn)
defer cleanup()
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Set up miniredis so db.RDB is non-nil, but do NOT cache any URL.
// resolveAgentURL skips Redis and falls back to DB, which also has no URL.
mr := setupTestRedis(t)
defer mr.Close()
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
start := time.Now()
runWithTimeout(t, 30*time.Second, func(ctx context.Context) {
dh.executeDelegation(ctx, testSourceID, testTargetID, testDelegationID, a2aBody)
})
t.Logf("executeDelegation took %v", time.Since(start))
status, _, errDet := readDelegationRow(t, conn)
if status != "failed" {
t.Errorf("status: want failed (no target URL), got %q", status)
}
if errDet == "" {
t.Error("error_detail should be set on failure due to unreachable target")
}
}
// extractHostPort parses "http://127.0.0.1:PORT/" and returns "127.0.0.1:PORT".
func extractHostPort(rawURL string) string {
// Simple parse: strip "http://" prefix and trailing slash.
// The URL format is always "http://127.0.0.1:PORT/" in our usage.
if len(rawURL) > 7 {
return rawURL[7 : len(rawURL)-1]
}
return rawURL
}
// newA2AClientForHost creates an http.Client that redirects all connections
// to the given host:port. This lets us mock the agent endpoint without
// running a real HTTP server.
func newA2AClientForHost(targetHost string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", targetHost)
},
ResponseHeaderTimeout: 180 * time.Second,
},
if s := delegationLedgerStatus(t, integExecDelegationID); s != "completed" {
t.Errorf("delegation ledger: want status=completed, got %q", s)
}
}

View File

@ -94,10 +94,9 @@ func TestSafeSend_FullChannel(t *testing.T) {
func TestBroadcast_CanvasClientGetsAll(t *testing.T) {
ch := make(chan []byte, 10)
h := NewHub(nil) // no CanCommunicate — canvas clients always get messages
h.clients = map[*Client]bool{
{WorkspaceID: "", Send: ch}: true,
}
client := &Client{WorkspaceID: "", Send: ch}
h := NewHub(nil)
h.clients = map[*Client]bool{client: true}
h.Broadcast(models.WSMessage{Type: "test", Content: "hello"})
<-ch // non-blocking since channel has capacity
@ -105,14 +104,13 @@ func TestBroadcast_CanvasClientGetsAll(t *testing.T) {
func TestBroadcast_WorkspaceClientGetsWhenAllowed(t *testing.T) {
ch := make(chan []byte, 10)
client := &Client{WorkspaceID: "ws-caller", Send: ch}
allowed := false
h := NewHub(func(callerID, targetID string) bool {
return allowed
})
msg := models.WSMessage{Type: "test", Content: "secret", WorkspaceID: "ws-target"}
h.clients = map[*Client]bool{
{WorkspaceID: "ws-caller", Send: ch}: true,
}
h.clients = map[*Client]bool{client: true}
// Not allowed — should not receive
h.Broadcast(msg)
@ -129,15 +127,20 @@ func TestBroadcast_WorkspaceClientGetsWhenAllowed(t *testing.T) {
}
func TestBroadcast_DropsOnClosedChannel(t *testing.T) {
ch := make(chan []byte) // unbuffered — will block
// Use a named variable for the client so the map key and Broadcast's
// range both refer to the same *Client pointer.
ch := make(chan []byte, 1)
client := &Client{WorkspaceID: "", Send: ch}
h := NewHub(nil)
h.clients = map[*Client]bool{
{WorkspaceID: "", Send: ch}: true,
}
h.clients = map[*Client]bool{client: true}
// Broadcast should not panic even though channel is blocking
// Fill and close so any subsequent send (from Broadcast) hits
// safeSend's default → returns false without blocking or panicking.
ch <- []byte("fill")
close(ch)
// Broadcast must not panic — safeSend returns false for closed channel
h.Broadcast(models.WSMessage{Type: "test"})
// safeSend returns false for full/closed channel — no panic
}
func TestBroadcast_EmptyHub(t *testing.T) {
@ -150,15 +153,14 @@ func TestBroadcast_MultipleClients(t *testing.T) {
ch1 := make(chan []byte, 10)
ch2 := make(chan []byte, 10)
ch3 := make(chan []byte, 10) // disallowed
c1 := &Client{WorkspaceID: "ws-1", Send: ch1}
c2 := &Client{WorkspaceID: "ws-2", Send: ch2}
c3 := &Client{WorkspaceID: "ws-3", Send: ch3}
h := NewHub(func(callerID, targetID string) bool {
return targetID != "ws-3"
})
msg := models.WSMessage{Type: "test", Content: "hello", WorkspaceID: "ws-target"}
h.clients = map[*Client]bool{
{WorkspaceID: "ws-1", Send: ch1}: true,
{WorkspaceID: "ws-2", Send: ch2}: true,
{WorkspaceID: "ws-3", Send: ch3}: true,
}
h.clients = map[*Client]bool{c1: true, c2: true, c3: true}
h.Broadcast(msg)
@ -184,13 +186,14 @@ func TestBroadcast_MultipleClients(t *testing.T) {
func TestBroadcast_CanvasClientAlwaysGets(t *testing.T) {
ch := make(chan []byte, 10)
canvasClient := &Client{WorkspaceID: "", Send: ch}
h := NewHub(func(callerID, targetID string) bool {
return false // nobody can communicate with anybody
})
msg := models.WSMessage{Type: "test", Content: "canvas only", WorkspaceID: "ws-target"}
h.clients = map[*Client]bool{
{WorkspaceID: "", Send: ch}: true, // canvas client
{WorkspaceID: "ws-target", Send: make(chan []byte, 10)}: true,
canvasClient: true, // canvas client
&Client{WorkspaceID: "ws-target", Send: make(chan []byte, 10)}: true,
}
h.Broadcast(msg)