chore: address follow-up code review — named enum, singleButton, tests

Post-review fixes on top of the quality-pass-2 branch.

1. delegation.go: replaced insertDelegationRow's (bool, bool) return
   with a typed insertDelegationOutcome enum (insertOK /
   insertHandledByIdempotent / insertTrackingUnavailable). Eliminates
   the positional-boolean decoding the caller had to do. Internal, no
   behavior change.

2. ConfirmDialog.tsx: added singleButton prop. When true, hides the
   Cancel button for single-action info toasts (Esc still dismisses
   via onCancel). TemplatePalette's import notice uses it.

3. ErrorBoundary.tsx: fixed the floating clipboard promise. Added
   .catch(() => {}) so a rejected writeText (permission denied,
   insecure context) doesn't surface as unhandled rejection.

4. a2a_proxy_test.go: added 5 direct unit tests for
   normalizeA2APayload (invalid JSON, wraps-bare, preserves-existing-
   id, preserves-existing-messageId, missing-method). Fills the unit-
   test gap for the helper extracted in the last pass.

Verification:
- go test -race ./internal/handlers/... passes (incl. 5 new tests)
- go build ./... clean
- canvas npm run build clean
- canvas npm test -- --run -> 352/352

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-13 14:45:05 -07:00
parent 789f568bef
commit 232766d0da
6 changed files with 125 additions and 202 deletions

177
AGENTS.md
View File

