From 6c618c9c3fc6649331bb7322326e975ae474e0db Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 15:07:25 -0700 Subject: [PATCH] feat: add tool_trace to activity_logs for platform-level agent observability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every A2A response now includes a tool_trace — the list of tools/commands the agent actually invoked during execution. This enables verifying agent claims against what they actually did, catches hallucinated "I checked X" responses, and provides an audit trail for the CEO to control hundreds of agents by checking the top-level PM's trace. Changes: - Python runtime: collect tool name/input/output_preview on every on_tool_start/on_tool_end event, embed in Message.metadata.tool_trace - Go platform: extract tool_trace from A2A response metadata, store in new activity_logs.tool_trace JSONB column with GIN index - Activity API: expose tool_trace in List and broadcast endpoints - Migration 039: adds tool_trace column + GIN index Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/a2a_proxy_helpers.go | 35 +++++++++++++++++++ .../internal/handlers/activity.go | 30 +++++++++++----- .../039_activity_tool_trace.down.sql | 2 ++ .../migrations/039_activity_tool_trace.up.sql | 9 +++++ workspace/a2a_executor.py | 22 +++++++++--- 5 files changed, 85 insertions(+), 13 deletions(-) create mode 100644 workspace-server/migrations/039_activity_tool_trace.down.sql create mode 100644 workspace-server/migrations/039_activity_tool_trace.up.sql diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 887a2057..ebbd642d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -161,6 +161,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle }() } summary := a2aMethod + " → " + wsNameForLog + toolTrace := extractToolTrace(respBody) go func(parent context.Context) { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() @@ -173,6 +174,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle Summary: &summary, RequestBody: json.RawMessage(body), ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, DurationMs: &durationMs, Status: logStatus, }) @@ -234,6 +236,39 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e // matching (the wsauth errors are typed for the invalid case). var errInvalidCallerToken = errors.New("missing caller auth token") +// extractToolTrace pulls metadata.tool_trace from an A2A JSON-RPC response. +// Returns nil when absent or malformed — callers can pass it straight through. +func extractToolTrace(respBody []byte) json.RawMessage { + if len(respBody) == 0 { + return nil + } + var top map[string]json.RawMessage + if err := json.Unmarshal(respBody, &top); err != nil { + return nil + } + rawResult, ok := top["result"] + if !ok { + return nil + } + var result map[string]json.RawMessage + if err := json.Unmarshal(rawResult, &result); err != nil { + return nil + } + rawMeta, ok := result["metadata"] + if !ok { + return nil + } + var meta map[string]json.RawMessage + if err := json.Unmarshal(rawMeta, &meta); err != nil { + return nil + } + trace, ok := meta["tool_trace"] + if !ok || len(trace) == 0 { + return nil + } + return trace +} + // extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body // and persists it via upsertTokenUsage. Safe to call in a goroutine — logs // errors but never panics. ctx must already be detached from the request. diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 8ff6e984..4d98e9fa 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -40,7 +40,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // Build query with optional filters query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, - summary, request_body, response_body, duration_ms, status, error_detail, created_at + summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at FROM activity_logs WHERE workspace_id = $1` args := []interface{}{workspaceID} argIdx := 2 @@ -75,12 +75,12 @@ func (h *ActivityHandler) List(c *gin.Context) { for rows.Next() { var id, wsID, actType, status string var sourceID, targetID, method, summary, errorDetail *string - var reqBody, respBody []byte + var reqBody, respBody, toolTrace []byte var durationMs *int var createdAt time.Time if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, - &summary, &reqBody, &respBody, &durationMs, &status, &errorDetail, &createdAt); err != nil { + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil { log.Printf("Activity scan error: %v", err) continue } @@ -104,6 +104,9 @@ func (h *ActivityHandler) List(c *gin.Context) { if respBody != nil { entry["response_body"] = json.RawMessage(respBody) } + if toolTrace != nil { + entry["tool_trace"] = json.RawMessage(toolTrace) + } activities = append(activities, entry) } if err := rows.Err(); err != nil { @@ -382,7 +385,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac respJSON = []byte("null") } - var reqStr, respStr *string + var reqStr, respStr, traceStr *string if params.RequestBody != nil { s := string(reqJSON) reqStr = &s @@ -391,12 +394,16 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac s := string(respJSON) respStr = &s } + if len(params.ToolTrace) > 0 { + s := string(params.ToolTrace) + traceStr = &s + } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, duration_ms, status, error_detail) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12) `, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID, - params.Method, params.Summary, reqStr, respStr, + params.Method, params.Summary, reqStr, respStr, traceStr, params.DurationMs, params.Status, params.ErrorDetail) if err != nil { log.Printf("LogActivity insert error: %v", err) @@ -405,7 +412,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac // Broadcast ACTIVITY_LOGGED event if broadcaster != nil { - broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", map[string]interface{}{ + payload := map[string]interface{}{ "activity_type": params.ActivityType, "method": params.Method, "summary": params.Summary, @@ -413,7 +420,11 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac "source_id": params.SourceID, "target_id": params.TargetID, "duration_ms": params.DurationMs, - }) + } + if len(params.ToolTrace) > 0 { + payload["tool_trace"] = json.RawMessage(params.ToolTrace) + } + broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", payload) } } @@ -426,6 +437,7 @@ type ActivityParams struct { Summary *string RequestBody interface{} ResponseBody interface{} + ToolTrace json.RawMessage // tools/commands the agent actually invoked DurationMs *int Status string // ok, error, timeout ErrorDetail *string diff --git a/workspace-server/migrations/039_activity_tool_trace.down.sql b/workspace-server/migrations/039_activity_tool_trace.down.sql new file mode 100644 index 00000000..73691b56 --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_activity_logs_tool_trace; +ALTER TABLE activity_logs DROP COLUMN IF EXISTS tool_trace; diff --git a/workspace-server/migrations/039_activity_tool_trace.up.sql b/workspace-server/migrations/039_activity_tool_trace.up.sql new file mode 100644 index 00000000..03dd0a4c --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.up.sql @@ -0,0 +1,9 @@ +-- Add tool_trace column to activity_logs for platform-level observability. +-- Stores the list of tools/commands an agent actually invoked during an A2A +-- call, extracted from the A2A response metadata. Enables verifying agent +-- claims ("I checked X") against what tools were actually called. +ALTER TABLE activity_logs ADD COLUMN IF NOT EXISTS tool_trace JSONB; + +-- Index for querying which agents used specific tools +CREATE INDEX IF NOT EXISTS idx_activity_logs_tool_trace + ON activity_logs USING gin (tool_trace) WHERE tool_trace IS NOT NULL; diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 81b17a35..13631ed1 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -304,6 +304,10 @@ class LangGraphA2AExecutor(AgentExecutor): else None ) + # ── Tool trace: collect every tool invocation for + # platform-level observability ──────────────────── + tool_trace: list[dict] = [] + async for event in self.agent.astream_events( {"messages": messages}, config=run_config, @@ -334,7 +338,12 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") + tool_input = event.get("data", {}).get("input", "") logger.debug("SSE: tool start — %s", tool_name) + tool_trace.append({ + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + }) if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -342,7 +351,11 @@ class LangGraphA2AExecutor(AgentExecutor): ) elif kind == "on_tool_end": - logger.debug("SSE: tool end — %s", event.get("name", "?")) + tool_end_name = event.get("name", "?") + tool_output = event.get("data", {}).get("output", "") + logger.debug("SSE: tool end — %s", tool_end_name) + if tool_trace and tool_trace[-1]["tool"] == tool_end_name: + tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry @@ -383,9 +396,10 @@ class LangGraphA2AExecutor(AgentExecutor): # Non-streaming: ResultAggregator.consume_all() returns this # immediately as the response (a2a_client.py reads .parts[0].text). # Streaming: yielded as the last SSE event in the stream. - await event_queue.enqueue_event( - new_agent_text_message(final_text, task_id=task_id, context_id=context_id) - ) + msg = new_agent_text_message(final_text, task_id=task_id, context_id=context_id) + if tool_trace: + msg.metadata = {"tool_trace": tool_trace} + await event_queue.enqueue_event(msg) _result = final_text except Exception as e: