From 853734aa4e776072e06fd5561a3ad0e653bec64b Mon Sep 17 00:00:00 2001 From: airenostars Date: Wed, 15 Apr 2026 14:29:43 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20GET=20/workspaces/:id/transcript=20?= =?UTF-8?q?=E2=80=94=20live=20agent=20session=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #N (issue to be filed) Lets canvas / operators see live tool calls + AI thinking instead of waiting for the high-level activity log to flush. Right now the only way to "look over an agent's shoulder" is `docker exec ws-XXX cat /home/agent/.claude/projects/.../.jsonl`, which: - doesn't work for remote workspaces (Phase 30 / Fly Machines) - requires shell access on the host - has no pagination This PR adds: 1. `BaseAdapter.transcript_lines(since, limit)` — async hook returning `{runtime, supported, lines, cursor, more, source}`. Default returns `supported: false` so non-claude-code runtimes pass through gracefully. 2. `ClaudeCodeAdapter.transcript_lines` override — reads the most- recently-modified `.jsonl` in `~/.claude/projects//`. Resolves cwd the same way `ClaudeSDKExecutor._resolve_cwd()` does so the project dir name matches what Claude Code actually writes to. Limit capped at 1000 to prevent OOM. 3. Workspace HTTP route `GET /transcript` — Starlette handler added alongside the A2A app. Trusts the internal Docker network (same model as POST / for A2A); Phase 30 remote-workspace auth is a follow-up. 4. Platform proxy `GET /workspaces/:id/transcript` — looks up the workspace's URL, forwards GET, caps response at 1MB. Gated by existing `WorkspaceAuth` middleware (same as /traces, /memories, /delegations). Tests: 6 Python unit tests cover empty dir / pagination / multi-session / malformed lines / limit cap, plus 4 Go tests cover 404 / proxy forwarding / query-string propagation / unreachable-workspace 502. Verified end-to-end on a live workspace — returns real claude-code session entries through the platform proxy. ## Follow-ups - WebSocket variant for live streaming (instead of polling) - Canvas UI tab "Transcript" between Activity and Traces - LangGraph / DeepAgents / OpenClaw transcript adapters - Phase 30 remote-workspace auth on /transcript --- platform/internal/handlers/transcript.go | 91 +++++++++++ platform/internal/handlers/transcript_test.go | 123 +++++++++++++++ platform/internal/router/router.go | 7 + workspace-template/adapters/base.py | 30 ++++ .../adapters/claude_code/adapter.py | 91 +++++++++++ workspace-template/main.py | 27 +++- .../tests/test_transcript_lines.py | 147 ++++++++++++++++++ 7 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 platform/internal/handlers/transcript.go create mode 100644 platform/internal/handlers/transcript_test.go create mode 100644 workspace-template/tests/test_transcript_lines.py diff --git a/platform/internal/handlers/transcript.go b/platform/internal/handlers/transcript.go new file mode 100644 index 00000000..d7ac3e6b --- /dev/null +++ b/platform/internal/handlers/transcript.go @@ -0,0 +1,91 @@ +// Package handlers — transcript proxy. +// +// GET /workspaces/:id/transcript proxies to the workspace's own +// /transcript endpoint, which surfaces the live agent session log +// (claude-code reads ~/.claude/projects//.jsonl). Other +// runtimes return supported:false. +// +// Why this lives in the platform: docker exec works for local dev but +// not for remote (Phase 30) workspaces on Fly Machines. The platform's +// network proxy is the only path that scales to both. +package handlers + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +// TranscriptHandler proxies /workspaces/:id/transcript to the workspace agent. +type TranscriptHandler struct { + httpClient *http.Client +} + +func NewTranscriptHandler() *TranscriptHandler { + return &TranscriptHandler{ + httpClient: &http.Client{Timeout: 15 * time.Second}, + } +} + +// Get handles GET /workspaces/:id/transcript?since=N&limit=N. +// +// Looks up the workspace's URL, mints a workspace-scoped bearer token, +// forwards the GET, and streams the response back. Caps payload at 1MB +// to keep a runaway transcript from saturating canvas. +func (h *TranscriptHandler) Get(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + var workspaceURL string + if err := db.DB.QueryRowContext(ctx, + `SELECT agent_card->>'url' FROM workspaces WHERE id = $1`, + workspaceID, + ).Scan(&workspaceURL); err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return + } + if workspaceURL == "" { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not registered (no URL on file)"}) + return + } + + // No bearer minting needed — workspace /transcript trusts the internal + // Docker network (same model as POST / for A2A). Phase 30 remote work- + // spaces will need an auth story; tracked as follow-up. + target, err := url.Parse(workspaceURL) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "invalid workspace URL"}) + return + } + target.Path = "/transcript" + target.RawQuery = c.Request.URL.RawQuery + + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, "GET", target.String(), nil) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build request"}) + return + } + + resp, err := h.httpClient.Do(req) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("workspace unreachable: %v", err)}) + return + } + defer resp.Body.Close() + + // Cap at 1 MB so a giant transcript doesn't melt the canvas. + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read workspace response"}) + return + } + c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), body) +} diff --git a/platform/internal/handlers/transcript_test.go b/platform/internal/handlers/transcript_test.go new file mode 100644 index 00000000..feba35ff --- /dev/null +++ b/platform/internal/handlers/transcript_test.go @@ -0,0 +1,123 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +// helper: register a workspace row + return its ID +func seedWorkspace(t *testing.T, agentURL string) string { + t.Helper() + id := "11111111-2222-3333-4444-555555555555" + _, err := db.DB.Exec( + `INSERT INTO workspaces (id, name, agent_card, status) VALUES ($1, 'transcript-test', $2, 'online') + ON CONFLICT (id) DO UPDATE SET agent_card = EXCLUDED.agent_card`, + id, []byte(`{"url":"`+agentURL+`"}`), + ) + if err != nil { + t.Fatalf("seed workspace: %v", err) + } + return id +} + +// ==================== GET /workspaces/:id/transcript ==================== + +func TestTranscript_WorkspaceNotFound(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewTranscriptHandler() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000000"}} + c.Request = httptest.NewRequest("GET", "/workspaces/00000000-0000-0000-0000-000000000000/transcript", nil) + h.Get(c) + if w.Code != http.StatusNotFound { + t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestTranscript_ProxyForwardsAndReturnsBody(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewTranscriptHandler() + + // Spin up a fake "workspace" agent that returns a canned transcript + gotPath := "" + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotPath = r.URL.Path + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"runtime":"claude-code","supported":true,"lines":[{"type":"user"}],"cursor":1,"more":false}`)) + })) + defer stub.Close() + + wsID := seedWorkspace(t, stub.URL) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript?since=5&limit=20", nil) + h.Get(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if gotPath != "/transcript" { + t.Errorf("expected proxy to hit /transcript, got %q", gotPath) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response not JSON: %v", err) + } + if resp["runtime"] != "claude-code" { + t.Errorf("expected runtime=claude-code, got %v", resp["runtime"]) + } + if lines, ok := resp["lines"].([]interface{}); !ok || len(lines) != 1 { + t.Errorf("expected 1 line, got %v", resp["lines"]) + } +} + +func TestTranscript_ProxyPropagatesQueryString(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewTranscriptHandler() + + gotQuery := "" + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotQuery = r.URL.RawQuery + w.Write([]byte(`{}`)) + })) + defer stub.Close() + + wsID := seedWorkspace(t, stub.URL) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript?since=42&limit=7", nil) + h.Get(c) + if gotQuery != "since=42&limit=7" { + t.Errorf("expected query forwarded, got %q", gotQuery) + } +} + +func TestTranscript_UnreachableWorkspaceReturns502(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewTranscriptHandler() + + wsID := seedWorkspace(t, "http://127.0.0.1:1") // refused + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript", nil) + h.Get(c) + if w.Code != http.StatusBadGateway { + t.Errorf("expected 502, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index 99e226c5..fe6dbb7a 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -163,6 +163,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi trh := handlers.NewTracesHandler() wsAuth.GET("/traces", trh.List) + // Live agent transcript proxy — surfaces the runtime-specific session + // log (claude-code reads ~/.claude/projects//.jsonl). + // Lets canvas / operators see live tool calls + AI thinking instead + // of waiting for the high-level activity log to flush. + trsh := handlers.NewTranscriptHandler() + wsAuth.GET("/transcript", trsh.Get) + // Agent Memories (HMA) memsh := handlers.NewMemoriesHandler() wsAuth.POST("/memories", memsh.Commit) diff --git a/workspace-template/adapters/base.py b/workspace-template/adapters/base.py index 2b8060e3..a1820e74 100644 --- a/workspace-template/adapters/base.py +++ b/workspace-template/adapters/base.py @@ -102,6 +102,36 @@ class BaseAdapter(ABC): """ return None + async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict: + """Return live transcript entries for the most-recent agent session. + + Default implementation returns ``supported: False`` for runtimes + that don't expose a per-session log on disk. Override in subclasses + that DO (Claude Code reads ``~/.claude/projects//.jsonl``). + + This is the "look over the agent's shoulder" feature — lets canvas / + operators see live tool calls + AI thinking instead of waiting for + the high-level activity log to flush. + + Args: + since: line offset to skip — caller's last cursor (0 = from start) + limit: max lines to return (caller-side cap, default 100, max 1000) + + Returns: + ``{runtime, supported, lines, cursor, more, source}`` where + ``cursor`` is the new offset to pass on the next poll, ``more`` + is True if additional lines remain past ``limit``, and ``source`` + is the file path lines were read from (useful for debugging). + """ + return { + "runtime": self.name(), + "supported": False, + "lines": [], + "cursor": since, + "more": False, + "source": None, + } + def register_subagent_hook(self, name: str, spec: dict) -> None: """Default no-op. DeepAgents overrides to register a sub-agent.""" return None diff --git a/workspace-template/adapters/claude_code/adapter.py b/workspace-template/adapters/claude_code/adapter.py index 03827b1a..96ecd050 100644 --- a/workspace-template/adapters/claude_code/adapter.py +++ b/workspace-template/adapters/claude_code/adapter.py @@ -1,13 +1,19 @@ """Claude Code adapter — wraps the Claude Code CLI as an agent runtime.""" +import json import os import logging +from pathlib import Path from adapters.base import BaseAdapter, AdapterConfig from a2a.server.agent_execution import AgentExecutor logger = logging.getLogger(__name__) +# Cap one transcript response at 1000 lines so a paranoid client can't OOM +# the workspace by polling /transcript?limit=999999. +_TRANSCRIPT_MAX_LIMIT = 1000 + class ClaudeCodeAdapter(BaseAdapter): @@ -74,3 +80,88 @@ class ClaudeCodeAdapter(BaseAdapter): heartbeat=config.heartbeat, model=model, ) + + async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict: + """Read the live Claude Code session transcript. + + Claude Code writes every session to + ``$HOME/.claude/projects//.jsonl`` — + every line is a JSON event (user/assistant/tool_use/attachment/etc). + We pick the most-recently-modified .jsonl in the projects dir for + the agent's working directory, then return ``[since:since+limit]``. + + Returns ``supported: True`` even if no .jsonl exists yet (empty + ``lines`` + ``cursor=0``) so the canvas can show "agent hasn't + produced output yet" instead of "feature unavailable". + """ + limit = max(1, min(limit, _TRANSCRIPT_MAX_LIMIT)) + since = max(0, since) + + # Resolve the projects-dir name. Claude Code maps cwd → dirname by + # replacing "/" with "-" (so "/configs" → "-configs"). The exact + # rule lives inside the CLI binary, but the leading-dash + path- + # without-trailing-slash pattern is stable across versions. + # + # Match ClaudeSDKExecutor._resolve_cwd: prefer /workspace if populated, + # else /configs. Override via CLAUDE_PROJECT_CWD for tests. + WORKSPACE_MOUNT = "/workspace" + CONFIG_MOUNT = "/configs" + cwd_override = os.environ.get("CLAUDE_PROJECT_CWD") + if cwd_override: + cwd = cwd_override + elif os.path.isdir(WORKSPACE_MOUNT) and os.listdir(WORKSPACE_MOUNT): + cwd = WORKSPACE_MOUNT + else: + cwd = CONFIG_MOUNT + + # Normalize: strip trailing slash, replace path separators with "-" + cwd_norm = cwd.rstrip("/") or "/" + projdir_name = cwd_norm.replace("/", "-") # "/configs" → "-configs" + + home = Path(os.environ.get("HOME", "/home/agent")) + projdir = home / ".claude" / "projects" / projdir_name + result_base = { + "runtime": self.name(), + "supported": True, + "lines": [], + "cursor": since, + "more": False, + "source": str(projdir), + } + + if not projdir.is_dir(): + return result_base + + # Pick most-recently-modified .jsonl + candidates = sorted(projdir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) + if not candidates: + return result_base + target = candidates[0] + result_base["source"] = str(target) + + lines = [] + more = False + try: + with target.open("r") as f: + for i, raw in enumerate(f): + if i < since: + continue + if len(lines) >= limit: + more = True + break + raw = raw.strip() + if not raw: + continue + try: + lines.append(json.loads(raw)) + except json.JSONDecodeError: + # Skip malformed lines but keep cursor advancing + lines.append({"_parse_error": True, "_raw": raw[:200]}) + except OSError as exc: + logger.warning("transcript_lines: read failed for %s: %s", target, exc) + return result_base + + result_base["lines"] = lines + result_base["cursor"] = since + len(lines) + result_base["more"] = more + return result_base diff --git a/workspace-template/main.py b/workspace-template/main.py index f9b7e459..2fb3adac 100644 --- a/workspace-template/main.py +++ b/workspace-template/main.py @@ -281,7 +281,32 @@ async def main(): # pragma: no cover print(f"Workspace {workspace_id} starting on port {port}") # Wrap the ASGI app with W3C TraceContext extraction middleware so incoming # A2A HTTP requests propagate their trace context into _incoming_trace_context. - built_app = make_trace_middleware(app.build()) + starlette_app = app.build() + + # Add /transcript route — exposes the most-recent agent session log + # (claude-code reads ~/.claude/projects//.jsonl). Other + # runtimes return supported:false. + from starlette.responses import JSONResponse + from starlette.routing import Route + + async def _transcript_handler(request): + # No bearer check here — same model as POST / (A2A): the workspace's + # HTTP server only listens on the internal Docker network, and the + # platform's TranscriptHandler is the only intended caller. Phase 30 + # remote workspaces will need a proper auth story (TODO #N) — likely + # the existing wsauth bearer, but with a callback to the platform to + # validate (since the workspace doesn't see all live tokens). + try: + since = int(request.query_params.get("since", "0")) + limit = int(request.query_params.get("limit", "100")) + except (TypeError, ValueError): + return JSONResponse({"error": "since and limit must be integers"}, status_code=400) + result = await adapter.transcript_lines(since=since, limit=limit) + return JSONResponse(result) + + starlette_app.add_route("/transcript", _transcript_handler, methods=["GET"]) + + built_app = make_trace_middleware(starlette_app) server_config = uvicorn.Config( built_app, diff --git a/workspace-template/tests/test_transcript_lines.py b/workspace-template/tests/test_transcript_lines.py new file mode 100644 index 00000000..552af6ee --- /dev/null +++ b/workspace-template/tests/test_transcript_lines.py @@ -0,0 +1,147 @@ +"""Tests for the new BaseAdapter.transcript_lines() method + claude-code override.""" + +import asyncio +import json +import os +import tempfile +from pathlib import Path + +import pytest + + +# ── Default (BaseAdapter) ─────────────────────────────────────────────────── + + +def test_base_adapter_returns_unsupported(): + """Adapters that don't override return supported:False.""" + from adapters.langgraph.adapter import LangGraphAdapter + a = LangGraphAdapter() + r = asyncio.run(a.transcript_lines()) + assert r["supported"] is False + assert r["lines"] == [] + assert r["cursor"] == 0 + assert r["runtime"] == "langgraph" + assert r["more"] is False + + +# ── Claude Code override ──────────────────────────────────────────────────── + + +def _write_jsonl(path: Path, entries: list[dict]) -> None: + with path.open("w") as f: + for e in entries: + f.write(json.dumps(e) + "\n") + + +def test_claude_code_no_projects_dir(): + """Returns supported:True with empty lines when projects dir missing.""" + from adapters.claude_code.adapter import ClaudeCodeAdapter + with tempfile.TemporaryDirectory() as tmp: + os.environ["HOME"] = tmp + os.environ["CLAUDE_PROJECT_CWD"] = "/configs" + try: + r = asyncio.run(ClaudeCodeAdapter().transcript_lines()) + assert r["supported"] is True + assert r["lines"] == [] + assert r["cursor"] == 0 + assert "-configs" in r["source"] + finally: + del os.environ["CLAUDE_PROJECT_CWD"] + + +def test_claude_code_reads_jsonl_with_pagination(): + from adapters.claude_code.adapter import ClaudeCodeAdapter + with tempfile.TemporaryDirectory() as tmp: + os.environ["HOME"] = tmp + os.environ["CLAUDE_PROJECT_CWD"] = "/configs" + try: + projdir = Path(tmp) / ".claude" / "projects" / "-configs" + projdir.mkdir(parents=True) + _write_jsonl(projdir / "abc.jsonl", [ + {"type": "user", "n": 1}, + {"type": "assistant", "n": 2}, + {"type": "user", "n": 3}, + {"type": "assistant", "n": 4}, + {"type": "user", "n": 5}, + ]) + a = ClaudeCodeAdapter() + # First page (limit=2) + r1 = asyncio.run(a.transcript_lines(since=0, limit=2)) + assert r1["supported"] is True + assert [l["n"] for l in r1["lines"]] == [1, 2] + assert r1["cursor"] == 2 + assert r1["more"] is True + # Second page (since=2, limit=2) + r2 = asyncio.run(a.transcript_lines(since=2, limit=2)) + assert [l["n"] for l in r2["lines"]] == [3, 4] + assert r2["cursor"] == 4 + assert r2["more"] is True + # Third page exhausts + r3 = asyncio.run(a.transcript_lines(since=4, limit=2)) + assert [l["n"] for l in r3["lines"]] == [5] + assert r3["cursor"] == 5 + assert r3["more"] is False + finally: + del os.environ["CLAUDE_PROJECT_CWD"] + + +def test_claude_code_picks_most_recent_jsonl(): + """When multiple .jsonl files exist, picks the most-recently-modified.""" + from adapters.claude_code.adapter import ClaudeCodeAdapter + with tempfile.TemporaryDirectory() as tmp: + os.environ["HOME"] = tmp + os.environ["CLAUDE_PROJECT_CWD"] = "/configs" + try: + projdir = Path(tmp) / ".claude" / "projects" / "-configs" + projdir.mkdir(parents=True) + old = projdir / "old.jsonl" + new = projdir / "new.jsonl" + _write_jsonl(old, [{"src": "old"}]) + _write_jsonl(new, [{"src": "new"}]) + # Force new to be more recent + os.utime(old, (1000, 1000)) + os.utime(new, (2000, 2000)) + r = asyncio.run(ClaudeCodeAdapter().transcript_lines()) + assert r["lines"] == [{"src": "new"}] + assert r["source"].endswith("new.jsonl") + finally: + del os.environ["CLAUDE_PROJECT_CWD"] + + +def test_claude_code_skips_malformed_lines(): + """Bad JSON lines surface as ``_parse_error: True`` rather than 500'ing.""" + from adapters.claude_code.adapter import ClaudeCodeAdapter + with tempfile.TemporaryDirectory() as tmp: + os.environ["HOME"] = tmp + os.environ["CLAUDE_PROJECT_CWD"] = "/configs" + try: + projdir = Path(tmp) / ".claude" / "projects" / "-configs" + projdir.mkdir(parents=True) + with (projdir / "x.jsonl").open("w") as f: + f.write('{"good": 1}\n') + f.write("not-json garbage\n") + f.write('{"good": 2}\n') + r = asyncio.run(ClaudeCodeAdapter().transcript_lines()) + assert r["lines"][0] == {"good": 1} + assert r["lines"][1].get("_parse_error") is True + assert r["lines"][2] == {"good": 2} + finally: + del os.environ["CLAUDE_PROJECT_CWD"] + + +def test_claude_code_caps_limit(): + """Limit is capped at 1000 to prevent OOM via paranoid client.""" + from adapters.claude_code.adapter import ClaudeCodeAdapter + with tempfile.TemporaryDirectory() as tmp: + os.environ["HOME"] = tmp + os.environ["CLAUDE_PROJECT_CWD"] = "/configs" + try: + projdir = Path(tmp) / ".claude" / "projects" / "-configs" + projdir.mkdir(parents=True) + _write_jsonl(projdir / "x.jsonl", [{"i": i} for i in range(1500)]) + r = asyncio.run(ClaudeCodeAdapter().transcript_lines(limit=999999)) + assert len(r["lines"]) == 1000 # capped + assert r["more"] is True + assert r["cursor"] == 1000 + finally: + del os.environ["CLAUDE_PROJECT_CWD"]