Files
molecule-core/local-e2e/cp_sim/cp_sim.py
claude-ceo-assistant 59d699b61c
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 24s
E2E API Smoke Test / detect-changes (pull_request) Successful in 14s
E2E Chat / detect-changes (pull_request) Successful in 11s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 5s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 12s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 9s
gate-check-v3 / gate-check (pull_request) Successful in 7s
qa-review / approved (pull_request) Failing after 7s
security-review / approved (pull_request) Failing after 6s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 5s
sop-checklist / review-refire (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 5s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m3s
CI / Platform (Go) (pull_request) Successful in 5m45s
CI / Python Lint & Test (pull_request) Successful in 7m0s
CI / Canvas (Next.js) (pull_request) Successful in 7m34s
CI / all-required (pull_request) Successful in 7m14s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 5s
E2E Chat / E2E Chat (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
feat(local-e2e): session-continuity canary harness (task #342, RFC#600 gate)
Adds a self-contained docker-compose harness in local-e2e/ that gates
RFC#600-class template changes BEFORE customer canary. Implements the 4
canonical canaries:

  1. 2-turn name continuity   — SessionStore key derivation
  2. File-only message        — no caption drop-to-empty-prompt regress
  3. File + prompt (multimodal) — multimodal happy path
  4. Cross-session memory     — explicit memory tool, distinct context_ids

Architecture is deliberately lean per CTO "separate CI as possible":

  local-e2e/
    docker-compose.yml       # runtime + cp_sim ONLY (no platform Go, no pg)
    cp_sim/                  # ~250 LoC Python A2A wire-shape emitter
    cp_sim/canary/           # 4 canary scenarios + layer-isolation probes
    scripts/run-canary.sh    # one-shot orchestration (target <3 min)
    scripts/onboard-template.sh  # gitops helper for cascade
    templates/session-continuity-e2e.yml  # canonical workflow shim

Rationale for a Python tenant-CP simulator (not the real workspace-server):
SessionStore behaviour is fully owned by workspace/a2a_executor.py +
executor_helpers.py — the Go platform service doesn't touch session
continuity. Excising it gets the harness to <3 min cold-boot on
docker-host runners and keeps the surface small enough to debug fast.

The simulator emits the byte-identical JSON-RPC message/send envelope
that workspace-server POSTs (cross-checked against
tests/e2e/test_chat_attachments_e2e.sh and workspace/a2a_executor.py
:_core_execute).

Per feedback_no_single_source_of_truth: the harness IS the canonical
session-continuity validator across templates. Per-template unit tests
keep covering their own guard logic.

Per feedback_image_promote_is_not_user_live + feedback_verify_actual_
endstate_not_ack_follow_sop: every canary asserts at the running-
container layer; artifacts dump SessionStore state + runtime logs on
failure for post-mortem.

Rollout (deliberate sequencing, per task #342):
  1. THIS PR — lands harness in molecule-core. NOT yet wired to any
     template repo.
  2. Companion PR in molecule-ai-workspace-template-hermes — adds
     .gitea/workflows/session-continuity-e2e.yml. NOT required yet.
  3. Bake on hermes for ≥5 business days.
  4. Cascade to remaining 6 templates via onboard-template.sh.
  5. Per-template BP flip — add "session-continuity-e2e (pull_request)"
     to status_check_contexts on each repo, hermes first.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 02:39:30 -07:00

215 lines
7.5 KiB
Python

"""Tenant control-plane simulator.
Emits the byte-identical JSON-RPC `message/send` wire shape that the
production `workspace-server` POSTs to the runtime's :8000 — see
``workspace-server/internal/handlers/a2a.go`` and the canonical sample
in ``tests/e2e/test_chat_attachments_e2e.sh``.
This file is purposefully small (~250 LoC). It is NOT a re-implementation
of `workspace-server`; it is just the minimum surface required to drive
the 4 session-continuity canaries.
If the runtime asserts on a header / envelope field that the production
platform sets but this simulator omits, FIX THE SIMULATOR — never weaken
the runtime to accept divergent wire shapes. The simulator is the
canonical contract emitter for canary purposes
(``feedback_no_single_source_of_truth``).
"""
from __future__ import annotations
import base64
import json
import os
import uuid
from dataclasses import dataclass
from typing import Any
import httpx
@dataclass
class CPSimConfig:
runtime_url: str
"""Base URL of the runtime under test (e.g. http://runtime:8000)."""
request_timeout_s: float = 60.0
"""Per-A2A-call timeout. Generous — canary mode replies are fast,
but a real Provider-backed runtime under cold cache can take 30+s."""
class CPSim:
"""Thin client matching workspace-server's wire shape."""
def __init__(self, cfg: CPSimConfig | None = None) -> None:
self.cfg = cfg or CPSimConfig(
runtime_url=os.environ.get("RUNTIME_URL", "http://localhost:18000"),
)
self._client = httpx.Client(timeout=self.cfg.request_timeout_s)
# ------------------------------------------------------------------ A2A
def send_text(
self,
text: str,
*,
context_id: str,
task_id: str | None = None,
) -> dict[str, Any]:
"""POST a text-only A2A message. Returns the JSON-RPC envelope."""
msg_id = f"canary-{uuid.uuid4().hex[:12]}"
payload = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": msg_id,
"kind": "message",
"contextId": context_id,
"taskId": task_id,
"parts": [{"kind": "text", "text": text}],
},
"configuration": {
"acceptedOutputModes": ["text/plain"],
"blocking": True,
},
},
}
return self._post(payload)
def send_with_file(
self,
*,
context_id: str,
text: str | None,
file_name: str,
file_bytes: bytes,
mime_type: str = "text/plain",
task_id: str | None = None,
) -> dict[str, Any]:
"""POST an A2A message with an inline file part.
Uses the inline `bytes` form of A2A file parts (RFC#600 — the
no-URI variant added precisely so canary tests don't need a
`/chat/uploads` round-trip). Each runtime's executor calls
``extract_attached_files`` which handles both forms — verified
in ``workspace/executor_helpers.py:903``.
"""
msg_id = f"canary-{uuid.uuid4().hex[:12]}"
parts: list[dict[str, Any]] = []
if text:
parts.append({"kind": "text", "text": text})
parts.append(
{
"kind": "file",
"file": {
"name": file_name,
"mimeType": mime_type,
"bytes": base64.b64encode(file_bytes).decode("ascii"),
},
}
)
payload = {
"jsonrpc": "2.0",
"id": msg_id,
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": msg_id,
"kind": "message",
"contextId": context_id,
"taskId": task_id,
"parts": parts,
},
"configuration": {
"acceptedOutputModes": ["text/plain"],
"blocking": True,
},
},
}
return self._post(payload)
# ------------------------------------------------------------ helpers
def _post(self, payload: dict[str, Any]) -> dict[str, Any]:
url = f"{self.cfg.runtime_url}/a2a"
try:
r = self._client.post(url, json=payload)
except httpx.HTTPError as e:
raise CPSimError(f"A2A POST failed: {e}") from e
if r.status_code != 200:
raise CPSimError(
f"A2A non-200: status={r.status_code} body={r.text[:500]}"
)
try:
return r.json()
except json.JSONDecodeError as e:
raise CPSimError(f"A2A body not JSON: {r.text[:500]}") from e
@staticmethod
def extract_text_parts(envelope: dict[str, Any]) -> str:
"""Return concatenated text from all text parts of a reply.
Handles both top-level `result.parts` (the canonical shape) and
`result.artifacts[*].parts` (which some runtimes emit when the
reply was streamed as artifact chunks). Matches the extractor in
``tests/e2e/test_chat_attachments_e2e.sh``.
"""
result = envelope.get("result") or {}
chunks: list[str] = []
for p in result.get("parts", []) or []:
if p.get("kind") == "text":
chunks.append(p.get("text", ""))
for art in result.get("artifacts", []) or []:
for p in art.get("parts", []) or []:
if p.get("kind") == "text":
chunks.append(p.get("text", ""))
# Some runtimes return a status.message instead of/in addition to parts.
status = result.get("status") or {}
status_msg = status.get("message") or {}
for p in status_msg.get("parts", []) or []:
if p.get("kind") == "text":
chunks.append(p.get("text", ""))
return "\n".join(chunks).strip()
# ----------------------------------------------------- memory probe
def probe_memory(self, key: str) -> str | None:
"""Read a memory value via the runtime's MCP memory tool.
Uses the same MCP transport the canvas uses
(``POST /workspaces/:id/mcp``-shaped JSON-RPC over /mcp). Returns
the recalled string or None if the key is missing.
"""
payload = {
"jsonrpc": "2.0",
"id": f"canary-mem-{uuid.uuid4().hex[:8]}",
"method": "tools/call",
"params": {"name": "recall_memory", "arguments": {"key": key}},
}
try:
r = self._client.post(f"{self.cfg.runtime_url}/mcp", json=payload)
except httpx.HTTPError as e:
raise CPSimError(f"MCP POST failed: {e}") from e
if r.status_code != 200:
return None
body = r.json()
result = body.get("result") or {}
# MCP responses wrap the tool output in result.content[*].text per
# the JSON-RPC tools/call contract.
for c in result.get("content", []) or []:
if c.get("type") == "text":
return c.get("text")
return None
class CPSimError(RuntimeError):
"""Raised on transport / envelope failures (NOT canary assertion failures).
Distinct from AssertionError so pytest reports them as ERROR not
FAILED — a transport-layer fault should be debugged differently from
a real session-continuity regression.
"""