Baseline restart 2026-04-24: every workspace came up healthy (uvicorn listening, agent-card serving) but produced zero delegations for two maintenance cycles. Tracing revealed platform's ProxyA2A POSTs to `http://ws-<id>:8000/` (no path suffix, see workspace-server/internal/provisioner.InternalURL) while the runtime's JSON-RPC routes were mounted at `/api/v1/jsonrpc/` under the a2a-sdk 1.x API migration. Result was silent — every inbound A2A returned 404 Not Found, the platform logged "Not Found" at INFO level, but no error bubbled up because the SDK's jsonrpc route factory doesn't respond to root when mounted at a subpath. Agents stayed warm, crons fired, but no work flowed. Fix: `create_jsonrpc_routes(handler, "/")` — matches platform expectation and the agent-card self-advertisement (which also shows root as the JSON-RPC URL). Agent-card route keeps its hard-coded `/.well-known/agent-card.json` path so there's no collision. Bumped to 0.1.12. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
599 lines
27 KiB
Python
599 lines
27 KiB
Python
"""Workspace runtime entry point.
|
|
|
|
Loads config -> discovers adapter -> setup -> create executor -> wrap in A2A -> register -> heartbeat.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
|
|
# When running as the installed `molecule-runtime` console script, the flat
|
|
# module names (config, heartbeat, adapters, etc.) need to resolve to this
|
|
# package's submodules. We inject the package directory onto sys.path so that
|
|
# `from config import ...` resolves to `molecule_runtime/config.py`.
|
|
_PKG_DIR = os.path.dirname(__file__)
|
|
if _PKG_DIR not in sys.path:
|
|
sys.path.insert(0, _PKG_DIR)
|
|
|
|
import httpx
|
|
import uvicorn
|
|
from a2a.server.request_handlers import DefaultRequestHandler
|
|
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
|
|
from a2a.server.tasks import InMemoryTaskStore
|
|
from a2a.types import AgentCard, AgentCapabilities, AgentSkill, AgentInterface
|
|
from starlette.applications import Starlette
|
|
|
|
from molecule_runtime.adapters import get_adapter, AdapterConfig
|
|
from molecule_runtime.config import load_config
|
|
from molecule_runtime.heartbeat import HeartbeatLoop
|
|
from molecule_runtime.preflight import run_preflight, render_preflight_report
|
|
from builtin_tools.awareness_client import get_awareness_config
|
|
import uuid as _uuid
|
|
|
|
from builtin_tools.telemetry import setup_telemetry, make_trace_middleware
|
|
from policies.namespaces import resolve_awareness_namespace
|
|
|
|
|
|
from initial_prompt import (
|
|
mark_initial_prompt_attempted,
|
|
resolve_initial_prompt_marker,
|
|
)
|
|
from molecule_runtime.platform_auth import auth_headers
|
|
|
|
|
|
def get_machine_ip() -> str: # pragma: no cover
|
|
"""Get the machine's IP for A2A discovery."""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
|
|
# Re-exported from transcript_auth for the inline /transcript handler.
|
|
# Separate module keeps the security-critical gate import-light + unit-testable.
|
|
from transcript_auth import transcript_authorized as _transcript_authorized
|
|
|
|
|
|
async def main(): # pragma: no cover
|
|
workspace_id = os.environ.get("WORKSPACE_ID", "workspace-default")
|
|
config_path = os.environ.get("WORKSPACE_CONFIG_PATH", "/configs")
|
|
platform_url = os.environ.get("PLATFORM_URL", "http://platform:8080")
|
|
awareness_config = get_awareness_config()
|
|
|
|
# 0. Normalise LLM auth env vars based on token type.
|
|
# Platform stores tokens as ANTHROPIC_AUTH_TOKEN, but the Claude SDK/CLI
|
|
# expects different env vars per token kind (OAuth vs API key vs proxy).
|
|
# Doing this early means every downstream adapter/executor sees a
|
|
# consistent, correct env — no per-adapter detection needed.
|
|
from molecule_runtime.llm_auth import normalise_llm_env
|
|
print(normalise_llm_env().summary())
|
|
|
|
# 0.1 GitHub credential helper installer — extracts bundled .sh scripts,
|
|
# configures git, starts refresh daemon, primes gh CLI. Eliminates the
|
|
# per-template wiring that caused #1933 (claude-code-default template
|
|
# shipped without the wiring; 39 workspaces lost their tokens after the
|
|
# ~60min installation-token TTL). Fails-soft so a missing git/gh binary
|
|
# doesn't block runtime startup. See credential_helper.py for the full
|
|
# rationale.
|
|
from molecule_runtime.credential_helper import install_credential_helper
|
|
install_credential_helper()
|
|
|
|
# 0.2 Pre-commit hook installer — refuse commits that add internal-flavored
|
|
# paths to the public monorepo. Runs at the agent's local git, so leaks
|
|
# get instant feedback with the redirect command in the same response
|
|
# cycle the agent ran `git commit` in. Hook is a no-op in non-public
|
|
# repos (internal, plugins, templates, third-party). See precommit_hook.py
|
|
# for the full rationale.
|
|
from molecule_runtime.precommit_hook import install_pre_commit_hook
|
|
install_pre_commit_hook()
|
|
|
|
# 0.5 Initialise OpenTelemetry (no-op if packages not installed)
|
|
setup_telemetry(service_name=workspace_id)
|
|
|
|
# 1. Load config
|
|
config = load_config(config_path)
|
|
port = config.a2a.port
|
|
preflight = run_preflight(config, config_path)
|
|
render_preflight_report(preflight)
|
|
if not preflight.ok:
|
|
raise SystemExit(1)
|
|
if awareness_config:
|
|
awareness_namespace = resolve_awareness_namespace(
|
|
workspace_id,
|
|
awareness_config.get("namespace", ""),
|
|
)
|
|
print(f"Awareness enabled for namespace: {awareness_namespace}")
|
|
|
|
# 1.5 Initialise governance adapter (no-op if disabled or package absent)
|
|
from builtin_tools.governance import initialize_governance
|
|
if config.governance.enabled:
|
|
await initialize_governance(config.governance)
|
|
print(f"Governance: Microsoft Agent Governance Toolkit enabled (mode={config.governance.policy_mode})")
|
|
else:
|
|
print("Governance: disabled (set governance.enabled: true in config.yaml to activate)")
|
|
|
|
# 2. Create heartbeat (passed to adapter for task tracking)
|
|
heartbeat = HeartbeatLoop(platform_url, workspace_id)
|
|
|
|
# 3. Get adapter for this runtime
|
|
runtime = config.runtime or "langgraph"
|
|
adapter_cls = get_adapter(runtime) # Raises KeyError if unknown — no silent fallback
|
|
|
|
adapter = adapter_cls()
|
|
print(f"Runtime: {runtime} ({adapter.display_name()})")
|
|
|
|
# 4. Build adapter config
|
|
adapter_config = AdapterConfig(
|
|
model=config.model,
|
|
system_prompt=None, # Adapter builds its own prompt
|
|
tools=config.skills, # Skill names from config.yaml
|
|
runtime_config=vars(config.runtime_config) if config.runtime_config else {},
|
|
config_path=config_path,
|
|
workspace_id=workspace_id,
|
|
prompt_files=config.prompt_files,
|
|
a2a_port=port,
|
|
heartbeat=heartbeat,
|
|
)
|
|
|
|
# 5. Setup adapter and create executor
|
|
# If setup fails, ensure heartbeat is stopped to prevent resource leak
|
|
try:
|
|
await adapter.setup(adapter_config)
|
|
executor = await adapter.create_executor(adapter_config)
|
|
except Exception:
|
|
# heartbeat hasn't started yet but may have async tasks pending
|
|
if hasattr(heartbeat, "stop"):
|
|
try:
|
|
await heartbeat.stop()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
# 5.5. Initialise Temporal durable execution wrapper (optional)
|
|
# Connects to TEMPORAL_HOST (default: localhost:7233) and starts a
|
|
# co-located Temporal worker as a background asyncio task.
|
|
# No-op with a warning log if Temporal is unreachable or temporalio
|
|
# is not installed — all tasks fall back to direct execution transparently.
|
|
from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper
|
|
temporal_wrapper = _create_temporal_wrapper()
|
|
await temporal_wrapper.start()
|
|
|
|
# Get loaded skills for agent card (adapter may have populated them)
|
|
loaded_skills = getattr(adapter, "loaded_skills", [])
|
|
|
|
# 6. Build Agent Card
|
|
machine_ip = os.environ.get("HOSTNAME", get_machine_ip())
|
|
workspace_url = f"http://{machine_ip}:{port}"
|
|
|
|
agent_card = AgentCard(
|
|
name=config.name,
|
|
description=config.description or config.name,
|
|
version=config.version,
|
|
supported_interfaces=[
|
|
AgentInterface(
|
|
protocol_binding="JSONRPC",
|
|
url=f"{workspace_url}/",
|
|
protocol_version="1.0",
|
|
),
|
|
],
|
|
capabilities=AgentCapabilities(
|
|
streaming=config.a2a.streaming,
|
|
push_notifications=config.a2a.push_notifications,
|
|
),
|
|
skills=[
|
|
AgentSkill(
|
|
id=skill.metadata.id,
|
|
name=skill.metadata.name,
|
|
description=skill.metadata.description,
|
|
tags=skill.metadata.tags,
|
|
examples=skill.metadata.examples,
|
|
)
|
|
for skill in loaded_skills
|
|
],
|
|
default_input_modes=["text/plain", "application/json"],
|
|
default_output_modes=["text/plain", "application/json"],
|
|
)
|
|
|
|
# 7. Wrap in A2A.
|
|
#
|
|
# Regression fix (#204): PR #198 tried to wire push_config_store +
|
|
# push_sender to satisfy #175 (push notification capability), but
|
|
# PushNotificationSender is an abstract base class in the a2a-sdk and
|
|
# can't be instantiated directly. Passing it crashed main.py on startup
|
|
# with `TypeError: Can't instantiate abstract class`. Dropped back to
|
|
# DefaultRequestHandler's own defaults — pushNotifications capability
|
|
# in the AgentCard below is still advertised via AgentCapabilities so
|
|
# clients know we COULD do pushes; actually implementing them requires
|
|
# a concrete sender subclass, tracked as a Phase-H follow-up to #175.
|
|
handler = DefaultRequestHandler(
|
|
agent_executor=executor,
|
|
task_store=InMemoryTaskStore(),
|
|
agent_card=agent_card,
|
|
)
|
|
|
|
# Build Starlette app from route factory functions (a2a-sdk 1.x API).
|
|
# JSON-RPC mounts at root because the platform's ProxyA2A (the only
|
|
# caller in this deployment) POSTs to `http://ws-<id>:8000/` — no
|
|
# path suffix. See workspace-server/internal/provisioner.InternalURL
|
|
# which returns the Docker-DNS form without a path. Mounting
|
|
# anywhere else produces 404 on every inbound A2A and silent fleet-
|
|
# wide productivity loss (baseline restart 2026-04-24: 0 delegations
|
|
# for 2 cycles until this was traced to `/api/v1/jsonrpc/`).
|
|
routes = []
|
|
routes.extend(create_agent_card_routes(agent_card))
|
|
routes.extend(create_jsonrpc_routes(handler, "/"))
|
|
app = Starlette(routes=routes)
|
|
|
|
# 8. Register with platform
|
|
agent_card_dict = {
|
|
"name": config.name,
|
|
"description": config.description,
|
|
"version": config.version,
|
|
"url": workspace_url,
|
|
"skills": [
|
|
{
|
|
"id": s.metadata.id,
|
|
"name": s.metadata.name,
|
|
"description": s.metadata.description,
|
|
"tags": s.metadata.tags,
|
|
}
|
|
for s in loaded_skills
|
|
],
|
|
"capabilities": {
|
|
"streaming": config.a2a.streaming,
|
|
"pushNotifications": config.a2a.push_notifications,
|
|
},
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
resp = await client.post(
|
|
f"{platform_url}/registry/register",
|
|
json={
|
|
"id": workspace_id,
|
|
"url": workspace_url,
|
|
"agent_card": agent_card_dict,
|
|
},
|
|
headers=auth_headers(),
|
|
)
|
|
print(f"Registered with platform: {resp.status_code}")
|
|
# Phase 30.1 — capture the auth token issued at first register.
|
|
# The platform only mints one on first register per workspace,
|
|
# so a subsequent restart gets an empty auth_token and we
|
|
# keep using the on-disk copy from the original issuance.
|
|
if resp.status_code == 200:
|
|
try:
|
|
body = resp.json()
|
|
tok = body.get("auth_token")
|
|
if tok:
|
|
from platform_auth import save_token
|
|
save_token(tok)
|
|
print("Workspace auth token saved to disk.")
|
|
except Exception as parse_exc:
|
|
print(f"Warning: couldn't parse register response for token: {parse_exc}")
|
|
except Exception as e:
|
|
print(f"Warning: failed to register with platform: {e}")
|
|
|
|
# 9. Start heartbeat
|
|
heartbeat.start()
|
|
|
|
# 9b. Start skills hot-reload watcher (background task)
|
|
# When a skill file changes the watcher reloads the skill module and calls
|
|
# back into the adapter so the next A2A request uses the updated tools.
|
|
if config.skills:
|
|
try:
|
|
from skill_loader.watcher import SkillsWatcher
|
|
|
|
def _on_skill_reload(updated_skill):
|
|
"""Rebuild the LangGraph agent when a skill changes in-place."""
|
|
if not hasattr(adapter, "loaded_skills"):
|
|
return
|
|
# Replace the matching skill in the adapter's skill list
|
|
adapter.loaded_skills = [
|
|
updated_skill if s.metadata.id == updated_skill.metadata.id else s
|
|
for s in adapter.loaded_skills
|
|
]
|
|
# Rebuild the agent's tool list from updated skills
|
|
if hasattr(adapter, "all_tools") and hasattr(adapter, "system_prompt"):
|
|
from builtin_tools.approval import request_approval
|
|
from builtin_tools.delegation import delegate_to_workspace
|
|
from builtin_tools.memory import commit_memory, search_memory
|
|
from builtin_tools.sandbox import run_code
|
|
base_tools = [delegate_to_workspace, request_approval,
|
|
commit_memory, search_memory, run_code]
|
|
skill_tools = []
|
|
for sk in adapter.loaded_skills:
|
|
skill_tools.extend(sk.tools)
|
|
adapter.all_tools = base_tools + skill_tools
|
|
# Rebuild compiled agent so next ainvoke picks up new tools
|
|
try:
|
|
from agent import create_agent
|
|
new_agent = create_agent(
|
|
config.model, adapter.all_tools, adapter.system_prompt
|
|
)
|
|
executor.agent = new_agent
|
|
print(f"Skills hot-reload: '{updated_skill.metadata.id}' reloaded — "
|
|
f"{len(updated_skill.tools)} tool(s)")
|
|
except Exception as rebuild_err:
|
|
print(f"Skills hot-reload: agent rebuild failed: {rebuild_err}")
|
|
|
|
skills_watcher = SkillsWatcher(
|
|
config_path=config_path,
|
|
skill_names=config.skills,
|
|
on_reload=_on_skill_reload,
|
|
)
|
|
asyncio.create_task(skills_watcher.start())
|
|
print(f"Skills hot-reload enabled for: {config.skills}")
|
|
except Exception as e:
|
|
print(f"Warning: skills watcher could not start: {e}")
|
|
|
|
# 10. Run A2A server
|
|
print(f"Workspace {workspace_id} starting on port {port}")
|
|
# Wrap the ASGI app with W3C TraceContext extraction middleware so incoming
|
|
# A2A HTTP requests propagate their trace context into _incoming_trace_context.
|
|
starlette_app = app
|
|
|
|
# Add /transcript route — exposes the most-recent agent session log
|
|
# (claude-code reads ~/.claude/projects/<cwd>/<session>.jsonl). Other
|
|
# runtimes return supported:false.
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Route
|
|
|
|
async def _transcript_handler(request):
|
|
# Require workspace bearer token — the same token issued at registration
|
|
# and stored in /configs/.auth_token. Any container on molecule-monorepo-net
|
|
# could otherwise read the full session log. Closes #287.
|
|
#
|
|
# #328: fail CLOSED when the token file is unavailable. get_token()
|
|
# returns None during the bootstrap window (first register hasn't
|
|
# completed), if /configs/.auth_token was deleted, or on OSError.
|
|
# The old `if expected:` guard treated all three cases as "skip
|
|
# auth" — an unauthenticated container on the same Docker network
|
|
# could read the entire session log during that window. Deny
|
|
# instead. The platform's TranscriptHandler acquires the token
|
|
# during registration, so once the bootstrap completes it always
|
|
# has a valid credential to present.
|
|
from platform_auth import get_token
|
|
if not _transcript_authorized(get_token(), request.headers.get("Authorization", "")):
|
|
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
|
try:
|
|
since = int(request.query_params.get("since", "0"))
|
|
limit = int(request.query_params.get("limit", "100"))
|
|
except (TypeError, ValueError):
|
|
return JSONResponse({"error": "since and limit must be integers"}, status_code=400)
|
|
result = await adapter.transcript_lines(since=since, limit=limit)
|
|
return JSONResponse(result)
|
|
|
|
starlette_app.add_route("/transcript", _transcript_handler, methods=["GET"])
|
|
|
|
built_app = make_trace_middleware(starlette_app)
|
|
|
|
server_config = uvicorn.Config(
|
|
built_app,
|
|
host="0.0.0.0",
|
|
port=port,
|
|
log_level="info",
|
|
)
|
|
server = uvicorn.Server(server_config)
|
|
|
|
# 10b. Schedule initial_prompt self-message after server is ready.
|
|
# Only runs on first boot — creates a marker file to prevent re-execution on restart.
|
|
initial_prompt_task = None
|
|
initial_prompt_marker = resolve_initial_prompt_marker(config_path)
|
|
if config.initial_prompt and not os.path.exists(initial_prompt_marker):
|
|
# Write the marker UP FRONT (#71): if the prompt later crashes or
|
|
# times out, we do NOT replay on next boot — that created a
|
|
# ProcessError cascade where every message kept crashing. Operators
|
|
# can always re-send via chat. Log loudly if the marker write
|
|
# fails so the situation is visible.
|
|
if not mark_initial_prompt_attempted(initial_prompt_marker):
|
|
print(
|
|
f"Initial prompt: WARNING — could not write marker at "
|
|
f"{initial_prompt_marker}; this boot may replay if it crashes.",
|
|
flush=True,
|
|
)
|
|
async def _send_initial_prompt():
|
|
"""Wait for server to be ready, then send initial_prompt as self-message."""
|
|
# Wait for the A2A server to accept connections
|
|
ready = False
|
|
for attempt in range(30):
|
|
await asyncio.sleep(1)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
resp = await client.get(f"http://127.0.0.1:{port}/.well-known/agent.json")
|
|
if resp.status_code == 200:
|
|
ready = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not ready:
|
|
print("Initial prompt: server not ready after 30s, skipping", flush=True)
|
|
return
|
|
|
|
# Send initial prompt through the platform A2A proxy (not directly to self).
|
|
# The proxy logs an a2a_receive with source_id=NULL (canvas-style),
|
|
# broadcasts A2A_RESPONSE via WebSocket so the chat shows both the
|
|
# prompt (as user message) and the response (as agent message).
|
|
# Uses urllib in a thread to avoid asyncio/httpx streaming hangs.
|
|
import json as _json
|
|
import urllib.request
|
|
|
|
def _do_send_sync():
|
|
import time as _time
|
|
payload = _json.dumps({
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"messageId": f"initial-{_uuid.uuid4().hex[:8]}",
|
|
"parts": [{"kind": "text", "text": config.initial_prompt}],
|
|
},
|
|
},
|
|
}).encode()
|
|
|
|
# #220: include platform bearer token so the request isn't
|
|
# silently rejected once any workspace has a live token on
|
|
# file. Without this, initial_prompt 401s in multi-tenant
|
|
# mode exactly like /registry/register did in #215.
|
|
headers = {"Content-Type": "application/json", **auth_headers()}
|
|
|
|
# Retry with backoff — the platform proxy may not be able to
|
|
# reach us yet (container networking takes a moment to settle).
|
|
max_retries = 5
|
|
for attempt in range(max_retries):
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"{platform_url}/workspaces/{workspace_id}/a2a",
|
|
data=payload,
|
|
headers=headers,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=600) as resp:
|
|
resp.read()
|
|
print(f"Initial prompt: completed (status={resp.status})", flush=True)
|
|
break
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
delay = 2 ** attempt # 1, 2, 4, 8, 16 seconds
|
|
print(f"Initial prompt: attempt {attempt + 1} failed ({e}), retrying in {delay}s...", flush=True)
|
|
_time.sleep(delay)
|
|
else:
|
|
print(f"Initial prompt: failed after {max_retries} attempts — {e}", flush=True)
|
|
return
|
|
|
|
# Marker was already written up front (#71). Nothing to do here.
|
|
|
|
print("Initial prompt: sending via platform proxy...", flush=True)
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_in_executor(None, _do_send_sync)
|
|
|
|
initial_prompt_task = asyncio.create_task(_send_initial_prompt())
|
|
|
|
# 10c. Idle loop — reflection-on-completion / backlog-pull pattern.
|
|
# Fires config.idle_prompt every config.idle_interval_seconds while the
|
|
# workspace has no active task. This turns every role from "waits for cron"
|
|
# into "self-wakes when idle" — the Hermes/Letta shape from today's
|
|
# multi-framework survey (see docs/ecosystem-watch.md). Cost collapses to
|
|
# event-driven in practice: the idle check is local (no LLM call, just
|
|
# heartbeat.active_tasks==0), and the prompt only fires when there's
|
|
# actually nothing to do. Gated on idle_prompt being non-empty so existing
|
|
# workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or
|
|
# per-workspace to enable.
|
|
idle_loop_task = None
|
|
if config.idle_prompt:
|
|
# Idle-fire HTTP timeout. Kept tight relative to the fire cadence so a
|
|
# hung platform doesn't accumulate dangling requests — a fire that
|
|
# takes longer than the idle interval itself is almost certainly stuck.
|
|
IDLE_FIRE_TIMEOUT_SECONDS = max(60, min(300, config.idle_interval_seconds))
|
|
# Initial settle delay — never longer than 60s so cold-start races
|
|
# don't stall the first fire, and never shorter than the configured
|
|
# interval (short intervals shouldn't fire instantly on boot either).
|
|
IDLE_INITIAL_SETTLE_SECONDS = min(config.idle_interval_seconds, 60)
|
|
|
|
async def _run_idle_loop():
|
|
"""Self-sends config.idle_prompt periodically when the workspace is idle."""
|
|
await asyncio.sleep(IDLE_INITIAL_SETTLE_SECONDS)
|
|
|
|
import json as _json
|
|
from urllib import request as _urlreq, error as _urlerr
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(config.idle_interval_seconds)
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
# Local idle check — no platform API call, no LLM call.
|
|
# heartbeat.active_tasks == 0 means no in-flight work.
|
|
if heartbeat.active_tasks > 0:
|
|
continue
|
|
|
|
# Self-post the idle prompt via the platform A2A proxy (same
|
|
# path as initial_prompt). The agent's own concurrency control
|
|
# rejects if the workspace becomes busy between this check and
|
|
# the post — that's the expected safety valve.
|
|
payload = _json.dumps({
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"messageId": f"idle-{_uuid.uuid4().hex[:8]}",
|
|
"parts": [{"kind": "text", "text": config.idle_prompt}],
|
|
},
|
|
},
|
|
}).encode()
|
|
|
|
def _post_sync():
|
|
# Returns (status_code, error_type) so the caller logs the
|
|
# actual outcome instead of a bare "post failed" line.
|
|
# #220: include auth_headers() on every idle fire. Without
|
|
# this, the idle loop 401s in multi-tenant mode.
|
|
headers = {"Content-Type": "application/json", **auth_headers()}
|
|
try:
|
|
req = _urlreq.Request(
|
|
f"{platform_url}/workspaces/{workspace_id}/a2a",
|
|
data=payload,
|
|
headers=headers,
|
|
)
|
|
with _urlreq.urlopen(req, timeout=IDLE_FIRE_TIMEOUT_SECONDS) as resp:
|
|
resp.read()
|
|
return resp.status, None
|
|
except _urlerr.HTTPError as e:
|
|
return e.code, type(e).__name__
|
|
except _urlerr.URLError as e:
|
|
return None, f"URLError: {e.reason}"
|
|
except Exception as e: # pragma: no cover — catch-all safety net
|
|
return None, type(e).__name__
|
|
|
|
print(
|
|
f"Idle loop: firing (active_tasks=0, interval={config.idle_interval_seconds}s, "
|
|
f"timeout={IDLE_FIRE_TIMEOUT_SECONDS}s)",
|
|
flush=True,
|
|
)
|
|
loop_ref = asyncio.get_running_loop()
|
|
|
|
def _log_result(future):
|
|
try:
|
|
status, err = future.result()
|
|
if err:
|
|
print(
|
|
f"Idle loop: post failed — status={status} err={err}",
|
|
flush=True,
|
|
)
|
|
else:
|
|
print(f"Idle loop: post ok status={status}", flush=True)
|
|
except Exception as e: # pragma: no cover
|
|
print(f"Idle loop: executor callback crashed — {e}", flush=True)
|
|
|
|
fut = loop_ref.run_in_executor(None, _post_sync)
|
|
fut.add_done_callback(_log_result)
|
|
|
|
idle_loop_task = asyncio.create_task(_run_idle_loop())
|
|
|
|
try:
|
|
await server.serve()
|
|
finally:
|
|
# Cancel initial prompt if still running
|
|
if initial_prompt_task and not initial_prompt_task.done():
|
|
initial_prompt_task.cancel()
|
|
# Cancel idle loop if running
|
|
if idle_loop_task and not idle_loop_task.done():
|
|
idle_loop_task.cancel()
|
|
# Gracefully stop the Temporal worker background task on shutdown
|
|
await temporal_wrapper.stop()
|
|
|
|
|
|
def main_sync(): # pragma: no cover
|
|
"""Synchronous entry point for the molecule-runtime console script."""
|
|
asyncio.run(main())
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
main_sync()
|