|
|
|
@@ -231,14 +231,21 @@ func TestListDelegations_Empty(t *testing.T) {
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
rows := sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
})
|
|
|
|
|
// Ledger returns empty → falls back to activity_logs (also empty)
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}))
|
|
|
|
|
mock.ExpectQuery("SELECT id, activity_type").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(rows)
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
@@ -258,9 +265,12 @@ func TestListDelegations_Empty(t *testing.T) {
|
|
|
|
|
if len(resp) != 0 {
|
|
|
|
|
t.Errorf("expected empty array, got %d entries", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: with results → 200 with entries ----------
|
|
|
|
|
// ---------- ListDelegations: with results (ledger only, no activity_logs fallback) ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_WithResults(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
@@ -270,19 +280,20 @@ func TestListDelegations_WithResults(t *testing.T) {
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
// Ledger query returns rows — no fallback to activity_logs
|
|
|
|
|
rows := sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}).
|
|
|
|
|
AddRow("1", "delegation", "ws-source", "ws-target",
|
|
|
|
|
AddRow("del-111", "ws-source", "ws-target",
|
|
|
|
|
"Delegating to ws-target", "pending", "", "",
|
|
|
|
|
"del-111", now).
|
|
|
|
|
AddRow("2", "delegation", "ws-source", "ws-target",
|
|
|
|
|
"Delegation completed (hello world)", "completed", "", "hello world",
|
|
|
|
|
"del-111", now.Add(time.Minute))
|
|
|
|
|
&now, &now.Add(6*time.Hour), now, now).
|
|
|
|
|
AddRow("del-222", "ws-source", "ws-target",
|
|
|
|
|
"Delegation completed (hello world)", "completed", "hello world", "",
|
|
|
|
|
&now, &now.Add(6*time.Hour), now, now.Add(time.Minute))
|
|
|
|
|
|
|
|
|
|
mock.ExpectQuery("SELECT id, activity_type").
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(rows)
|
|
|
|
|
|
|
|
|
@@ -306,23 +317,26 @@ func TestListDelegations_WithResults(t *testing.T) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check first entry (pending delegation)
|
|
|
|
|
if resp[0]["type"] != "delegation" {
|
|
|
|
|
t.Errorf("expected type 'delegation', got %v", resp[0]["type"])
|
|
|
|
|
if resp[0]["delegation_id"] != "del-111" {
|
|
|
|
|
t.Errorf("expected delegation_id 'del-111', got %v", resp[0]["delegation_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["status"] != "pending" {
|
|
|
|
|
t.Errorf("expected status 'pending', got %v", resp[0]["status"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["delegation_id"] != "del-111" {
|
|
|
|
|
t.Errorf("expected delegation_id 'del-111', got %v", resp[0]["delegation_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["source_id"] != "ws-source" {
|
|
|
|
|
t.Errorf("expected source_id 'ws-source', got %v", resp[0]["source_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["target_id"] != "ws-target" {
|
|
|
|
|
t.Errorf("expected target_id 'ws-target', got %v", resp[0]["target_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["_ledger"] != true {
|
|
|
|
|
t.Errorf("expected _ledger=true marker, got %v", resp[0]["_ledger"])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check second entry (completed, has response_preview)
|
|
|
|
|
if resp[1]["delegation_id"] != "del-222" {
|
|
|
|
|
t.Errorf("expected delegation_id 'del-222', got %v", resp[1]["delegation_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[1]["status"] != "completed" {
|
|
|
|
|
t.Errorf("expected status 'completed', got %v", resp[1]["status"])
|
|
|
|
|
}
|
|
|
|
@@ -956,3 +970,631 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) {
|
|
|
|
|
t.Errorf("insertOutcomeUnknown must not collide with insertOK")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ==================== executeDelegation — delivery-confirmed proxy error regression tests ====================
|
|
|
|
|
//
|
|
|
|
|
// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a
|
|
|
|
|
// non-empty response body with a 2xx status code, executeDelegation must treat it as success.
|
|
|
|
|
// The error is a delivery/transport error (e.g., connection reset after response was received).
|
|
|
|
|
// Previously, executeDelegation marked these as "failed" even though the work was done,
|
|
|
|
|
// causing retry storms and "error" rendering in canvas despite the response being available.
|
|
|
|
|
//
|
|
|
|
|
// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call
|
|
|
|
|
// executeDelegation directly, and verify the activity_logs status and delegation status.
|
|
|
|
|
|
|
|
|
|
const testDelegationID = "del-159-test"
|
|
|
|
|
const testSourceID = "ws-source-159"
|
|
|
|
|
const testTargetID = "ws-target-159"
|
|
|
|
|
|
|
|
|
|
// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that
|
|
|
|
|
// executeDelegation always makes, regardless of outcome.
|
|
|
|
|
func expectExecuteDelegationBase(mock sqlmock.Sqlmock) {
|
|
|
|
|
// updateDelegationStatus: dispatched
|
|
|
|
|
// Uses prefix match — sqlmock regexes match the full query string.
|
|
|
|
|
mock.ExpectExec("UPDATE activity_logs SET status").
|
|
|
|
|
WithArgs("dispatched", "", testSourceID, testDelegationID).
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
|
|
|
|
|
// CanCommunicate (source=target self-call is always allowed — no DB lookup needed)
|
|
|
|
|
// resolveAgentURL: reads ws:{id}:url from Redis, falls back to DB for target
|
|
|
|
|
mock.ExpectQuery("SELECT url, status FROM workspaces WHERE id = ").
|
|
|
|
|
WithArgs(testTargetID).
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "online"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expectExecuteDelegationSuccess sets up expectations for a completed delegation.
|
|
|
|
|
func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) {
|
|
|
|
|
// INSERT activity_logs for delegation completion (response_body status = 'completed')
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").
|
|
|
|
|
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "completed").
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
|
|
|
|
|
// updateDelegationStatus: completed
|
|
|
|
|
mock.ExpectExec("UPDATE activity_logs SET status").
|
|
|
|
|
WithArgs("completed", "", testSourceID, testDelegationID).
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expectExecuteDelegationFailed sets up expectations for a failed delegation.
|
|
|
|
|
func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) {
|
|
|
|
|
// INSERT activity_logs for delegation failure (response_body status = 'failed')
|
|
|
|
|
mock.ExpectExec("INSERT INTO activity_logs").
|
|
|
|
|
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), "failed").
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
|
|
|
|
|
// updateDelegationStatus: failed
|
|
|
|
|
mock.ExpectExec("UPDATE activity_logs SET status").
|
|
|
|
|
WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID).
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression
|
|
|
|
|
// test for issue #159. The scenario:
|
|
|
|
|
// - Attempt 1: server sends 200 OK headers + partial body, then closes connection.
|
|
|
|
|
// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, <partial>, BadGateway).
|
|
|
|
|
// isTransientProxyError(BadGateway) = TRUE → retry.
|
|
|
|
|
// - Attempt 2: server does the same thing (closes after partial body).
|
|
|
|
|
// proxyA2ARequest: same (200, <partial>, BadGateway).
|
|
|
|
|
// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon,
|
|
|
|
|
// or we get one more attempt). For the test we let it run.
|
|
|
|
|
// POST-FIX: the executeDelegation new condition sees status=200, body=<partial>, err!=nil
|
|
|
|
|
// and routes to handleSuccess immediately.
|
|
|
|
|
//
|
|
|
|
|
// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded)
|
|
|
|
|
// even when the server sent 200, so the condition always failed. Post-fix, status=200 is
|
|
|
|
|
// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody).
|
|
|
|
|
// In this test the retry ultimately succeeds (server eventually sends full body), but
|
|
|
|
|
// the critical assertion is that a 2xx partial-body delivery-confirmed response is never
|
|
|
|
|
// classified as "failed" — it always routes to success.
|
|
|
|
|
func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Server that sends a 200 response with declared Content-Length but closes
|
|
|
|
|
// the connection before sending all bytes. Go's http.Client sees io.EOF on
|
|
|
|
|
// the body read. proxyA2ARequest captures the partial body + status=200 and
|
|
|
|
|
// returns (200, <partial>, error). executeDelegation's new condition sees
|
|
|
|
|
// status=200 + body > 0 + error != nil → routes to handleSuccess.
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("failed to listen: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer ln.Close()
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
conn, err := ln.Accept()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
|
|
|
|
// Consume the HTTP request
|
|
|
|
|
buf := make([]byte, 2048)
|
|
|
|
|
conn.Read(buf)
|
|
|
|
|
// Send 200 OK with Content-Length: 100 but only 74 bytes of body
|
|
|
|
|
// (less than declared length → io.LimitReader returns io.EOF after reading all 74)
|
|
|
|
|
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
|
|
|
|
|
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
|
|
|
|
|
conn.Write([]byte(resp))
|
|
|
|
|
// Close immediately — client gets io.EOF on body read
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
agentURL := "http://" + ln.Addr().String()
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
expectExecuteDelegationBase(mock)
|
|
|
|
|
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
|
|
|
|
|
|
|
|
|
|
// Execute synchronously (not as a goroutine) so we can check DB state immediately.
|
|
|
|
|
// The handler fires it as goroutine; we call it directly for deterministic testing.
|
|
|
|
|
a2aBody, _ := json.Marshal(map[string]interface{}{
|
|
|
|
|
"jsonrpc": "2.0",
|
|
|
|
|
"id": "1",
|
|
|
|
|
"method": "message/send",
|
|
|
|
|
"params": map[string]interface{}{
|
|
|
|
|
"message": map[string]interface{}{
|
|
|
|
|
"role": "user",
|
|
|
|
|
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond) // let DB writes settle
|
|
|
|
|
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure
|
|
|
|
|
// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx
|
|
|
|
|
// status code (e.g., 500 Internal Server Error with partial body read before connection drop).
|
|
|
|
|
// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure.
|
|
|
|
|
func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Server returns 500 with declared Content-Length but closes connection early.
|
|
|
|
|
// proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error.
|
|
|
|
|
// Returns (500, <partial_body>, BadGateway).
|
|
|
|
|
// New condition: status=500 is NOT >= 200 && < 300 → routes to failure.
|
|
|
|
|
// isTransientProxyError(500) = false → no retry.
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("failed to listen: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer ln.Close()
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
conn, err := ln.Accept()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer conn.Close()
|
|
|
|
|
buf := make([]byte, 2048)
|
|
|
|
|
conn.Read(buf)
|
|
|
|
|
// 500 with Content-Length: 100 but only ~60 bytes of body
|
|
|
|
|
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
|
|
|
|
|
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared
|
|
|
|
|
conn.Write([]byte(resp))
|
|
|
|
|
// Close immediately — client gets io.EOF on body read
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
agentURL := "http://" + ln.Addr().String()
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
expectExecuteDelegationBase(mock)
|
|
|
|
|
expectExecuteDelegationFailed(mock)
|
|
|
|
|
|
|
|
|
|
a2aBody, _ := json.Marshal(map[string]interface{}{
|
|
|
|
|
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
|
|
|
|
"params": map[string]interface{}{
|
|
|
|
|
"message": map[string]interface{}{
|
|
|
|
|
"role": "user",
|
|
|
|
|
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure
|
|
|
|
|
// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body.
|
|
|
|
|
// The new condition requires len(respBody) > 0, so empty body routes to failure.
|
|
|
|
|
func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil.
|
|
|
|
|
// New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300
|
|
|
|
|
// → len(respBody) == 0 → condition FALSE → falls through to failure.
|
|
|
|
|
// isTransientProxyError(502) is TRUE → retry → same result → failure.
|
|
|
|
|
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
w.WriteHeader(http.StatusBadGateway)
|
|
|
|
|
// No body — connection closes normally
|
|
|
|
|
}))
|
|
|
|
|
defer agentServer.Close()
|
|
|
|
|
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
// First attempt: updateDelegationStatus(dispatched) — from expectExecuteDelegationBase
|
|
|
|
|
expectExecuteDelegationBase(mock)
|
|
|
|
|
// Second attempt (retry): updateDelegationStatus(dispatched) again
|
|
|
|
|
mock.ExpectExec("UPDATE activity_logs SET status").
|
|
|
|
|
WithArgs("dispatched", "", testSourceID, testDelegationID).
|
|
|
|
|
WillReturnResult(sqlmock.NewResult(0, 1))
|
|
|
|
|
// Failure: INSERT + UPDATE (failed)
|
|
|
|
|
expectExecuteDelegationFailed(mock)
|
|
|
|
|
|
|
|
|
|
a2aBody, _ := json.Marshal(map[string]interface{}{
|
|
|
|
|
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
|
|
|
|
"params": map[string]interface{}{
|
|
|
|
|
"message": map[string]interface{}{
|
|
|
|
|
"role": "user",
|
|
|
|
|
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response
|
|
|
|
|
// (no error, 200 with body) is unaffected by the new condition. This is the baseline:
|
|
|
|
|
// proxyErr == nil so the new condition never fires.
|
|
|
|
|
func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
mr := setupTestRedis(t)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
|
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
|
|
|
|
|
}))
|
|
|
|
|
defer agentServer.Close()
|
|
|
|
|
|
|
|
|
|
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
|
|
|
|
|
allowLoopbackForTest(t)
|
|
|
|
|
|
|
|
|
|
expectExecuteDelegationBase(mock)
|
|
|
|
|
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`)
|
|
|
|
|
|
|
|
|
|
a2aBody, _ := json.Marshal(map[string]interface{}{
|
|
|
|
|
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
|
|
|
|
"params": map[string]interface{}{
|
|
|
|
|
"message": map[string]interface{}{
|
|
|
|
|
"role": "user",
|
|
|
|
|
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
|
|
|
|
|
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: ledger has rows → returns them (no activity_logs fallback) ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_LedgerRowsReturned(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
// Ledger query returns rows
|
|
|
|
|
ledgerRows := sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}).AddRow(
|
|
|
|
|
"del-ledger-001", "caller-uuid", "callee-uuid",
|
|
|
|
|
"Analyze the codebase for bugs", "in_progress", "", "",
|
|
|
|
|
&now, &now.Add(6*time.Hour), now, now,
|
|
|
|
|
)
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("caller-uuid").
|
|
|
|
|
WillReturnRows(ledgerRows)
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 1 {
|
|
|
|
|
t.Fatalf("expected 1 entry, got %d", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["delegation_id"] != "del-ledger-001" {
|
|
|
|
|
t.Errorf("expected delegation_id 'del-ledger-001', got %v", resp[0]["delegation_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["status"] != "in_progress" {
|
|
|
|
|
t.Errorf("expected status 'in_progress', got %v", resp[0]["status"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["_ledger"] != true {
|
|
|
|
|
t.Errorf("expected _ledger=true marker, got %v", resp[0]["_ledger"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["source_id"] != "caller-uuid" {
|
|
|
|
|
t.Errorf("expected source_id 'caller-uuid', got %v", resp[0]["source_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["target_id"] != "callee-uuid" {
|
|
|
|
|
t.Errorf("expected target_id 'callee-uuid', got %v", resp[0]["target_id"])
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: ledger empty → falls back to activity_logs ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_LedgerEmptyFallsBackToActivityLogs(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Ledger returns empty → falls back to activity_logs
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
activityRows := sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
}).AddRow(
|
|
|
|
|
"act-001", "delegation", "ws-source", "ws-target",
|
|
|
|
|
"Delegating to ws-target", "pending", "", "",
|
|
|
|
|
"del-old-001", now,
|
|
|
|
|
)
|
|
|
|
|
mock.ExpectQuery("SELECT id, activity_type").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(activityRows)
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 1 {
|
|
|
|
|
t.Fatalf("expected 1 entry from fallback, got %d", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["delegation_id"] != "del-old-001" {
|
|
|
|
|
t.Errorf("expected delegation_id 'del-old-001' from activity_logs, got %v", resp[0]["delegation_id"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["type"] != "delegation" {
|
|
|
|
|
t.Errorf("expected type 'delegation' from activity_logs, got %v", resp[0]["type"])
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: both ledger and activity_logs empty → [] ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_BothEmptyReturnsEmptyArray(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Ledger empty
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}))
|
|
|
|
|
// activity_logs also empty
|
|
|
|
|
mock.ExpectQuery("SELECT id, activity_type").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 0 {
|
|
|
|
|
t.Errorf("expected empty array, got %d entries", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: ledger query error → falls back to activity_logs ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_LedgerQueryErrorFallsBackToActivityLogs(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
// Ledger query fails → fallback to activity_logs
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnError(fmt.Errorf("table does not exist"))
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
activityRows := sqlmock.NewRows([]string{
|
|
|
|
|
"id", "activity_type", "source_id", "target_id",
|
|
|
|
|
"summary", "status", "error_detail", "response_body",
|
|
|
|
|
"delegation_id", "created_at",
|
|
|
|
|
}).AddRow(
|
|
|
|
|
"act-002", "delegation", "ws-source", "ws-target",
|
|
|
|
|
"Some task", "completed", "", "result here",
|
|
|
|
|
"del-pre-318", now,
|
|
|
|
|
)
|
|
|
|
|
mock.ExpectQuery("SELECT id, activity_type").
|
|
|
|
|
WithArgs("ws-source").
|
|
|
|
|
WillReturnRows(activityRows)
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "ws-source"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/ws-source/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 1 || resp[0]["delegation_id"] != "del-pre-318" {
|
|
|
|
|
t.Errorf("expected 1 activity_logs entry, got %v", resp)
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: ledger completed delegation includes result_preview ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_LedgerCompletedIncludesResultPreview(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
ledgerRows := sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}).AddRow(
|
|
|
|
|
"del-complete-001", "caller-uuid", "callee-uuid",
|
|
|
|
|
"Run analysis", "completed", "Analysis complete: 42 issues found", "",
|
|
|
|
|
&now, &now.Add(6*time.Hour), now, now,
|
|
|
|
|
)
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("caller-uuid").
|
|
|
|
|
WillReturnRows(ledgerRows)
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 1 {
|
|
|
|
|
t.Fatalf("expected 1 entry, got %d", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["status"] != "completed" {
|
|
|
|
|
t.Errorf("expected status 'completed', got %v", resp[0]["status"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["response_preview"] != "Analysis complete: 42 issues found" {
|
|
|
|
|
t.Errorf("expected response_preview, got %v", resp[0]["response_preview"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["error"] != nil {
|
|
|
|
|
t.Errorf("expected no error on completed, got %v", resp[0]["error"])
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---------- ListDelegations: ledger failed delegation includes error_detail ----------
|
|
|
|
|
|
|
|
|
|
func TestListDelegations_LedgerFailedIncludesErrorDetail(t *testing.T) {
|
|
|
|
|
mock := setupTestDB(t)
|
|
|
|
|
setupTestRedis(t)
|
|
|
|
|
broadcaster := newTestBroadcaster()
|
|
|
|
|
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
|
|
|
|
dh := NewDelegationHandler(wh, broadcaster)
|
|
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
|
ledgerRows := sqlmock.NewRows([]string{
|
|
|
|
|
"delegation_id", "caller_id", "callee_id", "task_preview",
|
|
|
|
|
"status", "result_preview", "error_detail", "last_heartbeat",
|
|
|
|
|
"deadline", "created_at", "updated_at",
|
|
|
|
|
}).AddRow(
|
|
|
|
|
"del-failed-001", "caller-uuid", "callee-uuid",
|
|
|
|
|
"Fetch data", "failed", "", "Callee workspace not reachable",
|
|
|
|
|
&now, &now.Add(6*time.Hour), now, now,
|
|
|
|
|
)
|
|
|
|
|
mock.ExpectQuery("SELECT d.delegation_id, d.caller_id, d.callee_id, d.task_preview").
|
|
|
|
|
WithArgs("caller-uuid").
|
|
|
|
|
WillReturnRows(ledgerRows)
|
|
|
|
|
|
|
|
|
|
w := httptest.NewRecorder()
|
|
|
|
|
c, _ := gin.CreateTestContext(w)
|
|
|
|
|
c.Params = gin.Params{{Key: "id", Value: "caller-uuid"}}
|
|
|
|
|
c.Request = httptest.NewRequest("GET", "/workspaces/caller-uuid/delegations", nil)
|
|
|
|
|
|
|
|
|
|
dh.ListDelegations(c)
|
|
|
|
|
|
|
|
|
|
if w.Code != http.StatusOK {
|
|
|
|
|
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
|
|
|
|
}
|
|
|
|
|
var resp []map[string]interface{}
|
|
|
|
|
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
|
|
|
|
t.Fatalf("failed to parse response: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(resp) != 1 {
|
|
|
|
|
t.Fatalf("expected 1 entry, got %d", len(resp))
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["status"] != "failed" {
|
|
|
|
|
t.Errorf("expected status 'failed', got %v", resp[0]["status"])
|
|
|
|
|
}
|
|
|
|
|
if resp[0]["error"] != "Callee workspace not reachable" {
|
|
|
|
|
t.Errorf("expected error detail, got %v", resp[0]["error"])
|
|
|
|
|
}
|
|
|
|
|
if err := mock.ExpectationsWereMet(); err != nil {
|
|
|
|
|
t.Errorf("unmet sqlmock expectations: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|