Compare commits

...

6 Commits

Author SHA1 Message Date
core-fe 43661d89dc fix(canvas): set role field on USER_MESSAGE entries so bubbles render correctly
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
gate-check-v3 / gate-check (pull_request) Successful in 5s
security-review / approved (pull_request) Successful in 5s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-tier-check / tier-check (pull_request) Successful in 6s
qa-review / approved (pull_request) Successful in 11s
sop-checklist / all-items-acked (pull_request) Successful in 10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 56s
audit-force-merge / audit (pull_request) Successful in 5s
USER_MESSAGE events fanned to other sessions were inserting into
agentMessages without a role field. consumeAgentMessages returns raw
objects, and both useChatSocket.ts and MobileChat.tsx unconditionally
passed role="agent" to createMessage(). ChatTab.tsx uses
msg.role === "user" for right-side alignment and user-toned styling,
so user messages were rendering as agent bubbles — a double-bubble UX
bug.

Fix:
- Add role?: "user"|"agent" to the agentMessages stored type
- Set role:"user" in the USER_MESSAGE handler
- Use m.role ?? "agent" in useChatSocket.ts and MobileChat.tsx
- Assert role:"user" in existing USER_MESSAGE test cases

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 15:56:36 +00:00
fullstack-engineer 8862a8ef06 fix(both): fan user's own message to all conversation sessions (#1440)
CI / Detect changes (pull_request) Successful in 7s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 14s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 13s
E2E API Smoke Test / detect-changes (pull_request) Successful in 15s
E2E Chat / detect-changes (pull_request) Successful in 11s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 5s
Harness Replays / detect-changes (pull_request) Successful in 7s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 11s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
gate-check-v3 / gate-check (pull_request) Successful in 10s
qa-review / approved (pull_request) Successful in 7s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 41s
security-review / approved (pull_request) Successful in 7s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 5s
sop-tier-check / tier-check (pull_request) Successful in 8s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 24s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 5s
Harness Replays / Harness Replays (pull_request) Successful in 10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 23s
CI / Canvas (Next.js) (pull_request) Successful in 4m16s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Failing after 4m25s
CI / all-required (pull_request) Failing after 4m27s
E2E Chat / E2E Chat (pull_request) Failing after 5m2s
CI / Python Lint & Test (pull_request) Successful in 6m41s
Cross-session fan-out of the user's own outbound message so other
web sessions viewing the same conversation see it in real time — not
just the agent reply.

Backend (Go):
- Add EventUserMessage ("USER_MESSAGE") to events/types.go taxonomy
- Add extractUserMessagePayload() helper in a2a_proxy_helpers.go:
  parses JSON-RPC message/send body, extracts text from parts[] and
  file attachments (kind:file), returns nil for non-user-role or
  non-message/send methods (e.g. heartbeat pings with role:agent)
- logA2ASuccess now also calls BroadcastOnly(workspaceID, USER_MESSAGE)
  when callerID=="" (canvas-initiated) and the request succeeded.
- 8 new unit tests for extractUserMessagePayload.

Canvas (TypeScript):
- Create canvas/src/lib/ws-events.ts as the canonical canvas-side mirror
  of the Go event taxonomy. All 32 constants + union type.
- canvas-events.ts: add USER_MESSAGE case → appends to agentMessages
  for the workspace (mirrors AGENT_MESSAGE/A2A_RESPONSE path; ChatTab
  renders via the existing appendMessageDeduped dedup mechanism so the
  originating session collapses its optimistic copy — no double bubble).
