forked from molecule-ai/molecule-core
fix: review-driven hardening of wedge detector + idle timeout + progress feed
Bundle review of pieces 1/2/3 surfaced two critical issues plus a handful of required + optional fixes. All addressed. Critical: 1. Migration 043 was missing 'paused' and 'hibernated' from the workspace_status enum. Both are real production statuses written by workspace_restart.go (lines 283 and 406), introduced by migration 029_workspace_hibernation. The original `USING status::workspace_status` cast would have errored mid-transaction on any production DB containing those values. Added both. Also added `SET LOCAL lock_timeout = '5s'` so the migration aborts instead of stalling the workspace fleet behind a slow SELECT. 2. The chat activity-feed window kept only 8 lines, and a single multi-tool turn (Read 5 files + Grep + Bash + Edit + delegate) easily flushed older context before the user could read it. Extracted appendActivityLine to chat/activityLog.ts with a 20-line window AND consecutive-duplicate collapse (same tool on the same target twice in a row is noise, not new progress). 5 unit tests pin the behavior. Required: 3. The SDK wedge flag was sticky-only — a single transient Control-request-timeout from a flaky network blip locked the workspace into degraded for the whole process lifetime, even when the next query() would have succeeded. Added _clear_sdk_wedge_on_success(), called from _run_query's success path. The next heartbeat after a working query reports runtime_state empty and the platform recovers the workspace to online without a manual restart. New regression test. 4. _report_tool_use now sets target_id = WORKSPACE_ID for self- actions, matching the convention other self-logged activity rows use. DB consumers joining on target_id see a well-defined value instead of NULL. Optional taken: 5. Tightened _WEDGE_ERROR_PATTERNS from "control request timeout" to "control request timeout: initialize" — suffix-anchored so a future SDK error on an in-flight tool-call control message doesn't get misclassified as the unrecoverable post-init wedge. 6. Dropped the redundant "context canceled" substring fallback in isUpstreamBusyError. errors.Is(err, context.Canceled) is the typed check; the substring would also match healthy client-side aborts, which we don't want classified as upstream-busy. Verified: 1010 canvas tests + 64 Python tests + full Go suite pass; migration applies cleanly on dev DB with all 8 enum values; reverse migration restores TEXT. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bf1dc6b6a5
commit
892de784b3
@ -12,6 +12,7 @@ import { uploadChatFiles, downloadChatFile } from "./chat/uploads";
|
||||
import { AttachmentChip, PendingAttachmentPill } from "./chat/AttachmentViews";
|
||||
import { extractResponseText, extractRequestText, extractFilesFromTask } from "./chat/message-parser";
|
||||
import { AgentCommsPanel } from "./chat/AgentCommsPanel";
|
||||
import { appendActivityLine } from "./chat/activityLog";
|
||||
import { runtimeDisplayName } from "@/lib/runtime-names";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
@ -428,12 +429,12 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
}
|
||||
|
||||
if (line) {
|
||||
setActivityLog((prev) => [...prev.slice(-8), line]);
|
||||
setActivityLog((prev) => appendActivityLine(prev, line));
|
||||
}
|
||||
} else if (msg.event === "TASK_UPDATED" && msg.workspace_id === workspaceId) {
|
||||
const task = (msg.payload?.current_task as string) || "";
|
||||
if (task) {
|
||||
setActivityLog((prev) => [...prev.slice(-8), `⟳ ${task}`]);
|
||||
setActivityLog((prev) => appendActivityLine(prev, `⟳ ${task}`));
|
||||
}
|
||||
}
|
||||
// A2A_RESPONSE is already consumed by the store and its text is
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { ACTIVITY_LOG_WINDOW, appendActivityLine } from "../activityLog";
|
||||
|
||||
describe("appendActivityLine", () => {
|
||||
it("appends a fresh line", () => {
|
||||
expect(appendActivityLine([], "📄 Read /a")).toEqual(["📄 Read /a"]);
|
||||
});
|
||||
|
||||
it("collapses an immediate duplicate", () => {
|
||||
const prev = ["📄 Read /a"];
|
||||
// Same exact string twice in a row is noise — the helper should
|
||||
// return the original array reference, not a new one.
|
||||
expect(appendActivityLine(prev, "📄 Read /a")).toBe(prev);
|
||||
});
|
||||
|
||||
it("keeps non-adjacent duplicates", () => {
|
||||
const prev = ["📄 Read /a", "⚡ Bash: ls"];
|
||||
expect(appendActivityLine(prev, "📄 Read /a")).toEqual([
|
||||
"📄 Read /a",
|
||||
"⚡ Bash: ls",
|
||||
"📄 Read /a",
|
||||
]);
|
||||
});
|
||||
|
||||
it("rolls off the oldest line when the window fills", () => {
|
||||
const seed = Array.from({ length: ACTIVITY_LOG_WINDOW }, (_, i) => `line-${i}`);
|
||||
const next = appendActivityLine(seed, "newest");
|
||||
expect(next.length).toBe(ACTIVITY_LOG_WINDOW);
|
||||
expect(next[next.length - 1]).toBe("newest");
|
||||
// Oldest entry is dropped — line-0 is gone.
|
||||
expect(next[0]).toBe("line-1");
|
||||
});
|
||||
|
||||
it("keeps the original array reference when below the window cap", () => {
|
||||
const prev = ["a", "b"];
|
||||
const next = appendActivityLine(prev, "c");
|
||||
// Returned a new array (we appended); must NOT mutate prev.
|
||||
expect(prev).toEqual(["a", "b"]);
|
||||
expect(next).toEqual(["a", "b", "c"]);
|
||||
});
|
||||
});
|
||||
23
canvas/src/components/tabs/chat/activityLog.ts
Normal file
23
canvas/src/components/tabs/chat/activityLog.ts
Normal file
@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Sliding-window log for the in-chat activity feed (the live progress
|
||||
* lines under the spinner while a chat reply is in flight).
|
||||
*
|
||||
* Sized to fit the spinner area without forcing a scroll; per-tool-use
|
||||
* rows from the workspace's _report_tool_use can fire dozens per turn
|
||||
* (Read 5 files + Grep + Bash + Edits + delegations), so a too-small
|
||||
* window flushes useful early context before the user can read it.
|
||||
*
|
||||
* Consecutive identical lines collapse to a single entry — the same
|
||||
* tool repeated on the same target (e.g. Read of the same file twice
|
||||
* within a turn) is noise, not new progress.
|
||||
*/
|
||||
export const ACTIVITY_LOG_WINDOW = 20;
|
||||
|
||||
export function appendActivityLine(prev: string[], line: string): string[] {
|
||||
if (prev[prev.length - 1] === line) return prev; // collapse duplicates
|
||||
const next =
|
||||
prev.length >= ACTIVITY_LOG_WINDOW
|
||||
? prev.slice(-(ACTIVITY_LOG_WINDOW - 1))
|
||||
: prev;
|
||||
return [...next, line];
|
||||
}
|
||||
@ -124,11 +124,11 @@ func isUpstreamBusyError(err error) bool {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
// applyIdleTimeout cancels the request ctx via context.WithCancel
|
||||
// when the broadcaster silence window elapses — that surfaces as
|
||||
// context.Canceled (not DeadlineExceeded). Treat it as the same
|
||||
// "upstream busy" class so the caller produces a 503 + Retry-After
|
||||
// instead of a generic 500.
|
||||
// applyIdleTimeout cancels via context.WithCancel when the
|
||||
// broadcaster silence window elapses — context.Canceled
|
||||
// propagates cleanly through errors.Is, no substring fallback
|
||||
// needed (and a substring on "context canceled" would also match
|
||||
// healthy client-side aborts which we don't want to label busy).
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return true
|
||||
}
|
||||
@ -137,10 +137,10 @@ func isUpstreamBusyError(err error) bool {
|
||||
}
|
||||
// url.Error wraps "read tcp … EOF" and "Post …: context deadline
|
||||
// exceeded" strings from the stdlib HTTP client without typing the
|
||||
// inner cause. Fall back to substring match for those.
|
||||
// inner cause. Fall back to substring match for those specific
|
||||
// shapes only.
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "context deadline exceeded") ||
|
||||
strings.Contains(msg, "context canceled") ||
|
||||
strings.Contains(msg, "EOF") ||
|
||||
strings.Contains(msg, "connection reset")
|
||||
}
|
||||
|
||||
@ -23,21 +23,40 @@
|
||||
-- itself failed via bundle import / runtime crash
|
||||
-- removed — soft-delete tombstone; the row stays so foreign-
|
||||
-- key references survive but no operations target it
|
||||
-- paused — operator-initiated suspend via workspace_restart's
|
||||
-- pause path (workspace_restart.go:406)
|
||||
-- hibernated — auto-suspended after idle threshold; container
|
||||
-- stopped but row preserved (workspace_restart.go:283,
|
||||
-- introduced by migration 029_workspace_hibernation)
|
||||
--
|
||||
-- Verified before writing this migration that production code in
|
||||
-- workspace-server/internal/{handlers,registry,bundle} writes only
|
||||
-- values from this list (test fixtures may write others; tests run
|
||||
-- against an isolated fixture DB so the cast doesn't affect them).
|
||||
-- Sweep of every `UPDATE workspaces SET status = 'X'` in the
|
||||
-- workspace-server/internal/ tree (excluding tests) verified the
|
||||
-- value set. Adding a new state in the future requires both updating
|
||||
-- this enum (a separate `ALTER TYPE … ADD VALUE` migration) AND any
|
||||
-- writers — the enum will reject unknown strings at insert/update
|
||||
-- time, which is the exact failure mode this migration is meant to
|
||||
-- give us.
|
||||
--
|
||||
-- Deployment: `ALTER TABLE … ALTER COLUMN TYPE` takes ACCESS
|
||||
-- EXCLUSIVE on workspaces. A long-running SELECT against the table
|
||||
-- will block the migration; the migration will then block every
|
||||
-- writer behind it. `SET lock_timeout` aborts the migration in 5s
|
||||
-- if it can't acquire the lock — preferable to stalling the whole
|
||||
-- workspace fleet behind one slow query.
|
||||
|
||||
BEGIN;
|
||||
|
||||
SET LOCAL lock_timeout = '5s';
|
||||
|
||||
CREATE TYPE workspace_status AS ENUM (
|
||||
'provisioning',
|
||||
'online',
|
||||
'offline',
|
||||
'degraded',
|
||||
'failed',
|
||||
'removed'
|
||||
'removed',
|
||||
'paused',
|
||||
'hibernated'
|
||||
);
|
||||
|
||||
-- The two-step ALTER (DROP DEFAULT then change type then SET DEFAULT)
|
||||
|
||||
@ -127,13 +127,29 @@ def _mark_sdk_wedged(reason: str) -> None:
|
||||
global _sdk_wedged_reason
|
||||
if _sdk_wedged_reason is None:
|
||||
_sdk_wedged_reason = reason
|
||||
logger.error("SDK wedge detected: %s — workspace will report degraded until restart", reason)
|
||||
logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason)
|
||||
|
||||
|
||||
def _clear_sdk_wedge_on_success() -> None:
|
||||
"""Auto-recovery — called from _run_query after a successful
|
||||
completion. The original wedge could be transient (a single network
|
||||
blip during the SDK's first-message handshake), and a sticky-only
|
||||
flag would lock the workspace into degraded forever even after the
|
||||
SDK started working again. Clearing on observed success means the
|
||||
next heartbeat after a working query reports `runtime_state` empty
|
||||
and the platform flips status back to online.
|
||||
|
||||
No-op when not wedged (the common case)."""
|
||||
global _sdk_wedged_reason
|
||||
if _sdk_wedged_reason is not None:
|
||||
logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat")
|
||||
_sdk_wedged_reason = None
|
||||
|
||||
|
||||
def _reset_sdk_wedge_for_test() -> None:
|
||||
"""Test-only escape hatch. Production code never resets the flag —
|
||||
wedge clears via process restart, which naturally re-imports this
|
||||
module with the flag at None."""
|
||||
"""Test-only escape hatch. Production code clears the wedge via
|
||||
`_clear_sdk_wedge_on_success` when a query succeeds; this helper
|
||||
is for unit tests that need to reset between cases."""
|
||||
global _sdk_wedged_reason
|
||||
_sdk_wedged_reason = None
|
||||
|
||||
@ -213,6 +229,12 @@ async def _report_tool_use(block: Any) -> None:
|
||||
json={
|
||||
"activity_type": "agent_log",
|
||||
"source_id": WORKSPACE_ID,
|
||||
# target_id == source for self-actions. Matches the
|
||||
# convention other self-logged activity rows use
|
||||
# (a2a_receive when the workspace logs its own
|
||||
# outbound reply) so DB consumers joining on
|
||||
# target_id see a well-defined value.
|
||||
"target_id": WORKSPACE_ID,
|
||||
"summary": summary,
|
||||
"status": "ok",
|
||||
"method": tool_name,
|
||||
@ -228,11 +250,15 @@ async def _report_tool_use(block: Any) -> None:
|
||||
# claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient
|
||||
# subprocess crash, etc.). Match is case-insensitive on the formatted
|
||||
# error string. Adding a new pattern here MUST come with a test in
|
||||
# tests/test_claude_sdk_executor.py — the flag is sticky and false-
|
||||
# positives lock the workspace into degraded for the whole process
|
||||
# lifetime.
|
||||
# tests/test_claude_sdk_executor.py — false-positives lock the
|
||||
# workspace into degraded until the next successful query clears it.
|
||||
#
|
||||
# `:initialize` suffix-anchored — the SDK can theoretically time out
|
||||
# on later control messages (in-flight tool callbacks), but those
|
||||
# don't leave the SDK in the unrecoverable post-init state we're
|
||||
# trying to detect. Limit the pattern to the specific wedge.
|
||||
_WEDGE_ERROR_PATTERNS = (
|
||||
"control request timeout",
|
||||
"control request timeout: initialize",
|
||||
)
|
||||
|
||||
|
||||
@ -510,6 +536,12 @@ class ClaudeSDKExecutor(AgentExecutor):
|
||||
finally:
|
||||
self._active_stream = None
|
||||
text = result_text if result_text is not None else "".join(assistant_chunks)
|
||||
# Auto-recover the wedge flag — if a previous query() left this
|
||||
# process in `_sdk_wedged` and THIS query just completed
|
||||
# cleanly, the SDK clearly works again. Clear so the next
|
||||
# heartbeat reports runtime_state empty and the platform flips
|
||||
# status degraded → online without a manual restart.
|
||||
_clear_sdk_wedge_on_success()
|
||||
return QueryResult(text=text, session_id=session_id)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@ -1319,3 +1319,38 @@ async def test_execute_does_not_mark_wedge_on_unrelated_error():
|
||||
assert _executor_mod.is_wedged() is False, "non-wedge error must not flip the flag"
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_clears_wedge_on_successful_query():
|
||||
"""Auto-recovery: a process that previously hit a wedge should be
|
||||
able to recover when the SDK starts working again. _run_query
|
||||
calls _clear_sdk_wedge_on_success at the end of a clean
|
||||
completion; the flag flips back to None and the next heartbeat
|
||||
reports runtime_state empty so the platform recovers status →
|
||||
online without forcing the user to restart the workspace."""
|
||||
# Pre-set the wedge as if a prior call had tripped it.
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
_executor_mod._mark_sdk_wedged("transient: Control request timeout: initialize")
|
||||
assert _executor_mod.is_wedged() is True
|
||||
|
||||
e = _make_executor()
|
||||
ctx = _make_context(["test prompt"])
|
||||
eq = _make_event_queue()
|
||||
|
||||
async def good_query(prompt, options):
|
||||
# Working SDK — yield one normal assistant message + result.
|
||||
yield _FakeAssistantMessage([_FakeTextBlock("hello back")])
|
||||
yield _FakeResultMessage(session_id="recovered-sess")
|
||||
|
||||
with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \
|
||||
patch("claude_sdk_executor.read_delegation_results", return_value=""), \
|
||||
patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \
|
||||
patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \
|
||||
patch("claude_agent_sdk.query", new=good_query):
|
||||
try:
|
||||
await e.execute(ctx, eq)
|
||||
assert _executor_mod.is_wedged() is False, "wedge flag must clear after a successful query"
|
||||
assert _executor_mod.wedge_reason() == ""
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user