The boot-smoke gate (molecule-core#2275) invokes adapters with stub
creds and no network so it can exercise executor.execute() lazy
imports. OpenClaw's setup() spawns a real `openclaw gateway --dev`
subprocess that needs valid creds + network — under smoke env it
exits immediately, raising RuntimeError("OpenClaw gateway process
exited") and failing the publish-image workflow.
Add a 1-line opt-out at the top of setup() so the runtime can reach
its smoke short-circuit. Real production boots are unaffected.
Closes the openclaw failure surfaced when running publish-image
across all 8 workspace templates with the new boot-smoke step.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
257 lines
11 KiB
Python
257 lines
11 KiB
Python
"""OpenClaw adapter — bridges OpenClaw's Node.js gateway with our A2A protocol.
|
|
|
|
OpenClaw is a Node.js agent runtime with its own gateway (port 18789).
|
|
This adapter:
|
|
1. Installs OpenClaw CLI (npm) and missing deps in the container
|
|
2. Runs non-interactive onboard with the configured model provider
|
|
3. Copies workspace files (SOUL.md, BOOTSTRAP.md, etc.) to OpenClaw's workspace dir
|
|
4. Starts the OpenClaw gateway as a background process
|
|
5. Proxies A2A messages via `openclaw agent --json` CLI subprocess
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
|
|
from molecule_runtime.adapters.base import BaseAdapter, AdapterConfig
|
|
from molecule_runtime.adapters.shared_runtime import brief_task, extract_message_text, set_current_task
|
|
from a2a.server.agent_execution import AgentExecutor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
OPENCLAW_WORKSPACE = os.path.expanduser("~/.openclaw/workspace-dev/main")
|
|
OPENCLAW_PORT = 18789
|
|
|
|
# Known missing optional deps in OpenClaw's npm package
|
|
OPENCLAW_MISSING_DEPS = ["@buape/carbon", "@larksuiteoapi/node-sdk", "@slack/web-api", "grammy"]
|
|
|
|
|
|
class OpenClawAdapter(BaseAdapter):
|
|
|
|
def __init__(self):
|
|
self._gateway_process = None
|
|
|
|
@staticmethod
|
|
def name() -> str:
|
|
return "openclaw"
|
|
|
|
@staticmethod
|
|
def display_name() -> str:
|
|
return "OpenClaw"
|
|
|
|
@staticmethod
|
|
def description() -> str:
|
|
return "OpenClaw agent runtime — Node.js gateway with SOUL/BOOTSTRAP/AGENTS workspace convention"
|
|
|
|
@staticmethod
|
|
def get_config_schema() -> dict:
|
|
return {
|
|
"model": {"type": "string", "description": "Model ID (e.g. google/gemini-2.5-flash)"},
|
|
"provider_url": {"type": "string", "description": "LLM provider base URL", "default": "https://openrouter.ai/api/v1"},
|
|
"gateway_port": {"type": "integer", "description": "OpenClaw gateway port", "default": 18789},
|
|
}
|
|
|
|
async def setup(self, config: AdapterConfig) -> None: # pragma: no cover
|
|
"""Install OpenClaw, run onboard, copy workspace files, start gateway."""
|
|
# Boot-smoke contract (molecule-core#2275): the publish-image gate
|
|
# invokes us with stub creds + no network so it can exercise lazy
|
|
# imports inside execute(). Real gateway spawn would fail here
|
|
# (no valid api_key, no `openclaw` binary on PATH yet), so skip
|
|
# the heavy setup path entirely. The runtime's smoke_mode short-
|
|
# circuit fires immediately after create_executor() returns.
|
|
if os.environ.get("MOLECULE_SMOKE_MODE") == "1":
|
|
logger.info("MOLECULE_SMOKE_MODE=1 — skipping OpenClaw gateway spawn")
|
|
return
|
|
|
|
npm_prefix = os.path.expanduser("~/.local")
|
|
os.environ["PATH"] = f"{npm_prefix}/bin:{os.environ.get('PATH', '')}"
|
|
|
|
# 1. Install OpenClaw CLI if not present
|
|
if not shutil.which("openclaw"):
|
|
logger.info("Installing OpenClaw CLI...")
|
|
result = subprocess.run(
|
|
["npm", "install", "--prefix", npm_prefix, "-g", "openclaw"],
|
|
capture_output=True, text=True, timeout=300,
|
|
env={**os.environ, "npm_config_prefix": npm_prefix}
|
|
)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to install OpenClaw: {result.stderr[:500]}")
|
|
|
|
# Install known missing optional deps
|
|
oc_dir = os.path.join(npm_prefix, "lib/node_modules/openclaw")
|
|
if os.path.exists(oc_dir):
|
|
logger.info("Installing OpenClaw optional deps...")
|
|
subprocess.run(
|
|
["npm", "install"] + OPENCLAW_MISSING_DEPS,
|
|
capture_output=True, text=True, timeout=120, cwd=oc_dir
|
|
)
|
|
logger.info("OpenClaw CLI installed")
|
|
|
|
# 2. Resolve API key and model
|
|
prefix = config.model.split(":")[0] if ":" in config.model else "openai"
|
|
if prefix == "qianfan":
|
|
api_key = os.environ.get("QIANFAN_API_KEY", os.environ.get("AISTUDIO_API_KEY", ""))
|
|
else:
|
|
api_key = os.environ.get("OPENAI_API_KEY", os.environ.get("GROQ_API_KEY", os.environ.get("OPENROUTER_API_KEY", "")))
|
|
# Determine provider URL from model prefix
|
|
provider_urls = {
|
|
"openai": "https://api.openai.com/v1",
|
|
"groq": "https://api.groq.com/openai/v1",
|
|
"openrouter": "https://openrouter.ai/api/v1",
|
|
"qianfan": "https://qianfan.baidubce.com/v2",
|
|
}
|
|
provider_url = config.runtime_config.get("provider_url", provider_urls.get(prefix, "https://api.openai.com/v1"))
|
|
model = config.model
|
|
if ":" in model:
|
|
_, model = model.split(":", 1)
|
|
|
|
# 3. Run non-interactive onboard
|
|
if not os.path.exists(os.path.expanduser("~/.openclaw/openclaw.json")):
|
|
logger.info(f"Running OpenClaw onboard (model: {model})...")
|
|
subprocess.run(
|
|
["openclaw", "onboard", "--non-interactive",
|
|
"--auth-choice", "custom-api-key",
|
|
"--custom-base-url", provider_url,
|
|
"--custom-model-id", model,
|
|
"--custom-api-key", api_key,
|
|
"--custom-compatibility", "openai",
|
|
"--secret-input-mode", "plaintext",
|
|
"--accept-risk", "--skip-health"],
|
|
capture_output=True, text=True, timeout=60,
|
|
env={**os.environ, "NODE_NO_WARNINGS": "1"}
|
|
)
|
|
logger.info("OpenClaw onboard complete")
|
|
|
|
# 3b. Fix context window (OpenClaw defaults to 16K, but modern models have much more)
|
|
oc_config_path = os.path.expanduser("~/.openclaw/openclaw.json")
|
|
if os.path.exists(oc_config_path):
|
|
try:
|
|
import json as json_mod
|
|
oc_cfg = json_mod.load(open(oc_config_path))
|
|
provider_name = "custom-" + provider_url.split("//")[1].split("/")[0].replace(".", "-")
|
|
providers = oc_cfg.get("models", {}).get("providers", {})
|
|
if provider_name in providers:
|
|
for m in providers[provider_name].get("models", []):
|
|
m["contextWindow"] = 1000000 # 1M tokens for modern models
|
|
m["maxTokens"] = 16384
|
|
json_mod.dump(oc_cfg, open(oc_config_path, "w"), indent=2)
|
|
logger.info(f"Fixed context window for {provider_name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fix context window: {e}")
|
|
|
|
# 3c. Always write auth-profiles.json
|
|
# (key may have been set via secrets API after first boot)
|
|
if api_key:
|
|
auth_dir = os.path.expanduser("~/.openclaw/agents/main/agent")
|
|
os.makedirs(auth_dir, exist_ok=True)
|
|
auth_file = os.path.join(auth_dir, "auth-profiles.json")
|
|
import json as json_mod
|
|
provider_name = "custom-" + provider_url.split("//")[1].split("/")[0].replace(".", "-")
|
|
auth_data = {provider_name: {"type": "api-key", "key": api_key}}
|
|
with open(auth_file, "w") as f:
|
|
json_mod.dump(auth_data, f, indent=2)
|
|
logger.info(f"Wrote auth-profiles.json for {provider_name}")
|
|
|
|
# 4. Copy workspace files from /configs to OpenClaw's workspace dir
|
|
os.makedirs(OPENCLAW_WORKSPACE, exist_ok=True)
|
|
for fname in os.listdir(config.config_path):
|
|
src = os.path.join(config.config_path, fname)
|
|
if os.path.isfile(src) and fname.endswith(".md"):
|
|
shutil.copy2(src, os.path.join(OPENCLAW_WORKSPACE, fname))
|
|
logger.debug(f"Copied {fname} to OpenClaw workspace")
|
|
|
|
# 5. Start the gateway as a background process
|
|
gateway_port = config.runtime_config.get("gateway_port", OPENCLAW_PORT)
|
|
logger.info(f"Starting OpenClaw gateway on port {gateway_port}...")
|
|
env = os.environ.copy()
|
|
env["NODE_NO_WARNINGS"] = "1"
|
|
self._gateway_process = subprocess.Popen(
|
|
["openclaw", "gateway", "--dev", "--port", str(gateway_port), "--bind", "loopback"],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
env=env,
|
|
)
|
|
# Wait for gateway to become healthy (max 30s)
|
|
for attempt in range(15):
|
|
await asyncio.sleep(2)
|
|
if self._gateway_process.poll() is not None:
|
|
raise RuntimeError("OpenClaw gateway process exited")
|
|
try:
|
|
health = subprocess.run(
|
|
["openclaw", "gateway", "health"],
|
|
capture_output=True, text=True, timeout=10,
|
|
env=os.environ.copy()
|
|
)
|
|
if health.returncode == 0:
|
|
logger.info(f"OpenClaw gateway healthy (PID: {self._gateway_process.pid})")
|
|
break
|
|
except subprocess.TimeoutExpired:
|
|
logger.debug(f"Gateway health check timeout (attempt {attempt+1}/15)")
|
|
else:
|
|
raise RuntimeError("OpenClaw gateway did not become healthy within 30s")
|
|
|
|
async def create_executor(self, config: AdapterConfig) -> AgentExecutor:
|
|
return OpenClawA2AExecutor(heartbeat=config.heartbeat)
|
|
|
|
|
|
class OpenClawA2AExecutor(AgentExecutor):
|
|
"""Proxies A2A messages to OpenClaw via `openclaw agent` CLI subprocess."""
|
|
|
|
def __init__(self, heartbeat=None):
|
|
self._heartbeat = heartbeat
|
|
|
|
async def execute(self, context, event_queue):
|
|
from a2a.helpers import new_text_message
|
|
|
|
user_message = extract_message_text(context)
|
|
|
|
if not user_message:
|
|
await event_queue.enqueue_event(new_text_message("No message provided"))
|
|
return
|
|
|
|
await set_current_task(self._heartbeat, brief_task(user_message))
|
|
|
|
# Call OpenClaw agent via CLI
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"openclaw", "agent",
|
|
"--session-id", context.task_id or "default",
|
|
"--message", user_message,
|
|
"--json", "--timeout", "120",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env={**os.environ, "PATH": f"{os.path.expanduser('~/.local/bin')}:{os.environ.get('PATH', '')}"}
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=130)
|
|
output = stdout.decode().strip()
|
|
|
|
if proc.returncode == 0 and output:
|
|
try:
|
|
data = json.loads(output)
|
|
payloads = data.get("result", {}).get("payloads", [])
|
|
if payloads:
|
|
reply = payloads[0].get("text", "")
|
|
else:
|
|
reply = str(data)
|
|
except json.JSONDecodeError:
|
|
reply = output
|
|
else:
|
|
reply = f"OpenClaw error: {stderr.decode()[:300]}" if stderr else f"OpenClaw returned code {proc.returncode}"
|
|
|
|
except asyncio.TimeoutError:
|
|
reply = "OpenClaw timed out after 120s"
|
|
except Exception as e:
|
|
reply = f"OpenClaw error: {e}"
|
|
finally:
|
|
await set_current_task(self._heartbeat, "")
|
|
|
|
await event_queue.enqueue_event(new_text_message(reply))
|
|
|
|
async def cancel(self, context, event_queue): # pragma: no cover
|
|
pass
|
|
|
|
|
|
Adapter = OpenClawAdapter
|