@ -1,177 +0,0 @@
# AGENTS.md
This file provides guidance to Codex (Codex.ai/code) when working with code in this repository.
## Project Overview
Molecule AI is a platform for orchestrating AI agent workspaces that form an organizational hierarchy. Workspaces register with a central platform, communicate via A2A protocol, and are visualized on a drag-and-drop canvas.
## Architecture
```
Canvas (Next.js :3000) ←WebSocket→ Platform (Go :8080) ←HTTP→ Postgres + Redis
Workspace A ←──A2A──→ Workspace B
(pluggable runtimes)
↑ register/heartbeat ↑
└───── Platform ─────┘
```
Three main components:
- **Platform** (`platform/`): Go/Gin control plane — workspace CRUD, registry, discovery, WebSocket hub, liveness monitoring
- **Canvas** (`canvas/`): Next.js 15 + React Flow (@xyflow/react v12) + Zustand + Tailwind — visual workspace graph
- **Workspace Runtime** (`workspace-template/`): A2A runtime layer with pluggable adapters — LangGraph, DeepAgents, Claude Code, CrewAI, AutoGen, OpenClaw — registers with platform and sends heartbeats
## Build & Run Commands
### Infrastructure
```bash
./infra/scripts/setup.sh # Start Postgres, Redis, Langfuse; run migrations
./infra/scripts/nuke.sh # Tear down everything, remove volumes
```
### Platform (Go)
```bash
cd platform
go build ./cmd/server # Build
go run ./cmd/server # Run (requires Postgres + Redis running)
```
Must run from `platform/` directory (not repo root). Env vars: `DATABASE_URL`, `REDIS_URL`, `PORT` (defaults: postgres://dev:dev@localhost:5432/molecule?sslmode=prefer, redis://localhost:6379, 8080).
### Canvas (Next.js)
```bash
cd canvas
npm install
npm run dev # Dev server on :3000
npm run build && npm start # Production
```
Env vars: `NEXT_PUBLIC_PLATFORM_URL` (default http://localhost:8080), `NEXT_PUBLIC_WS_URL` (default ws://localhost:8080/ws).
### Integration Tests
```bash
bash test_api.sh # Runs 34 API tests against localhost:8080
```
Requires platform running. Tests full CRUD, registry, heartbeat, discovery, peers, access control, events, degraded/recovery lifecycle.
### Docker Compose
```bash
docker compose -f docker-compose.infra.yml up -d # Infra only
docker compose up # Full stack
```
## Key Architectural Patterns
### Import Cycle Prevention
The platform uses function injection to avoid Go import cycles between ws, registry, and events packages:
- `ws.NewHub(canCommunicate AccessChecker)` — Hub accepts `registry.CanCommunicate` as a function
- `registry.StartLivenessMonitor(ctx, onOffline OfflineHandler)` — Liveness accepts broadcaster callback
- Wiring happens in `platform/cmd/server/main.go`
### Communication Rules (`registry/access.go`)
`CanCommunicate(callerID, targetID)` determines if two workspaces can talk:
- Same workspace → allowed
- Siblings (same parent_id) → allowed
- Root-level siblings (both parent_id IS NULL) → allowed
- Parent ↔ child → allowed
- Everything else → denied
### JSONB Gotcha
When inserting Go `[]byte` (from `json.Marshal`) into Postgres JSONB columns, you must:
1. Convert to `string()` first
2. Use `::jsonb` cast in SQL
lib/pq treats `[]byte` as `bytea`, not JSONB.
### WebSocket Events Flow
1. Action occurs (register, heartbeat, etc.)
2. `broadcaster.RecordAndBroadcast()` inserts into `structure_events` table + publishes to Redis pub/sub
3. Redis subscriber relays to WebSocket hub
4. Hub broadcasts to canvas clients (all events) and workspace clients (filtered by CanCommunicate)
### Canvas State Management
- Initial load: HTTP fetch from `GET /workspaces` → Zustand hydrate
- Real-time updates: WebSocket events → `applyEvent()` in Zustand store
- Position persistence: `onNodeDragStop``PATCH /workspaces/:id` with `{x, y}`
### Workspace Lifecycle
`provisioning``online` (on register) → `degraded` (error_rate > 0.5) → `online` (recovered) → `offline` (Redis TTL expired) → `removed` (deleted)
## Platform API Routes
| Method | Path | Handler |
|--------|------|---------|
| GET | /health | inline |
| POST/GET/PATCH/DELETE | /workspaces[/:id] | workspace.go |
| POST | /registry/register | registry.go |
| POST | /registry/heartbeat | registry.go |
| POST | /registry/update-card | registry.go |
| GET | /registry/discover/:id | discovery.go |
| GET | /registry/:id/peers | discovery.go |
| POST | /registry/check-access | discovery.go |
| GET | /events[/:workspaceId] | events.go |
| GET | /ws | socket.go |
## Database
5 migration files in `platform/migrations/`. Key tables: `workspaces` (core entity with status, agent_card JSONB, heartbeat columns), `canvas_layouts` (x/y position), `structure_events` (append-only event log), `agents`, `workspace_secrets`.
The platform auto-discovers and runs migrations on startup from several candidate paths.
<!-- AWARENESS_RULES_START -->
# Awareness Memory Integration
MANDATORY agent policy - follow for every task.
## Awareness Memory Integration (MANDATORY)
awareness_* = cross-session persistent memory (past decisions, knowledge, tasks).
Other tools = current codebase navigation (file search, code index).
Use BOTH - they serve different purposes.
STEP 1 - SESSION START:
Call awareness_init(source="codex") -> get session_id, review context.
If active_skills[] is returned: skill = reusable procedure done 2+ times;
summary = injectable instruction, methods = steps. Apply matching skills to tasks.
STEP 2 - RECALL BEFORE WORK (progressive disclosure):
1. awareness_recall(semantic_query=..., keyword_query=..., detail='summary') → lightweight index.
2. Review summaries/scores, pick relevant IDs.
3. awareness_recall(detail='full', ids=[...]) → expand only what you need.
STEP 3 - RECORD EVERY CHANGE:
After EVERY code edit, decision, or bug fix:
awareness_record(content=<detailed natural language description>,
insights={knowledge_cards:[...], action_items:[...], risks:[...]})
Content should be RICH and DETAILED — include reasoning, key code snippets,
user quotes, alternatives considered, and files changed. Do NOT compress into
a single-line summary. The content IS the memory — more detail = better recall.
Include insights to create searchable knowledge in ONE step (recommended).
Skipping = permanent data loss.
STEP 4 - CATEGORY GUIDE (for insights.knowledge_cards):
- decision = choice made between alternatives.
- problem_solution = bug/problem plus the fix that resolved it.
- workflow = process, setup, or configuration steps only.
- pitfall = blocker, warning, or limitation without a fix yet.
- insight = reusable pattern or general learning.
- skill = reusable procedure done 2+ times; summary = injectable instruction, methods = steps.
- key_point = important technical fact when nothing else fits.
Never default everything to workflow.
STEP 5 - SESSION END:
awareness_record(content=[step1, step2, ...], insights={...}) with final summary.
BACKFILL (if applicable):
If MCP connected late: awareness_record(content=<transcript>)
RULES VERSION: Pass rules_version="2" to awareness_init so the server knows you have these rules.
If the server returns _setup_action, the rules have been updated — follow the instruction to re-sync.
NOTE: memory_id from X-Awareness-Memory-Id header. source/actor/event_type auto-inferred.
## Codex-Specific Notes
- Call awareness_init at task start before reading any files.
- After each code patch, call awareness_record with the change description.
<!-- AWARENESS_RULES_END -->

View File

@ -11,6 +11,9 @@ interface Props {
confirmVariant?: "danger" | "primary" | "warning";
onConfirm: () => void;
onCancel: () => void;
// Hide the Cancel button for single-action info toasts.
// onCancel is still invoked on Esc / backdrop-click.
singleButton?: boolean;
}
export function ConfirmDialog({
@ -21,6 +24,7 @@ export function ConfirmDialog({
confirmVariant = "primary",
onConfirm,
onCancel,
singleButton = false,
}: Props) {
const dialogRef = useRef<HTMLDivElement>(null);
const [mounted, setMounted] = useState(false);
@ -71,12 +75,14 @@ export function ConfirmDialog({
</div>
<div className="flex items-center justify-end gap-2 px-5 py-3 border-t border-zinc-800 bg-zinc-950/50">
<button
onClick={onCancel}
className="px-3.5 py-1.5 text-[13px] text-zinc-400 hover:text-zinc-200 bg-zinc-800 hover:bg-zinc-700 border border-zinc-700 rounded-lg transition-colors"
>
Cancel
</button>
{!singleButton && (
<button
onClick={onCancel}
className="px-3.5 py-1.5 text-[13px] text-zinc-400 hover:text-zinc-200 bg-zinc-800 hover:bg-zinc-700 border border-zinc-700 rounded-lg transition-colors"
>
Cancel
</button>
)}
<button
onClick={onConfirm}
className={`px-3.5 py-1.5 text-[13px] rounded-lg transition-colors ${confirmColors}`}

View File

@ -44,7 +44,7 @@ export class ErrorBoundary extends React.Component<
// Copy error info to clipboard for manual reporting (button click is its
// own affordance — no native alert needed). On clipboard failure the
// console.error above still surfaces the report.
navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2));
void navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2)).catch(() => {});
};
render() {

View File

@ -213,6 +213,7 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
message={notice ?? ""}
confirmLabel="OK"
confirmVariant="primary"
singleButton
onConfirm={() => setNotice(null)}
onCancel={() => setNotice(null)}
/>

View File

@ -733,3 +733,83 @@ func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
t.Errorf("expected 401, got %d", w.Code)
}
}
// --- Direct unit tests for normalizeA2APayload (extracted from proxyA2ARequest) ---
func TestNormalizeA2APayload_InvalidJSON(t *testing.T) {
_, _, perr := normalizeA2APayload([]byte("not json"))
if perr == nil {
t.Fatal("expected error for invalid JSON, got nil")
}
if perr.Status != http.StatusBadRequest {
t.Errorf("expected 400, got %d", perr.Status)
}
}
func TestNormalizeA2APayload_WrapsBareMessage(t *testing.T) {
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","parts":[{"type":"text","text":"hi"}]}}}`)
out, method, perr := normalizeA2APayload(raw)
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
if method != "message/send" {
t.Errorf("expected method=message/send, got %q", method)
}
var parsed map[string]interface{}
if err := json.Unmarshal(out, &parsed); err != nil {
t.Fatalf("output is not valid JSON: %v", err)
}
if parsed["jsonrpc"] != "2.0" {
t.Errorf("expected jsonrpc=2.0 wrapper, got %v", parsed["jsonrpc"])
}
if parsed["id"] == nil || parsed["id"] == "" {
t.Error("expected generated id, got empty")
}
params := parsed["params"].(map[string]interface{})
msg := params["message"].(map[string]interface{})
if msg["messageId"] == nil || msg["messageId"] == "" {
t.Error("expected messageId injected, got empty")
}
}
func TestNormalizeA2APayload_PreservesExistingJSONRPC(t *testing.T) {
raw := []byte(`{"jsonrpc":"2.0","id":"custom-id","method":"tasks/list","params":{}}`)
out, method, perr := normalizeA2APayload(raw)
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
if method != "tasks/list" {
t.Errorf("expected method=tasks/list, got %q", method)
}
var parsed map[string]interface{}
_ = json.Unmarshal(out, &parsed)
if parsed["id"] != "custom-id" {
t.Errorf("existing id overwritten: got %v", parsed["id"])
}
}
func TestNormalizeA2APayload_PreservesExistingMessageId(t *testing.T) {
raw := []byte(`{"method":"message/send","params":{"message":{"messageId":"fixed-mid","role":"user","parts":[]}}}`)
out, _, perr := normalizeA2APayload(raw)
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
var parsed map[string]interface{}
_ = json.Unmarshal(out, &parsed)
params := parsed["params"].(map[string]interface{})
msg := params["message"].(map[string]interface{})
if msg["messageId"] != "fixed-mid" {
t.Errorf("existing messageId overwritten: got %v", msg["messageId"])
}
}
func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) {
raw := []byte(`{"params":{"message":{"role":"user"}}}`)
_, method, perr := normalizeA2APayload(raw)
if perr != nil {
t.Fatalf("unexpected error: %+v", perr)
}
if method != "" {
t.Errorf("expected empty method, got %q", method)
}
}

