Merge pull request #2103 from Molecule-AI/runtime/cd-chain
feat: runtime CD chain + queued/drain classification + reload-safe agent messages
This commit is contained in:
commit
61c16fe657
113
.github/workflows/auto-tag-runtime.yml
vendored
Normal file
113
.github/workflows/auto-tag-runtime.yml
vendored
Normal file
@ -0,0 +1,113 @@
|
||||
name: auto-tag-runtime
|
||||
|
||||
# Auto-tag runtime releases on every merge to main that touches workspace/.
|
||||
# This is the entry point of the runtime CD chain:
|
||||
#
|
||||
# merge PR → auto-tag-runtime (this) → publish-runtime → cascade → template
|
||||
# image rebuilds → repull on hosts.
|
||||
#
|
||||
# Default bump is patch. Override via PR label `release:minor` or
|
||||
# `release:major` BEFORE merging — the label is read off the merged PR
|
||||
# associated with the push commit.
|
||||
#
|
||||
# Skips when:
|
||||
# - The push isn't to main (other branches don't auto-release).
|
||||
# - The merge commit message contains `[skip-release]` (escape hatch
|
||||
# for cleanup PRs that touch workspace/ but shouldn't ship).
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- "workspace/**"
|
||||
- "scripts/build_runtime_package.py"
|
||||
- ".github/workflows/auto-tag-runtime.yml"
|
||||
- ".github/workflows/publish-runtime.yml"
|
||||
|
||||
permissions:
|
||||
contents: write # to push the new tag
|
||||
pull-requests: read # to read labels off the merged PR
|
||||
|
||||
concurrency:
|
||||
# Serialize tag bumps so two near-simultaneous merges can't both think
|
||||
# they're 0.1.6 and race to push the same tag.
|
||||
group: auto-tag-runtime
|
||||
cancel-in-progress: false
|
||||
|
||||
jobs:
|
||||
tag:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0 # need full tag history for `git describe` / sort
|
||||
|
||||
- name: Skip when commit asks
|
||||
id: skip
|
||||
run: |
|
||||
MSG=$(git log -1 --format=%B "${{ github.sha }}")
|
||||
if echo "$MSG" | grep -qiE '\[skip-release\]|\[no-release\]'; then
|
||||
echo "skip=true" >> "$GITHUB_OUTPUT"
|
||||
echo "Commit message contains [skip-release] — no tag will be created."
|
||||
else
|
||||
echo "skip=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Determine bump kind from PR label
|
||||
id: bump
|
||||
if: steps.skip.outputs.skip != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ github.token }}
|
||||
run: |
|
||||
# The merged PR for this push commit. `gh pr list --search` finds
|
||||
# closed PRs whose merge commit matches; we take the first.
|
||||
PR=$(gh pr list --state merged --search "${{ github.sha }}" --json number,labels --jq '.[0]' 2>/dev/null || echo "")
|
||||
if [ -z "$PR" ] || [ "$PR" = "null" ]; then
|
||||
echo "No merged PR found for ${{ github.sha }} — defaulting to patch bump."
|
||||
echo "kind=patch" >> "$GITHUB_OUTPUT"
|
||||
exit 0
|
||||
fi
|
||||
LABELS=$(echo "$PR" | jq -r '.labels[].name')
|
||||
if echo "$LABELS" | grep -qx 'release:major'; then
|
||||
echo "kind=major" >> "$GITHUB_OUTPUT"
|
||||
elif echo "$LABELS" | grep -qx 'release:minor'; then
|
||||
echo "kind=minor" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "kind=patch" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Compute next version from latest runtime-v* tag
|
||||
id: version
|
||||
if: steps.skip.outputs.skip != 'true'
|
||||
run: |
|
||||
# Find the highest runtime-vX.Y.Z tag. `sort -V` handles semver
|
||||
# ordering; `grep` filters to the right tag prefix.
|
||||
LATEST=$(git tag --list 'runtime-v*' | sort -V | tail -1)
|
||||
if [ -z "$LATEST" ]; then
|
||||
# No prior tag — start the runtime line at 0.1.0.
|
||||
CURRENT="0.0.0"
|
||||
else
|
||||
CURRENT="${LATEST#runtime-v}"
|
||||
fi
|
||||
MAJOR=$(echo "$CURRENT" | cut -d. -f1)
|
||||
MINOR=$(echo "$CURRENT" | cut -d. -f2)
|
||||
PATCH=$(echo "$CURRENT" | cut -d. -f3)
|
||||
case "${{ steps.bump.outputs.kind }}" in
|
||||
major) MAJOR=$((MAJOR+1)); MINOR=0; PATCH=0;;
|
||||
minor) MINOR=$((MINOR+1)); PATCH=0;;
|
||||
patch) PATCH=$((PATCH+1));;
|
||||
esac
|
||||
NEW="$MAJOR.$MINOR.$PATCH"
|
||||
echo "current=$CURRENT" >> "$GITHUB_OUTPUT"
|
||||
echo "new=$NEW" >> "$GITHUB_OUTPUT"
|
||||
echo "Bumping runtime $CURRENT → $NEW (${{ steps.bump.outputs.kind }})"
|
||||
|
||||
- name: Push new tag
|
||||
if: steps.skip.outputs.skip != 'true'
|
||||
run: |
|
||||
NEW_TAG="runtime-v${{ steps.version.outputs.new }}"
|
||||
git config user.name "github-actions[bot]"
|
||||
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
|
||||
git tag -a "$NEW_TAG" -m "runtime $NEW_TAG (auto-bump from ${{ steps.bump.outputs.kind }})"
|
||||
git push origin "$NEW_TAG"
|
||||
echo "Pushed $NEW_TAG — publish-runtime workflow will fire on the tag."
|
||||
161
.github/workflows/publish-runtime.yml
vendored
Normal file
161
.github/workflows/publish-runtime.yml
vendored
Normal file
@ -0,0 +1,161 @@
|
||||
name: publish-runtime
|
||||
|
||||
# Publishes molecule-ai-workspace-runtime to PyPI from monorepo workspace/.
|
||||
# Monorepo workspace/ is the only source-of-truth for runtime code; this
|
||||
# workflow is the bridge from monorepo edits to the PyPI artifact that
|
||||
# the 8 workspace-template-* repos depend on.
|
||||
#
|
||||
# Triggered by:
|
||||
# - Pushing a tag matching `runtime-vX.Y.Z` (the version is derived from
|
||||
# the tag — `runtime-v0.1.6` publishes `0.1.6`).
|
||||
# - Manual workflow_dispatch with an explicit `version` input (useful for
|
||||
# dev/test releases without tagging the repo).
|
||||
#
|
||||
# The workflow:
|
||||
# 1. Runs scripts/build_runtime_package.py to copy workspace/ →
|
||||
# build/molecule_runtime/ with imports rewritten (`a2a_client` →
|
||||
# `molecule_runtime.a2a_client`).
|
||||
# 2. Builds wheel + sdist with `python -m build`.
|
||||
# 3. Publishes to PyPI via twine + repo secret PYPI_TOKEN.
|
||||
#
|
||||
# After publish: the 8 template repos pick up the new version on their
|
||||
# next image rebuild (their requirements.txt pin
|
||||
# `molecule-ai-workspace-runtime>=0.1.0`, so any new release is eligible).
|
||||
# To force-pull immediately, bump the pin in each template repo's
|
||||
# requirements.txt and merge — that triggers their own publish-image.yml.
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "runtime-v*"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
version:
|
||||
description: "Version to publish (e.g. 0.1.6). Required for manual dispatch."
|
||||
required: true
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
publish:
|
||||
runs-on: ubuntu-latest
|
||||
environment: pypi-publish
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
cache: pip
|
||||
|
||||
- name: Derive version from tag or input
|
||||
id: version
|
||||
run: |
|
||||
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
|
||||
VERSION="${{ inputs.version }}"
|
||||
else
|
||||
# Tag is `runtime-vX.Y.Z` — strip the prefix.
|
||||
VERSION="${GITHUB_REF_NAME#runtime-v}"
|
||||
fi
|
||||
if ! echo "$VERSION" | grep -qE '^[0-9]+\.[0-9]+\.[0-9]+(\.dev[0-9]+|rc[0-9]+|a[0-9]+|b[0-9]+|\.post[0-9]+)?$'; then
|
||||
echo "::error::version $VERSION does not match PEP 440"
|
||||
exit 1
|
||||
fi
|
||||
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
|
||||
echo "Publishing molecule-ai-workspace-runtime $VERSION"
|
||||
|
||||
- name: Install build tooling
|
||||
run: pip install build twine
|
||||
|
||||
- name: Build package from workspace/
|
||||
run: |
|
||||
python scripts/build_runtime_package.py \
|
||||
--version "${{ steps.version.outputs.version }}" \
|
||||
--out "${{ runner.temp }}/runtime-build"
|
||||
|
||||
- name: Build wheel + sdist
|
||||
working-directory: ${{ runner.temp }}/runtime-build
|
||||
run: python -m build
|
||||
|
||||
- name: Verify package contents (sanity)
|
||||
working-directory: ${{ runner.temp }}/runtime-build
|
||||
run: |
|
||||
python -m twine check dist/*
|
||||
# Smoke-import the built wheel to catch import-rewrite mistakes
|
||||
# before they hit PyPI. The package depends on a2a-sdk + httpx
|
||||
# via pyproject; install those so the smoke import resolves.
|
||||
python -m venv /tmp/smoke
|
||||
/tmp/smoke/bin/pip install --quiet dist/*.whl
|
||||
WORKSPACE_ID=00000000-0000-0000-0000-000000000000 \
|
||||
PLATFORM_URL=http://localhost:8080 \
|
||||
/tmp/smoke/bin/python -c "
|
||||
from molecule_runtime import a2a_client, a2a_tools
|
||||
from molecule_runtime.builtin_tools import memory
|
||||
from molecule_runtime.adapters import get_adapter, BaseAdapter, AdapterConfig
|
||||
assert a2a_client._A2A_QUEUED_PREFIX, 'queued prefix missing — chat-leak fix not in build'
|
||||
print('✓ smoke import passed')
|
||||
"
|
||||
|
||||
- name: Publish to PyPI
|
||||
working-directory: ${{ runner.temp }}/runtime-build
|
||||
env:
|
||||
TWINE_USERNAME: __token__
|
||||
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
|
||||
run: python -m twine upload dist/*
|
||||
|
||||
cascade:
|
||||
# After PyPI accepts the upload, fan out a repository_dispatch to each
|
||||
# template repo so they rebuild their image against the new runtime.
|
||||
# Each template's `runtime-published.yml` receiver picks up the event,
|
||||
# pulls the new PyPI version (their requirements.txt pin is `>=`), and
|
||||
# republishes ghcr.io/molecule-ai/workspace-template-<runtime>:latest.
|
||||
#
|
||||
# Soft-fail per repo: if one template's dispatch fails (perms missing,
|
||||
# repo archived, etc.) we still try the others and surface the failures
|
||||
# in the workflow summary instead of aborting the whole cascade.
|
||||
needs: publish
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Fan out repository_dispatch
|
||||
env:
|
||||
# Fine-grained PAT with `actions:write` on the 8 template repos.
|
||||
# GITHUB_TOKEN can't fire dispatches across repos — needs an explicit
|
||||
# token. Stored as a repo secret; rotate per the standard schedule.
|
||||
DISPATCH_TOKEN: ${{ secrets.TEMPLATE_DISPATCH_TOKEN }}
|
||||
RUNTIME_VERSION: ${{ needs.publish.outputs.version || steps.version.outputs.version }}
|
||||
run: |
|
||||
set +e # don't abort on a single repo failure — collect them all
|
||||
if [ -z "$DISPATCH_TOKEN" ]; then
|
||||
echo "::warning::TEMPLATE_DISPATCH_TOKEN secret not set — skipping cascade. PyPI was published; templates will pick up the new version on their own next rebuild."
|
||||
exit 0
|
||||
fi
|
||||
# Re-derive version from the tag here too (in case publish job
|
||||
# didn't expose an output the previous step's reference reads).
|
||||
VERSION="${GITHUB_REF_NAME#runtime-v}"
|
||||
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
|
||||
VERSION="${{ inputs.version }}"
|
||||
fi
|
||||
TEMPLATES="claude-code langgraph crewai autogen deepagents hermes gemini-cli openclaw"
|
||||
FAILED=""
|
||||
for tpl in $TEMPLATES; do
|
||||
REPO="Molecule-AI/molecule-ai-workspace-template-$tpl"
|
||||
STATUS=$(curl -sS -o /tmp/dispatch.out -w "%{http_code}" \
|
||||
-X POST "https://api.github.com/repos/$REPO/dispatches" \
|
||||
-H "Authorization: Bearer $DISPATCH_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
-d "{\"event_type\":\"runtime-published\",\"client_payload\":{\"runtime_version\":\"$VERSION\"}}")
|
||||
if [ "$STATUS" = "204" ]; then
|
||||
echo "✓ dispatched $tpl ($VERSION)"
|
||||
else
|
||||
echo "::warning::✗ failed to dispatch $tpl: HTTP $STATUS — $(cat /tmp/dispatch.out)"
|
||||
FAILED="$FAILED $tpl"
|
||||
fi
|
||||
done
|
||||
if [ -n "$FAILED" ]; then
|
||||
echo "::warning::Cascade incomplete. Failed templates:$FAILED"
|
||||
# Don't fail the whole job — PyPI publish already succeeded;
|
||||
# operators can retry the failed templates manually.
|
||||
fi
|
||||
@ -126,6 +126,13 @@ services:
|
||||
REDIS_URL: redis://redis:6379
|
||||
PORT: "${PLATFORM_PORT:-8080}"
|
||||
PLATFORM_URL: "http://platform:${PLATFORM_PORT:-8080}"
|
||||
# Default MOLECULE_ENV=development so the WorkspaceAuth / AdminAuth
|
||||
# middleware fail-open path activates when ADMIN_TOKEN is unset —
|
||||
# otherwise the canvas (which runs without a bearer in pure local
|
||||
# dev) gets 401 "missing workspace auth token" on every request.
|
||||
# Override to "production" for SaaS/staged deploys; in those modes
|
||||
# ADMIN_TOKEN must also be set or every request rejects.
|
||||
MOLECULE_ENV: "${MOLECULE_ENV:-development}"
|
||||
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:${CANVAS_PUBLISH_PORT:-3000},http://127.0.0.1:${CANVAS_PUBLISH_PORT:-3000},http://localhost:3001}
|
||||
RATE_LIMIT: "${RATE_LIMIT:-1000}"
|
||||
CONFIGS_DIR: /configs
|
||||
@ -153,6 +160,24 @@ services:
|
||||
HIBERNATION_IDLE_MINUTES: "${HIBERNATION_IDLE_MINUTES:-}"
|
||||
# Plugin supply chain hardening (issue #768 / PR #775). Never set in production.
|
||||
PLUGIN_ALLOW_UNPINNED: "${PLUGIN_ALLOW_UNPINNED:-}"
|
||||
# Force ImagePull/ContainerCreate to request linux/amd64 manifests
|
||||
# for the workspace-template-* images. The templates ship single-arch
|
||||
# amd64 today; without this override, an arm64 host (Apple Silicon)
|
||||
# asks the daemon for linux/arm64/v8, which doesn't match the manifest
|
||||
# and the pull fails with "no matching manifest". Apple Silicon will
|
||||
# run the amd64 image under Rosetta — slower (~2-3×) but functional.
|
||||
# Override to "" or another platform when the templates start shipping
|
||||
# multi-arch (then this hardcoded amd64 becomes unnecessary).
|
||||
MOLECULE_IMAGE_PLATFORM: "${MOLECULE_IMAGE_PLATFORM:-linux/amd64}"
|
||||
# GHCR auth for the workspace-images refresh endpoint
|
||||
# (POST /admin/workspace-images/refresh). When set, the platform's
|
||||
# Docker SDK ImagePull on private workspace-template-* images
|
||||
# succeeds without per-host `docker login`. GHCR_USER is the GitHub
|
||||
# username; GHCR_TOKEN is a fine-grained PAT with `read:packages`
|
||||
# on the Molecule-AI org. Both unset → endpoint can only pull
|
||||
# public images (current state for all 8 templates).
|
||||
GHCR_USER: "${GHCR_USER:-}"
|
||||
GHCR_TOKEN: "${GHCR_TOKEN:-}"
|
||||
volumes:
|
||||
- ./workspace-configs-templates:/configs
|
||||
- ./org-templates:/org-templates:ro
|
||||
|
||||
@ -2,29 +2,67 @@
|
||||
|
||||
## Overview
|
||||
|
||||
The shared workspace runtime infrastructure lives in two places:
|
||||
The shared workspace runtime infrastructure has **one editable source** and
|
||||
**one published artifact**:
|
||||
|
||||
1. **Source of truth (monorepo):** `workspace/` — this is where all development happens
|
||||
2. **Published package:** [`molecule-ai-workspace-runtime`](https://pypi.org/project/molecule-ai-workspace-runtime/) on PyPI
|
||||
1. **Source of truth (monorepo, editable):** `workspace/` — every runtime
|
||||
change lands here. Edit it like any other monorepo code.
|
||||
2. **Published artifact (PyPI, generated):** [`molecule-ai-workspace-runtime`](https://pypi.org/project/molecule-ai-workspace-runtime/)
|
||||
— produced by `.github/workflows/publish-runtime.yml` on every
|
||||
`runtime-vX.Y.Z` tag push. Do NOT edit this independently — it gets
|
||||
overwritten on every publish.
|
||||
|
||||
The legacy sibling repo `molecule-ai-workspace-runtime` (the GitHub repo, as
|
||||
distinct from the PyPI package) is no longer the source-of-truth and should
|
||||
be treated as a publish artifact only. It can be archived or used as a
|
||||
read-only mirror.
|
||||
|
||||
## Why this shape
|
||||
|
||||
The 8 workspace template repos (claude-code, langgraph, hermes, etc.) each
|
||||
build their own Docker image and `pip install molecule-ai-workspace-runtime`
|
||||
from PyPI. PyPI is the right distribution channel — semver, reproducible
|
||||
builds, no submodule dance per-repo. But the runtime ALSO needs to evolve
|
||||
in lock-step with the platform's wire protocol (queue shape, A2A metadata,
|
||||
event payloads). Shipping cross-cutting protocol changes as separate
|
||||
runtime + platform PRs in two repos creates ordering pain and broken
|
||||
intermediate states.
|
||||
|
||||
The monorepo + auto-publish split gives both: edit cross-cutting changes
|
||||
in one PR, publish the runtime artifact via a tag.
|
||||
|
||||
## What's in the package
|
||||
|
||||
Everything in `workspace/` except adapter-specific code:
|
||||
Everything in `workspace/*.py` plus the `adapters/`, `builtin_tools/`,
|
||||
`plugins_registry/`, `policies/`, `skill_loader/` subpackages. Build
|
||||
artifacts (`Dockerfile`, `*.sh`, `pytest.ini`, `requirements.txt`) are
|
||||
excluded.
|
||||
|
||||
- `molecule_runtime/` — all shared `.py` files (main.py, config.py, heartbeat.py, etc.)
|
||||
- `molecule_runtime/adapters/` — `BaseAdapter`, `AdapterConfig`, `SetupResult`, `shared_runtime`
|
||||
- `molecule_runtime/builtin_tools/` — delegation, memory, approvals, sandbox, telemetry
|
||||
- `molecule_runtime/skill_loader/` — skill loading + hot-reload
|
||||
- `molecule_runtime/plugins_registry/` — plugin discovery and install pipeline
|
||||
- `molecule_runtime/policies/` — namespace routing policies
|
||||
- Console script: `molecule-runtime` → `molecule_runtime.main:main_sync`
|
||||
The build script rewrites bare imports so the published package is a
|
||||
proper Python namespace:
|
||||
|
||||
```
|
||||
# In monorepo workspace/:
|
||||
from a2a_client import discover_peer
|
||||
from builtin_tools.memory import store
|
||||
|
||||
# In published molecule_runtime/ (auto-rewritten at publish time):
|
||||
from molecule_runtime.a2a_client import discover_peer
|
||||
from molecule_runtime.builtin_tools.memory import store
|
||||
```
|
||||
|
||||
The closed allowlist of rewritten module names lives in
|
||||
`scripts/build_runtime_package.py` (`TOP_LEVEL_MODULES` + `SUBPACKAGES`).
|
||||
Add a new top-level module to workspace/? Add it to the allowlist in the
|
||||
same PR.
|
||||
|
||||
## Adapter repos
|
||||
|
||||
Each of the 8 adapter repos now contains:
|
||||
Each of the 8 adapter template repos contains:
|
||||
- `adapter.py` — runtime-specific `Adapter` class
|
||||
- `requirements.txt` — `molecule-ai-workspace-runtime>=0.1.0` + adapter deps
|
||||
- `Dockerfile` — standalone image (no longer extends workspace-template:base)
|
||||
- `requirements.txt` — `molecule-ai-workspace-runtime>=0.1.X` + adapter deps
|
||||
- `Dockerfile` — standalone image with `ENV ADAPTER_MODULE=adapter` and
|
||||
`ENTRYPOINT ["molecule-runtime"]`
|
||||
|
||||
| Adapter | Repo |
|
||||
|---------|------|
|
||||
@ -39,8 +77,8 @@ Each of the 8 adapter repos now contains:
|
||||
|
||||
## Adapter discovery (ADAPTER_MODULE)
|
||||
|
||||
Standalone adapter repos set `ENV ADAPTER_MODULE=adapter` in their Dockerfile.
|
||||
The runtime's `get_adapter()` checks this env var first:
|
||||
Standalone adapter repos set `ENV ADAPTER_MODULE=adapter` in their
|
||||
Dockerfile. The runtime's `get_adapter()` checks this env var first:
|
||||
|
||||
```python
|
||||
# In molecule_runtime/adapters/__init__.py
|
||||
@ -49,25 +87,104 @@ def get_adapter(runtime: str) -> type[BaseAdapter]:
|
||||
if adapter_module:
|
||||
mod = importlib.import_module(adapter_module)
|
||||
return getattr(mod, "Adapter")
|
||||
# Fall back to built-in subdirectory scan (monorepo local dev)
|
||||
...
|
||||
raise KeyError(...)
|
||||
```
|
||||
|
||||
## Publishing a new version
|
||||
|
||||
```bash
|
||||
cd workspace-template
|
||||
# 1. Bump version in pyproject.toml
|
||||
# 2. Sync to molecule-ai-workspace-runtime repo
|
||||
# 3. Tag and push — CI publishes to PyPI via PYPI_TOKEN secret
|
||||
# From any local checkout of monorepo, after merging your runtime change:
|
||||
git tag runtime-v0.1.6
|
||||
git push origin runtime-v0.1.6
|
||||
```
|
||||
|
||||
Or manually:
|
||||
```bash
|
||||
cd workspace-template
|
||||
python -m build
|
||||
python -m twine upload dist/*
|
||||
The `publish-runtime` workflow takes over — checks out the tag, runs
|
||||
`scripts/build_runtime_package.py --version 0.1.6`, builds wheel + sdist,
|
||||
runs a smoke import to catch broken rewrites, and uploads to PyPI via
|
||||
the `PYPI_TOKEN` repo secret.
|
||||
|
||||
For dev/test releases without tagging, dispatch the workflow manually
|
||||
with an explicit version (e.g. `0.1.6.dev1` — PEP 440 dev/rc/post forms
|
||||
are accepted).
|
||||
|
||||
After publish, the 8 template repos pick up the new version on their
|
||||
next `:latest` rebuild. To force-pull immediately, bump the pin in each
|
||||
template's `requirements.txt`.
|
||||
|
||||
## End-to-end CD chain
|
||||
|
||||
The full chain from monorepo merge → workspace containers running new code:
|
||||
|
||||
```
|
||||
1. Merge PR with workspace/ changes to main
|
||||
↓
|
||||
2. .github/workflows/auto-tag-runtime.yml fires
|
||||
↓ reads PR labels (release:major/minor) or defaults to patch
|
||||
↓ pushes runtime-vX.Y.Z tag
|
||||
↓
|
||||
3. .github/workflows/publish-runtime.yml fires (on the tag)
|
||||
↓ builds wheel via scripts/build_runtime_package.py
|
||||
↓ smoke-imports the wheel
|
||||
↓ uploads to PyPI
|
||||
↓ cascade job fires repository_dispatch (event-type: runtime-published)
|
||||
↓ to all 8 workspace-template-* repos
|
||||
↓
|
||||
4. Each template's publish-image.yml fires (on repository_dispatch)
|
||||
↓ rebuilds Dockerfile (which pip-installs the new PyPI version)
|
||||
↓ pushes ghcr.io/molecule-ai/workspace-template-<runtime>:latest
|
||||
↓
|
||||
5. Production hosts run scripts/refresh-workspace-images.sh
|
||||
OR an operator hits POST /admin/workspace-images/refresh on the platform
|
||||
↓ docker pull all 8 :latest tags
|
||||
↓ remove + force-recreate any running ws-* containers using a refreshed image
|
||||
↓ canvas re-provisions the workspaces on next interaction
|
||||
```
|
||||
|
||||
Steps 1-4 are fully automated. Step 5 is one-click: a single curl or shell
|
||||
command. SaaS deployments typically wire step 5 into their normal deploy
|
||||
pipeline (every release pulls fresh images on every host); local dev fires
|
||||
it manually after a runtime release lands.
|
||||
|
||||
### Required secrets
|
||||
|
||||
| Secret | Where | Why |
|
||||
|---|---|---|
|
||||
| `PYPI_TOKEN` | molecule-core repo | Twine upload auth (PyPI) |
|
||||
| `TEMPLATE_DISPATCH_TOKEN` | molecule-core repo | Fine-grained PAT with `actions:write` on the 8 template repos. Without it the `cascade` job warns and exits clean — PyPI still publishes; templates just don't auto-rebuild. |
|
||||
|
||||
### Step 5 specifics
|
||||
|
||||
**Local dev (compose stack):**
|
||||
```bash
|
||||
bash scripts/refresh-workspace-images.sh # all runtimes
|
||||
bash scripts/refresh-workspace-images.sh --runtime claude-code
|
||||
bash scripts/refresh-workspace-images.sh --no-recreate # pull only, leave containers
|
||||
```
|
||||
|
||||
**Via platform admin endpoint (any deploy):**
|
||||
```bash
|
||||
curl -X POST "$PLATFORM/admin/workspace-images/refresh"
|
||||
curl -X POST "$PLATFORM/admin/workspace-images/refresh?runtime=claude-code"
|
||||
curl -X POST "$PLATFORM/admin/workspace-images/refresh?recreate=false"
|
||||
```
|
||||
|
||||
The endpoint pulls + recreates from inside the platform container, so it
|
||||
needs Docker socket access (the compose stack mounts
|
||||
`/var/run/docker.sock` already) AND GHCR auth on the host's docker config
|
||||
(`docker login ghcr.io` once per host). On a fresh host without GHCR auth,
|
||||
the pull step warns per runtime and the response surfaces the failures.
|
||||
|
||||
## Local dev (build the package without publishing)
|
||||
|
||||
```bash
|
||||
python3 scripts/build_runtime_package.py --version 0.1.0-local --out /tmp/runtime-build
|
||||
cd /tmp/runtime-build
|
||||
python -m build # produces dist/*.whl + dist/*.tar.gz
|
||||
pip install dist/*.whl # install into a venv to test locally
|
||||
```
|
||||
|
||||
This is the same pipeline CI runs. Use it to validate import-rewrite
|
||||
correctness before pushing a `runtime-v*` tag.
|
||||
|
||||
## Writing a new adapter
|
||||
|
||||
@ -75,5 +192,18 @@ python -m twine upload dist/*
|
||||
2. Copy `adapter.py` pattern from any existing adapter repo
|
||||
3. Change imports: `from molecule_runtime.adapters.base import BaseAdapter, AdapterConfig`
|
||||
4. Create `requirements.txt` with `molecule-ai-workspace-runtime>=0.1.0` + your deps
|
||||
5. Create `Dockerfile` with `ENV ADAPTER_MODULE=adapter` and `ENTRYPOINT ["molecule-runtime"]`
|
||||
5. Create `Dockerfile` with `ENV ADAPTER_MODULE=adapter` and
|
||||
`ENTRYPOINT ["molecule-runtime"]`
|
||||
6. Register the runtime name in the platform's known runtimes list
|
||||
|
||||
## Migration note
|
||||
|
||||
Prior to this workflow, the runtime was duplicated across monorepo
|
||||
`workspace/` AND a sibling repo `molecule-ai-workspace-runtime`, with no
|
||||
sync mechanism. That caused 30+ files to drift between the two trees and
|
||||
tonight's chat-leak / queued-classification fixes existed only in the
|
||||
monorepo copy until manually ported.
|
||||
|
||||
If you have an old local checkout of `molecule-ai-workspace-runtime`, treat
|
||||
it as outdated. The monorepo `workspace/` is now authoritative; the PyPI
|
||||
artifact is rebuilt from it on every `runtime-v*` tag.
|
||||
|
||||
298
scripts/build_runtime_package.py
Executable file
298
scripts/build_runtime_package.py
Executable file
@ -0,0 +1,298 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Build the molecule-ai-workspace-runtime PyPI package from monorepo workspace/.
|
||||
|
||||
Monorepo workspace/ is the single source-of-truth for runtime code. The PyPI
|
||||
package is a publish-time mirror produced by this script, NOT a parallel
|
||||
editable copy. Anyone editing the runtime should edit workspace/, never the
|
||||
sibling molecule-ai-workspace-runtime repo.
|
||||
|
||||
What this does
|
||||
--------------
|
||||
1. Copies workspace/ source into build/molecule_runtime/ (note the rename:
|
||||
bare modules become a real Python package).
|
||||
2. Rewrites top-level imports so e.g. `from a2a_client import X` becomes
|
||||
`from molecule_runtime.a2a_client import X`. The rewrite is regex-based
|
||||
on a closed allowlist of modules — third-party imports like `from a2a.X`
|
||||
(the a2a-sdk package) are left alone because the regex is anchored on
|
||||
exact module names.
|
||||
3. Writes a pyproject.toml with the requested version + the README + the
|
||||
py.typed marker.
|
||||
4. Leaves the build dir ready for `python -m build` to produce a wheel/sdist.
|
||||
|
||||
Usage
|
||||
-----
|
||||
scripts/build_runtime_package.py --version 0.1.6 --out /tmp/runtime-build
|
||||
cd /tmp/runtime-build && python -m build
|
||||
python -m twine upload dist/*
|
||||
|
||||
The publish workflow (.github/workflows/publish-runtime.yml) drives this
|
||||
on every `runtime-v*` tag push.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Top-level Python modules in workspace/ that become molecule_runtime.X.
|
||||
# Anything imported as `from <name> import` or `import <name>` (where <name>
|
||||
# matches one of these) gets rewritten to use the package prefix.
|
||||
#
|
||||
# Closed list (not "every .py we copy") because a typo in workspace/ would
|
||||
# otherwise leak into a wrong rewrite. Update this when adding a new
|
||||
# top-level module to workspace/.
|
||||
TOP_LEVEL_MODULES = {
|
||||
"a2a_cli",
|
||||
"a2a_client",
|
||||
"a2a_executor",
|
||||
"a2a_mcp_server",
|
||||
"a2a_tools",
|
||||
"adapter_base",
|
||||
"agent",
|
||||
"agents_md",
|
||||
"claude_sdk_executor",
|
||||
"cli_executor",
|
||||
"config",
|
||||
"consolidation",
|
||||
"coordinator",
|
||||
"events",
|
||||
"executor_helpers",
|
||||
"heartbeat",
|
||||
"hermes_executor",
|
||||
"initial_prompt",
|
||||
"main",
|
||||
"molecule_ai_status",
|
||||
"platform_auth",
|
||||
"plugins",
|
||||
"preflight",
|
||||
"prompt",
|
||||
"shared_runtime",
|
||||
}
|
||||
|
||||
# Subdirectory packages — these are already real packages (they have or will
|
||||
# have __init__.py) so the rewrite is `from <pkg>` → `from molecule_runtime.<pkg>`.
|
||||
SUBPACKAGES = {
|
||||
"adapters",
|
||||
"builtin_tools",
|
||||
"plugins_registry",
|
||||
"policies",
|
||||
"skill_loader",
|
||||
}
|
||||
|
||||
# Files in workspace/ NOT included in the published package. These are
|
||||
# build artifacts, dev scripts, or monorepo-only scaffolding.
|
||||
EXCLUDE_FILES = {
|
||||
"Dockerfile",
|
||||
"build-all.sh",
|
||||
"rebuild-runtime-images.sh",
|
||||
"entrypoint.sh",
|
||||
"pytest.ini",
|
||||
"requirements.txt",
|
||||
# Note: adapter_base.py, agents_md.py, hermes_executor.py, shared_runtime.py
|
||||
# are kept (referenced by adapters/__init__.py and other modules); they get
|
||||
# their imports rewritten via TOP_LEVEL_MODULES. Excluding them broke the
|
||||
# smoke-test install with `ModuleNotFoundError: adapter_base`.
|
||||
}
|
||||
|
||||
EXCLUDE_DIRS = {
|
||||
"__pycache__",
|
||||
"tests",
|
||||
"lib",
|
||||
"molecule_audit",
|
||||
"scripts",
|
||||
}
|
||||
|
||||
|
||||
def build_import_rewriter() -> re.Pattern:
|
||||
"""Compile a single regex matching all import statements that need
|
||||
rewriting. The match groups capture the keyword + module name so the
|
||||
replacement preserves whitespace and trailing punctuation.
|
||||
|
||||
Modules included: TOP_LEVEL_MODULES ∪ SUBPACKAGES.
|
||||
|
||||
The negative-lookahead on `\\.` in the suffix prevents matching
|
||||
`from a2a.server.X import Y` against bare `a2a` (which isn't in our
|
||||
set, but the principle matters for any future short module name that
|
||||
happens to be a prefix of a real package name).
|
||||
"""
|
||||
names = sorted(TOP_LEVEL_MODULES | SUBPACKAGES)
|
||||
alt = "|".join(re.escape(n) for n in names)
|
||||
# Matches:
|
||||
# from <name>(\.|\s|import)
|
||||
# import <name>(\s|$|,)
|
||||
# And captures the keyword + name so we can re-emit with prefix.
|
||||
pattern = (
|
||||
r"(?m)^(?P<indent>\s*)" # leading whitespace (preserved)
|
||||
r"(?P<kw>from|import)\s+" # 'from' or 'import'
|
||||
r"(?P<mod>" + alt + r")" # the module name
|
||||
r"(?P<rest>[\s.,]|$)" # what follows: '.subpath', ' import …', ',', whitespace, EOL
|
||||
)
|
||||
return re.compile(pattern)
|
||||
|
||||
|
||||
def rewrite_imports(text: str, regex: re.Pattern) -> str:
|
||||
"""Replace bare imports with package-prefixed ones.
|
||||
|
||||
`import X` → `import molecule_runtime.X as X` (preserve binding)
|
||||
`from X import Y` → `from molecule_runtime.X import Y`
|
||||
`from X.sub import Y` → `from molecule_runtime.X.sub import Y`
|
||||
"""
|
||||
def repl(m: re.Match) -> str:
|
||||
indent, kw, mod, rest = m.group("indent"), m.group("kw"), m.group("mod"), m.group("rest")
|
||||
if kw == "from":
|
||||
# `from X` or `from X.sub` — always safe to prefix.
|
||||
return f"{indent}from molecule_runtime.{mod}{rest}"
|
||||
# `import X` — preserve the binding name `X` (callers do `X.foo`)
|
||||
# by aliasing. `import X.sub` is uncommon for our modules and would
|
||||
# need a different binding form, but isn't used in workspace/ today.
|
||||
if rest.startswith("."):
|
||||
# `import X.sub` — rewrite as `import molecule_runtime.X.sub` and
|
||||
# leave the trailing dot pattern intact for the rest of the line.
|
||||
return f"{indent}import molecule_runtime.{mod}{rest}"
|
||||
# Plain `import X` — alias preserves the local name.
|
||||
return f"{indent}import molecule_runtime.{mod} as {mod}{rest}"
|
||||
return regex.sub(repl, text)
|
||||
|
||||
|
||||
def copy_tree_filtered(src: Path, dst: Path) -> list[Path]:
|
||||
"""Copy src/ → dst/ skipping EXCLUDE_FILES + EXCLUDE_DIRS. Returns the
|
||||
list of .py files copied so the caller can run the import rewrite over
|
||||
them in one pass."""
|
||||
py_files: list[Path] = []
|
||||
if dst.exists():
|
||||
shutil.rmtree(dst)
|
||||
dst.mkdir(parents=True)
|
||||
for entry in src.iterdir():
|
||||
if entry.is_dir():
|
||||
if entry.name in EXCLUDE_DIRS:
|
||||
continue
|
||||
sub_py = copy_tree_filtered(entry, dst / entry.name)
|
||||
py_files.extend(sub_py)
|
||||
else:
|
||||
if entry.name in EXCLUDE_FILES:
|
||||
continue
|
||||
shutil.copy2(entry, dst / entry.name)
|
||||
if entry.suffix == ".py":
|
||||
py_files.append(dst / entry.name)
|
||||
return py_files
|
||||
|
||||
|
||||
PYPROJECT_TEMPLATE = """\
|
||||
[build-system]
|
||||
requires = ["setuptools>=68.0", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "molecule-ai-workspace-runtime"
|
||||
version = "{version}"
|
||||
description = "Molecule AI workspace runtime — shared infrastructure for all agent adapters"
|
||||
requires-python = ">=3.11"
|
||||
license = {{text = "BSL-1.1"}}
|
||||
readme = "README.md"
|
||||
dependencies = [
|
||||
"a2a-sdk[http-server]>=1.0.0,<2.0",
|
||||
"httpx>=0.27.0",
|
||||
"uvicorn>=0.30.0",
|
||||
"starlette>=0.38.0",
|
||||
"websockets>=12.0",
|
||||
"pyyaml>=6.0",
|
||||
"langchain-core>=0.3.0",
|
||||
"opentelemetry-api>=1.24.0",
|
||||
"opentelemetry-sdk>=1.24.0",
|
||||
"opentelemetry-exporter-otlp-proto-http>=1.24.0",
|
||||
"temporalio>=1.7.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
molecule-runtime = "molecule_runtime.main:main_sync"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["molecule_runtime*"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"molecule_runtime" = ["py.typed"]
|
||||
"""
|
||||
|
||||
|
||||
README_TEMPLATE = """\
|
||||
# molecule-ai-workspace-runtime
|
||||
|
||||
Shared workspace runtime for [Molecule AI](https://github.com/Molecule-AI/molecule-core)
|
||||
agent adapters. Installed by every workspace template image
|
||||
(`workspace-template-claude-code`, `-langgraph`, `-hermes`, etc.) to provide
|
||||
A2A delegation, heartbeat, memory, plugin loading, and skill management.
|
||||
|
||||
This package is **published from the molecule-core monorepo `workspace/`
|
||||
directory** by the `publish-runtime` GitHub Actions workflow on every
|
||||
`runtime-v*` tag push. **Do not edit this package directly** — edit
|
||||
`workspace/` in the monorepo.
|
||||
|
||||
See [`docs/workspace-runtime-package.md`](https://github.com/Molecule-AI/molecule-core/blob/main/docs/workspace-runtime-package.md)
|
||||
for the publish flow and architecture.
|
||||
"""
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--version", required=True, help="Package version, e.g. 0.1.6")
|
||||
parser.add_argument("--out", required=True, type=Path, help="Build output directory (will be wiped)")
|
||||
parser.add_argument("--source", type=Path, default=Path(__file__).resolve().parent.parent / "workspace",
|
||||
help="Path to monorepo workspace/ directory (default: ../workspace from this script)")
|
||||
args = parser.parse_args()
|
||||
|
||||
src = args.source.resolve()
|
||||
out = args.out.resolve()
|
||||
if not src.is_dir():
|
||||
print(f"error: source not a directory: {src}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
pkg_dir = out / "molecule_runtime"
|
||||
print(f"[build] source: {src}")
|
||||
print(f"[build] output: {out}")
|
||||
print(f"[build] package: {pkg_dir}")
|
||||
|
||||
if out.exists():
|
||||
shutil.rmtree(out)
|
||||
out.mkdir(parents=True)
|
||||
|
||||
py_files = copy_tree_filtered(src, pkg_dir)
|
||||
print(f"[build] copied {len(py_files)} .py files")
|
||||
|
||||
# Ensure top-level package marker exists. workspace/ doesn't have one
|
||||
# (it's not a package in monorepo), but the published artifact must.
|
||||
init = pkg_dir / "__init__.py"
|
||||
if not init.exists():
|
||||
init.write_text('"""Molecule AI workspace runtime."""\n')
|
||||
|
||||
# Touch py.typed so type-checkers in adapter consumers see the package
|
||||
# as typed. Empty file is the convention.
|
||||
(pkg_dir / "py.typed").touch()
|
||||
|
||||
# Rewrite imports in every .py file we copied + the new __init__.py.
|
||||
regex = build_import_rewriter()
|
||||
rewrites = 0
|
||||
for f in [*py_files, init]:
|
||||
original = f.read_text()
|
||||
rewritten = rewrite_imports(original, regex)
|
||||
if rewritten != original:
|
||||
f.write_text(rewritten)
|
||||
rewrites += 1
|
||||
print(f"[build] rewrote imports in {rewrites} files")
|
||||
|
||||
# Emit pyproject.toml + README at build root.
|
||||
(out / "pyproject.toml").write_text(PYPROJECT_TEMPLATE.format(version=args.version))
|
||||
(out / "README.md").write_text(README_TEMPLATE)
|
||||
|
||||
print(f"[build] done. To publish:")
|
||||
print(f" cd {out}")
|
||||
print(f" python -m build")
|
||||
print(f" python -m twine upload dist/*")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
95
scripts/refresh-workspace-images.sh
Executable file
95
scripts/refresh-workspace-images.sh
Executable file
@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env bash
|
||||
# refresh-workspace-images.sh — pull the latest workspace template images
|
||||
# from GHCR and recreate any running ws-* containers against the new digest.
|
||||
#
|
||||
# This is the local-dev / single-host equivalent of step 5 of the runtime CD
|
||||
# chain (see docs/workspace-runtime-package.md). On a SaaS deployment the
|
||||
# host's deploy pipeline does the pull on every release; this script is
|
||||
# what to run on a local docker-compose host after a runtime release lands.
|
||||
#
|
||||
# Usage:
|
||||
# bash scripts/refresh-workspace-images.sh # pull all 8 + recreate running ws-*
|
||||
# bash scripts/refresh-workspace-images.sh --runtime claude-code # pull just one template
|
||||
# bash scripts/refresh-workspace-images.sh --no-recreate # pull only, leave containers
|
||||
#
|
||||
# Behavior:
|
||||
# - Always pulls fresh; docker is a no-op if local matches remote, so
|
||||
# repeated runs are cheap.
|
||||
# - Recreate is "kill + remove + let the next canvas interaction re-
|
||||
# provision" — simpler than `docker stop / docker run` because the
|
||||
# platform owns the run flags. Workspaces re-register on next probe.
|
||||
# - If a container is mid-conversation, the kill cancels in-flight work.
|
||||
# Run during a quiet window OR add --no-recreate and recreate manually
|
||||
# via canvas Restart buttons.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[0;33m'
|
||||
RED='\033[0;31m'
|
||||
NC='\033[0m'
|
||||
log() { echo -e "${GREEN}[refresh]${NC} $1" >&2; }
|
||||
warn() { echo -e "${YELLOW}[refresh]${NC} $1" >&2; }
|
||||
err() { echo -e "${RED}[refresh]${NC} $1" >&2; }
|
||||
|
||||
ALL_RUNTIMES=(claude-code langgraph crewai autogen deepagents hermes gemini-cli openclaw)
|
||||
RUNTIMES=("${ALL_RUNTIMES[@]}")
|
||||
RECREATE=true
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
--runtime) RUNTIMES=("$2"); shift 2;;
|
||||
--no-recreate) RECREATE=false; shift;;
|
||||
-h|--help) sed -n '2,30p' "$0"; exit 0;;
|
||||
*) err "unknown arg: $1"; exit 2;;
|
||||
esac
|
||||
done
|
||||
|
||||
# 1. Pull fresh tags. Soft-fail per runtime — one missing image (e.g., a
|
||||
# template that hasn't been published yet) shouldn't abort the others.
|
||||
log "pulling latest images for: ${RUNTIMES[*]}"
|
||||
PULLED=()
|
||||
FAILED=()
|
||||
for rt in "${RUNTIMES[@]}"; do
|
||||
IMG="ghcr.io/molecule-ai/workspace-template-$rt:latest"
|
||||
if docker pull "$IMG" >/dev/null 2>&1; then
|
||||
log " ✓ $rt"
|
||||
PULLED+=("$rt")
|
||||
else
|
||||
warn " ✗ $rt (pull failed — image may not exist or auth missing)"
|
||||
FAILED+=("$rt")
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$RECREATE" = "false" ]; then
|
||||
log "skip-recreate set — leaving containers untouched"
|
||||
log "done. pulled=${#PULLED[@]} failed=${#FAILED[@]}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# 2. Find ws-* containers whose image is one of the runtimes we pulled.
|
||||
# `docker inspect` exposes the image tag/digest each was created from.
|
||||
log "scanning ws-* containers for stale images..."
|
||||
TO_RECREATE=()
|
||||
for cn in $(docker ps -a --filter "name=ws-" --format "{{.Names}}"); do
|
||||
IMG=$(docker inspect "$cn" --format '{{.Config.Image}}' 2>/dev/null || echo "")
|
||||
for rt in "${PULLED[@]}"; do
|
||||
if [[ "$IMG" == *"workspace-template-$rt"* ]]; then
|
||||
TO_RECREATE+=("$cn")
|
||||
break
|
||||
fi
|
||||
done
|
||||
done
|
||||
|
||||
if [ "${#TO_RECREATE[@]}" -eq 0 ]; then
|
||||
log "no running ws-* containers using a refreshed image — nothing to recreate"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# 3. Kill + remove. Canvas next-interaction will re-provision.
|
||||
log "recreating ${#TO_RECREATE[@]} containers (canvas will re-provision on next interaction)"
|
||||
for cn in "${TO_RECREATE[@]}"; do
|
||||
docker rm -f "$cn" >/dev/null 2>&1 && log " removed $cn" || warn " failed to remove $cn"
|
||||
done
|
||||
|
||||
log "done. open the canvas and the workspaces will re-provision against the new image."
|
||||
@ -288,7 +288,7 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
}
|
||||
// logActivity=false: the original EnqueueA2A callsite already logged
|
||||
// the dispatch attempt; re-logging here would double-count events.
|
||||
status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
|
||||
// 202 Accepted = the dispatch was itself queued again (target still busy).
|
||||
// That's not a failure — the queued item just stays queued naturally on
|
||||
@ -321,4 +321,89 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
MarkQueueItemCompleted(ctx, item.ID)
|
||||
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
|
||||
item.ID, workspaceID, item.Attempts)
|
||||
|
||||
// Stitch the response back to the originating delegation row, if this
|
||||
// queue item was a delegation. Without this, check_task_status would
|
||||
// see status='queued' (set by the executeDelegation queued-branch) and
|
||||
// the LLM would think the work was never done. We embed delegation_id
|
||||
// in params.message.metadata at Delegate-handler time; pull it out
|
||||
// here and UPDATE the delegate_result row so the original caller can
|
||||
// observe the real reply.
|
||||
if delegationID := extractDelegationIDFromBody(item.Body); delegationID != "" {
|
||||
h.stitchDrainResponseToDelegation(ctx, callerID, item.WorkspaceID, delegationID, respBody)
|
||||
}
|
||||
}
|
||||
|
||||
// extractDelegationIDFromBody pulls params.message.metadata.delegation_id
|
||||
// out of an A2A JSON-RPC body. Empty string when absent — drain treats
|
||||
// that as "this queue item didn't originate from /workspaces/:id/delegate"
|
||||
// and skips the stitch, so non-delegation queue uses (cross-workspace
|
||||
// peer-direct A2A) aren't affected.
|
||||
func extractDelegationIDFromBody(body []byte) string {
|
||||
var envelope struct {
|
||||
Params struct {
|
||||
Message struct {
|
||||
Metadata struct {
|
||||
DelegationID string `json:"delegation_id"`
|
||||
} `json:"metadata"`
|
||||
} `json:"message"`
|
||||
} `json:"params"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return ""
|
||||
}
|
||||
return envelope.Params.Message.Metadata.DelegationID
|
||||
}
|
||||
|
||||
// stitchDrainResponseToDelegation writes the drained response into the
|
||||
// delegation's existing delegate_result row (created with status='queued'
|
||||
// by executeDelegation when the proxy first returned queued). This is the
|
||||
// other half of the loop that closes "queued → completed" so the LLM's
|
||||
// check_task_status reflects ground truth.
|
||||
//
|
||||
// Errors are logged-only — drain is fire-and-forget from Heartbeat, and a
|
||||
// stitch failure shouldn't block other queued items. The delegation will
|
||||
// just remain stuck at 'queued' in this case, which is the pre-fix
|
||||
// behaviour (no regression vs. shipping nothing).
|
||||
func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context, sourceID, targetID, delegationID string, respBody []byte) {
|
||||
if sourceID == "" || delegationID == "" {
|
||||
return
|
||||
}
|
||||
responseText := extractResponseText(respBody)
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"text": responseText,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
res, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE activity_logs
|
||||
SET status = 'completed',
|
||||
summary = $1,
|
||||
response_body = $2::jsonb
|
||||
WHERE workspace_id = $3
|
||||
AND method = 'delegate_result'
|
||||
AND target_id = $4
|
||||
AND response_body->>'delegation_id' = $5
|
||||
`, "Delegation completed ("+truncate(responseText, 80)+")", string(respJSON),
|
||||
sourceID, targetID, delegationID)
|
||||
if err != nil {
|
||||
log.Printf("A2AQueue drain stitch: update failed for delegation %s: %v", delegationID, err)
|
||||
return
|
||||
}
|
||||
if rows, _ := res.RowsAffected(); rows == 0 {
|
||||
log.Printf("A2AQueue drain stitch: no delegate_result row for delegation %s (queued-row may not exist yet)", delegationID)
|
||||
return
|
||||
}
|
||||
log.Printf("A2AQueue drain stitch: delegation %s queued → completed (%d chars)", delegationID, len(responseText))
|
||||
|
||||
// Broadcast DELEGATION_COMPLETE so the canvas chat feed flips the
|
||||
// "⏸ queued" line to "✓ completed" in real time. Without this the
|
||||
// transition only surfaces after the user reloads or polls activity.
|
||||
if h.broadcaster != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_COMPLETE", sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"target_id": targetID,
|
||||
"response_preview": truncate(responseText, 200),
|
||||
"via": "queue_drain",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,6 +80,39 @@ func TestExtractIdempotencyKey_emptyOnMissing(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractDelegationIDFromBody(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "delegation body — metadata.delegation_id present",
|
||||
body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"abc-123","metadata":{"delegation_id":"abc-123"},"parts":[{"type":"text","text":"hi"}]}}}`,
|
||||
want: "abc-123",
|
||||
},
|
||||
{
|
||||
name: "non-delegation body — no metadata (peer-direct A2A)",
|
||||
body: `{"method":"message/send","params":{"message":{"role":"user","messageId":"m-1","parts":[{"type":"text","text":"hi"}]}}}`,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "metadata present but no delegation_id key",
|
||||
body: `{"params":{"message":{"metadata":{"trace_id":"t-1"}}}}`,
|
||||
want: "",
|
||||
},
|
||||
{"malformed JSON", `not json`, ""},
|
||||
{"empty body", ``, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := extractDelegationIDFromBody([]byte(tc.body)); got != tc.want {
|
||||
t.Errorf("extractDelegationIDFromBody = %q, want %q", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// DrainQueueForWorkspace — nil-safe error extraction regression tests
|
||||
//
|
||||
|
||||
@ -286,6 +286,37 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
|
||||
"name": wsName,
|
||||
})
|
||||
|
||||
// Persist to activity_logs so the chat history loader restores this
|
||||
// message after a page reload. Pre-fix, send_message_to_user pushes
|
||||
// were broadcast-only — survived the WebSocket session but vanished
|
||||
// when the user refreshed because nothing wrote them to the DB.
|
||||
//
|
||||
// Shape chosen to match the existing loader query
|
||||
// (`type=a2a_receive&source=canvas`):
|
||||
// - activity_type='a2a_receive' so it joins the same query path
|
||||
// - source_id=NULL so the canvas-source filter accepts it
|
||||
// - method='notify' to distinguish from real A2A receives in audits
|
||||
// - request_body=NULL so the loader doesn't append a duplicate
|
||||
// "user message" bubble for it
|
||||
// - response_body={"result": "<text>"} matches extractResponseText's
|
||||
// simplest branch ({result: string} → take verbatim)
|
||||
//
|
||||
// Errors are logged-only — broadcast already succeeded, the user
|
||||
// sees the message; persistence failure just means the message
|
||||
// won't survive reload (pre-fix behavior). Don't fail the whole
|
||||
// notify on a DB hiccup.
|
||||
respJSON, _ := json.Marshal(map[string]interface{}{"result": body.Message})
|
||||
preview := body.Message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
}
|
||||
if _, err := db.DB.ExecContext(c.Request.Context(), `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "sent"})
|
||||
}
|
||||
|
||||
|
||||
@ -217,6 +217,86 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
|
||||
// Regression guard for the "responses gone on reload" bug. send_message_to_user
|
||||
// pushes (which route through Notify) used to be broadcast-only — they
|
||||
// rendered in the canvas but vanished on page reload because nothing
|
||||
// wrote them to activity_logs. The chat history loader queries
|
||||
// `type=a2a_receive&source=canvas`, so the persisted row must:
|
||||
// - Use activity_type='a2a_receive' (loader's filter)
|
||||
// - Have source_id NULL (canvas-source filter)
|
||||
// - Carry the message text in response_body so extractResponseText
|
||||
// can reconstruct the agent reply on reload
|
||||
mockDB, mock, _ := sqlmock.New()
|
||||
defer mockDB.Close()
|
||||
db.DB = mockDB
|
||||
|
||||
// Workspace existence check
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces`).
|
||||
WithArgs("ws-notify").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
|
||||
|
||||
// Persistence INSERT — verify shape
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
"ws-notify",
|
||||
sqlmock.AnyArg(), // summary
|
||||
sqlmock.AnyArg(), // response_body JSON
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-notify"}}
|
||||
body := `{"message":"agent reply that arrived after the sync POST timed out"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-notify/notify", strings.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Notify(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
|
||||
// Persistence is best-effort — a DB hiccup must NOT block the
|
||||
// WebSocket push (which the user is already seeing in their open
|
||||
// canvas). Pre-fix the WS push always succeeded; we don't want
|
||||
// the new persistence step to regress that path.
|
||||
mockDB, mock, _ := sqlmock.New()
|
||||
defer mockDB.Close()
|
||||
db.DB = mockDB
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces`).
|
||||
WithArgs("ws-x").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnError(fmt.Errorf("simulated db hiccup"))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-x"}}
|
||||
body := `{"message":"hi"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-x/notify", strings.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Notify(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("DB failure must not break the response; got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Direct unit tests for SessionSearch helpers ====================
|
||||
|
||||
// --- parseSessionSearchParams ---
|
||||
|
||||
227
workspace-server/internal/handlers/admin_workspace_images.go
Normal file
227
workspace-server/internal/handlers/admin_workspace_images.go
Normal file
@ -0,0 +1,227 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
dockerimage "github.com/docker/docker/api/types/image"
|
||||
dockerclient "github.com/docker/docker/client"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
)
|
||||
|
||||
// AdminWorkspaceImagesHandler serves POST /admin/workspace-images/refresh — the
|
||||
// production-side end of the runtime CD chain. Operators (or post-publish
|
||||
// automation) hit this to (1) pull the latest workspace template images from
|
||||
// GHCR via the Docker SDK and (2) recreate any running ws-* containers so
|
||||
// they adopt the new image. Without this, a freshly-published runtime sits
|
||||
// in the registry but containers keep running the old image until the next
|
||||
// manual restart.
|
||||
//
|
||||
// On a SaaS deployment the deploy pipeline already pulls on every release,
|
||||
// so the pull step is a no-op there; the recreate step is still the way to
|
||||
// make running workspaces adopt the new image without a full host restart.
|
||||
//
|
||||
// POST /admin/workspace-images/refresh
|
||||
//
|
||||
// ?runtime=claude-code (optional; default = all 8 templates)
|
||||
// &recreate=true|false (default true; false = pull only)
|
||||
//
|
||||
// Returns JSON {pulled: [...], failed: [...], recreated: [...]}
|
||||
type AdminWorkspaceImagesHandler struct {
|
||||
docker *dockerclient.Client
|
||||
}
|
||||
|
||||
func NewAdminWorkspaceImagesHandler(docker *dockerclient.Client) *AdminWorkspaceImagesHandler {
|
||||
return &AdminWorkspaceImagesHandler{docker: docker}
|
||||
}
|
||||
|
||||
// allRuntimes is the canonical list mirroring docs/workspace-runtime-package.md.
|
||||
// Update both when a new template is added.
|
||||
var allRuntimes = []string{
|
||||
"claude-code", "langgraph", "crewai", "autogen",
|
||||
"deepagents", "hermes", "gemini-cli", "openclaw",
|
||||
}
|
||||
|
||||
type refreshResult struct {
|
||||
Pulled []string `json:"pulled"`
|
||||
Failed []string `json:"failed"`
|
||||
Recreated []string `json:"recreated"`
|
||||
}
|
||||
|
||||
// ghcrAuthHeader returns the base64-encoded JSON auth payload Docker's
|
||||
// ImagePull expects in PullOptions.RegistryAuth, or empty string when no
|
||||
// GHCR_USER/GHCR_TOKEN env is set (lets public images pull through).
|
||||
//
|
||||
// The Docker SDK doesn't read ~/.docker/config.json — every authenticated
|
||||
// pull needs an explicit RegistryAuth string. Format per the Docker
|
||||
// engine API: {"username":"…","password":"…","serveraddress":"ghcr.io"}
|
||||
// → base64-encoded JSON with no trailing padding stripped (engine handles
|
||||
// either form).
|
||||
func ghcrAuthHeader() string {
|
||||
user := strings.TrimSpace(os.Getenv("GHCR_USER"))
|
||||
token := strings.TrimSpace(os.Getenv("GHCR_TOKEN"))
|
||||
if user == "" || token == "" {
|
||||
return ""
|
||||
}
|
||||
payload := map[string]string{
|
||||
"username": user,
|
||||
"password": token,
|
||||
"serveraddress": "ghcr.io",
|
||||
}
|
||||
js, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
// Should be unreachable for a static map[string]string. Log so a
|
||||
// future contributor adding a non-marshallable field notices.
|
||||
log.Printf("workspace-images: failed to marshal GHCR auth: %v", err)
|
||||
return ""
|
||||
}
|
||||
return base64.URLEncoding.EncodeToString(js)
|
||||
}
|
||||
|
||||
func (h *AdminWorkspaceImagesHandler) Refresh(c *gin.Context) {
|
||||
runtimes := allRuntimes
|
||||
if r := c.Query("runtime"); r != "" {
|
||||
// Accept a single runtime; reject anything not in the canonical list
|
||||
// so a typo doesn't silently no-op.
|
||||
found := false
|
||||
for _, known := range allRuntimes {
|
||||
if known == r {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": fmt.Sprintf("unknown runtime: %s", r),
|
||||
"known_runtimes": allRuntimes,
|
||||
})
|
||||
return
|
||||
}
|
||||
runtimes = []string{r}
|
||||
}
|
||||
recreate := c.DefaultQuery("recreate", "true") == "true"
|
||||
|
||||
res := refreshResult{Pulled: []string{}, Failed: []string{}, Recreated: []string{}}
|
||||
auth := ghcrAuthHeader()
|
||||
|
||||
// 1. Pull each template image via the Docker SDK. Soft-fail per-runtime
|
||||
// so one missing image (e.g. unpublished template) doesn't abort
|
||||
// the others. Each pull's progress stream is drained to completion
|
||||
// — the engine treats early-close as "abandon", leaving partial
|
||||
// layers around with no reference.
|
||||
pullCtx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute)
|
||||
defer cancel()
|
||||
for _, rt := range runtimes {
|
||||
image := fmt.Sprintf("ghcr.io/molecule-ai/workspace-template-%s:latest", rt)
|
||||
opts := dockerimage.PullOptions{Platform: provisioner.DefaultImagePlatform()}
|
||||
if auth != "" {
|
||||
opts.RegistryAuth = auth
|
||||
}
|
||||
rc, err := h.docker.ImagePull(pullCtx, image, opts)
|
||||
if err != nil {
|
||||
log.Printf("workspace-images/refresh: pull %s failed: %v", rt, err)
|
||||
res.Failed = append(res.Failed, rt)
|
||||
continue
|
||||
}
|
||||
// Drain to completion. We discard progress payload because no
|
||||
// caller renders it; the platform log already records pulled/failed
|
||||
// per runtime. If a future caller wants live progress, decode the
|
||||
// JSON-line stream into events here.
|
||||
if _, err := io.Copy(io.Discard, rc); err != nil {
|
||||
rc.Close()
|
||||
log.Printf("workspace-images/refresh: drain %s failed: %v", rt, err)
|
||||
res.Failed = append(res.Failed, rt)
|
||||
continue
|
||||
}
|
||||
rc.Close()
|
||||
res.Pulled = append(res.Pulled, rt)
|
||||
}
|
||||
|
||||
if !recreate {
|
||||
c.JSON(http.StatusOK, res)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Find ws-* containers running an image we just pulled. Recreate
|
||||
// them — kill+remove and let the platform's normal provisioning
|
||||
// flow re-create on next canvas interaction.
|
||||
listCtx, listCancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
||||
defer listCancel()
|
||||
containers, err := h.docker.ContainerList(listCtx, container.ListOptions{
|
||||
All: true,
|
||||
Filters: filters.NewArgs(filters.Arg("name", "ws-")),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("workspace-images/refresh: container list failed: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "container list failed", "partial_result": res})
|
||||
return
|
||||
}
|
||||
|
||||
pulledSet := map[string]struct{}{}
|
||||
for _, rt := range res.Pulled {
|
||||
pulledSet[rt] = struct{}{}
|
||||
}
|
||||
for _, ctr := range containers {
|
||||
// ContainerList's ctr.Image is the *resolved digest* (sha256:…),
|
||||
// not the human-readable tag. Use ContainerInspect to get the
|
||||
// original Config.Image (e.g. "ghcr.io/molecule-ai/workspace-
|
||||
// template-claude-code:latest") so we can match against the
|
||||
// pulled-runtime set. The cost is one extra round-trip per
|
||||
// ws-* container — there are at most 8 typically, so this is
|
||||
// well below any UX threshold.
|
||||
inspectCtx, inspectCancel := context.WithTimeout(c.Request.Context(), 10*time.Second)
|
||||
full, err := h.docker.ContainerInspect(inspectCtx, ctr.ID)
|
||||
inspectCancel()
|
||||
if err != nil {
|
||||
log.Printf("workspace-images/refresh: inspect %s failed: %v", ctr.ID[:12], err)
|
||||
continue
|
||||
}
|
||||
imageRef := ""
|
||||
if full.Config != nil {
|
||||
imageRef = full.Config.Image
|
||||
}
|
||||
matched := ""
|
||||
for rt := range pulledSet {
|
||||
if strings.Contains(imageRef, "workspace-template-"+rt) {
|
||||
matched = rt
|
||||
break
|
||||
}
|
||||
}
|
||||
if matched == "" {
|
||||
continue
|
||||
}
|
||||
name := strings.TrimPrefix(ctr.Names[0], "/")
|
||||
// Remove with force — the workspace will re-provision on the next
|
||||
// canvas interaction. This drops in-flight conversations on the
|
||||
// removed container; document via the response so callers can
|
||||
// schedule the refresh during a quiet window.
|
||||
rmCtx, rmCancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
|
||||
err = h.docker.ContainerRemove(rmCtx, ctr.ID, container.RemoveOptions{Force: true})
|
||||
rmCancel()
|
||||
if err != nil {
|
||||
log.Printf("workspace-images/refresh: remove %s failed: %v", name, err)
|
||||
continue
|
||||
}
|
||||
res.Recreated = append(res.Recreated, name)
|
||||
}
|
||||
|
||||
authStatus := "no GHCR auth (public images only)"
|
||||
if auth != "" {
|
||||
authStatus = "GHCR_USER/GHCR_TOKEN auth"
|
||||
}
|
||||
log.Printf("workspace-images/refresh: pulled=%d failed=%d recreated=%d (%s)",
|
||||
len(res.Pulled), len(res.Failed), len(res.Recreated), authStatus)
|
||||
c.JSON(http.StatusOK, res)
|
||||
}
|
||||
@ -0,0 +1,73 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGHCRAuthHeader_NoEnvReturnsEmpty(t *testing.T) {
|
||||
t.Setenv("GHCR_USER", "")
|
||||
t.Setenv("GHCR_TOKEN", "")
|
||||
if got := ghcrAuthHeader(); got != "" {
|
||||
t.Errorf("expected empty (no auth → public-only), got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGHCRAuthHeader_PartialEnvReturnsEmpty(t *testing.T) {
|
||||
// Both must be set — defensive against half-configured env.
|
||||
t.Setenv("GHCR_USER", "alice")
|
||||
t.Setenv("GHCR_TOKEN", "")
|
||||
if got := ghcrAuthHeader(); got != "" {
|
||||
t.Errorf("user-only env should disable auth, got %q", got)
|
||||
}
|
||||
t.Setenv("GHCR_USER", "")
|
||||
t.Setenv("GHCR_TOKEN", "fake-tok-xxx")
|
||||
if got := ghcrAuthHeader(); got != "" {
|
||||
t.Errorf("token-only env should disable auth, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGHCRAuthHeader_EncodesDockerEnginePayload(t *testing.T) {
|
||||
t.Setenv("GHCR_USER", "alice")
|
||||
t.Setenv("GHCR_TOKEN", "fake-tok-value")
|
||||
got := ghcrAuthHeader()
|
||||
if got == "" {
|
||||
t.Fatal("expected non-empty auth header")
|
||||
}
|
||||
raw, err := base64.URLEncoding.DecodeString(got)
|
||||
if err != nil {
|
||||
t.Fatalf("auth header is not valid base64-url: %v", err)
|
||||
}
|
||||
var payload map[string]string
|
||||
if err := json.Unmarshal(raw, &payload); err != nil {
|
||||
t.Fatalf("decoded auth is not valid JSON: %v (raw=%s)", err, raw)
|
||||
}
|
||||
if payload["username"] != "alice" {
|
||||
t.Errorf("username: got %q, want alice", payload["username"])
|
||||
}
|
||||
if payload["password"] != "fake-tok-value" {
|
||||
t.Errorf("password: got %q, want fake-tok-value", payload["password"])
|
||||
}
|
||||
if payload["serveraddress"] != "ghcr.io" {
|
||||
t.Errorf("serveraddress: got %q, want ghcr.io", payload["serveraddress"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGHCRAuthHeader_TrimsWhitespace(t *testing.T) {
|
||||
// .env lines often have trailing newlines or accidental spaces. Without
|
||||
// trimming, a stray space would produce an auth payload the engine
|
||||
// rejects with a confusing 401.
|
||||
t.Setenv("GHCR_USER", " alice ")
|
||||
t.Setenv("GHCR_TOKEN", "\tfake-tok-value\n")
|
||||
got := ghcrAuthHeader()
|
||||
raw, _ := base64.URLEncoding.DecodeString(got)
|
||||
var payload map[string]string
|
||||
_ = json.Unmarshal(raw, &payload)
|
||||
if payload["username"] != "alice" {
|
||||
t.Errorf("username not trimmed: got %q", payload["username"])
|
||||
}
|
||||
if payload["password"] != "fake-tok-value" {
|
||||
t.Errorf("password not trimmed: got %q", payload["password"])
|
||||
}
|
||||
}
|
||||
@ -78,13 +78,21 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
// reason (logged); we still dispatch the A2A request and surface the
|
||||
// warning in the response.
|
||||
|
||||
// Build A2A payload
|
||||
// Build A2A payload. Embedding delegation_id in metadata gives the
|
||||
// queue drain path a way to look up the originating delegation row
|
||||
// when stitching the response back (issue: previously the drain
|
||||
// dispatched successfully but discarded the response, so
|
||||
// check_task_status returned status='queued' forever even after a
|
||||
// real reply landed). messageId mirrors delegation_id so the
|
||||
// platform's idempotency-key extraction also keys off the same id.
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": body.Task}},
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
@ -284,6 +292,40 @@ func (h *DelegationHandler) executeDelegation(sourceID, targetID, delegationID s
|
||||
return
|
||||
}
|
||||
|
||||
// 202 + {queued: true} means the target was busy and the proxy
|
||||
// enqueued the request for the next drain tick — NOT a completion.
|
||||
// Treat it as such: write a clean 'queued' activity row with no
|
||||
// JSON-as-text leakage into the summary, broadcast a status update,
|
||||
// and return. The eventual drain doesn't (yet) feed a result back
|
||||
// into this delegation, so callers polling check_task_status will
|
||||
// see status='queued' and know to retry instead of believing the
|
||||
// queued JSON is the agent's reply. Fixes the chat-leak where the
|
||||
// LLM echoed "Delegation completed (workspace agent busy ...)" to
|
||||
// the user.
|
||||
if status == http.StatusAccepted && isQueuedProxyResponse(respBody) {
|
||||
log.Printf("Delegation %s: target %s busy — queued for drain", delegationID, targetID)
|
||||
h.updateDelegationStatus(sourceID, delegationID, "queued", "")
|
||||
// Store delegation_id in response_body so DrainQueueForWorkspace's
|
||||
// stitch step can find this row by JSON-path key after the queued
|
||||
// dispatch eventually succeeds. Without the key, the drain finds
|
||||
// the row by (workspace_id, target_id, method) but can't tell
|
||||
// multiple-queued-delegations-to-same-target apart.
|
||||
queuedJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"queued": true,
|
||||
})
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
|
||||
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
|
||||
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "DELEGATION_STATUS", sourceID, map[string]interface{}{
|
||||
"delegation_id": delegationID, "target_id": targetID, "status": "queued",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// A2A returned 200 — target received and processed the task
|
||||
// Status: dispatched → received → completed (we don't have a separate "received" signal from the target yet)
|
||||
responseText := extractResponseText(respBody)
|
||||
@ -517,6 +559,21 @@ func isTransientProxyError(err *proxyA2AError) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// isQueuedProxyResponse reports whether the proxy returned a body shaped like
|
||||
// `{"queued": true, "queue_id": ..., "queue_depth": ..., "message": ...}` —
|
||||
// the busy-target enqueue path in a2a_proxy_helpers.go. Caller checks this
|
||||
// alongside HTTP 202 to distinguish a successful agent reply from a deferred
|
||||
// dispatch; without the distinction we'd write the queued-message JSON into
|
||||
// the delegation result row and the LLM would surface it as agent output.
|
||||
func isQueuedProxyResponse(body []byte) bool {
|
||||
var resp map[string]interface{}
|
||||
if json.Unmarshal(body, &resp) != nil {
|
||||
return false
|
||||
}
|
||||
queued, _ := resp["queued"].(bool)
|
||||
return queued
|
||||
}
|
||||
|
||||
func extractResponseText(body []byte) string {
|
||||
var resp map[string]interface{}
|
||||
if json.Unmarshal(body, &resp) != nil {
|
||||
|
||||
@ -376,6 +376,39 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsQueuedProxyResponse(t *testing.T) {
|
||||
// Regression guard for the chat-leak bug: when the proxy returns
|
||||
// 202 with a queued-shape body, executeDelegation must classify it
|
||||
// as "queued" — not "completed". Mis-classifying it causes the
|
||||
// queued JSON to land in activity_logs.summary, which the LLM then
|
||||
// echoes verbatim into the agent chat as
|
||||
// "Delegation completed: Delegation completed (workspace agent
|
||||
// busy — request queued, will dispatch...)".
|
||||
cases := []struct {
|
||||
name string
|
||||
body string
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "real proxy busy-enqueue body",
|
||||
body: `{"queued":true,"queue_id":"d0993390-5f5a-4f5d-90a2-66639e53e3c9","queue_depth":1,"message":"workspace agent busy — request queued, will dispatch when capacity available"}`,
|
||||
want: true,
|
||||
},
|
||||
{"queued false explicitly", `{"queued":false}`, false},
|
||||
{"queued field absent (real A2A reply)", `{"jsonrpc":"2.0","id":"1","result":{"kind":"message","parts":[{"kind":"text","text":"hi"}]}}`, false},
|
||||
{"non-bool queued value (defensive)", `{"queued":"true"}`, false},
|
||||
{"malformed JSON", `not-json`, false},
|
||||
{"empty body", ``, false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := isQueuedProxyResponse([]byte(tc.body)); got != tc.want {
|
||||
t.Errorf("isQueuedProxyResponse(%q) = %v, want %v", tc.body, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationRetryDelay_IsSaneWindow(t *testing.T) {
|
||||
// Regression guard: the retry delay must be long enough for the
|
||||
// reactive URL refresh in proxyA2ARequest to kick in (which involves
|
||||
|
||||
@ -127,7 +127,16 @@ var testAllowLoopback = false
|
||||
// container deployments the relaxation is off and every private range
|
||||
// stays blocked.
|
||||
func isPrivateOrMetadataIP(ip net.IP) bool {
|
||||
saas := saasMode()
|
||||
// MOLECULE_ENV=development is the dev-host pattern: platform and
|
||||
// workspace containers share a docker bridge network (172.18.0.0/16,
|
||||
// RFC-1918). Treat that the same as SaaS for private-range relaxation
|
||||
// — both share the "trusted intra-network routing" property. Without
|
||||
// this, every workspace registration via docker-internal hostname
|
||||
// resolves to 172.18.x.x and gets rejected as
|
||||
// "workspace URL is not publicly routable", breaking the entire
|
||||
// docker-compose dev loop. Always-blocked categories (metadata link-
|
||||
// local, TEST-NET, CGNAT) remain blocked regardless.
|
||||
saas := saasMode() || devModeAllowsLoopback()
|
||||
|
||||
// IPv4 path.
|
||||
if ip4 := ip.To4(); ip4 != nil {
|
||||
|
||||
@ -1082,6 +1082,13 @@ func pullImageAndDrain(ctx context.Context, cli dockerImageClient, ref, platform
|
||||
//
|
||||
// Tracked in issue #1875; remove this fallback once the template repos
|
||||
// publish multi-arch manifests.
|
||||
// DefaultImagePlatform is the exported alias used by the admin
|
||||
// workspace-images handler so its ImagePull picks the same platform as
|
||||
// the provisioner's. Avoids duplicating the Apple-Silicon-needs-amd64
|
||||
// logic and keeps both call sites in sync if Docker manifest support
|
||||
// changes (e.g., when the templates start shipping multi-arch).
|
||||
func DefaultImagePlatform() string { return defaultImagePlatform() }
|
||||
|
||||
func defaultImagePlatform() string {
|
||||
if v, ok := os.LookupEnv("MOLECULE_IMAGE_PLATFORM"); ok {
|
||||
return v
|
||||
|
||||
@ -402,6 +402,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.POST("/admin/a2a-queue/drop-stale", middleware.AdminAuth(db.DB), qH.DropStale)
|
||||
}
|
||||
|
||||
// Admin — workspace template image refresh. Pulls latest images from GHCR
|
||||
// and recreates running ws-* containers so they adopt the new image.
|
||||
// Final step of the runtime CD chain — see docs/workspace-runtime-package.md.
|
||||
// Operators (or post-publish automation) hit this after a runtime release.
|
||||
// Reuses the provisioner's Docker client; no-op when prov is nil
|
||||
// (test / non-Docker deploy).
|
||||
if prov != nil {
|
||||
imgH := handlers.NewAdminWorkspaceImagesHandler(prov.DockerClient())
|
||||
r.POST("/admin/workspace-images/refresh", middleware.AdminAuth(db.DB), imgH.Refresh)
|
||||
}
|
||||
|
||||
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
|
||||
// NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and
|
||||
// fresh installs use to obtain their first admin bearer. Adding AdminAuth
|
||||
|
||||
Loading…
Reference in New Issue
Block a user