diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx index ce1f3bed..68734f11 100644 --- a/canvas/src/components/tabs/ChatTab.tsx +++ b/canvas/src/components/tabs/ChatTab.tsx @@ -12,6 +12,7 @@ import { uploadChatFiles, downloadChatFile } from "./chat/uploads"; import { AttachmentChip, PendingAttachmentPill } from "./chat/AttachmentViews"; import { extractResponseText, extractRequestText, extractFilesFromTask } from "./chat/message-parser"; import { AgentCommsPanel } from "./chat/AgentCommsPanel"; +import { appendActivityLine } from "./chat/activityLog"; import { runtimeDisplayName } from "@/lib/runtime-names"; import { ConfirmDialog } from "@/components/ConfirmDialog"; @@ -428,12 +429,12 @@ function MyChatPanel({ workspaceId, data }: Props) { } if (line) { - setActivityLog((prev) => [...prev.slice(-8), line]); + setActivityLog((prev) => appendActivityLine(prev, line)); } } else if (msg.event === "TASK_UPDATED" && msg.workspace_id === workspaceId) { const task = (msg.payload?.current_task as string) || ""; if (task) { - setActivityLog((prev) => [...prev.slice(-8), `⟳ ${task}`]); + setActivityLog((prev) => appendActivityLine(prev, `⟳ ${task}`)); } } // A2A_RESPONSE is already consumed by the store and its text is diff --git a/canvas/src/components/tabs/chat/__tests__/activityLog.test.ts b/canvas/src/components/tabs/chat/__tests__/activityLog.test.ts new file mode 100644 index 00000000..c66aa949 --- /dev/null +++ b/canvas/src/components/tabs/chat/__tests__/activityLog.test.ts @@ -0,0 +1,41 @@ +import { describe, it, expect } from "vitest"; +import { ACTIVITY_LOG_WINDOW, appendActivityLine } from "../activityLog"; + +describe("appendActivityLine", () => { + it("appends a fresh line", () => { + expect(appendActivityLine([], "📄 Read /a")).toEqual(["📄 Read /a"]); + }); + + it("collapses an immediate duplicate", () => { + const prev = ["📄 Read /a"]; + // Same exact string twice in a row is noise — the helper should + // return the original array reference, not a new one. + expect(appendActivityLine(prev, "📄 Read /a")).toBe(prev); + }); + + it("keeps non-adjacent duplicates", () => { + const prev = ["📄 Read /a", "⚡ Bash: ls"]; + expect(appendActivityLine(prev, "📄 Read /a")).toEqual([ + "📄 Read /a", + "⚡ Bash: ls", + "📄 Read /a", + ]); + }); + + it("rolls off the oldest line when the window fills", () => { + const seed = Array.from({ length: ACTIVITY_LOG_WINDOW }, (_, i) => `line-${i}`); + const next = appendActivityLine(seed, "newest"); + expect(next.length).toBe(ACTIVITY_LOG_WINDOW); + expect(next[next.length - 1]).toBe("newest"); + // Oldest entry is dropped — line-0 is gone. + expect(next[0]).toBe("line-1"); + }); + + it("keeps the original array reference when below the window cap", () => { + const prev = ["a", "b"]; + const next = appendActivityLine(prev, "c"); + // Returned a new array (we appended); must NOT mutate prev. + expect(prev).toEqual(["a", "b"]); + expect(next).toEqual(["a", "b", "c"]); + }); +}); diff --git a/canvas/src/components/tabs/chat/activityLog.ts b/canvas/src/components/tabs/chat/activityLog.ts new file mode 100644 index 00000000..57efa31d --- /dev/null +++ b/canvas/src/components/tabs/chat/activityLog.ts @@ -0,0 +1,23 @@ +/** + * Sliding-window log for the in-chat activity feed (the live progress + * lines under the spinner while a chat reply is in flight). + * + * Sized to fit the spinner area without forcing a scroll; per-tool-use + * rows from the workspace's _report_tool_use can fire dozens per turn + * (Read 5 files + Grep + Bash + Edits + delegations), so a too-small + * window flushes useful early context before the user can read it. + * + * Consecutive identical lines collapse to a single entry — the same + * tool repeated on the same target (e.g. Read of the same file twice + * within a turn) is noise, not new progress. + */ +export const ACTIVITY_LOG_WINDOW = 20; + +export function appendActivityLine(prev: string[], line: string): string[] { + if (prev[prev.length - 1] === line) return prev; // collapse duplicates + const next = + prev.length >= ACTIVITY_LOG_WINDOW + ? prev.slice(-(ACTIVITY_LOG_WINDOW - 1)) + : prev; + return [...next, line]; +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index f4ec335e..f74790e4 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -124,11 +124,11 @@ func isUpstreamBusyError(err error) bool { if errors.Is(err, context.DeadlineExceeded) { return true } - // applyIdleTimeout cancels the request ctx via context.WithCancel - // when the broadcaster silence window elapses — that surfaces as - // context.Canceled (not DeadlineExceeded). Treat it as the same - // "upstream busy" class so the caller produces a 503 + Retry-After - // instead of a generic 500. + // applyIdleTimeout cancels via context.WithCancel when the + // broadcaster silence window elapses — context.Canceled + // propagates cleanly through errors.Is, no substring fallback + // needed (and a substring on "context canceled" would also match + // healthy client-side aborts which we don't want to label busy). if errors.Is(err, context.Canceled) { return true } @@ -137,10 +137,10 @@ func isUpstreamBusyError(err error) bool { } // url.Error wraps "read tcp … EOF" and "Post …: context deadline // exceeded" strings from the stdlib HTTP client without typing the - // inner cause. Fall back to substring match for those. + // inner cause. Fall back to substring match for those specific + // shapes only. msg := err.Error() return strings.Contains(msg, "context deadline exceeded") || - strings.Contains(msg, "context canceled") || strings.Contains(msg, "EOF") || strings.Contains(msg, "connection reset") } diff --git a/workspace-server/migrations/043_workspace_status_enum.up.sql b/workspace-server/migrations/043_workspace_status_enum.up.sql index 1d42fec3..ac8af646 100644 --- a/workspace-server/migrations/043_workspace_status_enum.up.sql +++ b/workspace-server/migrations/043_workspace_status_enum.up.sql @@ -23,21 +23,40 @@ -- itself failed via bundle import / runtime crash -- removed — soft-delete tombstone; the row stays so foreign- -- key references survive but no operations target it +-- paused — operator-initiated suspend via workspace_restart's +-- pause path (workspace_restart.go:406) +-- hibernated — auto-suspended after idle threshold; container +-- stopped but row preserved (workspace_restart.go:283, +-- introduced by migration 029_workspace_hibernation) -- --- Verified before writing this migration that production code in --- workspace-server/internal/{handlers,registry,bundle} writes only --- values from this list (test fixtures may write others; tests run --- against an isolated fixture DB so the cast doesn't affect them). +-- Sweep of every `UPDATE workspaces SET status = 'X'` in the +-- workspace-server/internal/ tree (excluding tests) verified the +-- value set. Adding a new state in the future requires both updating +-- this enum (a separate `ALTER TYPE … ADD VALUE` migration) AND any +-- writers — the enum will reject unknown strings at insert/update +-- time, which is the exact failure mode this migration is meant to +-- give us. +-- +-- Deployment: `ALTER TABLE … ALTER COLUMN TYPE` takes ACCESS +-- EXCLUSIVE on workspaces. A long-running SELECT against the table +-- will block the migration; the migration will then block every +-- writer behind it. `SET lock_timeout` aborts the migration in 5s +-- if it can't acquire the lock — preferable to stalling the whole +-- workspace fleet behind one slow query. BEGIN; +SET LOCAL lock_timeout = '5s'; + CREATE TYPE workspace_status AS ENUM ( 'provisioning', 'online', 'offline', 'degraded', 'failed', - 'removed' + 'removed', + 'paused', + 'hibernated' ); -- The two-step ALTER (DROP DEFAULT then change type then SET DEFAULT) diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index d0b53b07..af3abc71 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -127,13 +127,29 @@ def _mark_sdk_wedged(reason: str) -> None: global _sdk_wedged_reason if _sdk_wedged_reason is None: _sdk_wedged_reason = reason - logger.error("SDK wedge detected: %s — workspace will report degraded until restart", reason) + logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason) + + +def _clear_sdk_wedge_on_success() -> None: + """Auto-recovery — called from _run_query after a successful + completion. The original wedge could be transient (a single network + blip during the SDK's first-message handshake), and a sticky-only + flag would lock the workspace into degraded forever even after the + SDK started working again. Clearing on observed success means the + next heartbeat after a working query reports `runtime_state` empty + and the platform flips status back to online. + + No-op when not wedged (the common case).""" + global _sdk_wedged_reason + if _sdk_wedged_reason is not None: + logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat") + _sdk_wedged_reason = None def _reset_sdk_wedge_for_test() -> None: - """Test-only escape hatch. Production code never resets the flag — - wedge clears via process restart, which naturally re-imports this - module with the flag at None.""" + """Test-only escape hatch. Production code clears the wedge via + `_clear_sdk_wedge_on_success` when a query succeeds; this helper + is for unit tests that need to reset between cases.""" global _sdk_wedged_reason _sdk_wedged_reason = None @@ -213,6 +229,12 @@ async def _report_tool_use(block: Any) -> None: json={ "activity_type": "agent_log", "source_id": WORKSPACE_ID, + # target_id == source for self-actions. Matches the + # convention other self-logged activity rows use + # (a2a_receive when the workspace logs its own + # outbound reply) so DB consumers joining on + # target_id see a well-defined value. + "target_id": WORKSPACE_ID, "summary": summary, "status": "ok", "method": tool_name, @@ -228,11 +250,15 @@ async def _report_tool_use(block: Any) -> None: # claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient # subprocess crash, etc.). Match is case-insensitive on the formatted # error string. Adding a new pattern here MUST come with a test in -# tests/test_claude_sdk_executor.py — the flag is sticky and false- -# positives lock the workspace into degraded for the whole process -# lifetime. +# tests/test_claude_sdk_executor.py — false-positives lock the +# workspace into degraded until the next successful query clears it. +# +# `:initialize` suffix-anchored — the SDK can theoretically time out +# on later control messages (in-flight tool callbacks), but those +# don't leave the SDK in the unrecoverable post-init state we're +# trying to detect. Limit the pattern to the specific wedge. _WEDGE_ERROR_PATTERNS = ( - "control request timeout", + "control request timeout: initialize", ) @@ -510,6 +536,12 @@ class ClaudeSDKExecutor(AgentExecutor): finally: self._active_stream = None text = result_text if result_text is not None else "".join(assistant_chunks) + # Auto-recover the wedge flag — if a previous query() left this + # process in `_sdk_wedged` and THIS query just completed + # cleanly, the SDK clearly works again. Clear so the next + # heartbeat reports runtime_state empty and the platform flips + # status degraded → online without a manual restart. + _clear_sdk_wedge_on_success() return QueryResult(text=text, session_id=session_id) # ------------------------------------------------------------------ diff --git a/workspace/tests/test_claude_sdk_executor.py b/workspace/tests/test_claude_sdk_executor.py index 5b50a950..aff8d264 100644 --- a/workspace/tests/test_claude_sdk_executor.py +++ b/workspace/tests/test_claude_sdk_executor.py @@ -1319,3 +1319,38 @@ async def test_execute_does_not_mark_wedge_on_unrelated_error(): assert _executor_mod.is_wedged() is False, "non-wedge error must not flip the flag" finally: _executor_mod._reset_sdk_wedge_for_test() + + +@pytest.mark.asyncio +async def test_execute_clears_wedge_on_successful_query(): + """Auto-recovery: a process that previously hit a wedge should be + able to recover when the SDK starts working again. _run_query + calls _clear_sdk_wedge_on_success at the end of a clean + completion; the flag flips back to None and the next heartbeat + reports runtime_state empty so the platform recovers status → + online without forcing the user to restart the workspace.""" + # Pre-set the wedge as if a prior call had tripped it. + _executor_mod._reset_sdk_wedge_for_test() + _executor_mod._mark_sdk_wedged("transient: Control request timeout: initialize") + assert _executor_mod.is_wedged() is True + + e = _make_executor() + ctx = _make_context(["test prompt"]) + eq = _make_event_queue() + + async def good_query(prompt, options): + # Working SDK — yield one normal assistant message + result. + yield _FakeAssistantMessage([_FakeTextBlock("hello back")]) + yield _FakeResultMessage(session_id="recovered-sess") + + with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ + patch("claude_sdk_executor.read_delegation_results", return_value=""), \ + patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ + patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ + patch("claude_agent_sdk.query", new=good_query): + try: + await e.execute(ctx, eq) + assert _executor_mod.is_wedged() is False, "wedge flag must clear after a successful query" + assert _executor_mod.wedge_reason() == "" + finally: + _executor_mod._reset_sdk_wedge_for_test()