Merge pull request 'fix(#376): store proxy-path delegation results in activity_logs' (#483) from fix/376-activity-delegation-polling into staging
All checks were successful
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
All checks were successful
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
This commit is contained in:
commit
8ca7576567
@ -512,6 +512,13 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
// Fix #376: when the proxied method is 'delegate_result', also write
|
||||
// the delegation row so heartbeat delegation polling can find it.
|
||||
// Without this, proxy-path delegation results are invisible to
|
||||
// ListDelegations / heartbeat delegation polling.
|
||||
if a2aMethod == "delegate_result" {
|
||||
h.logA2ADelegationResult(ctx, workspaceID, callerID, body, respBody, resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// Track LLM token usage for cost transparency (#593).
|
||||
|
||||
@ -336,6 +336,93 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
}
|
||||
}
|
||||
|
||||
// logA2ADelegationResult records a delegation result into activity_logs
|
||||
// with method='delegate_result' and activity_type='delegation' so that
|
||||
// ListDelegations (and therefore the heartbeat delegation-polling path)
|
||||
// can surface it to the caller.
|
||||
//
|
||||
// This bridges the gap for proxy-path delegations: when a workspace
|
||||
// sends a delegate_task via POST /workspaces/:id/a2a, the proxy stores
|
||||
// the response here with the correct method so heartbeat polling finds it.
|
||||
// (The non-proxy path via executeDelegation already writes correctly via
|
||||
// its own INSERT at delegation.go:422.)
|
||||
//
|
||||
// Fire-and-forget: runs in a goroutine so it never adds latency to the
|
||||
// critical A2A response path. Errors are logged but non-fatal.
|
||||
func (h *WorkspaceHandler) logA2ADelegationResult(ctx context.Context, callerID, targetID string, reqBody, respBody []byte, statusCode int) {
|
||||
// Extract delegation_id from the request body (JSON-RPC delegate_result).
|
||||
var req struct {
|
||||
Params struct {
|
||||
Data struct {
|
||||
DelegationID string `json:"delegation_id"`
|
||||
} `json:"data"`
|
||||
} `json:"params"`
|
||||
}
|
||||
if err := json.Unmarshal(reqBody, &req); err != nil {
|
||||
log.Printf("logA2ADelegationResult: failed to parse req body: %v", err)
|
||||
return
|
||||
}
|
||||
delegationID := req.Params.Data.DelegationID
|
||||
if delegationID == "" {
|
||||
log.Printf("logA2ADelegationResult: no delegation_id in request body")
|
||||
return
|
||||
}
|
||||
|
||||
// Extract text from the response body — the delegate_result response
|
||||
// carries the agent's answer in result.data.text or result.text.
|
||||
var responseText string
|
||||
var respTop map[string]json.RawMessage
|
||||
if json.Unmarshal(respBody, &respTop) == nil {
|
||||
if result, ok := respTop["result"]; ok {
|
||||
var resultObj map[string]json.RawMessage
|
||||
if json.Unmarshal(result, &resultObj) == nil {
|
||||
if textRaw, ok := resultObj["text"]; ok {
|
||||
json.Unmarshal(textRaw, &responseText)
|
||||
} else if dataRaw, ok := resultObj["data"]; ok {
|
||||
var dataObj map[string]json.RawMessage
|
||||
if json.Unmarshal(dataRaw, &dataObj) == nil {
|
||||
if textRaw, ok := dataObj["text"]; ok {
|
||||
json.Unmarshal(textRaw, &responseText)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if responseText == "" {
|
||||
if textRaw, ok := respTop["text"]; ok {
|
||||
json.Unmarshal(textRaw, &responseText)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
status := "completed"
|
||||
if statusCode >= 300 {
|
||||
status = "failed"
|
||||
}
|
||||
|
||||
summary := "Delegation completed"
|
||||
if status == "failed" {
|
||||
summary = "Delegation failed"
|
||||
}
|
||||
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"text": responseText,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
if _, err := db.DB.ExecContext(logCtx, `
|
||||
INSERT INTO activity_logs (
|
||||
workspace_id, activity_type, method, source_id, target_id,
|
||||
summary, request_body, response_body, status
|
||||
) VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, $6::jsonb, $7)
|
||||
`, callerID, callerID, targetID, summary, string(reqBody), string(respJSON), status); err != nil {
|
||||
log.Printf("logA2ADelegationResult: INSERT failed for delegation %s: %v", delegationID, err)
|
||||
}
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) *string {
|
||||
if s == "" {
|
||||
return nil
|
||||
|
||||
@ -2017,6 +2017,131 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// logA2ADelegationResult — fix #376: proxy-path delegation results
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestLogA2ADelegationResult_Smoke verifies that a successful delegation result
|
||||
// fires an INSERT with activity_type='delegation', method='delegate_result',
|
||||
// and status='completed'. The response text is extracted from result.data.text.
|
||||
func TestLogA2ADelegationResult_Smoke(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// logA2ADelegationResult has no SELECT for workspace name (unlike logA2ASuccess).
|
||||
// It fires the INSERT directly in a goroutine.
|
||||
mock.ExpectExec(`^INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
"ws-caller", // workspace_id ($1)
|
||||
"ws-caller", // source_id ($2)
|
||||
"ws-target", // target_id ($3)
|
||||
"Delegation completed", // summary ($4)
|
||||
sqlmock.AnyArg(), // request_body ($5)
|
||||
sqlmock.AnyArg(), // response_body ($6)
|
||||
"completed", // status ($7)
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ADelegationResult(
|
||||
context.Background(),
|
||||
"ws-caller", "ws-target",
|
||||
[]byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-abc123"}}}`),
|
||||
[]byte(`{"jsonrpc":"2.0","id":"1","result":{"data":{"text":"the answer"}}}`),
|
||||
200,
|
||||
)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogA2ADelegationResult_FailedStatus verifies that a 4xx/5xx response
|
||||
// from the target is recorded with status='failed' and summary='Delegation failed'.
|
||||
func TestLogA2ADelegationResult_FailedStatus(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectExec(`^INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
"ws-a", "ws-a", "ws-b",
|
||||
"Delegation failed",
|
||||
sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(),
|
||||
"failed",
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ADelegationResult(
|
||||
context.Background(),
|
||||
"ws-a", "ws-b",
|
||||
[]byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-xyz"}}}`),
|
||||
[]byte(`{"jsonrpc":"2.0","id":"2","error":{"code":-32600,"message":"bad request"}}`),
|
||||
400,
|
||||
)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogA2ADelegationResult_NoDelegationID skips the INSERT when the
|
||||
// request body carries no delegation_id (logically impossible but defensive).
|
||||
func TestLogA2ADelegationResult_NoDelegationID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// No ExpectExec — the function must return early without any DB write.
|
||||
|
||||
handler.logA2ADelegationResult(
|
||||
context.Background(),
|
||||
"ws-x", "ws-y",
|
||||
[]byte(`{"method":"delegate_task","params":{"data":{}}}`),
|
||||
[]byte(`{}`),
|
||||
200,
|
||||
)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB call: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLogA2ADelegationResult_TextFromResultText verifies that when the
|
||||
// response text lives at result.text (flat JSON-RPC), it is still captured.
|
||||
func TestLogA2ADelegationResult_TextFromResultText(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectExec(`^INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
"ws-1", "ws-1", "ws-2",
|
||||
"Delegation completed",
|
||||
sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(),
|
||||
"completed",
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ADelegationResult(
|
||||
context.Background(),
|
||||
"ws-1", "ws-2",
|
||||
[]byte(`{"method":"delegate_task","params":{"data":{"delegation_id":"del-flat"}}}`),
|
||||
[]byte(`{"jsonrpc":"2.0","id":"3","result":{"text":"flat response"}}`),
|
||||
200,
|
||||
)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// A2A auto-wake: hibernated workspace (#711)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
Loading…
Reference in New Issue
Block a user