View File

@ -63,13 +63,13 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
delegationID := uuid.New().String()
trackingOK, handled := insertDelegationRow(ctx, c, sourceID, body, delegationID)
if handled {
outcome := insertDelegationRow(ctx, c, sourceID, body, delegationID)
if outcome == insertHandledByIdempotent {
return // idempotency-conflict response already written
}
// trackingOK==false (and !handled) means insert failed for a non-
// idempotency reason (logged); we still dispatch the A2A request and
// surface the warning in the response.
// insertTrackingUnavailable means insert failed for a non-idempotency
// reason (logged); we still dispatch the A2A request and surface the
// warning in the response.
// Build A2A payload
a2aBody, _ := json.Marshal(map[string]interface{}{
@ -97,7 +97,7 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
"status": "delegated",
"target_id": body.TargetID,
}
if !trackingOK {
if outcome == insertTrackingUnavailable {
resp["warning"] = "delegation dispatched but status tracking unavailable"
}
c.JSON(http.StatusAccepted, resp)
@ -152,15 +152,28 @@ func lookupIdempotentDelegation(ctx context.Context, c *gin.Context, sourceID, i
return true
}
// insertDelegationRow stores the pending delegation row.
// Returns (trackingOK, handled):
// - (true, false) on success — caller continues with dispatch.
// - (false, true) on a unique-constraint hit when a concurrent idempotent
// request just took the slot; the winner's JSON response is written here
// and the caller MUST return without further writes.
// - (false, false) on any other DB failure — caller continues with dispatch
// and surfaces a tracking-unavailable warning in the response.
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) (bool, bool) {
// insertDelegationOutcome captures the three distinct results of storing
// the pending delegation row, so callers never have to decode a positional
// (bool, bool) tuple.
type insertDelegationOutcome int
const (
// insertOK — row stored; caller continues with dispatch and does NOT
// surface a tracking warning.
insertOK insertDelegationOutcome = iota
// insertHandledByIdempotent — a concurrent idempotent request took the
// slot; the winner's JSON response is already written and the caller
// MUST return without further writes.
insertHandledByIdempotent
// insertTrackingUnavailable — insert failed for a non-idempotency
// reason (logged by this function); caller continues with dispatch
// and surfaces a tracking-unavailable warning in the response.
insertTrackingUnavailable
)
// insertDelegationRow stores the pending delegation row. See
// insertDelegationOutcome for the three possible return values.
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) insertDelegationOutcome {
taskJSON, _ := json.Marshal(map[string]interface{}{
"task": body.Task,
"delegation_id": delegationID,
@ -174,7 +187,7 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
if err == nil {
return true, false
return insertOK
}
// A unique-constraint hit means a concurrent request just took the
// slot — rare, but worth surfacing as the same idempotent response
@ -193,11 +206,11 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
"target_id": body.TargetID,
"idempotent_hit": true,
})
return false, true
return insertHandledByIdempotent
}
}
log.Printf("Delegation: failed to store: %v", err)
return false, false
return insertTrackingUnavailable
}
// executeDelegation runs in a goroutine — sends A2A and stores the result.