- 7 new unit tests for the USER_MESSAGE case in canvas-events.test.ts.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 15:28:45 +00:00
fullstack-engineer e84bf3a4c6 test(handlers+canvas): BroadcastHandler sqlmock suite + extractAgentText tests (#1475)
Block internal-flavored paths / Block forbidden paths (push) Successful in 6s
CI / Detect changes (push) Successful in 12s
E2E API Smoke Test / detect-changes (push) Successful in 10s
CI / Shellcheck (E2E scripts) (push) Successful in 10s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
E2E Chat / detect-changes (push) Successful in 7s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 5s
Harness Replays / Harness Replays (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Failing after 32s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Failing after 40s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 1m18s
CI / Platform (Go) (push) Successful in 3m9s
CI / Canvas (Next.js) (push) Successful in 4m37s
CI / Canvas Deploy Reminder (push) Successful in 1s
E2E Chat / E2E Chat (push) Failing after 5m1s
CI / Python Lint & Test (push) Successful in 6m51s
CI / all-required (push) Successful in 6m51s
Co-authored-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
Co-committed-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
2026-05-18 07:30:33 +00:00
core-qa 376f78278d fix(ci): increase Go test timeouts for cold runner performance (#1175)
CI / Canvas Deploy Reminder (push) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
E2E Chat / E2E Chat (push) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
CI / Detect changes (push) Failing after 1s
Block internal-flavored paths / Block forbidden paths (push) Successful in 5s
CI / all-required (push) Failing after 2s
CI / Platform (Go) (push) Has been cancelled
CI / Shellcheck (E2E scripts) (push) Has been cancelled
CI / Canvas (Next.js) (push) Has been cancelled
CI / Python Lint & Test (push) Has been cancelled
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
E2E API Smoke Test / detect-changes (push) Has been cancelled
Runtime PR-Built Compatibility / detect-changes (push) Has been cancelled
Handlers Postgres Integration / detect-changes (push) Has been cancelled
Secret scan / Scan diff for credential-shaped strings (push) Has been cancelled
E2E Chat / detect-changes (push) Has been cancelled
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 49s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m5s
Co-authored-by: Molecule AI Core-QA <core-qa@agents.moleculesai.app>
Co-committed-by: Molecule AI Core-QA <core-qa@agents.moleculesai.app>
2026-05-18 07:30:24 +00:00
fullstack-engineer 3d0d9b1818 test(handlers): add Uninstall 503 coverage for plugins_install.go (closes #1377) (#1378)
Block internal-flavored paths / Block forbidden paths (push) Successful in 5s
CI / Detect changes (push) Successful in 12s
CI / Shellcheck (E2E scripts) (push) Successful in 15s
E2E API Smoke Test / detect-changes (push) Successful in 11s
Harness Replays / detect-changes (push) Successful in 7s
E2E Chat / detect-changes (push) Successful in 10s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 5s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 7s
Handlers Postgres Integration / detect-changes (push) Successful in 14s
Harness Replays / Harness Replays (push) Successful in 2s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Failing after 1m22s
E2E API Smoke Test / E2E API Smoke Test (push) Failing after 2m9s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 2m40s
CI / Platform (Go) (push) Successful in 3m45s
CI / Canvas (Next.js) (push) Successful in 5m23s
CI / Canvas Deploy Reminder (push) Successful in 2s
E2E Chat / E2E Chat (push) Failing after 6m14s
CI / Python Lint & Test (push) Successful in 7m7s
CI / all-required (push) Successful in 7m11s
Co-authored-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
Co-committed-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
2026-05-18 06:51:21 +00:00
fullstack-engineer 1c61db9042 test: PatchAbilities handler + resolveWorkspaceName coverage (#1481)
CI / Shellcheck (E2E scripts) (push) Waiting to run
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Waiting to run
CI / all-required (push) Waiting to run
E2E API Smoke Test / detect-changes (push) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
Runtime PR-Built Compatibility / detect-changes (push) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (push) Waiting to run
CI / Platform (Go) (push) Has been cancelled
Block internal-flavored paths / Block forbidden paths (push) Has been cancelled
CI / Canvas (Next.js) (push) Has been cancelled
Handlers Postgres Integration / detect-changes (push) Has been cancelled
CI / Detect changes (push) Has been cancelled
Harness Replays / detect-changes (push) Successful in 7s
E2E Chat / detect-changes (push) Successful in 11s
Harness Replays / Harness Replays (push) Successful in 2s
E2E Chat / E2E Chat (push) Failing after 6m10s
Co-authored-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
Co-committed-by: Molecule AI Fullstack Engineer <fullstack-engineer@agents.moleculesai.app>
2026-05-18 06:51:20 +00:00
17 changed files with 1194 additions and 54 deletions
+12 -10
View File
@@ -145,10 +145,10 @@ jobs:
# the diagnostic step with its own continue-on-error: true (line 203).
# Flip confirmed by CI / Platform (Go) status = success on main HEAD 363905d3.
continue-on-error: false
# Job-level ceiling. The go test step below runs with a per-step 10m timeout;
# this cap catches any step that leaks past that. Set well above 10m so
# Job-level ceiling. The go test step below runs with a per-step 30m timeout;
# this cap catches any step that leaks past that. Set well above 30m so
# the per-step timeout is the active constraint.
timeout-minutes: 15
timeout-minutes: 35
defaults:
run:
working-directory: workspace-server
@@ -176,12 +176,14 @@ jobs:
name: Run golangci-lint
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
- if: always()
name: Diagnostic — per-package verbose 60s
name: Diagnostic — per-package verbose (300s timeout)
run: |
set +e
go test -race -v -timeout 60s ./internal/handlers/... 2>&1 | tee /tmp/test-handlers.log
# 300s allows handlers + pendinguploads packages to complete on cold
# runners with -race instrumentation (~60-120s each vs ~14s non-race).
go test -race -v -timeout 300s ./internal/handlers/... 2>&1 | tee /tmp/test-handlers.log
handlers_exit=$?
go test -race -v -timeout 60s ./internal/pendinguploads/... 2>&1 | tee /tmp/test-pu.log
go test -race -v -timeout 300s ./internal/pendinguploads/... 2>&1 | tee /tmp/test-pu.log
pu_exit=$?
echo "::group::handlers exit=$handlers_exit (last 100 lines)"
tail -100 /tmp/test-handlers.log
@@ -194,10 +196,10 @@ jobs:
- if: always()
name: Run tests with race detection and coverage
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
# full ./... suite with race detection + coverage. A 10m per-step timeout
# lets the suite complete on cold cache (~5-7m) while failing cleanly
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
# full ./... suite with race detection + coverage. A 30m per-step timeout
# lets the suite complete on cold cache (~13-25m) while failing cleanly
# instead of OOM-killing. The job-level timeout (35m) is a backstop.
run: go test -race -timeout 30m -coverprofile=coverage.out ./...
- if: always()
name: Per-file coverage report
+1 -1
View File
@@ -271,7 +271,7 @@ export function MobileChat({
const msgs = consume(agentId);
for (const m of msgs) {
appendMessageDeduped(
createMessage("agent", m.content, m.attachments),
createMessage(m.role ?? "agent", m.content, m.attachments),
);
}
}, [historyLoading, agentId, appendMessageDeduped]);
@@ -248,6 +248,88 @@ describe("extractResponseText", () => {
});
});
describe("extractAgentText", () => {
it("extracts text from top-level parts", () => {
const task = {
parts: [{ kind: "text", text: "Agent said hello" }],
};
expect(extractAgentText(task)).toBe("Agent said hello");
});
it("extracts from artifacts[0].parts when top-level parts absent", () => {
const task = {
artifacts: [
{ parts: [{ kind: "text", text: "From artifact block" }] },
],
};
expect(extractAgentText(task)).toBe("From artifact block");
});
it("extracts from status.message.parts as fallback", () => {
const task = {
status: {
message: { parts: [{ kind: "text", text: "Status text" }] },
},
};
expect(extractAgentText(task)).toBe("Status text");
});
it("prefers top-level parts over artifacts", () => {
const task = {
parts: [{ kind: "text", text: "top-level wins" }],
artifacts: [
{ parts: [{ kind: "text", text: "artifact text" }] },
],
};
expect(extractAgentText(task)).toBe("top-level wins");
});
it("prefers top-level parts over status.message", () => {
const task = {
parts: [{ kind: "text", text: "parts wins" }],
status: {
message: { parts: [{ kind: "text", text: "status text" }] },
},
};
expect(extractAgentText(task)).toBe("parts wins");
});
it("returns string identity when task itself is a string", () => {
expect(extractAgentText("plain string task" as unknown as Record<string, unknown>)).toBe(
"plain string task",
);
});
it("returns fallback when task is an empty object", () => {
expect(extractAgentText({})).toBe("(Could not extract response text)");
});
it("returns fallback when task has no extractable text", () => {
expect(
extractAgentText({ status: "running", other: "fields" }),
).toBe("(Could not extract response text)");
});
it("tolerates malformed nested shapes without throwing", () => {
const task = {
parts: null,
artifacts: "not an array",
status: { message: 42 },
};
expect(extractAgentText(task)).toBe("(Could not extract response text)");
});
it("joins multiple text parts with newline", () => {
const task = {
parts: [
{ kind: "text", text: "Line one" },
{ kind: "text", text: "Line two" },
],
};
expect(extractAgentText(task)).toBe("Line one\nLine two");
});
});
describe("extractTextsFromParts", () => {
it("extracts text parts with kind=text", () => {
const parts = [
@@ -0,0 +1,102 @@
import { describe, it, expect, beforeEach } from "vitest";
import { useCanvasStore } from "@/store/canvas";
import { resolveWorkspaceName } from "../hooks/resolveWorkspaceName";
beforeEach(() => {
// Reset store to a clean slate between tests so node lookup is deterministic.
useCanvasStore.setState({ nodes: [] });
});
describe("resolveWorkspaceName", () => {
it("returns the workspace name when a node with that ID exists", () => {
useCanvasStore.setState({
nodes: [
{
id: "ws-alpha-001",
type: "workspace",
data: { name: "Alpha Agent" },
position: { x: 0, y: 0 },
},
],
});
expect(resolveWorkspaceName("ws-alpha-001")).toBe("Alpha Agent");
});
it("falls back to the first 8 chars of the ID when no matching node exists", () => {
expect(resolveWorkspaceName("ws-zzz-not-found")).toBe("ws-zzz-n");
});
it("falls back to the first 8 chars when the node exists but has no name", () => {
useCanvasStore.setState({
nodes: [
{
id: "ws-no-name",
type: "workspace",
// data.name is deliberately absent
data: {},
position: { x: 0, y: 0 },
},
],
});
expect(resolveWorkspaceName("ws-no-name")).toBe("ws-no-na");
});
it("returns the first 8 chars for a very short ID", () => {
expect(resolveWorkspaceName("ab")).toBe("ab");
});
it("returns the first 8 chars when the ID is exactly 8 characters", () => {
// slice(0,8) of an 8-char string is the full string
const id = "12345678";
expect(resolveWorkspaceName(id)).toBe(id);
});
it("picks the right node when multiple workspaces share a prefix", () => {
useCanvasStore.setState({
nodes: [
{
id: "00000000-0000-0000-0000-000000000001",
type: "workspace",
data: { name: "Backend Agent" },
position: { x: 0, y: 0 },
},
{
id: "00000000-0000-0000-0000-000000000002",
type: "workspace",
data: { name: "Frontend Agent" },
position: { x: 100, y: 0 },
},
],
});
expect(resolveWorkspaceName("00000000-0000-0000-0000-000000000002")).toBe(
"Frontend Agent"
);
expect(resolveWorkspaceName("00000000-0000-0000-0000-000000000001")).toBe(
"Backend Agent"
);
});
it("does not mutate store state between calls", () => {
useCanvasStore.setState({
nodes: [
{
id: "stable-id",
type: "workspace",
data: { name: "Stable Workspace" },
position: { x: 0, y: 0 },
},
],
});
resolveWorkspaceName("stable-id");
resolveWorkspaceName("unknown-id");
// Store nodes must be unchanged — resolveWorkspaceName is read-only.
const nodes = useCanvasStore.getState().nodes;
expect(nodes).toHaveLength(1);
expect((nodes[0] as { id: string }).id).toBe("stable-id");
});
});
@@ -27,7 +27,7 @@ export function useChatSocket(
const msgs = consume(workspaceId);
for (const m of msgs) {
callbacksRef.current.onAgentMessage?.(
createMessage("agent", m.content, m.attachments),
createMessage(m.role ?? "agent", m.content, m.attachments),
);
}
if (msgs.length > 0) {
+130
View File
@@ -0,0 +1,130 @@
/**
* ws-events.ts — canonical WebSocket event taxonomy shared between
* the Go platform and the TypeScript canvas.
*
* Source of truth: `workspace-server/internal/events/types.go` (Go side).
* This file is the canvas mirror — every constant in that file MUST appear
* here. The go vet / build pipeline does NOT enforce this today; the
* discipline is manual: when adding a new EventType in Go, mirror it here.
*
* Consumer usage:
* import { WS_EVENTS } from "@/lib/ws-events";
* switch (msg.event) {
* case WS_EVENTS.AGENT_MESSAGE: ...
* case WS_EVENTS.USER_MESSAGE: ...
* }
*
* Wire format: the string literal is sent over the WebSocket as
* `WSMessage.Event`. Do NOT change these values.
*/
// Chat / agent messaging.
export const WS_AGENT_MESSAGE = "AGENT_MESSAGE";
export const WS_A2A_RESPONSE = "A2A_RESPONSE";
export const WS_USER_MESSAGE = "USER_MESSAGE";
export const WS_ACTIVITY_LOGGED = "ACTIVITY_LOGGED";
export const WS_CHANNEL_MESSAGE = "CHANNEL_MESSAGE";
// Workspace lifecycle.
export const WS_WORKSPACE_PROVISIONING = "WORKSPACE_PROVISIONING";
export const WS_WORKSPACE_PROVISION_FAILED = "WORKSPACE_PROVISION_FAILED";
export const WS_WORKSPACE_ONLINE = "WORKSPACE_ONLINE";
export const WS_WORKSPACE_OFFLINE = "WORKSPACE_OFFLINE";
export const WS_WORKSPACE_DEGRADED = "WORKSPACE_DEGRADED";
export const WS_WORKSPACE_HIBERNATED = "WORKSPACE_HIBERNATED";
export const WS_WORKSPACE_PAUSED = "WORKSPACE_PAUSED";
export const WS_WORKSPACE_REMOVED = "WORKSPACE_REMOVED";
export const WS_WORKSPACE_AWAITING_AGENT = "WORKSPACE_AWAITING_AGENT";
export const WS_WORKSPACE_HEARTBEAT = "WORKSPACE_HEARTBEAT";
// Agent assignment + identity.
export const WS_AGENT_ASSIGNED = "AGENT_ASSIGNED";
export const WS_AGENT_REPLACED = "AGENT_REPLACED";
export const WS_AGENT_REMOVED = "AGENT_REMOVED";
export const WS_AGENT_MOVED = "AGENT_MOVED";
export const WS_AGENT_CARD_UPDATED = "AGENT_CARD_UPDATED";
// Delegation lifecycle.
export const WS_DELEGATION_SENT = "DELEGATION_SENT";
export const WS_DELEGATION_STATUS = "DELEGATION_STATUS";
export const WS_DELEGATION_COMPLETE = "DELEGATION_COMPLETE";
export const WS_DELEGATION_FAILED = "DELEGATION_FAILED";
// Task progression + scheduler.
export const WS_TASK_UPDATED = "TASK_UPDATED";
export const WS_CRON_EXECUTED = "CRON_EXECUTED";
export const WS_CRON_SKIPPED = "CRON_SKIPPED";
// Approvals.
export const WS_APPROVAL_REQUESTED = "APPROVAL_REQUESTED";
export const WS_APPROVAL_ESCALATED = "APPROVAL_ESCALATED";
// Auth / credentials.
export const WS_EXTERNAL_CREDENTIALS_ROTATED = "EXTERNAL_CREDENTIALS_ROTATED";
/** Union type of all known event names. Used to type WSMessage.Event. */
export type WSEventName =
| typeof WS_AGENT_MESSAGE
| typeof WS_A2A_RESPONSE
| typeof WS_USER_MESSAGE
| typeof WS_ACTIVITY_LOGGED
| typeof WS_CHANNEL_MESSAGE
| typeof WS_WORKSPACE_PROVISIONING
| typeof WS_WORKSPACE_PROVISION_FAILED
| typeof WS_WORKSPACE_ONLINE
| typeof WS_WORKSPACE_OFFLINE
| typeof WS_WORKSPACE_DEGRADED
| typeof WS_WORKSPACE_HIBERNATED
| typeof WS_WORKSPACE_PAUSED
| typeof WS_WORKSPACE_REMOVED
| typeof WS_WORKSPACE_AWAITING_AGENT
| typeof WS_WORKSPACE_HEARTBEAT
| typeof WS_AGENT_ASSIGNED
| typeof WS_AGENT_REPLACED
| typeof WS_AGENT_REMOVED
| typeof WS_AGENT_MOVED
| typeof WS_AGENT_CARD_UPDATED
| typeof WS_DELEGATION_SENT
| typeof WS_DELEGATION_STATUS
| typeof WS_DELEGATION_COMPLETE
| typeof WS_DELEGATION_FAILED
| typeof WS_TASK_UPDATED
| typeof WS_CRON_EXECUTED
| typeof WS_CRON_SKIPPED
| typeof WS_APPROVAL_REQUESTED
| typeof WS_APPROVAL_ESCALATED
| typeof WS_EXTERNAL_CREDENTIALS_ROTATED;
/** All event name constants, for exhaustive-switch linting. */
export const WS_EVENTS = {
AGENT_MESSAGE: WS_AGENT_MESSAGE,
A2A_RESPONSE: WS_A2A_RESPONSE,
USER_MESSAGE: WS_USER_MESSAGE,
ACTIVITY_LOGGED: WS_ACTIVITY_LOGGED,
CHANNEL_MESSAGE: WS_CHANNEL_MESSAGE,
WORKSPACE_PROVISIONING: WS_WORKSPACE_PROVISIONING,
WORKSPACE_PROVISION_FAILED: WS_WORKSPACE_PROVISION_FAILED,
WORKSPACE_ONLINE: WS_WORKSPACE_ONLINE,
WORKSPACE_OFFLINE: WS_WORKSPACE_OFFLINE,
WORKSPACE_DEGRADED: WS_WORKSPACE_DEGRADED,
WORKSPACE_HIBERNATED: WS_WORKSPACE_HIBERNATED,
WORKSPACE_PAUSED: WS_WORKSPACE_PAUSED,
WORKSPACE_REMOVED: WS_WORKSPACE_REMOVED,
WORKSPACE_AWAITING_AGENT: WS_WORKSPACE_AWAITING_AGENT,
WORKSPACE_HEARTBEAT: WS_WORKSPACE_HEARTBEAT,
AGENT_ASSIGNED: WS_AGENT_ASSIGNED,
AGENT_REPLACED: WS_AGENT_REPLACED,
AGENT_REMOVED: WS_AGENT_REMOVED,
AGENT_MOVED: WS_AGENT_MOVED,
AGENT_CARD_UPDATED: WS_AGENT_CARD_UPDATED,
DELEGATION_SENT: WS_DELEGATION_SENT,
DELEGATION_STATUS: WS_DELEGATION_STATUS,
DELEGATION_COMPLETE: WS_DELEGATION_COMPLETE,
DELEGATION_FAILED: WS_DELEGATION_FAILED,
TASK_UPDATED: WS_TASK_UPDATED,
CRON_EXECUTED: WS_CRON_EXECUTED,
CRON_SKIPPED: WS_CRON_SKIPPED,
APPROVAL_REQUESTED: WS_APPROVAL_REQUESTED,
APPROVAL_ESCALATED: WS_APPROVAL_ESCALATED,
EXTERNAL_CREDENTIALS_ROTATED: WS_EXTERNAL_CREDENTIALS_ROTATED,
} as const;
@@ -808,6 +808,210 @@ describe("handleCanvasEvent A2A_RESPONSE", () => {
});
});
// ---------------------------------------------------------------------------
// USER_MESSAGE (cross-session fan-out of user's own outbound message)
// ---------------------------------------------------------------------------
// #1440: user's own message is optimistically inserted in the originating
// session by useChatSend; other sessions need this event to render it.
describe("handleCanvasEvent USER_MESSAGE", () => {
it("appends a user message to agentMessages for the workspace", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-abc",
content: "Hello, agent!",
},
}),
get,
set
);
expect(set).toHaveBeenCalledOnce();
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].id).toBe("msg-abc");
expect(agentMessages["ws-1"][0].content).toBe("Hello, agent!");
expect(agentMessages["ws-1"][0].role).toBe("user");
expect(typeof agentMessages["ws-1"][0].timestamp).toBe("string");
});
it("appends to existing messages rather than replacing them", () => {
const node = makeNode("ws-1");
const existing = [{ id: "old", content: "prior msg", timestamp: "2024-01-01T00:00:00Z" }];
const { get, set } = makeStore([node], [], null, { "ws-1": existing });
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-xyz",
content: "second user message",
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; content: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(2);
expect(agentMessages["ws-1"][0].content).toBe("prior msg");
expect(agentMessages["ws-1"][1].content).toBe("second user message");
expect(agentMessages["ws-1"][1].id).toBe("msg-xyz");
});
it("is a no-op when both content and files are absent", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node]);
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {},
}),
get,
set
);
expect(set).not.toHaveBeenCalled();
});
it("is a no-op when content is empty string", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node]);
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { content: "" },
}),
get,
set
);
expect(set).not.toHaveBeenCalled();
});
it("passes through valid file attachments", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
const att = {
uri: "workspace:/uploads/doc.pdf",
name: "doc.pdf",
mimeType: "application/pdf",
size: 98765,
};
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-with-file",
content: "see attached",
files: [att],
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; attachments?: Array<{ uri: string; name: string; mimeType?: string; size?: number }> }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].id).toBe("msg-with-file");
expect(agentMessages["ws-1"][0].role).toBe("user");
expect(agentMessages["ws-1"][0].attachments).toEqual([att]);
});
it("drops file entries with missing or empty uri/name", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
// Two bad entries followed by one good one
const bad = [{ uri: "" }, { name: "" }, { uri: "x", name: "y" }];
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-bad-files",
content: "check files",
files: bad as Array<{ uri?: unknown; name?: unknown }>,
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ attachments?: Array<{ uri: string }> }>>;
};
// Only the valid entry should survive the filter.
expect(agentMessages["ws-1"][0].attachments).toHaveLength(1);
expect(agentMessages["ws-1"][0].attachments![0].uri).toBe("x");
});
it("uses crypto.randomUUID() when messageId is absent", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { content: "no id field" },
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string }>>;
};
expect(agentMessages["ws-1"][0].id).toBeDefined();
expect(typeof agentMessages["ws-1"][0].id).toBe("string");
});
it("renders a files-only message (no text) when content is absent but files present", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "file-only",
files: [{ uri: "workspace:/x.pdf", name: "x.pdf" }],
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ content: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].content).toBe("");
});
});
// ---------------------------------------------------------------------------
// Unknown event
// ---------------------------------------------------------------------------
+61 -1
View File
@@ -71,7 +71,7 @@ export function handleCanvasEvent(
nodes: Node<WorkspaceNodeData>[];
edges: Edge[];
selectedNodeId: string | null;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
},
set: (partial: Record<string, unknown>) => void,
): void {
@@ -515,6 +515,66 @@ export function handleCanvasEvent(
break;
}
// #1440 USER_MESSAGE: the canvas optimistically inserts the user's
// own message into the originating session's store before the request
// fires (useChatSend → onUserMessage). Other sessions never saw it —
// the agent reply was broadcast but not the user's own text. This event
// fans the user's message to every OTHER session so they render the
// bubble without requiring a manual refresh. The originating session
// collapses its optimistic copy via the dedup mechanism in ChatTab
// (role+content+timestamp window) — no double bubble.
//
// Payload shape mirrors EventUserMessage in Go
// a2a_proxy_helpers.go: userMessagePayload:
// messageId: string
// content: string
// files: ParsedFilePart[] (name, uri, mimeType, size)
case "USER_MESSAGE": {
const payload = msg.payload as {
messageId?: string;
content?: string;
files?: Array<{ name?: unknown; uri?: unknown; mimeType?: unknown; size?: unknown }>;
};
const content = typeof payload?.content === "string" ? payload.content : "";
const files: Array<{ name: string; uri: string; mimeType?: string; size?: number }> = [];
if (Array.isArray(payload?.files)) {
for (const f of payload.files) {
if (typeof f?.uri === "string" && typeof f?.name === "string") {
files.push({
name: f.name,
uri: f.uri,
mimeType: typeof f.mimeType === "string" ? f.mimeType : undefined,
size: typeof f.size === "number" ? f.size : undefined,
});
}
}
}
// Render only when there's something visible.
if (content || files.length > 0) {
// Insert into agentMessages for rendering as a user-bubble.
// ChatTab uses msg.role === "user" for right-side alignment and
// user-toned styling, so we must set role:"user" explicitly.
const { agentMessages } = get();
const existing = agentMessages[msg.workspace_id] || [];
set({
agentMessages: {
...agentMessages,
[msg.workspace_id]: [
...existing,
{
id: payload?.messageId ?? crypto.randomUUID(),
content,
role: "user",
timestamp: new Date().toISOString(),
...(files.length > 0 ? { attachments: files } : {}),
},
],
},
});
}
break;
}
default:
break;
}
+2 -2
View File
@@ -224,8 +224,8 @@ interface CanvasState {
batchPause: () => Promise<void>;
batchDelete: () => Promise<void>;
/** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
/** WebSocket connection status — drives the live indicator in the Toolbar. */
wsStatus: "connected" | "connecting" | "disconnected";
setWsStatus: (status: "connected" | "connecting" | "disconnected") => void;
+4 -2
View File
@@ -41,8 +41,9 @@ type EventType string
// scan-friendly as it grows.
const (
// Chat / agent messaging — surfaces in canvas chat panels.
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventUserMessage EventType = "USER_MESSAGE"
EventActivityLogged EventType = "ACTIVITY_LOGGED"
EventChannelMessage EventType = "CHANNEL_MESSAGE"
@@ -104,6 +105,7 @@ var AllEventTypes = []EventType{
EventApprovalEscalated,
EventApprovalRequested,
EventChannelMessage,
EventUserMessage,
EventCronExecuted,
EventCronSkipped,
EventDelegationComplete,
@@ -41,6 +41,7 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
"DELEGATION_STATUS",
"EXTERNAL_CREDENTIALS_ROTATED",
"TASK_UPDATED",
"USER_MESSAGE",
"WORKSPACE_AWAITING_AGENT",
"WORKSPACE_DEGRADED",
"WORKSPACE_HEARTBEAT",
@@ -344,6 +344,106 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
"duration_ms": durationMs,
})
}
// #1440: fan user's own outbound message to all sessions of this workspace.
// When a canvas user sends a message, the originating session renders it
// optimistically (useChatSend inserts it into the local store immediately).
// Other sessions never saw it — only the agent reply was broadcast. The fix:
// parse the JSON-RPC body and broadcast a USER_MESSAGE event so every
// other session renders the user's bubble without a manual refresh.
// Skipped for non-2xx responses (no message was delivered) and non-canvas
// callers (workspace-to-workspace calls have their own fan-out via the
// channel system).
if callerID == "" && statusCode < 400 {
if payload := extractUserMessagePayload(body, a2aMethod); payload != nil {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), payload)
}
}
}
// userMessagePayload carries the fields the canvas needs to render a user
// bubble from a cross-session fan-out event.
type userMessagePayload struct {
MessageID string `json:"messageId,omitempty"`
Content string `json:"content,omitempty"`
Files []userMessageFile `json:"files,omitempty"`
}
// userMessageFile mirrors the shape canvas/src/components/tabs/chat/message-parser.ts
// ParsedFilePart so the canvas can render the same download chip.
type userMessageFile struct {
Name string `json:"name"`
URI string `json:"uri"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
}
// extractUserMessagePayload parses a JSON-RPC message/send body and extracts the
// user-visible fields (text content + file attachments) for fan-out broadcasting.
// Returns nil when the body is not a canvas message/send request — either the
// method is not message/send, or the message does not have role=user (e.g. a
// heartbeat ping sent by the canvas for keepalive, which carries role:agent).
// Safe to call on any body — malformed JSON returns nil with no side effects.
func extractUserMessagePayload(body []byte, method string) map[string]interface{} {
if method != "message/send" {
return nil
}
var rpc struct {
ID string `json:"id,omitempty"`
Params struct {
Message struct {
Role string `json:"role,omitempty"`
MessageID string `json:"messageId,omitempty"`
Parts []struct {
Kind string `json:"kind,omitempty"`
Text string `json:"text,omitempty"`
File *struct {
Name string `json:"name,omitempty"`
URI string `json:"uri,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
} `json:"file,omitempty"`
} `json:"parts,omitempty"`
} `json:"message,omitempty"`
} `json:"params,omitempty"`
}
if err := json.Unmarshal(body, &rpc); err != nil {
return nil
}
// Only broadcast user-role messages. The canvas also sends internal
// heartbeat pings via message/send (role:agent) that should not appear
// as user bubbles.
if rpc.Params.Message.Role != "user" {
return nil
}
var content string
var files []userMessageFile
for _, part := range rpc.Params.Message.Parts {
if part.Kind == "text" && part.Text != "" {
if content != "" {
content += "\n"
}
content += part.Text
} else if part.Kind == "file" && part.File != nil {
files = append(files, userMessageFile{
Name: part.File.Name,
URI: part.File.URI,
MimeType: part.File.MimeType,
Size: part.File.Size,
})
}
}
// Skip pure-noise events (e.g. empty send with no text and no files).
// The canvas optimistically inserts the bubble before the request; an
// empty broadcast would create an extra blank bubble in other sessions.
if content == "" && len(files) == 0 {
return nil
}
return map[string]interface{}{
"messageId": rpc.Params.Message.MessageID,
"content": content,
"files": files,
}
}
func nilIfEmpty(s string) *string {
@@ -2024,6 +2024,101 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
time.Sleep(80 * time.Millisecond)
}
// ──────────────────────────────────────────────────────────────────────────────
// extractUserMessagePayload fan-out of user's own outbound message (#1440)
// ──────────────────────────────────────────────────────────────────────────────
func TestExtractUserMessagePayload_BasicText(t *testing.T) {
body := []byte(`{"id":"mid-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"hello world"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["messageId"] != "msg-1" {
t.Errorf("messageId: got %v", got["messageId"])
}
if got["content"] != "hello world" {
t.Errorf("content: got %v", got["content"])
}
}
func TestExtractUserMessagePayload_MultipleTextParts(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"line one"},{"kind":"text","text":"line two"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["content"] != "line one\nline two" {
t.Errorf("content: got %v", got["content"])
}
}
func TestExtractUserMessagePayload_WithFile(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"check this"},{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf","mimeType":"application/pdf","size":12345}}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["content"] != "check this" {
t.Errorf("content: got %v", got["content"])
}
files, ok := got["files"].([]userMessageFile)
if !ok {
t.Fatalf("files type: got %T", got["files"])
}
if len(files) != 1 {
t.Errorf("files count: got %d", len(files))
}
if files[0].Name != "report.pdf" {
t.Errorf("file name: got %s", files[0].Name)
}
if files[0].URI != "workspace:/uploads/report.pdf" {
t.Errorf("file uri: got %s", files[0].URI)
}
}
func TestExtractUserMessagePayload_EmptyParts(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for empty parts, got %v", got)
}
}
func TestExtractUserMessagePayload_NonMessageSendMethod(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"hello"}]}}}`)
got := extractUserMessagePayload(body, "initialize")
if got != nil {
t.Errorf("expected nil for non-message/send, got %v", got)
}
}
func TestExtractUserMessagePayload_NonUserRole(t *testing.T) {
// Canvas also sends heartbeat pings via message/send with role:agent;
// these must NOT appear as user bubbles.
body := []byte(`{"params":{"message":{"role":"agent","parts":[{"kind":"text","text":"ping"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for role=agent, got %v", got)
}
}
func TestExtractUserMessagePayload_MalformedJSON(t *testing.T) {
body := []byte(`not json at all`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for malformed JSON, got %v", got)
}
}
func TestExtractUserMessagePayload_MissingPartsField(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user"}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for missing parts, got %v", got)
}
}
// ──────────────────────────────────────────────────────────────────────────────
// A2A auto-wake: hibernated workspace (#711)
// ──────────────────────────────────────────────────────────────────────────────
@@ -0,0 +1,53 @@
package handlers
// plugins_install_test.go — additional coverage for plugins_install.go.
//
// Gaps filled vs. existing test files:
// - plugins_install_external_test.go: Install + Uninstall 422 (external runtime) ✓ covered
// - plugins_test.go: Install 400 (missing source, invalid body, etc.) ✓ covered
// Uninstall 400 (invalid plugin name, empty name) ✓ covered
// Download auth gate ✓ covered
// - org_import_helpers_test.go: countWorkspaces, envRequirementKey, sanitizeEnvMembers,
// flattenAndSortRequirements, collectOrgEnv ✓ covered
//
// New test added here:
// - Uninstall 503: container not running, no SaaS dispatch.
//
// NOTE: validateWorkspaceID is not called inside the Install/Uninstall handlers.
// UUID validation is the responsibility of the WorkspaceAuth middleware, so no
// 400 test is needed here for UUID format.
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
// TestPluginUninstall_ContainerNotRunning_Returns503 exercises the 503 path
// where neither a local Docker container nor a SaaS instance-id dispatch
// resolves. The handler must return "workspace container not running" — NOT a
// generic 500 or a misleading 422 (external-runtime) message.
func TestPluginUninstall_ContainerNotRunning_Returns503(t *testing.T) {
// No docker client + no instance-id lookup → falls through to 503.
h := NewPluginsHandler(t.TempDir(), nil, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"},
{Key: "name", Value: "some-plugin"},
}
c.Request = httptest.NewRequest("DELETE",
"/workspaces/550e8400-e29b-41d4-a716-446655440000/plugins/some-plugin", nil)
h.Uninstall(c)
require.Equal(t, http.StatusServiceUnavailable, w.Code)
var body map[string]string
json.Unmarshal(w.Body.Bytes(), &body)
require.Equal(t, "workspace container not running", body["error"])
}
@@ -0,0 +1,193 @@
package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// patchReq builds a gin context for a PATCH request to /workspaces/:id/abilities.
func patchReq(id, body string) (*http.Request, *httptest.ResponseRecorder, *gin.Context) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: id}}
c.Request = httptest.NewRequest("PATCH", "/workspaces/"+id+"/abilities", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
return c.Request, w, c
}
func TestPatchAbilities_InvalidWorkspaceID(t *testing.T) {
setupTestDB(t)
// "not-a-uuid" fails validateWorkspaceID
_, w, c := patchReq("not-a-uuid", `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_EmptyBody(t *testing.T) {
setupTestDB(t)
id := "00000000-0000-0000-0000-000000000001"
// Empty JSON object — no ability fields present
_, w, c := patchReq(id, `{}`)
PatchAbilities(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]string
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["error"] != "at least one ability field required" {
t.Errorf("expected 'at least one ability field required', got %v", resp["error"])
}
}
func TestPatchAbilities_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000002"
// SELECT EXISTS returns false (workspace does not exist)
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_SetBroadcastEnabledTrue(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000003"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled = true
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]string
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "updated" {
t.Errorf("expected status=updated, got %v", resp["status"])
}
}
func TestPatchAbilities_SetTalkToUserEnabledFalse(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000004"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE talk_to_user_enabled = false
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"talk_to_user_enabled":false}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_BothFields(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000005"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled = false
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnResult(sqlmock.NewResult(0, 1))
// UPDATE talk_to_user_enabled = true
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnResult(sqlmock.NewResult(0, 1))
_, w, c := patchReq(id, `{"broadcast_enabled":false,"talk_to_user_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_BroadcastUpdateFails(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000006"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE fails
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, true).
WillReturnError(sql.ErrConnDone)
_, w, c := patchReq(id, `{"broadcast_enabled":true}`)
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
}
func TestPatchAbilities_TalkToUserUpdateFails(t *testing.T) {
mock := setupTestDB(t)
id := "00000000-0000-0000-0000-000000000007"
// SELECT EXISTS → true
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// UPDATE broadcast_enabled skipped (not in payload)
// UPDATE talk_to_user_enabled fails
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(id, false).
WillReturnError(sql.ErrConnDone)
_, w, c := patchReq(id, `{"talk_to_user_enabled":false}`)
PatchAbilities(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
}
@@ -34,11 +34,13 @@ import (
// BroadcastHandler is constructed once and shared across requests.
type BroadcastHandler struct {
broadcaster *events.Broadcaster
broadcaster events.EventEmitter
}
// NewBroadcastHandler creates a BroadcastHandler.
func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
// The emitter is any EventEmitter — the concrete *Broadcaster in production,
// or a test double in unit tests.
func NewBroadcastHandler(b events.EventEmitter) *BroadcastHandler {
return &BroadcastHandler{broadcaster: b}
}
@@ -67,7 +67,6 @@ func TestBroadcast_OrgScopedRecipients(t *testing.T) {
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
@@ -206,7 +205,7 @@ func TestBroadcast_Disabled(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000003"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
@@ -237,7 +236,7 @@ func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
senderID := "00000000-0000-0000-0000-000000000004" // org root, only workspace in org
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
@@ -297,33 +296,12 @@ func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
// finding the org root errors, the handler returns 500 instead of proceeding
// with an un-scoped query that would broadcast to all orgs.
func TestBroadcast_OrgRootLookupFails(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000005"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
@@ -353,16 +331,13 @@ func TestBroadcast_OrgRootLookupFails(t *testing.T) {
}
}
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
// from a workspace does not send a broadcast_receive to the sender itself
// (the sender logs broadcast_sent, not broadcast_receive).
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
peerID := "00000000-0000-0000-0000-000000000002"
senderID := "00000000-0000-0000-0000-000000000006"
peerID := "00000000-0000-0000-0000-000000000007"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
@@ -399,10 +374,145 @@ func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
}
}
// TestBroadcast_RecipientActivityLogFails_SkipsAndContinues: if one recipient's
// activity_log insert fails, the handler logs the error and continues to the
// next recipient rather than aborting the whole broadcast.
func TestBroadcast_RecipientActivityLogFails_SkipsAndContinues(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000008"
peerA := "00000000-0000-0000-0000-000000000009"
peerB := "00000000-0000-0000-0000-00000000000a"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Resilient Agent", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerA).AddRow(peerB))
// Peer A fails — handler logs and continues
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerA, senderID, sqlmock.AnyArg()).
WillReturnError(context.DeadlineExceeded)
// Peer B succeeds
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerB, senderID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Sender log succeeds
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"partial delivery"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
// Only peerB was delivered
if int(resp["delivered"].(float64)) != 1 {
t.Errorf("expected delivered=1, got %v", resp["delivered"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_SenderActivityLogFails_StillReturns200: if the sender's own
// broadcast_sent activity_log insert fails, the handler still returns 200
// so the caller doesn't retry a broadcast that already partially delivered.
func TestBroadcast_SenderActivityLogFails_StillReturns200(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-00000000000b"
peerA := "00000000-0000-0000-0000-00000000000c"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Log-Fail Agent", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerA))
// Peer log succeeds
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerA, senderID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Sender log FAILS
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).
WillReturnError(context.DeadlineExceeded)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"log fail test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200 even on sender log failure, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-00000000000d"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-00000000000d/broadcast", bytes.NewBufferString("{}"))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingBody(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-00000000000e"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-00000000000e/broadcast", nil)
// no Content-Type and no body
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…".
func TestBroadcast_Truncate(t *testing.T) {
cases := []struct {
msg string
@@ -410,14 +520,18 @@ func TestBroadcast_Truncate(t *testing.T) {
expect string
}{
{"short", 120, "short"}, // under max — no truncation
// exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
// exactly 120 chars → unchanged
{"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
// "this is a longer mes" = 20 runes; + "…" = 21 chars
// 21 runes at max=20 → 20 + "…" = 21 chars
{"this is a longer message that needs truncating", 20, "this is a longer mes…"},
// at-max boundary: 20 chars at max=20 → no truncation
{"exactly twenty chars", 20, "exactly twenty chars"},
// over max: 11 chars at max=10 → 10 + "…" = 11
{"hello world!", 10, "hello worl…"},
// Unicode: 3-rune string at max=3 → unchanged
{"日本語", 3, "日本語"},
// Empty string → unchanged
{"", 120, ""},
}
for _, tc := range cases {
result := broadcastTruncate(tc.msg, tc.max)