Files
molecule-core/tests/e2e/stub-runtime/server.py
core-devops 097a5a9613
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 7s
CI / Python Lint & Test (pull_request) Successful in 8s
E2E API Smoke Test / detect-changes (pull_request) Successful in 8s
E2E Chat / detect-changes (pull_request) Successful in 7s
CI / Detect changes (pull_request) Successful in 12s
Harness Replays / detect-changes (pull_request) Successful in 7s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 5s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 5s
Check migration collisions / Migration version collision check (pull_request) Successful in 24s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 16s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 15s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 17s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 15s
E2E Chat / E2E Chat (pull_request) Successful in 31s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 30s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 13s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m0s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Failing after 1m0s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 57s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m3s
Harness Replays / Harness Replays (pull_request) Successful in 3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 9s
qa-review / approved (pull_request_target) Failing after 11s
security-review / approved (pull_request_target) Failing after 10s
gate-check-v3 / gate-check (pull_request_target) Successful in 14s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / review-refire (pull_request_target) Has been skipped
sop-checklist / all-items-acked (pull_request_target) Successful in 8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 2s
sop-tier-check / tier-check (pull_request_target) Failing after 6s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Failing after 1m40s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (pull_request) Successful in 2m13s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Has been skipped
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m6s
Local Provision Lifecycle E2E / Local Provision Lifecycle E2E (stub) (pull_request) Failing after 2m9s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 2m14s
CI / Platform (Go) (pull_request) Successful in 6m57s
Local Provision Lifecycle E2E / Local Provision Lifecycle E2E (real image, advisory) (pull_request) Failing after 6m58s
CI / Canvas (Next.js) (pull_request) Successful in 7m41s
CI / Canvas Deploy Status (pull_request) Successful in 2s
CI / all-required (pull_request) Successful in 2s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Has been cancelled
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Has been cancelled
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been cancelled
E2E Staging SaaS (full lifecycle) / E2E Staging Platform Boot (pull_request) Has been cancelled
test(e2e): mandatory local Docker-provisioner lifecycle e2e (provision/online/restart-survive/proxy) + stub runtime
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 03:50:57 -07:00

308 lines
13 KiB
Python

#!/usr/bin/env python3
"""Minimal stub runtime for the local Docker-provisioner lifecycle e2e.
This is NOT a real agent — it carries no LLM, no claude-code SDK, no plugin
host. Its only job is to satisfy the platform's runtime<->platform contract so
the `test_local_provision_lifecycle_e2e.sh` harness can prove the LOCAL Docker
provisioner can provision a workspace, bring it online, SURVIVE A RESTART
(reusing the config volume), and route an A2A `message/send` through the
platform proxy — all WITHOUT building/booting the 2.5GB real claude-code image.
Contract it replicates (discovered from workspace-server):
* registration is done BY the runtime container on boot (NOT the provisioner).
The provisioner only sets status=provisioning + pre-stores the host URL; the
container must POST /registry/register itself, and the heartbeat loop is what
transitions provisioning -> online (registry.go evaluateStatus, #1784).
* env vars the real entrypoint reads, injected by buildContainerEnv():
WORKSPACE_ID - this workspace's UUID
PLATFORM_URL - canonical platform base URL (e.g. http://platform:8080)
We read exactly those (with WORKSPACE_CONFIG_PATH for the config.yaml probe).
* POST {PLATFORM_URL}/registry/register
body: {"id", "url", "agent_card":{"name","skills":[]}}
- url MUST be push-routable. The provisioner runs the platform inside
Docker, so it rewrites a stored http://127.0.0.1:<port> URL to the
container-DNS form http://ws-<id[:12]>:8000 before proxying
(a2a_proxy.go resolveAgentURL). We register our OWN container-DNS URL
(http://<hostname>:8000) so SSRF validation passes in SaaS mode AND the
proxy can reach us; in self-hosted (non-saas) mode RFC-1918 is blocked,
so we fall back to registering by the ws-<id> alias hostname which
resolves on molecule-core-net.
- first register returns {"auth_token": ...}; we keep it for heartbeats.
* POST {PLATFORM_URL}/registry/heartbeat (every ~10s)
header: Authorization: Bearer <auth_token>
body: {"workspace_id","error_rate","sample_error","active_tasks",
"uptime_seconds","current_task"}
This is what lifts the workspace provisioning -> online and keeps the
Redis liveness TTL fresh (so the restart re-online assertion can pass).
* listen on :8000 and answer the A2A JSON-RPC the proxy forwards:
POST / {"jsonrpc","id","method":"message/send","params":{...}}
-> 200 {"jsonrpc":"2.0","id":<echoed>,
"result":{"kind":"message","role":"agent",
"parts":[{"kind":"text","text":"STUB OK"}],
"messageId":<uuid>}}
The result envelope matches what test_a2a_e2e.sh asserts on
(result.parts[0].text, role=agent, kind=text). A health path (/health and
GET /) returns 200 so any probe sees the container as alive.
"""
import json
import os
import sys
import threading
import time
import urllib.request
import urllib.error
import uuid
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
PORT = 8000
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "").strip()
PLATFORM_URL = (os.environ.get("PLATFORM_URL") or os.environ.get("MOLECULE_URL") or "").rstrip("/")
HOSTNAME = os.environ.get("HOSTNAME", "").strip() # docker sets this to the container id; ws-<id> alias also resolves
# URL we register with. Two hard constraints, discovered from workspace-server:
#
# * validateAgentURL (registry.go) blocks RFC-1918 ranges in NON-saas mode
# (this dev stack sets neither MOLECULE_DEPLOY_MODE=saas nor MOLECULE_ORG_ID
# -> strict mode). The molecule-core-net bridge is 172.18.0.0/16, INSIDE the
# blocked 172.16/12 — so registering our own ws-<id>:8000 DNS name (which
# resolves to a 172.18.x bridge IP) would be REJECTED and we'd never get an
# auth_token. "localhost" is explicitly allowed BY NAME (no DNS lookup).
#
# * the proxy doesn't use the URL we register anyway: the provisioner
# pre-stores http://127.0.0.1:<host-port>, the register upsert PRESERVES any
# existing 127.0.0.1 URL (CASE WHEN url LIKE 'http://127.0.0.1%'), and when
# the platform runs in Docker resolveAgentURL rewrites that to the container
# -DNS form http://ws-<id[:12]>:8000 before forwarding. So our listen
# address (0.0.0.0:8000, reachable as ws-<id>:8000 on the bridge) is what
# the proxy actually hits — independent of the URL string we register.
#
# Net: register a name-form localhost URL purely to satisfy push-mode's
# "url required + must pass SSRF check" and to get our auth_token. Routing is
# handled by the provisioner-stored 127.0.0.1 URL + the proxy rewrite.
_short = WORKSPACE_ID[:12] if len(WORKSPACE_ID) > 12 else WORKSPACE_ID
SELF_URL = os.environ.get("STUB_REGISTER_URL", f"http://localhost:{PORT}")
CONFIG_PATH = (os.environ.get("WORKSPACE_CONFIG_PATH") or "/configs").rstrip("/")
AUTH_TOKEN_FILE = f"{CONFIG_PATH}/.auth_token"
AUTH_TOKEN = None
_started = time.time()
def _log(msg):
print(f"[stub-runtime {_short}] {msg}", flush=True)
def read_volume_token():
"""The provisioner pre-writes the CURRENT workspace bearer to
/configs/.auth_token before every container start (issueAndInjectToken,
#1877), and ROTATES it on every (re)provision (RevokeAllForWorkspace +
IssueToken). So the volume file — NOT the register-response token — is the
authoritative, rotation-proof bearer. Reading it on each heartbeat means a
provision-time token rotation never wedges our heartbeat at 401 (which is
what kept the workspace stuck in 'provisioning' instead of flipping online).
"""
try:
with open(AUTH_TOKEN_FILE, "r") as f:
tok = f.read().strip()
return tok or None
except Exception:
return None
def _post_json(path, payload, token=None):
url = f"{PLATFORM_URL}{path}"
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if token:
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=15) as resp:
body = resp.read().decode()
return resp.status, body
def register():
"""POST /registry/register. Returns the issued auth_token (first register).
C18 hijack guard: once the workspace has ANY live token on file (the
provisioner mints+injects one into /configs/.auth_token before start), a
register MUST carry that workspace's bearer or it 401s. So we send the
volume token (if present). First-ever boot has no live token yet → bootstrap
register (no bearer) is allowed and returns the freshly-issued auth_token.
"""
global AUTH_TOKEN
payload = {
"id": WORKSPACE_ID,
"url": SELF_URL,
"delivery_mode": "push",
"agent_card": {
"name": WORKSPACE_ID,
"description": "stub runtime (e2e lifecycle)",
"skills": [],
},
}
status, body = _post_json("/registry/register", payload, token=read_volume_token())
_log(f"register -> {status} {body[:200]}")
try:
parsed = json.loads(body)
except Exception:
parsed = {}
tok = parsed.get("auth_token")
if tok:
AUTH_TOKEN = tok
_log("captured auth_token from register response")
return status
def current_token():
# Volume file is authoritative (rotation-proof); fall back to the token we
# captured from the register response if the file isn't there yet.
return read_volume_token() or AUTH_TOKEN
def heartbeat():
payload = {
"workspace_id": WORKSPACE_ID,
"error_rate": 0.0,
"sample_error": "",
"active_tasks": 0,
"uptime_seconds": int(time.time() - _started),
"current_task": "",
}
status, body = _post_json("/registry/heartbeat", payload, token=current_token())
return status, body
def register_with_retry():
# The platform may still be wiring the row when we boot; retry a few times.
# Register is best-effort for the e2e (heartbeat drives online); a sticky
# 401 just means the workspace already has a live token and our volume token
# is momentarily stale — the heartbeat path re-reads the volume each beat.
for attempt in range(1, 11):
try:
status = register()
if status == 200:
return True
_log(f"register attempt {attempt}: HTTP {status}, retrying")
except urllib.error.HTTPError as e:
_log(f"register attempt {attempt}: HTTPError {e.code} {e.read().decode()[:200]}")
except Exception as e:
_log(f"register attempt {attempt}: {e}")
time.sleep(2)
return False
def heartbeat_loop():
# Fire the FIRST heartbeat immediately (no initial 5s wait) — the
# provisioning->online transition is driven by the heartbeat handler
# (registry.go evaluateStatus, #1784), so an eager first beat minimises the
# provision->online latency the e2e polls on.
while True:
try:
status, body = heartbeat()
if status != 200:
_log(f"heartbeat -> {status} {body[:160]}")
# A 401 means our token was rotated (every provision rotates the
# workspace token, issueAndInjectToken -> RevokeAllForWorkspace).
# Re-register to mint a fresh one. This is what lets the SAME
# container process survive a platform-side token rotation.
if status == 401:
_log("heartbeat 401 — re-registering to refresh token")
register_with_retry()
except urllib.error.HTTPError as e:
if e.code == 401:
_log("heartbeat 401 (HTTPError) — re-registering")
register_with_retry()
else:
_log(f"heartbeat HTTPError {e.code}")
except Exception as e:
_log(f"heartbeat error: {e}")
time.sleep(5)
class Handler(BaseHTTPRequestHandler):
def log_message(self, *args): # silence default access logging
pass
def _send(self, code, obj):
body = json.dumps(obj).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
# Health: any GET returns 200 so probes see us as alive.
self._send(200, {"status": "ok", "stub": True, "workspace_id": WORKSPACE_ID})
def do_POST(self):
length = int(self.headers.get("Content-Length", "0") or "0")
raw = self.rfile.read(length) if length else b"{}"
try:
req = json.loads(raw or b"{}")
except Exception:
req = {}
method = req.get("method", "")
req_id = req.get("id", str(uuid.uuid4()))
if method and method != "message/send":
# Match the proxy's -32601 method-not-found contract for unknowns.
self._send(200, {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32601, "message": f"method not found: {method}"},
})
return
# Canned A2A reply — exact envelope the canvas/proxy + test_a2a_e2e.sh
# assert on: result.role=agent, result.parts[0].kind=text/text.
self._send(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"kind": "message",
"role": "agent",
"parts": [{"kind": "text", "text": "STUB OK"}],
"messageId": str(uuid.uuid4()),
},
})
def main():
if not WORKSPACE_ID or not PLATFORM_URL:
_log(f"FATAL: WORKSPACE_ID={WORKSPACE_ID!r} PLATFORM_URL={PLATFORM_URL!r} — both required")
sys.exit(1)
_log(f"booting: platform={PLATFORM_URL} self_url={SELF_URL} hostname={HOSTNAME}")
# Start the HTTP server FIRST so the platform can reach us the instant we
# register (avoids a race where the proxy forwards before we're listening).
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
threading.Thread(target=server.serve_forever, daemon=True).start()
_log(f"listening on :{PORT}")
# Try to register, but do NOT make heartbeating contingent on it. The
# provisioning->online transition is driven by the HEARTBEAT handler
# (registry.go evaluateStatus, #1784), and heartbeats authenticate with the
# volume token (rotation-proof). If register transiently 401s (e.g. a token
# rotation mid-boot), we must still heartbeat so the workspace can come
# online — blocking the heartbeat loop on register success is exactly what
# kept the workspace stuck in 'provisioning'. register_with_retry runs in a
# background thread; the foreground heartbeat loop starts immediately.
threading.Thread(target=register_with_retry, daemon=True).start()
heartbeat_loop()
if __name__ == "__main__":
main()