fix(chat): reconstruct agent tool-chain from agent_log rows when tool_trace empty (core#2636 follow-up) #2639

Merged
devops-engineer merged 2 commits from feat/2636b-reconstruct-tooltrace-from-agentlog into main 2026-06-12 11:21:06 +00:00
4 changed files with 280 additions and 9 deletions
@@ -45,8 +45,12 @@ export function ToolTraceChips({ trace }: { trace: ToolTraceEntry[] }) {
* inline chip and never render raw secrets (input is a stringified
* preview, not the live arg values). */
function formatTool(t: ToolTraceEntry): string {
const tool = t.tool.trim();
const input = (t.input ?? "").trim();
if (!input) return `${t.tool}(…)`;
// Reconstructed entries (agent_log source) already carry the full
// "name(…)" summary and no input — render as-is. Tool-trace-column
// entries carry a bare tool name + an input preview.
if (!input) return tool;
const preview = input.length > 60 ? `${input.slice(0, 60)}` : input;
return `${t.tool}(${preview})`;
return `${tool}(${preview})`;
}
@@ -32,3 +32,23 @@ describe("ToolTraceChips (core#2636 tool-chain persistence)", () => {
expect(screen.getByText("1 tool used")).toBeTruthy();
});
});
describe("ToolTraceChips formatTool shapes", () => {
it("renders a reconstructed entry (no input) as-is, no doubled parens", () => {
const { container } = render(
<ToolTraceChips trace={[{ tool: "mcp__platform__create_request(…)" }]} />,
);
fireEvent.click(container.querySelector("button")!);
const li = container.querySelector("li")!;
expect(li.textContent).toBe("🛠 mcp__platform__create_request(…)");
// No doubled parens from the formatter appending its own "(…)".
expect(li.textContent).not.toContain("(…)(…)");
});
it("renders a column-source entry as tool(input)", () => {
const { container } = render(
<ToolTraceChips trace={[{ tool: "Read", input: "/tmp/foo" }]} />,
);
fireEvent.click(container.querySelector("button")!);
expect(container.querySelector("li")!.textContent).toBe("🛠 Read(/tmp/foo)");
});
});
@@ -83,6 +83,11 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt
defer rows.Close()
var messages []ChatMessage
// Turns whose agent message has no explicit tool_trace column but DID
// run for a measurable duration are candidates for reconstruction from
// the per-tool agent_log rows (the live-feed source) — see
// reconstructToolTracesFromAgentLog. Tracks (message index, window).
var reconstructTargets []agentTurnWindow
rowCount := 0
for rows.Next() {
var (
@@ -91,8 +96,9 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt
rawRequest sql.NullString
rawResponse sql.NullString
rawTrace sql.NullString
durationMs sql.NullInt64
)
if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse, &rawTrace); err != nil {
if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse, &rawTrace, &durationMs); err != nil {
// Skip malformed row, continue. The error is logged at
// the caller (handler) layer; an isolated bad row should
// not abort the whole page.
@@ -110,12 +116,38 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt
if rawTrace.Valid && rawTrace.String != "" && rawTrace.String != "null" {
toolTrace = json.RawMessage(rawTrace.String)
}
before := len(messages)
messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, toolTrace, IsInternalSelfMessage)...)
// If the row produced an agent message with NO explicit tool_trace
// but ran long enough to have invoked tools, mark it for
// agent_log reconstruction. duration_ms bounds the window
// [end-duration, end] the per-tool rows fall in.
if toolTrace == nil && durationMs.Valid && durationMs.Int64 > 0 {
for i := before; i < len(messages); i++ {
if messages[i].Role == "agent" {
reconstructTargets = append(reconstructTargets, agentTurnWindow{
msgIndex: i,
start: createdAt.Add(-time.Duration(durationMs.Int64) * time.Millisecond),
end: createdAt,
})
}
}
}
}
if err := rows.Err(); err != nil {
return nil, false, err
}
// Reconstruct tool chains for claude-code-style turns (tool steps are
// emitted as per-tool agent_log rows, not into the response metadata's
// tool_trace) so the canvas re-renders them after a reload — the
// platform agent + every claude-code workspace take this path
// (core#2636 follow-up). Best-effort: a query error leaves those turns
// without a chain rather than failing the whole page.
if len(reconstructTargets) > 0 {
s.reconstructToolTracesFromAgentLog(ctx, workspaceID, messages, reconstructTargets)
}
// Wire order: oldest-first within the page so canvas (and any
// future client) can render chronologically without per-pair
// reordering. The SQL is `ORDER BY created_at DESC LIMIT N` for
@@ -135,6 +167,101 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt
return messages, reachedEnd, nil
}
// agentTurnWindow ties one agent ChatMessage (by slice index) to the
// [start, end] wall-clock window of its turn, so per-tool agent_log rows
// in that window can be reattached as the turn's tool chain.
type agentTurnWindow struct {
msgIndex int
start time.Time
end time.Time
}
// reconstructToolTracesFromAgentLog fills ToolTrace for agent turns that
// had none on the a2a_receive row, by reading the per-tool agent_log
// rows (summary "🛠 <tool>(…)", source_id = the workspace itself) that
// fall inside each turn's window. ONE batched query bounded by the
// overall page span; each row is bucketed into the turn whose window
// contains it. Mutates messages in place. Best-effort — logs and
// returns on error.
func (s *PostgresMessageStore) reconstructToolTracesFromAgentLog(
ctx context.Context, workspaceID string, messages []ChatMessage, targets []agentTurnWindow,
) {
minStart, maxEnd := targets[0].start, targets[0].end
for _, t := range targets {
if t.start.Before(minStart) {
minStart = t.start
}
if t.end.After(maxEnd) {
maxEnd = t.end
}
}
rows, err := s.db.QueryContext(ctx, `
SELECT created_at, summary
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'agent_log'
AND source_id = $1
AND summary LIKE $2
AND created_at >= $3
AND created_at <= $4
ORDER BY created_at ASC
LIMIT 2000
`, workspaceID, toolSummaryPrefix+"%", minStart, maxEnd)
if err != nil {
log.Printf("messagestore: agent_log tool-trace reconstruction query failed for %s: %v (chains omitted)", workspaceID, err)
return
}
defer rows.Close()
type logRow struct {
at time.Time
summary string
}
var logs []logRow
for rows.Next() {
var at time.Time
var summary sql.NullString
if rows.Scan(&at, &summary) == nil && summary.Valid && summary.String != "" {
logs = append(logs, logRow{at: at, summary: summary.String})
}
}
for _, t := range targets {
var entries []toolTraceEntry
for _, lr := range logs {
if (lr.at.Equal(t.start) || lr.at.After(t.start)) && (lr.at.Equal(t.end) || lr.at.Before(t.end)) {
entries = append(entries, toolTraceEntry{Tool: toolNameFromSummary(lr.summary)})
}
}
if len(entries) == 0 {
continue
}
if raw, mErr := json.Marshal(entries); mErr == nil {
messages[t.msgIndex].ToolTrace = raw
}
}
}
// toolTraceEntry mirrors the canvas ToolTraceEntry / the tool_trace array
// element shape ({tool, input}). Reconstructed rows carry only the tool
// (the agent_log summary), no input.
type toolTraceEntry struct {
Tool string `json:"tool"`
Input string `json:"input,omitempty"`
}
// toolSummaryPrefix marks a tool-use agent_log summary (the executor's
// _report_tool_use writes "🛠 <tool>(…)"). The reconstruction query
// filters to this prefix so NON-tool agent_log summaries that happen to
// fall in the turn window are never rehydrated as fake tools (CR2).
const toolSummaryPrefix = "🛠 "
// toolNameFromSummary strips the leading "🛠 " marker from a tool-use
// agent_log summary, leaving e.g. "mcp__platform__create_request(…)".
func toolNameFromSummary(summary string) string {
return strings.TrimPrefix(strings.TrimSpace(summary), toolSummaryPrefix)
}
// reverseRowChunks groups msgs by adjacent same-Timestamp runs and
// reverses the run order, preserving within-run order. Pairs of
// (user, agent) emitted by activityRowToChatMessages share a
@@ -171,7 +298,7 @@ func reverseRowChunks(msgs []ChatMessage) []ChatMessage {
func (s *PostgresMessageStore) queryActivityRows(ctx context.Context, workspaceID string, opts ListOptions) (*sql.Rows, error) {
if opts.HasBefore {
return s.db.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
@@ -182,7 +309,7 @@ func (s *PostgresMessageStore) queryActivityRows(ctx context.Context, workspaceI
`, workspaceID, opts.BeforeTS, opts.Limit)
}
return s.db.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
@@ -309,16 +309,16 @@ func TestList_WireOrderIsOldestFirstAcrossPagedRows(t *testing.T) {
// Server's SQL is ORDER BY created_at DESC. Build mock rows in
// THAT order so the row-aware reversal has work to do.
rows := sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace"}).
rows := sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}).
AddRow(mustParseTime(t, "2026-05-05T00:03:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u3"}]}}}`,
`{"result":"a3"}`, nil).
`{"result":"a3"}`, nil, nil).
AddRow(mustParseTime(t, "2026-05-05T00:02:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u2"}]}}}`,
`{"result":"a2"}`, nil).
`{"result":"a2"}`, nil, nil).
AddRow(mustParseTime(t, "2026-05-05T00:01:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u1"}]}}}`,
`{"result":"a1"}`, nil)
`{"result":"a1"}`, nil, nil)
mock.ExpectQuery(`SELECT created_at, status, request_body::text, response_body::text`).
WillReturnRows(rows)
@@ -583,3 +583,123 @@ func TestActivityRow_AgentMessageCarriesToolTrace(t *testing.T) {
t.Errorf("agent message must carry the tool_trace; got %q", string(msgs[1].ToolTrace))
}
}
// TestList_ReconstructsToolTraceFromAgentLog (core#2636 follow-up): a
// claude-code turn stores tool steps as per-tool agent_log rows, not in
// the response tool_trace column. List must reconstruct the chain from
// those rows so the chat reload re-renders it.
func TestList_ReconstructsToolTraceFromAgentLog(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()
turnEnd := mustParseTime(t, "2026-06-12T00:00:10Z")
// a2a_receive turn: 10s duration, NULL tool_trace.
mock.ExpectQuery(`SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms`).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}).
AddRow(turnEnd, "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"do it"}]}}}`,
`{"result":"done"}`, nil, int64(10000)))
// Reconstruction query: two tool steps inside [end-10s, end].
mock.ExpectQuery(`SELECT created_at, summary\s+FROM activity_logs\s+WHERE workspace_id = \$1\s+AND activity_type = 'agent_log'`).
WithArgs("ws-1", "🛠 %", turnEnd.Add(-10000*1e6), turnEnd).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "summary"}).
AddRow(mustParseTime(t, "2026-06-12T00:00:03Z"), "🛠 mcp__platform__create_approval(…)").
AddRow(mustParseTime(t, "2026-06-12T00:00:05Z"), "🛠 mcp__platform__create_request(…)"))
store := NewPostgresMessageStore(db)
msgs, _, err := store.List(context.Background(), "ws-1", ListOptions{Limit: 10})
if err != nil {
t.Fatalf("List: %v", err)
}
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
}
}
if agent == nil {
t.Fatalf("no agent message; got %+v", msgs)
}
if len(agent.ToolTrace) == 0 {
t.Fatalf("agent message has no reconstructed tool_trace")
}
var entries []toolTraceEntry
if err := json.Unmarshal(agent.ToolTrace, &entries); err != nil {
t.Fatalf("tool_trace not valid JSON: %v", err)
}
if len(entries) != 2 {
t.Fatalf("want 2 reconstructed tools, got %d: %+v", len(entries), entries)
}
if entries[0].Tool != "mcp__platform__create_approval(…)" {
t.Errorf("first tool = %q, want the 🛠-stripped summary", entries[0].Tool)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestReconstruction_FiltersToToolMarker (CR2 on #2639): the SQL filters
// agent_log summaries to the "🛠 " prefix so non-tool live-feed lines in
// the same turn window are never rehydrated as fake tools. We assert the
// query carries the marker LIKE arg, and that the only rows the DB is
// asked to return (and thus bucket) are tool rows.
func TestReconstruction_FiltersToToolMarker(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()
turnEnd := mustParseTime(t, "2026-06-12T00:00:10Z")
mock.ExpectQuery(`SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms`).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}).
AddRow(turnEnd, "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"go"}]}}}`,
`{"result":"done"}`, nil, int64(10000)))
// The reconstruction query MUST pass the "🛠 %" LIKE arg — that is the
// DB-level guard that excludes non-tool agent_log summaries. sqlmock's
// WithArgs makes a missing/changed marker fail the expectation.
mock.ExpectQuery(`activity_type = 'agent_log'.*summary LIKE`).
WithArgs("ws-1", "🛠 %", turnEnd.Add(-10000*1e6), turnEnd).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "summary"}).
AddRow(mustParseTime(t, "2026-06-12T00:00:04Z"), "🛠 Read(/tmp/x)"))
store := NewPostgresMessageStore(db)
msgs, _, err := store.List(context.Background(), "ws-1", ListOptions{Limit: 10})
if err != nil {
t.Fatalf("List: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sqlmock expectations (marker LIKE arg missing?): %v", err)
}
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
}
}
var entries []toolTraceEntry
if agent != nil && len(agent.ToolTrace) > 0 {
_ = json.Unmarshal(agent.ToolTrace, &entries)
}
if len(entries) != 1 || entries[0].Tool != "Read(/tmp/x)" {
t.Fatalf("want exactly the one marker tool, got %+v", entries)
}
}
// TestToolNameFromSummary_NonMarkerUnchanged: defensive — a summary
// without the marker is returned as-is (the SQL filter is the primary
// guard; this proves no accidental mangling if one slips through).
func TestToolNameFromSummary_NonMarkerUnchanged(t *testing.T) {
if got := toolNameFromSummary("plain status line"); got != "plain status line" {
t.Errorf("got %q, want unchanged", got)
}
if got := toolNameFromSummary("🛠 Bash(ls)"); got != "Bash(ls)" {
t.Errorf("marker not stripped: %q", got)
}
}