Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| eb21a02b6d | |||
| 498ce4e287 | |||
| 7081a8e900 | |||
| da4b86a159 | |||
| 81d864f4bc | |||
| c9795a6c4d | |||
| f5dc55f1d1 | |||
| fd92df486c | |||
| fc7498fef0 | |||
| 51dcca592d | |||
| 27c1e18e98 | |||
| 4f85ef5209 |
@@ -44,6 +44,8 @@ name: E2E Peer Visibility (literal MCP list_peers)
|
||||
# - No cross-repo `uses:` (feedback_gitea_cross_repo_uses_blocked). The
|
||||
# actions/checkout SHA is the one e2e-staging-canvas.yml already uses
|
||||
# successfully (a mirrored SHA — see #1277/PR#1292 root-cause).
|
||||
# - 2026-05-21 retrigger: verify fresh platform-tenant image after the
|
||||
# publish Buildx DOCKER_CONFIG fix restored staging-latest image updates.
|
||||
# - Per-SHA concurrency, not global (feedback_concurrency_group_per_sha).
|
||||
# - Workflow-level GITHUB_SERVER_URL pinned
|
||||
# (feedback_act_runner_github_server_url).
|
||||
|
||||
@@ -25,8 +25,11 @@ name: publish-workspace-server-image
|
||||
# staging-<sha>. Set repo variable or secret PROD_AUTO_DEPLOY_DISABLED=true
|
||||
# to stop production rollout while keeping image publishing enabled.
|
||||
#
|
||||
# ECR target: 153263036946.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/*
|
||||
# Primary ECR target: 153263036946.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/*
|
||||
# Optional staging tenant mirror target:
|
||||
# 004947743811.dkr.ecr.us-east-2.amazonaws.com/molecule-ai/platform-tenant
|
||||
# Required secrets: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AUTO_SYNC_TOKEN
|
||||
# Optional secrets: AWS_STAGING_ECR_ACCESS_KEY_ID, AWS_STAGING_ECR_SECRET_ACCESS_KEY
|
||||
#
|
||||
# mc#711: Docker daemon not accessible on ubuntu-latest runner (molecule-canonical-1
|
||||
# shows client-only in `docker info` — daemon not running). DinD mount is present but
|
||||
@@ -65,6 +68,7 @@ env:
|
||||
# use below in this repo's staging-verify.yml.
|
||||
IMAGE_NAME: ${{ vars.ECR_REGISTRY || '153263036946.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform
|
||||
TENANT_IMAGE_NAME: ${{ vars.ECR_REGISTRY || '153263036946.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform-tenant
|
||||
STAGING_TENANT_IMAGE_NAME: ${{ vars.STAGING_ECR_REGISTRY || '004947743811.dkr.ecr.us-east-2.amazonaws.com' }}/molecule-ai/platform-tenant
|
||||
|
||||
jobs:
|
||||
build-and-push:
|
||||
@@ -135,6 +139,18 @@ jobs:
|
||||
run: |
|
||||
echo "sha=${GITHUB_SHA::7}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
# Keep Buildx state inside the job temp dir. The publish runner's
|
||||
# inherited DOCKER_CONFIG can point at a host-owned ECR config path
|
||||
# (/home/hongming/.docker-ecr), which caused setup-buildx-action to
|
||||
# fail before image build with EACCES creating buildx/certs.
|
||||
- name: Prepare writable Docker config
|
||||
run: |
|
||||
set -euo pipefail
|
||||
export DOCKER_CONFIG="$RUNNER_TEMP/docker-config"
|
||||
mkdir -p "$DOCKER_CONFIG/buildx/certs"
|
||||
echo "DOCKER_CONFIG=$DOCKER_CONFIG" >> "$GITHUB_ENV"
|
||||
docker buildx version
|
||||
|
||||
# Build + push platform image (inline ECR auth — mirrors the operator-host
|
||||
# approach; credentials come from GITHUB_SECRET_AWS_ACCESS_KEY_ID /
|
||||
# GITHUB_SECRET_AWS_SECRET_ACCESS_KEY in Gitea Actions).
|
||||
@@ -170,21 +186,46 @@ jobs:
|
||||
--push .
|
||||
|
||||
# Build + push tenant image (Go platform + Next.js canvas in one image).
|
||||
# When staging ECR publisher credentials are configured, push the same
|
||||
# build to the staging account too so fresh staging/E2E tenants can pull
|
||||
# without cross-account ECR permissions.
|
||||
- name: Build & push tenant image to ECR (staging-<sha> + staging-latest)
|
||||
env:
|
||||
TENANT_IMAGE_NAME: ${{ env.TENANT_IMAGE_NAME }}
|
||||
STAGING_TENANT_IMAGE_NAME: ${{ env.STAGING_TENANT_IMAGE_NAME }}
|
||||
TAG_SHA: staging-${{ steps.tags.outputs.sha }}
|
||||
TAG_LATEST: staging-latest
|
||||
GIT_SHA: ${{ github.sha }}
|
||||
REPO: ${{ github.repository }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_STAGING_ECR_ACCESS_KEY_ID: ${{ secrets.AWS_STAGING_ECR_ACCESS_KEY_ID }}
|
||||
AWS_STAGING_ECR_SECRET_ACCESS_KEY: ${{ secrets.AWS_STAGING_ECR_SECRET_ACCESS_KEY }}
|
||||
AWS_DEFAULT_REGION: us-east-2
|
||||
run: |
|
||||
set -euo pipefail
|
||||
ECR_REGISTRY="${TENANT_IMAGE_NAME%%/*}"
|
||||
aws ecr get-login-password --region us-east-2 | \
|
||||
docker login --username AWS --password-stdin "${ECR_REGISTRY}"
|
||||
|
||||
build_tags=(
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_SHA}"
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_LATEST}"
|
||||
)
|
||||
if [ -n "${AWS_STAGING_ECR_ACCESS_KEY_ID:-}" ] && [ -n "${AWS_STAGING_ECR_SECRET_ACCESS_KEY:-}" ]; then
|
||||
STAGING_ECR_REGISTRY="${STAGING_TENANT_IMAGE_NAME%%/*}"
|
||||
AWS_ACCESS_KEY_ID="${AWS_STAGING_ECR_ACCESS_KEY_ID}" \
|
||||
AWS_SECRET_ACCESS_KEY="${AWS_STAGING_ECR_SECRET_ACCESS_KEY}" \
|
||||
aws ecr get-login-password --region us-east-2 | \
|
||||
docker login --username AWS --password-stdin "${STAGING_ECR_REGISTRY}"
|
||||
build_tags+=(
|
||||
--tag "${STAGING_TENANT_IMAGE_NAME}:${TAG_SHA}"
|
||||
--tag "${STAGING_TENANT_IMAGE_NAME}:${TAG_LATEST}"
|
||||
)
|
||||
else
|
||||
echo "::notice::Skipping staging ECR tenant push; AWS_STAGING_ECR_ACCESS_KEY_ID/AWS_STAGING_ECR_SECRET_ACCESS_KEY are not configured."
|
||||
fi
|
||||
|
||||
docker buildx build \
|
||||
--file ./workspace-server/Dockerfile.tenant \
|
||||
--build-arg NEXT_PUBLIC_PLATFORM_URL= \
|
||||
@@ -193,8 +234,7 @@ jobs:
|
||||
--label "org.opencontainers.image.revision=${GIT_SHA}" \
|
||||
--label "org.opencontainers.image.created=$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
|
||||
--label "molecule.workflow.run_id=${GITHUB_RUN_ID}" \
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_SHA}" \
|
||||
--tag "${TENANT_IMAGE_NAME}:${TAG_LATEST}" \
|
||||
"${build_tags[@]}" \
|
||||
--push .
|
||||
|
||||
# bp-exempt: production deploy side-effect; merge is gated by CI / all-required and this job waits for push CI before acting.
|
||||
|
||||
+1
-1
@@ -36,7 +36,7 @@ e2e_mint_test_token() {
|
||||
local admin_bearer="${MOLECULE_ADMIN_TOKEN:-${ADMIN_TOKEN:-}}"
|
||||
local admin_auth=()
|
||||
[ -n "$admin_bearer" ] && admin_auth=(-H "Authorization: Bearer $admin_bearer")
|
||||
body=$(curl -s -w "\n%{http_code}" "$BASE/admin/workspaces/$wid/test-token" "${admin_auth[@]}")
|
||||
body=$(curl -s -w "\n%{http_code}" "$BASE/admin/workspaces/$wid/test-token" ${admin_auth[@]+"${admin_auth[@]}"})
|
||||
local code
|
||||
code=$(printf '%s' "$body" | tail -n1)
|
||||
local json
|
||||
|
||||
@@ -71,7 +71,7 @@ pv_assert_runtime() {
|
||||
set +e
|
||||
resp=$(curl -sS -X POST "$base_url/workspaces/$wid/mcp" \
|
||||
-H "Authorization: Bearer $wtok" \
|
||||
"${org_header[@]}" \
|
||||
${org_header[@]+"${org_header[@]}"} \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$PV_RPC_BODY" \
|
||||
-o /tmp/pv_mcp_body.json -w "%{http_code}" 2>/dev/null)
|
||||
|
||||
@@ -24,7 +24,8 @@
|
||||
#
|
||||
# Only PROVISIONING differs from staging:
|
||||
# - staging: POST /cp/admin/orgs (cold EC2 tenant) + per-tenant admin
|
||||
# token + each workspace's auth_token from the POST /workspaces resp.
|
||||
# token + each workspace's MCP bearer from create response or an admin
|
||||
# token-mint fallback.
|
||||
# - local: POST /workspaces directly against the local stack
|
||||
# (BASE, default http://localhost:8080), MCP bearer minted via
|
||||
# GET /admin/workspaces/:id/test-token (e2e_mint_test_token —
|
||||
@@ -103,7 +104,7 @@ teardown() {
|
||||
log "[teardown] deleting ${#CREATED_WSIDS[@]} workspace(s) this run created (scoped)"
|
||||
for wid in ${CREATED_WSIDS[@]+"${CREATED_WSIDS[@]}"}; do
|
||||
[ -n "$wid" ] || continue
|
||||
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" "${ADMIN_AUTH[@]}" >/dev/null 2>&1 || true
|
||||
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} >/dev/null 2>&1 || true
|
||||
done
|
||||
exit $rc
|
||||
}
|
||||
@@ -112,7 +113,7 @@ trap teardown EXIT INT TERM
|
||||
# Pre-sweep workspaces a prior crashed run of THIS script left behind
|
||||
# (name prefix match only — never a blanket delete). The trap fires on
|
||||
# normal exit, but a kill -9 / SIGPIPE can bypass it.
|
||||
PRIOR=$(curl -s "$BASE/workspaces" "${ADMIN_AUTH[@]}" | python3 -c '
|
||||
PRIOR=$(curl -s "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} | python3 -c '
|
||||
import json, sys
|
||||
try:
|
||||
print(" ".join(w["id"] for w in json.load(sys.stdin) if w.get("name","").startswith("PV-Local-")))
|
||||
@@ -121,7 +122,7 @@ except Exception:
|
||||
' 2>/dev/null)
|
||||
for _wid in $PRIOR; do
|
||||
log "Pre-sweeping prior PV-Local workspace: $_wid"
|
||||
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" "${ADMIN_AUTH[@]}" >/dev/null 2>&1 || true
|
||||
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} >/dev/null 2>&1 || true
|
||||
done
|
||||
|
||||
# ─── Local-stack preflight ─────────────────────────────────────────────
|
||||
@@ -132,10 +133,10 @@ if ! curl -fsS "$BASE/health" -m 5 >/dev/null 2>&1; then
|
||||
fi
|
||||
# admin/test-token is the local MCP-bearer mint path; it 404s in
|
||||
# production. If it is off, this gate cannot drive the literal call.
|
||||
if ! curl -fsS "$BASE/admin/workspaces/preflight-probe/test-token" "${ADMIN_AUTH[@]}" -m 5 >/dev/null 2>&1; then
|
||||
if ! curl -fsS "$BASE/admin/workspaces/preflight-probe/test-token" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -m 5 >/dev/null 2>&1; then
|
||||
# A 404 here is EITHER "no such ws" (fine — endpoint is enabled) OR the
|
||||
# endpoint is disabled (MOLECULE_ENV=production). Distinguish by body.
|
||||
PROBE=$(curl -s "$BASE/admin/workspaces/preflight-probe/test-token" "${ADMIN_AUTH[@]}" -m 5 2>/dev/null)
|
||||
PROBE=$(curl -s "$BASE/admin/workspaces/preflight-probe/test-token" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -m 5 2>/dev/null)
|
||||
if echo "$PROBE" | grep -qi 'production\|disabled\|not found.*endpoint'; then
|
||||
echo "::error::GET /admin/workspaces/:id/test-token disabled (MOLECULE_ENV=production?). Cannot mint a local MCP bearer." >&2
|
||||
exit 1
|
||||
@@ -240,7 +241,7 @@ else
|
||||
fi
|
||||
log "1/5 provisioning parent ($PARENT_RUNTIME, mode=$PV_LOCAL_PROVISION_MODE) + one sibling per runtime under test..."
|
||||
|
||||
P_RESP=$(curl -s -X POST "$BASE/workspaces" "${ADMIN_AUTH[@]}" -H "Content-Type: application/json" \
|
||||
P_RESP=$(curl -s -X POST "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -H "Content-Type: application/json" \
|
||||
-d "{\"name\":\"${NAME_PREFIX}-parent\",\"runtime\":\"$PARENT_RUNTIME\",\"tier\":3$PARENT_EXTRA,\"secrets\":$PARENT_SECRETS}")
|
||||
PARENT_ID=$(echo "$P_RESP" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("id",""))' 2>/dev/null)
|
||||
if [ -z "$PARENT_ID" ]; then
|
||||
@@ -290,7 +291,7 @@ for rt in $PV_RUNTIMES; do
|
||||
CREATE_RUNTIME="$rt"
|
||||
CREATE_EXTRA=""
|
||||
fi
|
||||
R=$(curl -s -X POST "$BASE/workspaces" "${ADMIN_AUTH[@]}" -H "Content-Type: application/json" \
|
||||
R=$(curl -s -X POST "$BASE/workspaces" ${ADMIN_AUTH[@]+"${ADMIN_AUTH[@]}"} -H "Content-Type: application/json" \
|
||||
-d "{\"name\":\"${NAME_PREFIX}-$rt\",\"runtime\":\"$CREATE_RUNTIME\",\"tier\":2,\"parent_id\":\"$PARENT_ID\"$CREATE_EXTRA,\"secrets\":$SEC}")
|
||||
WID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("id",""))' 2>/dev/null)
|
||||
if [ -z "$WID" ]; then
|
||||
|
||||
@@ -40,8 +40,10 @@
|
||||
# drives: POST /cp/admin/orgs (provision), GET
|
||||
# /cp/admin/orgs/:slug/admin-token (per-tenant token), DELETE
|
||||
# /cp/admin/tenants/:slug (teardown). The per-tenant admin token drives
|
||||
# tenant workspace creation; each workspace's OWN auth_token (returned by
|
||||
# POST /workspaces) drives its MCP call.
|
||||
# tenant workspace creation; each workspace's OWN auth_token drives its
|
||||
# MCP call. External-like runtimes may return the token in POST
|
||||
# /workspaces; managed container runtimes usually require the admin token
|
||||
# mint fallback below.
|
||||
#
|
||||
# Required env:
|
||||
# MOLECULE_ADMIN_TOKEN CP admin bearer — Railway staging CP_ADMIN_API_TOKEN
|
||||
@@ -104,6 +106,46 @@ tenant_call() {
|
||||
-H "Content-Type: application/json" "$@"
|
||||
}
|
||||
|
||||
tenant_call_capture() {
|
||||
local method="$1" path="$2" out="$3"; shift 3
|
||||
curl -sS -o "$out" -w "%{http_code}" -X "$method" "$TENANT_URL$path" \
|
||||
-H "Authorization: Bearer $TENANT_TOKEN" \
|
||||
-H "X-Molecule-Org-Id: $ORG_ID" \
|
||||
-H "Content-Type: application/json" "$@"
|
||||
}
|
||||
|
||||
redact_token_body() {
|
||||
python3 -c '
|
||||
import json, re, sys
|
||||
raw = sys.stdin.read()
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
print(re.sub(r"(?i)([a-z0-9_]*token)=([^&\\s]+)", r"\1=<redacted>", raw)[:500])
|
||||
raise SystemExit(0)
|
||||
|
||||
def scrub(v):
|
||||
if isinstance(v, dict):
|
||||
return {k: ("<redacted>" if "token" in k.lower() else scrub(val)) for k, val in v.items()}
|
||||
if isinstance(v, list):
|
||||
return [scrub(x) for x in v]
|
||||
return v
|
||||
|
||||
print(json.dumps(scrub(data), separators=(",", ":"))[:500])
|
||||
'
|
||||
}
|
||||
|
||||
extract_auth_token() {
|
||||
python3 -c "
|
||||
import sys, json
|
||||
try:
|
||||
d = json.load(sys.stdin)
|
||||
except Exception:
|
||||
print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or d.get('connection', {}).get('auth_token') or '')
|
||||
" 2>/dev/null
|
||||
}
|
||||
|
||||
# ─── Scoped teardown ───────────────────────────────────────────────────
|
||||
# Deletes ONLY the org this run created (DELETE /cp/admin/tenants/$SLUG
|
||||
# with the {"confirm":$SLUG} fat-finger guard). Never a cluster-wide
|
||||
@@ -218,34 +260,31 @@ for rt in $PV_RUNTIMES; do
|
||||
R=$(tenant_call POST /workspaces \
|
||||
-d "{\"name\":\"pv-$rt\",\"runtime\":\"$rt\",\"tier\":2,\"parent_id\":\"$PARENT_ID\",\"secrets\":$SECRETS_JSON}")
|
||||
WID=$(echo "$R" | python3 -c "import sys,json; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
|
||||
# auth_token is top-level for container runtimes; external-like nest it
|
||||
# under connection.auth_token (verified vs staging response shape).
|
||||
WTOK=$(echo "$R" | python3 -c "
|
||||
import sys, json
|
||||
try: d = json.load(sys.stdin)
|
||||
except Exception: print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or d.get('connection', {}).get('auth_token') or '')
|
||||
" 2>/dev/null)
|
||||
# External-like runtimes may return connection.auth_token on create.
|
||||
# Managed container runtimes usually return only id/status here, then
|
||||
# receive their bearer through registry/bootstrap; for this literal MCP
|
||||
# driver we mint an admin test token below.
|
||||
WTOK=$(echo "$R" | extract_auth_token)
|
||||
[ -n "$WID" ] || fail "$rt workspace create failed: $(echo "$R" | head -c 300)"
|
||||
TOKEN_DIAG=""
|
||||
if [ -z "$WTOK" ]; then
|
||||
TTOK_RESP=$(tenant_call POST "/admin/workspaces/$WID/tokens" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | python3 -c "
|
||||
import sys, json
|
||||
try: d = json.load(sys.stdin)
|
||||
except Exception: print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or '')
|
||||
" 2>/dev/null)
|
||||
TTOK_FILE=$(mktemp)
|
||||
TTOK_CODE=$(tenant_call_capture POST "/admin/workspaces/$WID/tokens" "$TTOK_FILE" 2>/dev/null || echo "curl_error")
|
||||
TTOK_RESP=$(cat "$TTOK_FILE" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | extract_auth_token)
|
||||
TOKEN_DIAG="POST /admin/workspaces/$WID/tokens -> HTTP $TTOK_CODE body: $(echo "$TTOK_RESP" | redact_token_body)"
|
||||
rm -f "$TTOK_FILE"
|
||||
fi
|
||||
if [ -z "$WTOK" ]; then
|
||||
TTOK_RESP=$(tenant_call GET "/admin/workspaces/$WID/test-token" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | python3 -c "
|
||||
import sys, json
|
||||
try: d = json.load(sys.stdin)
|
||||
except Exception: print(''); sys.exit(0)
|
||||
print(d.get('auth_token') or '')
|
||||
" 2>/dev/null)
|
||||
TTOK_FILE=$(mktemp)
|
||||
TTOK_CODE=$(tenant_call_capture GET "/admin/workspaces/$WID/test-token" "$TTOK_FILE" 2>/dev/null || echo "curl_error")
|
||||
TTOK_RESP=$(cat "$TTOK_FILE" 2>/dev/null || true)
|
||||
WTOK=$(echo "$TTOK_RESP" | extract_auth_token)
|
||||
TOKEN_DIAG="${TOKEN_DIAG}
|
||||
GET /admin/workspaces/$WID/test-token -> HTTP $TTOK_CODE body: $(echo "$TTOK_RESP" | redact_token_body)"
|
||||
rm -f "$TTOK_FILE"
|
||||
fi
|
||||
[ -n "$WTOK" ] || fail "$rt workspace did not return or mint an auth_token — cannot drive its MCP call (resp: $(echo "$R" | head -c 300))"
|
||||
[ -n "$WTOK" ] || fail "$rt workspace did not return or mint an auth_token — cannot drive its MCP call (create_resp: $(echo "$R" | redact_token_body); token_fallbacks: $TOKEN_DIAG)"
|
||||
WS_IDS[$rt]="$WID"
|
||||
WS_TOKENS[$rt]="$WTOK"
|
||||
ALL_WS_IDS="$ALL_WS_IDS $WID"
|
||||
|
||||
@@ -84,6 +84,7 @@ type mcpTool struct {
|
||||
type MCPHandler struct {
|
||||
database *sql.DB
|
||||
broadcaster *events.Broadcaster
|
||||
a2aProxy func(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error)
|
||||
|
||||
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
|
||||
// every v2 tool calls memoryV2Available() first and returns a
|
||||
@@ -98,6 +99,14 @@ func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandle
|
||||
return &MCPHandler{database: database, broadcaster: broadcaster}
|
||||
}
|
||||
|
||||
func (h *MCPHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
||||
if h.a2aProxy != nil {
|
||||
return h.a2aProxy(ctx, workspaceID, body, callerID, logActivity)
|
||||
}
|
||||
wh := NewWorkspaceHandler(h.broadcaster, nil, "", "")
|
||||
return wh.ProxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Tool definitions (mirrors workspace/a2a_mcp_server.py TOOLS list)
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -53,6 +53,15 @@ func mcpPost(t *testing.T, h *MCPHandler, workspaceID string, body interface{})
|
||||
return w
|
||||
}
|
||||
|
||||
func expectCanCommunicateSiblings(mock sqlmock.Sqlmock, callerID, targetID, parentID string) {
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(callerID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(callerID, parentID))
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(targetID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(targetID, parentID))
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// initialize
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -178,6 +187,98 @@ func TestMCPHandler_ToolsList_ContainsExpectedTools(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMCPHandler_DelegateTask_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
var gotTarget, gotCaller string
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
||||
gotTarget = workspaceID
|
||||
gotCaller = callerID
|
||||
if !logActivity {
|
||||
t.Fatal("delegate_task should log through platform A2A proxy")
|
||||
}
|
||||
if !strings.Contains(string(body), "do work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "do work",
|
||||
}, mcpCallTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task returned error: %v", err)
|
||||
}
|
||||
if out != "done" {
|
||||
t.Fatalf("delegate_task response = %q, want done", out)
|
||||
}
|
||||
if gotTarget != targetID || gotCaller != callerID {
|
||||
t.Fatalf("proxy called with target=%q caller=%q, want target=%q caller=%q", gotTarget, gotCaller, targetID, callerID)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
called := make(chan struct{}, 1)
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
if !strings.Contains(string(body), "async work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
called <- struct{}{}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "async work",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case <-called:
|
||||
default:
|
||||
t.Fatal("async delegate did not call platform A2A proxy")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// notifications/initialized
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -7,24 +7,19 @@ package handlers
|
||||
// and A2A response parsing helpers.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// insertMCPDelegationRow writes a delegation activity row so the canvas
|
||||
// Agent Comms tab can show the task text for MCP-initiated delegations.
|
||||
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
|
||||
@@ -190,15 +185,6 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
// Non-fatal: still make the A2A call even if activity log write fails.
|
||||
}
|
||||
|
||||
agentURL, err := mcpResolveURL(ctx, h.database, targetID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// SSRF defence: reject private/metadata URLs before making outbound call.
|
||||
if err := isSafeURL(agentURL); err != nil {
|
||||
return "", fmt.Errorf("invalid workspace URL: %w", err)
|
||||
}
|
||||
|
||||
a2aBody, err := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": uuid.New().String(),
|
||||
@@ -218,36 +204,17 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(reqCtx, "POST", agentURL+"/a2a", bytes.NewReader(a2aBody))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
// X-Workspace-ID identifies this caller to the A2A proxy. The /workspaces/:id/a2a
|
||||
// endpoint is intentionally outside WorkspaceAuth (agents do not hold bearer tokens
|
||||
// to peer workspaces). Access control is enforced by CanCommunicate above, which
|
||||
// already validated callerID → targetID before this request is constructed.
|
||||
// callerID was authenticated by WorkspaceAuth on the MCP bridge entry point,
|
||||
// so this header reflects a verified caller identity, not a spoofable value.
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
status, body, err := h.proxyA2ARequest(reqCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", err.Error())
|
||||
return "", fmt.Errorf("A2A call failed: %w", err)
|
||||
return "", fmt.Errorf("A2A proxy failed: %w", err)
|
||||
}
|
||||
if status < 200 || status >= 300 {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "failed", fmt.Sprintf("A2A proxy returned status %d", status))
|
||||
return "", fmt.Errorf("A2A proxy returned status %d", status)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// A 200/500 from the peer still means the call was dispatched — only
|
||||
// network errors are truly "failed". Status 'dispatched' is correct for
|
||||
// any HTTP response (peer's A2A layer handles the actual processing).
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "dispatched", "")
|
||||
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
return extractA2AText(body), nil
|
||||
}
|
||||
|
||||
@@ -278,24 +245,13 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
|
||||
// Fire and forget in a detached goroutine. Use a background context so
|
||||
// the call is not cancelled when the HTTP request completes.
|
||||
// RFC internal#524 Layer 1: globalGoAsync — the detached call reads
|
||||
// db.DB (mcpResolveURL + updateMCPDelegationStatus) and must be
|
||||
// drained by drainTestAsync before any t.Cleanup-driven db.DB swap.
|
||||
// RFC internal#524 Layer 1: globalGoAsync — the detached call reads db.DB
|
||||
// through the platform A2A proxy and must be drained by drainTestAsync
|
||||
// before any t.Cleanup-driven db.DB swap.
|
||||
globalGoAsync(func() {
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
agentURL, err := mcpResolveURL(bgCtx, h.database, targetID)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: resolve URL for %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
// SSRF defence: reject private/metadata URLs before making outbound call.
|
||||
if err := isSafeURL(agentURL); err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: unsafe URL for %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": delegationID,
|
||||
@@ -309,22 +265,15 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
},
|
||||
})
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(bgCtx, "POST", agentURL+"/a2a", bytes.NewReader(a2aBody))
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: create request: %v", err)
|
||||
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil || status < 200 || status >= 300 {
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s: %v", targetID, err)
|
||||
} else {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s returned status %d", targetID, status)
|
||||
}
|
||||
return
|
||||
}
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("X-Workspace-ID", callerID)
|
||||
|
||||
resp, err := http.DefaultClient.Do(httpReq)
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A call to %s: %v", targetID, err)
|
||||
return
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
// Drain response so the connection can be reused.
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
})
|
||||
|
||||
return fmt.Sprintf(`{"task_id":%q,"status":"dispatched","target_id":%q}`, delegationID, targetID), nil
|
||||
@@ -405,7 +354,6 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
return "Message sent.", nil
|
||||
}
|
||||
|
||||
|
||||
func (h *MCPHandler) toolCommitMemory(ctx context.Context, workspaceID string, args map[string]interface{}) (string, error) {
|
||||
// PR-6 (RFC #2728) compat shim: when the v2 plugin is wired
|
||||
// (MEMORY_PLUGIN_URL set), translate legacy scope→namespace and
|
||||
@@ -534,56 +482,6 @@ func (h *MCPHandler) toolRecallMemory(ctx context.Context, workspaceID string, a
|
||||
// Helpers
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// mcpResolveURL returns a routable URL for a workspace's A2A server.
|
||||
//
|
||||
// Resolution order:
|
||||
// 1. Docker-internal URL cache (set by provisioner; correct when platform is in Docker)
|
||||
// 2. Redis URL cache
|
||||
// 3. DB `url` column fallback, with 127.0.0.1→Docker bridge rewrite when in Docker
|
||||
//
|
||||
// SECURITY (F1083 / #1130): all three paths run the returned URL through
|
||||
// validateAgentURL to block SSRF targets (private IPs, loopback, cloud metadata).
|
||||
func mcpResolveURL(ctx context.Context, database *sql.DB, workspaceID string) (string, error) {
|
||||
if platformInDocker {
|
||||
if url, err := db.GetCachedInternalURL(ctx, workspaceID); err == nil && url != "" {
|
||||
if err := validateAgentURL(url); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from internal cache: %w", workspaceID, err)
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
}
|
||||
if url, err := db.GetCachedURL(ctx, workspaceID); err == nil && url != "" {
|
||||
if platformInDocker && strings.HasPrefix(url, "http://127.0.0.1:") {
|
||||
return provisioner.InternalURL(workspaceID), nil
|
||||
}
|
||||
if err := validateAgentURL(url); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from Redis cache: %w", workspaceID, err)
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
|
||||
var urlStr sql.NullString
|
||||
var status string
|
||||
if err := database.QueryRowContext(ctx,
|
||||
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&urlStr, &status); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", fmt.Errorf("workspace %s not found", workspaceID)
|
||||
}
|
||||
return "", fmt.Errorf("workspace lookup failed: %w", err)
|
||||
}
|
||||
if !urlStr.Valid || urlStr.String == "" {
|
||||
return "", fmt.Errorf("workspace %s has no URL (status: %s)", workspaceID, status)
|
||||
}
|
||||
if platformInDocker && strings.HasPrefix(urlStr.String, "http://127.0.0.1:") {
|
||||
return provisioner.InternalURL(workspaceID), nil
|
||||
}
|
||||
if err := validateAgentURL(urlStr.String); err != nil {
|
||||
return "", fmt.Errorf("workspace %s: forbidden URL from DB: %w", workspaceID, err)
|
||||
}
|
||||
return urlStr.String, nil
|
||||
}
|
||||
|
||||
// extractA2AText extracts human-readable text from an A2A JSON-RPC response body.
|
||||
// Falls back to the raw JSON when no text part can be found.
|
||||
func extractA2AText(body []byte) string {
|
||||
@@ -632,4 +530,3 @@ func extractA2AText(body []byte) string {
|
||||
b, _ := json.Marshal(result)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
|
||||
// Go's net.ParseIP.To4() before Contains() runs, so the IPv4 rules above
|
||||
// catch those without a separate entry.
|
||||
//
|
||||
// F1083/#1130 (SSRF on mcpResolveURL / a2a_proxy resolveAgentURL): in
|
||||
// F1083/#1130 (SSRF on direct A2A URL resolution): in
|
||||
// addition to blocking IP literals, DNS names are now resolved and each
|
||||
// returned IP is checked against the blocklist. This closes the gap where
|
||||
// an attacker could register agent.example.com pointing to 169.254.169.254.
|
||||
|
||||
Reference in New Issue
Block a user