diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 887a2057..ebbd642d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -161,6 +161,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle }() } summary := a2aMethod + " → " + wsNameForLog + toolTrace := extractToolTrace(respBody) go func(parent context.Context) { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() @@ -173,6 +174,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle Summary: &summary, RequestBody: json.RawMessage(body), ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, DurationMs: &durationMs, Status: logStatus, }) @@ -234,6 +236,39 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e // matching (the wsauth errors are typed for the invalid case). var errInvalidCallerToken = errors.New("missing caller auth token") +// extractToolTrace pulls metadata.tool_trace from an A2A JSON-RPC response. +// Returns nil when absent or malformed — callers can pass it straight through. +func extractToolTrace(respBody []byte) json.RawMessage { + if len(respBody) == 0 { + return nil + } + var top map[string]json.RawMessage + if err := json.Unmarshal(respBody, &top); err != nil { + return nil + } + rawResult, ok := top["result"] + if !ok { + return nil + } + var result map[string]json.RawMessage + if err := json.Unmarshal(rawResult, &result); err != nil { + return nil + } + rawMeta, ok := result["metadata"] + if !ok { + return nil + } + var meta map[string]json.RawMessage + if err := json.Unmarshal(rawMeta, &meta); err != nil { + return nil + } + trace, ok := meta["tool_trace"] + if !ok || len(trace) == 0 { + return nil + } + return trace +} + // extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body // and persists it via upsertTokenUsage. Safe to call in a goroutine — logs // errors but never panics. ctx must already be detached from the request. diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 8ff6e984..4d98e9fa 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -40,7 +40,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // Build query with optional filters query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, - summary, request_body, response_body, duration_ms, status, error_detail, created_at + summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at FROM activity_logs WHERE workspace_id = $1` args := []interface{}{workspaceID} argIdx := 2 @@ -75,12 +75,12 @@ func (h *ActivityHandler) List(c *gin.Context) { for rows.Next() { var id, wsID, actType, status string var sourceID, targetID, method, summary, errorDetail *string - var reqBody, respBody []byte + var reqBody, respBody, toolTrace []byte var durationMs *int var createdAt time.Time if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, - &summary, &reqBody, &respBody, &durationMs, &status, &errorDetail, &createdAt); err != nil { + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil { log.Printf("Activity scan error: %v", err) continue } @@ -104,6 +104,9 @@ func (h *ActivityHandler) List(c *gin.Context) { if respBody != nil { entry["response_body"] = json.RawMessage(respBody) } + if toolTrace != nil { + entry["tool_trace"] = json.RawMessage(toolTrace) + } activities = append(activities, entry) } if err := rows.Err(); err != nil { @@ -382,7 +385,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac respJSON = []byte("null") } - var reqStr, respStr *string + var reqStr, respStr, traceStr *string if params.RequestBody != nil { s := string(reqJSON) reqStr = &s @@ -391,12 +394,16 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac s := string(respJSON) respStr = &s } + if len(params.ToolTrace) > 0 { + s := string(params.ToolTrace) + traceStr = &s + } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, duration_ms, status, error_detail) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12) `, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID, - params.Method, params.Summary, reqStr, respStr, + params.Method, params.Summary, reqStr, respStr, traceStr, params.DurationMs, params.Status, params.ErrorDetail) if err != nil { log.Printf("LogActivity insert error: %v", err) @@ -405,7 +412,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac // Broadcast ACTIVITY_LOGGED event if broadcaster != nil { - broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", map[string]interface{}{ + payload := map[string]interface{}{ "activity_type": params.ActivityType, "method": params.Method, "summary": params.Summary, @@ -413,7 +420,11 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac "source_id": params.SourceID, "target_id": params.TargetID, "duration_ms": params.DurationMs, - }) + } + if len(params.ToolTrace) > 0 { + payload["tool_trace"] = json.RawMessage(params.ToolTrace) + } + broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", payload) } } @@ -426,6 +437,7 @@ type ActivityParams struct { Summary *string RequestBody interface{} ResponseBody interface{} + ToolTrace json.RawMessage // tools/commands the agent actually invoked DurationMs *int Status string // ok, error, timeout ErrorDetail *string diff --git a/workspace-server/migrations/039_activity_tool_trace.down.sql b/workspace-server/migrations/039_activity_tool_trace.down.sql new file mode 100644 index 00000000..73691b56 --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_activity_logs_tool_trace; +ALTER TABLE activity_logs DROP COLUMN IF EXISTS tool_trace; diff --git a/workspace-server/migrations/039_activity_tool_trace.up.sql b/workspace-server/migrations/039_activity_tool_trace.up.sql new file mode 100644 index 00000000..03dd0a4c --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.up.sql @@ -0,0 +1,9 @@ +-- Add tool_trace column to activity_logs for platform-level observability. +-- Stores the list of tools/commands an agent actually invoked during an A2A +-- call, extracted from the A2A response metadata. Enables verifying agent +-- claims ("I checked X") against what tools were actually called. +ALTER TABLE activity_logs ADD COLUMN IF NOT EXISTS tool_trace JSONB; + +-- Index for querying which agents used specific tools +CREATE INDEX IF NOT EXISTS idx_activity_logs_tool_trace + ON activity_logs USING gin (tool_trace) WHERE tool_trace IS NOT NULL; diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 81b17a35..13631ed1 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -304,6 +304,10 @@ class LangGraphA2AExecutor(AgentExecutor): else None ) + # ── Tool trace: collect every tool invocation for + # platform-level observability ──────────────────── + tool_trace: list[dict] = [] + async for event in self.agent.astream_events( {"messages": messages}, config=run_config, @@ -334,7 +338,12 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") + tool_input = event.get("data", {}).get("input", "") logger.debug("SSE: tool start — %s", tool_name) + tool_trace.append({ + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + }) if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -342,7 +351,11 @@ class LangGraphA2AExecutor(AgentExecutor): ) elif kind == "on_tool_end": - logger.debug("SSE: tool end — %s", event.get("name", "?")) + tool_end_name = event.get("name", "?") + tool_output = event.get("data", {}).get("output", "") + logger.debug("SSE: tool end — %s", tool_end_name) + if tool_trace and tool_trace[-1]["tool"] == tool_end_name: + tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry @@ -383,9 +396,10 @@ class LangGraphA2AExecutor(AgentExecutor): # Non-streaming: ResultAggregator.consume_all() returns this # immediately as the response (a2a_client.py reads .parts[0].text). # Streaming: yielded as the last SSE event in the stream. - await event_queue.enqueue_event( - new_agent_text_message(final_text, task_id=task_id, context_id=context_id) - ) + msg = new_agent_text_message(final_text, task_id=task_id, context_id=context_id) + if tool_trace: + msg.metadata = {"tool_trace": tool_trace} + await event_queue.enqueue_event(msg) _result = final_text except Exception as e: