feat: GET /workspaces/:id/transcript — live agent session log

Closes #N (issue to be filed)

Lets canvas / operators see live tool calls + AI thinking instead of
waiting for the high-level activity log to flush. Right now the only
way to "look over an agent's shoulder" is `docker exec ws-XXX cat
/home/agent/.claude/projects/.../<session>.jsonl`, which:
  - doesn't work for remote workspaces (Phase 30 / Fly Machines)
  - requires shell access on the host
  - has no pagination

This PR adds:

1. `BaseAdapter.transcript_lines(since, limit)` — async hook returning
   `{runtime, supported, lines, cursor, more, source}`. Default returns
   `supported: false` so non-claude-code runtimes pass through gracefully.

2. `ClaudeCodeAdapter.transcript_lines` override — reads the most-
   recently-modified `.jsonl` in `~/.claude/projects/<cwd>/`. Resolves
   cwd the same way `ClaudeSDKExecutor._resolve_cwd()` does so the
   project dir name matches what Claude Code actually writes to. Limit
   capped at 1000 to prevent OOM.

3. Workspace HTTP route `GET /transcript` — Starlette handler added
   alongside the A2A app. Trusts the internal Docker network (same
   model as POST / for A2A); Phase 30 remote-workspace auth is a
   follow-up.

4. Platform proxy `GET /workspaces/:id/transcript` — looks up the
   workspace's URL, forwards GET, caps response at 1MB. Gated by
   existing `WorkspaceAuth` middleware (same as /traces, /memories,
   /delegations).

Tests: 6 Python unit tests cover empty dir / pagination / multi-session
/ malformed lines / limit cap, plus 4 Go tests cover 404 / proxy
forwarding / query-string propagation / unreachable-workspace 502.

Verified end-to-end on a live workspace — returns real claude-code
session entries through the platform proxy.

## Follow-ups
- WebSocket variant for live streaming (instead of polling)
- Canvas UI tab "Transcript" between Activity and Traces
- LangGraph / DeepAgents / OpenClaw transcript adapters
- Phase 30 remote-workspace auth on /transcript
This commit is contained in:
airenostars 2026-04-15 14:29:43 -07:00
parent 12db566b00
commit 853734aa4e
7 changed files with 515 additions and 1 deletions

View File

@ -0,0 +1,91 @@
// Package handlers — transcript proxy.
//
// GET /workspaces/:id/transcript proxies to the workspace's own
// /transcript endpoint, which surfaces the live agent session log
// (claude-code reads ~/.claude/projects/<cwd>/<session>.jsonl). Other
// runtimes return supported:false.
//
// Why this lives in the platform: docker exec works for local dev but
// not for remote (Phase 30) workspaces on Fly Machines. The platform's
// network proxy is the only path that scales to both.
package handlers
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// TranscriptHandler proxies /workspaces/:id/transcript to the workspace agent.
type TranscriptHandler struct {
httpClient *http.Client
}
func NewTranscriptHandler() *TranscriptHandler {
return &TranscriptHandler{
httpClient: &http.Client{Timeout: 15 * time.Second},
}
}
// Get handles GET /workspaces/:id/transcript?since=N&limit=N.
//
// Looks up the workspace's URL, mints a workspace-scoped bearer token,
// forwards the GET, and streams the response back. Caps payload at 1MB
// to keep a runaway transcript from saturating canvas.
func (h *TranscriptHandler) Get(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
var workspaceURL string
if err := db.DB.QueryRowContext(ctx,
`SELECT agent_card->>'url' FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&workspaceURL); err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if workspaceURL == "" {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not registered (no URL on file)"})
return
}
// No bearer minting needed — workspace /transcript trusts the internal
// Docker network (same model as POST / for A2A). Phase 30 remote work-
// spaces will need an auth story; tracked as follow-up.
target, err := url.Parse(workspaceURL)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "invalid workspace URL"})
return
}
target.Path = "/transcript"
target.RawQuery = c.Request.URL.RawQuery
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, "GET", target.String(), nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build request"})
return
}
resp, err := h.httpClient.Do(req)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("workspace unreachable: %v", err)})
return
}
defer resp.Body.Close()
// Cap at 1 MB so a giant transcript doesn't melt the canvas.
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read workspace response"})
return
}
c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), body)
}

View File

@ -0,0 +1,123 @@
package handlers
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// helper: register a workspace row + return its ID
func seedWorkspace(t *testing.T, agentURL string) string {
t.Helper()
id := "11111111-2222-3333-4444-555555555555"
_, err := db.DB.Exec(
`INSERT INTO workspaces (id, name, agent_card, status) VALUES ($1, 'transcript-test', $2, 'online')
ON CONFLICT (id) DO UPDATE SET agent_card = EXCLUDED.agent_card`,
id, []byte(`{"url":"`+agentURL+`"}`),
)
if err != nil {
t.Fatalf("seed workspace: %v", err)
}
return id
}
// ==================== GET /workspaces/:id/transcript ====================
func TestTranscript_WorkspaceNotFound(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
h := NewTranscriptHandler()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000000"}}
c.Request = httptest.NewRequest("GET", "/workspaces/00000000-0000-0000-0000-000000000000/transcript", nil)
h.Get(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
}
func TestTranscript_ProxyForwardsAndReturnsBody(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
h := NewTranscriptHandler()
// Spin up a fake "workspace" agent that returns a canned transcript
gotPath := ""
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"runtime":"claude-code","supported":true,"lines":[{"type":"user"}],"cursor":1,"more":false}`))
}))
defer stub.Close()
wsID := seedWorkspace(t, stub.URL)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript?since=5&limit=20", nil)
h.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if gotPath != "/transcript" {
t.Errorf("expected proxy to hit /transcript, got %q", gotPath)
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response not JSON: %v", err)
}
if resp["runtime"] != "claude-code" {
t.Errorf("expected runtime=claude-code, got %v", resp["runtime"])
}
if lines, ok := resp["lines"].([]interface{}); !ok || len(lines) != 1 {
t.Errorf("expected 1 line, got %v", resp["lines"])
}
}
func TestTranscript_ProxyPropagatesQueryString(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
h := NewTranscriptHandler()
gotQuery := ""
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotQuery = r.URL.RawQuery
w.Write([]byte(`{}`))
}))
defer stub.Close()
wsID := seedWorkspace(t, stub.URL)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript?since=42&limit=7", nil)
h.Get(c)
if gotQuery != "since=42&limit=7" {
t.Errorf("expected query forwarded, got %q", gotQuery)
}
}
func TestTranscript_UnreachableWorkspaceReturns502(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
h := NewTranscriptHandler()
wsID := seedWorkspace(t, "http://127.0.0.1:1") // refused
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/transcript", nil)
h.Get(c)
if w.Code != http.StatusBadGateway {
t.Errorf("expected 502, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -163,6 +163,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
trh := handlers.NewTracesHandler()
wsAuth.GET("/traces", trh.List)
// Live agent transcript proxy — surfaces the runtime-specific session
// log (claude-code reads ~/.claude/projects/<cwd>/<session>.jsonl).
// Lets canvas / operators see live tool calls + AI thinking instead
// of waiting for the high-level activity log to flush.
trsh := handlers.NewTranscriptHandler()
wsAuth.GET("/transcript", trsh.Get)
// Agent Memories (HMA)
memsh := handlers.NewMemoriesHandler()
wsAuth.POST("/memories", memsh.Commit)

View File

@ -102,6 +102,36 @@ class BaseAdapter(ABC):
"""
return None
async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict:
"""Return live transcript entries for the most-recent agent session.
Default implementation returns ``supported: False`` for runtimes
that don't expose a per-session log on disk. Override in subclasses
that DO (Claude Code reads ``~/.claude/projects/<cwd>/<session>.jsonl``).
This is the "look over the agent's shoulder" feature lets canvas /
operators see live tool calls + AI thinking instead of waiting for
the high-level activity log to flush.
Args:
since: line offset to skip caller's last cursor (0 = from start)
limit: max lines to return (caller-side cap, default 100, max 1000)
Returns:
``{runtime, supported, lines, cursor, more, source}`` where
``cursor`` is the new offset to pass on the next poll, ``more``
is True if additional lines remain past ``limit``, and ``source``
is the file path lines were read from (useful for debugging).
"""
return {
"runtime": self.name(),
"supported": False,
"lines": [],
"cursor": since,
"more": False,
"source": None,
}
def register_subagent_hook(self, name: str, spec: dict) -> None:
"""Default no-op. DeepAgents overrides to register a sub-agent."""
return None

View File

@ -1,13 +1,19 @@
"""Claude Code adapter — wraps the Claude Code CLI as an agent runtime."""
import json
import os
import logging
from pathlib import Path
from adapters.base import BaseAdapter, AdapterConfig
from a2a.server.agent_execution import AgentExecutor
logger = logging.getLogger(__name__)
# Cap one transcript response at 1000 lines so a paranoid client can't OOM
# the workspace by polling /transcript?limit=999999.
_TRANSCRIPT_MAX_LIMIT = 1000
class ClaudeCodeAdapter(BaseAdapter):
@ -74,3 +80,88 @@ class ClaudeCodeAdapter(BaseAdapter):
heartbeat=config.heartbeat,
model=model,
)
async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict:
"""Read the live Claude Code session transcript.
Claude Code writes every session to
``$HOME/.claude/projects/<cwd-as-dirname>/<session-uuid>.jsonl``
every line is a JSON event (user/assistant/tool_use/attachment/etc).
We pick the most-recently-modified .jsonl in the projects dir for
the agent's working directory, then return ``[since:since+limit]``.
Returns ``supported: True`` even if no .jsonl exists yet (empty
``lines`` + ``cursor=0``) so the canvas can show "agent hasn't
produced output yet" instead of "feature unavailable".
"""
limit = max(1, min(limit, _TRANSCRIPT_MAX_LIMIT))
since = max(0, since)
# Resolve the projects-dir name. Claude Code maps cwd → dirname by
# replacing "/" with "-" (so "/configs" → "-configs"). The exact
# rule lives inside the CLI binary, but the leading-dash + path-
# without-trailing-slash pattern is stable across versions.
#
# Match ClaudeSDKExecutor._resolve_cwd: prefer /workspace if populated,
# else /configs. Override via CLAUDE_PROJECT_CWD for tests.
WORKSPACE_MOUNT = "/workspace"
CONFIG_MOUNT = "/configs"
cwd_override = os.environ.get("CLAUDE_PROJECT_CWD")
if cwd_override:
cwd = cwd_override
elif os.path.isdir(WORKSPACE_MOUNT) and os.listdir(WORKSPACE_MOUNT):
cwd = WORKSPACE_MOUNT
else:
cwd = CONFIG_MOUNT
# Normalize: strip trailing slash, replace path separators with "-"
cwd_norm = cwd.rstrip("/") or "/"
projdir_name = cwd_norm.replace("/", "-") # "/configs" → "-configs"
home = Path(os.environ.get("HOME", "/home/agent"))
projdir = home / ".claude" / "projects" / projdir_name
result_base = {
"runtime": self.name(),
"supported": True,
"lines": [],
"cursor": since,
"more": False,
"source": str(projdir),
}
if not projdir.is_dir():
return result_base
# Pick most-recently-modified .jsonl
candidates = sorted(projdir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
if not candidates:
return result_base
target = candidates[0]
result_base["source"] = str(target)
lines = []
more = False
try:
with target.open("r") as f:
for i, raw in enumerate(f):
if i < since:
continue
if len(lines) >= limit:
more = True
break
raw = raw.strip()
if not raw:
continue
try:
lines.append(json.loads(raw))
except json.JSONDecodeError:
# Skip malformed lines but keep cursor advancing
lines.append({"_parse_error": True, "_raw": raw[:200]})
except OSError as exc:
logger.warning("transcript_lines: read failed for %s: %s", target, exc)
return result_base
result_base["lines"] = lines
result_base["cursor"] = since + len(lines)
result_base["more"] = more
return result_base

View File

@ -281,7 +281,32 @@ async def main(): # pragma: no cover
print(f"Workspace {workspace_id} starting on port {port}")
# Wrap the ASGI app with W3C TraceContext extraction middleware so incoming
# A2A HTTP requests propagate their trace context into _incoming_trace_context.
built_app = make_trace_middleware(app.build())
starlette_app = app.build()
# Add /transcript route — exposes the most-recent agent session log
# (claude-code reads ~/.claude/projects/<cwd>/<session>.jsonl). Other
# runtimes return supported:false.
from starlette.responses import JSONResponse
from starlette.routing import Route
async def _transcript_handler(request):
# No bearer check here — same model as POST / (A2A): the workspace's
# HTTP server only listens on the internal Docker network, and the
# platform's TranscriptHandler is the only intended caller. Phase 30
# remote workspaces will need a proper auth story (TODO #N) — likely
# the existing wsauth bearer, but with a callback to the platform to
# validate (since the workspace doesn't see all live tokens).
try:
since = int(request.query_params.get("since", "0"))
limit = int(request.query_params.get("limit", "100"))
except (TypeError, ValueError):
return JSONResponse({"error": "since and limit must be integers"}, status_code=400)
result = await adapter.transcript_lines(since=since, limit=limit)
return JSONResponse(result)
starlette_app.add_route("/transcript", _transcript_handler, methods=["GET"])
built_app = make_trace_middleware(starlette_app)
server_config = uvicorn.Config(
built_app,

View File

@ -0,0 +1,147 @@
"""Tests for the new BaseAdapter.transcript_lines() method + claude-code override."""
import asyncio
import json
import os
import tempfile
from pathlib import Path
import pytest
# ── Default (BaseAdapter) ───────────────────────────────────────────────────
def test_base_adapter_returns_unsupported():
"""Adapters that don't override return supported:False."""
from adapters.langgraph.adapter import LangGraphAdapter
a = LangGraphAdapter()
r = asyncio.run(a.transcript_lines())
assert r["supported"] is False
assert r["lines"] == []
assert r["cursor"] == 0
assert r["runtime"] == "langgraph"
assert r["more"] is False
# ── Claude Code override ────────────────────────────────────────────────────
def _write_jsonl(path: Path, entries: list[dict]) -> None:
with path.open("w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
def test_claude_code_no_projects_dir():
"""Returns supported:True with empty lines when projects dir missing."""
from adapters.claude_code.adapter import ClaudeCodeAdapter
with tempfile.TemporaryDirectory() as tmp:
os.environ["HOME"] = tmp
os.environ["CLAUDE_PROJECT_CWD"] = "/configs"
try:
r = asyncio.run(ClaudeCodeAdapter().transcript_lines())
assert r["supported"] is True
assert r["lines"] == []
assert r["cursor"] == 0
assert "-configs" in r["source"]
finally:
del os.environ["CLAUDE_PROJECT_CWD"]
def test_claude_code_reads_jsonl_with_pagination():
from adapters.claude_code.adapter import ClaudeCodeAdapter
with tempfile.TemporaryDirectory() as tmp:
os.environ["HOME"] = tmp
os.environ["CLAUDE_PROJECT_CWD"] = "/configs"
try:
projdir = Path(tmp) / ".claude" / "projects" / "-configs"
projdir.mkdir(parents=True)
_write_jsonl(projdir / "abc.jsonl", [
{"type": "user", "n": 1},
{"type": "assistant", "n": 2},
{"type": "user", "n": 3},
{"type": "assistant", "n": 4},
{"type": "user", "n": 5},
])
a = ClaudeCodeAdapter()
# First page (limit=2)
r1 = asyncio.run(a.transcript_lines(since=0, limit=2))
assert r1["supported"] is True
assert [l["n"] for l in r1["lines"]] == [1, 2]
assert r1["cursor"] == 2
assert r1["more"] is True
# Second page (since=2, limit=2)
r2 = asyncio.run(a.transcript_lines(since=2, limit=2))
assert [l["n"] for l in r2["lines"]] == [3, 4]
assert r2["cursor"] == 4
assert r2["more"] is True
# Third page exhausts
r3 = asyncio.run(a.transcript_lines(since=4, limit=2))
assert [l["n"] for l in r3["lines"]] == [5]
assert r3["cursor"] == 5
assert r3["more"] is False
finally:
del os.environ["CLAUDE_PROJECT_CWD"]
def test_claude_code_picks_most_recent_jsonl():
"""When multiple .jsonl files exist, picks the most-recently-modified."""
from adapters.claude_code.adapter import ClaudeCodeAdapter
with tempfile.TemporaryDirectory() as tmp:
os.environ["HOME"] = tmp
os.environ["CLAUDE_PROJECT_CWD"] = "/configs"
try:
projdir = Path(tmp) / ".claude" / "projects" / "-configs"
projdir.mkdir(parents=True)
old = projdir / "old.jsonl"
new = projdir / "new.jsonl"
_write_jsonl(old, [{"src": "old"}])
_write_jsonl(new, [{"src": "new"}])
# Force new to be more recent
os.utime(old, (1000, 1000))
os.utime(new, (2000, 2000))
r = asyncio.run(ClaudeCodeAdapter().transcript_lines())
assert r["lines"] == [{"src": "new"}]
assert r["source"].endswith("new.jsonl")
finally:
del os.environ["CLAUDE_PROJECT_CWD"]
def test_claude_code_skips_malformed_lines():
"""Bad JSON lines surface as ``_parse_error: True`` rather than 500'ing."""
from adapters.claude_code.adapter import ClaudeCodeAdapter
with tempfile.TemporaryDirectory() as tmp:
os.environ["HOME"] = tmp
os.environ["CLAUDE_PROJECT_CWD"] = "/configs"
try:
projdir = Path(tmp) / ".claude" / "projects" / "-configs"
projdir.mkdir(parents=True)
with (projdir / "x.jsonl").open("w") as f:
f.write('{"good": 1}\n')
f.write("not-json garbage\n")
f.write('{"good": 2}\n')
r = asyncio.run(ClaudeCodeAdapter().transcript_lines())
assert r["lines"][0] == {"good": 1}
assert r["lines"][1].get("_parse_error") is True
assert r["lines"][2] == {"good": 2}
finally:
del os.environ["CLAUDE_PROJECT_CWD"]
def test_claude_code_caps_limit():
"""Limit is capped at 1000 to prevent OOM via paranoid client."""
from adapters.claude_code.adapter import ClaudeCodeAdapter
with tempfile.TemporaryDirectory() as tmp:
os.environ["HOME"] = tmp
os.environ["CLAUDE_PROJECT_CWD"] = "/configs"
try:
projdir = Path(tmp) / ".claude" / "projects" / "-configs"
projdir.mkdir(parents=True)
_write_jsonl(projdir / "x.jsonl", [{"i": i} for i in range(1500)])
r = asyncio.run(ClaudeCodeAdapter().transcript_lines(limit=999999))
assert len(r["lines"]) == 1000 # capped
assert r["more"] is True
assert r["cursor"] == 1000
finally:
del os.environ["CLAUDE_PROJECT_CWD"]