59d699b61c
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 24s
E2E API Smoke Test / detect-changes (pull_request) Successful in 14s
E2E Chat / detect-changes (pull_request) Successful in 11s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 5s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 12s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 9s
gate-check-v3 / gate-check (pull_request) Successful in 7s
qa-review / approved (pull_request) Failing after 7s
security-review / approved (pull_request) Failing after 6s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 5s
sop-checklist / review-refire (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 5s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m3s
CI / Platform (Go) (pull_request) Successful in 5m45s
CI / Python Lint & Test (pull_request) Successful in 7m0s
CI / Canvas (Next.js) (pull_request) Successful in 7m34s
CI / all-required (pull_request) Successful in 7m14s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 5s
E2E Chat / E2E Chat (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Adds a self-contained docker-compose harness in local-e2e/ that gates
RFC#600-class template changes BEFORE customer canary. Implements the 4
canonical canaries:
1. 2-turn name continuity — SessionStore key derivation
2. File-only message — no caption drop-to-empty-prompt regress
3. File + prompt (multimodal) — multimodal happy path
4. Cross-session memory — explicit memory tool, distinct context_ids
Architecture is deliberately lean per CTO "separate CI as possible":
local-e2e/
docker-compose.yml # runtime + cp_sim ONLY (no platform Go, no pg)
cp_sim/ # ~250 LoC Python A2A wire-shape emitter
cp_sim/canary/ # 4 canary scenarios + layer-isolation probes
scripts/run-canary.sh # one-shot orchestration (target <3 min)
scripts/onboard-template.sh # gitops helper for cascade
templates/session-continuity-e2e.yml # canonical workflow shim
Rationale for a Python tenant-CP simulator (not the real workspace-server):
SessionStore behaviour is fully owned by workspace/a2a_executor.py +
executor_helpers.py — the Go platform service doesn't touch session
continuity. Excising it gets the harness to <3 min cold-boot on
docker-host runners and keeps the surface small enough to debug fast.
The simulator emits the byte-identical JSON-RPC message/send envelope
that workspace-server POSTs (cross-checked against
tests/e2e/test_chat_attachments_e2e.sh and workspace/a2a_executor.py
:_core_execute).
Per feedback_no_single_source_of_truth: the harness IS the canonical
session-continuity validator across templates. Per-template unit tests
keep covering their own guard logic.
Per feedback_image_promote_is_not_user_live + feedback_verify_actual_
endstate_not_ack_follow_sop: every canary asserts at the running-
container layer; artifacts dump SessionStore state + runtime logs on
failure for post-mortem.
Rollout (deliberate sequencing, per task #342):
1. THIS PR — lands harness in molecule-core. NOT yet wired to any
template repo.
2. Companion PR in molecule-ai-workspace-template-hermes — adds
.gitea/workflows/session-continuity-e2e.yml. NOT required yet.
3. Bake on hermes for ≥5 business days.
4. Cascade to remaining 6 templates via onboard-template.sh.
5. Per-template BP flip — add "session-continuity-e2e (pull_request)"
to status_check_contexts on each repo, hermes first.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
215 lines
7.5 KiB
Python
215 lines
7.5 KiB
Python
"""Tenant control-plane simulator.
|
|
|
|
Emits the byte-identical JSON-RPC `message/send` wire shape that the
|
|
production `workspace-server` POSTs to the runtime's :8000 — see
|
|
``workspace-server/internal/handlers/a2a.go`` and the canonical sample
|
|
in ``tests/e2e/test_chat_attachments_e2e.sh``.
|
|
|
|
This file is purposefully small (~250 LoC). It is NOT a re-implementation
|
|
of `workspace-server`; it is just the minimum surface required to drive
|
|
the 4 session-continuity canaries.
|
|
|
|
If the runtime asserts on a header / envelope field that the production
|
|
platform sets but this simulator omits, FIX THE SIMULATOR — never weaken
|
|
the runtime to accept divergent wire shapes. The simulator is the
|
|
canonical contract emitter for canary purposes
|
|
(``feedback_no_single_source_of_truth``).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
@dataclass
|
|
class CPSimConfig:
|
|
runtime_url: str
|
|
"""Base URL of the runtime under test (e.g. http://runtime:8000)."""
|
|
request_timeout_s: float = 60.0
|
|
"""Per-A2A-call timeout. Generous — canary mode replies are fast,
|
|
but a real Provider-backed runtime under cold cache can take 30+s."""
|
|
|
|
|
|
class CPSim:
|
|
"""Thin client matching workspace-server's wire shape."""
|
|
|
|
def __init__(self, cfg: CPSimConfig | None = None) -> None:
|
|
self.cfg = cfg or CPSimConfig(
|
|
runtime_url=os.environ.get("RUNTIME_URL", "http://localhost:18000"),
|
|
)
|
|
self._client = httpx.Client(timeout=self.cfg.request_timeout_s)
|
|
|
|
# ------------------------------------------------------------------ A2A
|
|
|
|
def send_text(
|
|
self,
|
|
text: str,
|
|
*,
|
|
context_id: str,
|
|
task_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""POST a text-only A2A message. Returns the JSON-RPC envelope."""
|
|
msg_id = f"canary-{uuid.uuid4().hex[:12]}"
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"id": msg_id,
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"messageId": msg_id,
|
|
"kind": "message",
|
|
"contextId": context_id,
|
|
"taskId": task_id,
|
|
"parts": [{"kind": "text", "text": text}],
|
|
},
|
|
"configuration": {
|
|
"acceptedOutputModes": ["text/plain"],
|
|
"blocking": True,
|
|
},
|
|
},
|
|
}
|
|
return self._post(payload)
|
|
|
|
def send_with_file(
|
|
self,
|
|
*,
|
|
context_id: str,
|
|
text: str | None,
|
|
file_name: str,
|
|
file_bytes: bytes,
|
|
mime_type: str = "text/plain",
|
|
task_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""POST an A2A message with an inline file part.
|
|
|
|
Uses the inline `bytes` form of A2A file parts (RFC#600 — the
|
|
no-URI variant added precisely so canary tests don't need a
|
|
`/chat/uploads` round-trip). Each runtime's executor calls
|
|
``extract_attached_files`` which handles both forms — verified
|
|
in ``workspace/executor_helpers.py:903``.
|
|
"""
|
|
msg_id = f"canary-{uuid.uuid4().hex[:12]}"
|
|
parts: list[dict[str, Any]] = []
|
|
if text:
|
|
parts.append({"kind": "text", "text": text})
|
|
parts.append(
|
|
{
|
|
"kind": "file",
|
|
"file": {
|
|
"name": file_name,
|
|
"mimeType": mime_type,
|
|
"bytes": base64.b64encode(file_bytes).decode("ascii"),
|
|
},
|
|
}
|
|
)
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"id": msg_id,
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"messageId": msg_id,
|
|
"kind": "message",
|
|
"contextId": context_id,
|
|
"taskId": task_id,
|
|
"parts": parts,
|
|
},
|
|
"configuration": {
|
|
"acceptedOutputModes": ["text/plain"],
|
|
"blocking": True,
|
|
},
|
|
},
|
|
}
|
|
return self._post(payload)
|
|
|
|
# ------------------------------------------------------------ helpers
|
|
|
|
def _post(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
url = f"{self.cfg.runtime_url}/a2a"
|
|
try:
|
|
r = self._client.post(url, json=payload)
|
|
except httpx.HTTPError as e:
|
|
raise CPSimError(f"A2A POST failed: {e}") from e
|
|
if r.status_code != 200:
|
|
raise CPSimError(
|
|
f"A2A non-200: status={r.status_code} body={r.text[:500]}"
|
|
)
|
|
try:
|
|
return r.json()
|
|
except json.JSONDecodeError as e:
|
|
raise CPSimError(f"A2A body not JSON: {r.text[:500]}") from e
|
|
|
|
@staticmethod
|
|
def extract_text_parts(envelope: dict[str, Any]) -> str:
|
|
"""Return concatenated text from all text parts of a reply.
|
|
|
|
Handles both top-level `result.parts` (the canonical shape) and
|
|
`result.artifacts[*].parts` (which some runtimes emit when the
|
|
reply was streamed as artifact chunks). Matches the extractor in
|
|
``tests/e2e/test_chat_attachments_e2e.sh``.
|
|
"""
|
|
result = envelope.get("result") or {}
|
|
chunks: list[str] = []
|
|
for p in result.get("parts", []) or []:
|
|
if p.get("kind") == "text":
|
|
chunks.append(p.get("text", ""))
|
|
for art in result.get("artifacts", []) or []:
|
|
for p in art.get("parts", []) or []:
|
|
if p.get("kind") == "text":
|
|
chunks.append(p.get("text", ""))
|
|
# Some runtimes return a status.message instead of/in addition to parts.
|
|
status = result.get("status") or {}
|
|
status_msg = status.get("message") or {}
|
|
for p in status_msg.get("parts", []) or []:
|
|
if p.get("kind") == "text":
|
|
chunks.append(p.get("text", ""))
|
|
return "\n".join(chunks).strip()
|
|
|
|
# ----------------------------------------------------- memory probe
|
|
|
|
def probe_memory(self, key: str) -> str | None:
|
|
"""Read a memory value via the runtime's MCP memory tool.
|
|
|
|
Uses the same MCP transport the canvas uses
|
|
(``POST /workspaces/:id/mcp``-shaped JSON-RPC over /mcp). Returns
|
|
the recalled string or None if the key is missing.
|
|
"""
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"id": f"canary-mem-{uuid.uuid4().hex[:8]}",
|
|
"method": "tools/call",
|
|
"params": {"name": "recall_memory", "arguments": {"key": key}},
|
|
}
|
|
try:
|
|
r = self._client.post(f"{self.cfg.runtime_url}/mcp", json=payload)
|
|
except httpx.HTTPError as e:
|
|
raise CPSimError(f"MCP POST failed: {e}") from e
|
|
if r.status_code != 200:
|
|
return None
|
|
body = r.json()
|
|
result = body.get("result") or {}
|
|
# MCP responses wrap the tool output in result.content[*].text per
|
|
# the JSON-RPC tools/call contract.
|
|
for c in result.get("content", []) or []:
|
|
if c.get("type") == "text":
|
|
return c.get("text")
|
|
return None
|
|
|
|
|
|
class CPSimError(RuntimeError):
|
|
"""Raised on transport / envelope failures (NOT canary assertion failures).
|
|
|
|
Distinct from AssertionError so pytest reports them as ERROR not
|
|
FAILED — a transport-layer fault should be debugged differently from
|
|
a real session-continuity regression.
|
|
"""
|