Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8a5c6cf771 | |||
| c8efa8f82a | |||
| f04c80b606 | |||
| 7f3a4491bb | |||
| 7c6986a96b | |||
| 19aa126c18 | |||
| 5f99c29de3 | |||
| d9ff9d036a | |||
| 91c9893ad4 | |||
| d8ff0b2503 | |||
| 6447edd2fd | |||
| d0ab3d7c4b |
@@ -109,6 +109,9 @@ jobs:
|
||||
E2E_ANTHROPIC_API_KEY: ${{ secrets.MOLECULE_STAGING_ANTHROPIC_API_KEY }}
|
||||
E2E_OPENAI_API_KEY: ${{ secrets.MOLECULE_STAGING_OPENAI_API_KEY }}
|
||||
E2E_RUNTIME: claude-code
|
||||
# Platform-managed create path (moonshot/kimi-k2.6, no tenant key) — the
|
||||
# combo proven to create cleanly; this test only needs the ws online.
|
||||
E2E_LLM_PATH: platform
|
||||
E2E_MODEL_SLUG: MiniMax-M2
|
||||
E2E_RUN_ID: "${{ github.run_id }}-${{ github.run_attempt }}"
|
||||
E2E_KEEP_ORG: ${{ github.event.inputs.keep_org && '1' || '0' }}
|
||||
|
||||
@@ -172,7 +172,16 @@ jobs:
|
||||
# and defeats the cost saving. Operators can override via the
|
||||
# workflow_dispatch flow (no input wired here yet — runtime
|
||||
# override is enough for ad-hoc).
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.runtime == 'hermes' && 'openai/gpt-4o' || github.event.inputs.runtime == 'codex' && 'openai/gpt-4o' || github.event.inputs.runtime == 'google-adk' && 'google_genai:gemini-2.5-pro' || 'MiniMax-M2' }}
|
||||
#
|
||||
# #2263 deploy-skew: the claude-code default is the COLON-namespaced BYOK
|
||||
# id `minimax:MiniMax-M2.7`, NOT bare `MiniMax-M2`. The deployed staging
|
||||
# ws-server's compiled registry can lag source; validateRegisteredModelForRuntime
|
||||
# 400s the bare form on an older image (the sibling Platform Boot job, on
|
||||
# the SAME image, succeeds with namespaced `moonshot/kimi-k2.6`). The colon
|
||||
# form stays in the BYOK `minimax` arm (providers.yaml:851) so it resolves
|
||||
# provider=minimax (BYOK) and the #1994 byok-not-platform guard still
|
||||
# passes — the slash/platform form `minimax/MiniMax-M2.7` would not.
|
||||
E2E_MODEL_SLUG: ${{ github.event.inputs.runtime == 'hermes' && 'openai/gpt-4o' || github.event.inputs.runtime == 'codex' && 'openai/gpt-4o' || github.event.inputs.runtime == 'google-adk' && 'google_genai:gemini-2.5-pro' || 'minimax:MiniMax-M2.7' }}
|
||||
E2E_RUN_ID: "${{ github.run_id }}-${{ github.run_attempt }}"
|
||||
E2E_KEEP_ORG: ${{ github.event.inputs.keep_org && '1' || '0' }}
|
||||
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
# default + 401, see PR #1714.)
|
||||
#
|
||||
# claude-code → auth-aware:
|
||||
# E2E_MINIMAX_API_KEY → "MiniMax-M2"
|
||||
# E2E_MINIMAX_API_KEY → "minimax:MiniMax-M2.7"
|
||||
# (colon-namespaced BYOK id; bare
|
||||
# "MiniMax-M2" 400s on a deploy-skewed
|
||||
# staging registry — #2263)
|
||||
# E2E_ANTHROPIC_API_KEY → "claude-sonnet-4-6"
|
||||
# otherwise → "sonnet"
|
||||
#
|
||||
@@ -82,7 +85,17 @@ pick_model_slug() {
|
||||
hermes) printf 'openai/gpt-4o' ;;
|
||||
claude-code)
|
||||
if [ -n "${E2E_MINIMAX_API_KEY:-}" ]; then
|
||||
printf 'MiniMax-M2'
|
||||
# Namespaced (colon) BYOK id, not bare "MiniMax-M2" (#2263 deploy-skew):
|
||||
# bare ids can lag the deployed staging ws-server's compiled registry,
|
||||
# so workspace-create's validateRegisteredModelForRuntime 400s the bare
|
||||
# form on an older image. The colon-namespaced `minimax:MiniMax-M2.7`
|
||||
# resolves the same way the proven-working sibling `moonshot/kimi-k2.6`
|
||||
# does. It stays in the BYOK `minimax` arm (providers.yaml:851), so
|
||||
# DeriveProvider -> provider_selection=minimax (BYOK) and the #1994
|
||||
# byok-not-platform guard (test_staging_full_saas.sh:1000) still passes —
|
||||
# unlike the slash/platform form `minimax/MiniMax-M2.7`, which resolves
|
||||
# to provider=platform and would trip that guard.
|
||||
printf 'minimax:MiniMax-M2.7'
|
||||
elif [ -n "${E2E_ANTHROPIC_API_KEY:-}" ]; then
|
||||
printf 'claude-sonnet-4-6'
|
||||
else
|
||||
|
||||
@@ -49,13 +49,13 @@ run_test "codex → slash-form fallback" codex
|
||||
run_test "claude-code → OAuth/default alias" claude-code "sonnet"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG E2E_ANTHROPIC_API_KEY; E2E_MINIMAX_API_KEY="mx-test" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + MiniMax key → MiniMax model" "$got" "MiniMax-M2"
|
||||
assert_eq "claude-code + MiniMax key → MiniMax model" "$got" "minimax:MiniMax-M2.7"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG E2E_MINIMAX_API_KEY; E2E_ANTHROPIC_API_KEY="sk-ant-test" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + Anthropic API key → Anthropic API model" "$got" "claude-sonnet-4-6"
|
||||
|
||||
got=$(unset E2E_MODEL_SLUG; E2E_MINIMAX_API_KEY="mx-priority" E2E_ANTHROPIC_API_KEY="sk-ant-loser" pick_model_slug claude-code)
|
||||
assert_eq "claude-code + both keys → MiniMax priority" "$got" "MiniMax-M2"
|
||||
assert_eq "claude-code + both keys → MiniMax priority" "$got" "minimax:MiniMax-M2.7"
|
||||
|
||||
# ── Fallback for unknown runtime ──
|
||||
# Picks slash-form (hermes-shaped) since hermes is the historical
|
||||
|
||||
@@ -50,7 +50,11 @@
|
||||
# Optional env (mirrors the full-saas harness where they overlap):
|
||||
# E2E_RUNTIME claude-code (default)
|
||||
# E2E_PROVISION_TIMEOUT_SECS default 900 (cold EC2 budget)
|
||||
# E2E_WORKSPACE_ONLINE_TIMEOUT_SECS default 3600 (cold-boot worst-case)
|
||||
# E2E_WORKSPACE_ONLINE_TIMEOUT_SECS default 900 (15min). A workspace that
|
||||
# cannot reach online in 15min is a staging/boot problem,
|
||||
# not slow cold-boot — fail fast so the trap tears down the
|
||||
# EC2 instead of hanging ~1h and leaking a running instance
|
||||
# (observed: run 216031 hung 32min with a live e2e-rec EC2).
|
||||
# E2E_RECONCILE_OFFLINE_TIMEOUT_SECS default 180 (PRIMARY: leave 'online'.
|
||||
# Reconciler cadence is 60s — 3 cycles +
|
||||
# AWS terminate-visibility slack.)
|
||||
@@ -82,7 +86,7 @@ CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
|
||||
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLECULE_ADMIN_TOKEN required — Railway staging CP_ADMIN_API_TOKEN}"
|
||||
RUNTIME="${E2E_RUNTIME:-claude-code}"
|
||||
PROVISION_TIMEOUT_SECS="${E2E_PROVISION_TIMEOUT_SECS:-900}"
|
||||
WORKSPACE_ONLINE_TIMEOUT_SECS="${E2E_WORKSPACE_ONLINE_TIMEOUT_SECS:-3600}"
|
||||
WORKSPACE_ONLINE_TIMEOUT_SECS="${E2E_WORKSPACE_ONLINE_TIMEOUT_SECS:-900}"
|
||||
# PRIMARY bound: the reconciler ticks every 60s; it needs one cycle to see
|
||||
# the dead instance after AWS makes the terminate visible to DescribeInstances
|
||||
# (typically seconds, but can lag). 180s = ~3 cycles + slack.
|
||||
@@ -325,7 +329,18 @@ ws_field() {
|
||||
# tolerable — but wiring the same keys keeps boot behaviour identical to the
|
||||
# sibling and avoids a config path that only this test would exercise.
|
||||
SECRETS_JSON='{}'
|
||||
if [ -n "${E2E_MINIMAX_API_KEY:-}" ]; then
|
||||
# Platform-managed path (E2E_LLM_PATH=platform, the DEFAULT for this test):
|
||||
# the workspace boots on the CP LLM proxy with NO tenant key, model
|
||||
# moonshot/kimi-k2.6 — the exact create combo test_staging_full_saas.sh uses
|
||||
# successfully. This test only needs the workspace to reach status=online so
|
||||
# it can kill the EC2 and assert the reconciler heals it; it does NOT exercise
|
||||
# a real LLM completion, so the platform path is both sufficient and the one
|
||||
# proven to create cleanly. (The BYOK key paths below 400'd at create — see
|
||||
# the create-failure capture added below — which is why platform is default.)
|
||||
if [ "${E2E_LLM_PATH:-platform}" = "platform" ]; then
|
||||
log " LLM path: PLATFORM-MANAGED (no tenant key; moonshot/kimi-k2.6 via proxy)"
|
||||
SECRETS_JSON='{}'
|
||||
elif [ -n "${E2E_MINIMAX_API_KEY:-}" ]; then
|
||||
SECRETS_JSON=$(python3 -c "import json,os; print(json.dumps({'MINIMAX_API_KEY': os.environ['E2E_MINIMAX_API_KEY']}))")
|
||||
elif [ -n "${E2E_ANTHROPIC_API_KEY:-}" ]; then
|
||||
SECRETS_JSON=$(python3 -c "import json,os; print(json.dumps({'ANTHROPIC_API_KEY': os.environ['E2E_ANTHROPIC_API_KEY']}))")
|
||||
@@ -345,21 +360,32 @@ print(json.dumps({
|
||||
")
|
||||
fi
|
||||
|
||||
MODEL_SLUG=$(pick_model_slug "$RUNTIME")
|
||||
E2E_LLM_PATH="${E2E_LLM_PATH:-platform}" MODEL_SLUG=$(E2E_LLM_PATH="${E2E_LLM_PATH:-platform}" pick_model_slug "$RUNTIME")
|
||||
log " MODEL_SLUG=$MODEL_SLUG"
|
||||
|
||||
log "4/6 Provisioning workspace (runtime=$RUNTIME)..."
|
||||
# --fail-with-body makes curl exit non-zero on a 4xx/5xx but STILL writes the
|
||||
# response body to stdout; the `|| { ... }` catches that so the body is printed
|
||||
# instead of `set -e` aborting the command-substitution silently (the old bug
|
||||
# that hid the real HTTP-400 reason). $WS_RESP holds the body either way.
|
||||
WS_RESP=$(tenant_call POST /workspaces \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"name\":\"E2E Reconciler\",\"runtime\":\"$RUNTIME\",\"tier\":2,\"model\":\"$MODEL_SLUG\",\"secrets\":$SECRETS_JSON}")
|
||||
WS_ID=$(echo "$WS_RESP" | python3 -c "import json,sys; print(json.load(sys.stdin)['id'])")
|
||||
[ -z "$WS_ID" ] && fail "Workspace create response missing 'id': $WS_RESP"
|
||||
-d "{\"name\":\"E2E Reconciler\",\"runtime\":\"$RUNTIME\",\"tier\":2,\"model\":\"$MODEL_SLUG\",\"secrets\":$SECRETS_JSON}") || {
|
||||
rc=$?
|
||||
fail "Workspace create failed (curl rc=$rc, model=$MODEL_SLUG). Response body: $WS_RESP"
|
||||
}
|
||||
WS_ID=$(echo "$WS_RESP" | python3 -c "import json,sys; print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
|
||||
[ -z "$WS_ID" ] && fail "Workspace create response missing 'id' (model=$MODEL_SLUG): $WS_RESP"
|
||||
log " WS_ID=$WS_ID"
|
||||
|
||||
# Wait for the workspace to reach status=online and capture its instance_id.
|
||||
log " Waiting for workspace to reach status=online (up to $((WORKSPACE_ONLINE_TIMEOUT_SECS/60)) min)..."
|
||||
ONLINE_DEADLINE=$(( $(date +%s) + WORKSPACE_ONLINE_TIMEOUT_SECS ))
|
||||
ORIGINAL_INSTANCE_ID=""
|
||||
ONLINE_SINCE=""
|
||||
# Grace before falling back to the AWS workspace tag when the tenant API
|
||||
# does not surface instance_id (observed on staging).
|
||||
INSTANCE_ID_GRACE_SECS="${E2E_INSTANCE_ID_GRACE_SECS:-45}"
|
||||
WS_LAST_STATUS=""
|
||||
while true; do
|
||||
if [ "$(date +%s)" -gt "$ONLINE_DEADLINE" ]; then
|
||||
@@ -372,11 +398,27 @@ while true; do
|
||||
WS_LAST_STATUS="$WS_STATUS"
|
||||
fi
|
||||
if [ "$WS_STATUS" = "online" ]; then
|
||||
[ -z "$ONLINE_SINCE" ] && ONLINE_SINCE=$(date +%s)
|
||||
ORIGINAL_INSTANCE_ID=$(ws_field "$WS_ID" "instance_id")
|
||||
if [ -n "$ORIGINAL_INSTANCE_ID" ]; then
|
||||
break
|
||||
fi
|
||||
# online but instance_id not surfaced yet — keep polling briefly.
|
||||
# The workspace is online but the tenant API does not surface instance_id
|
||||
# (observed on staging — the DB has it, the API response omits it). After a
|
||||
# short grace, fall back to the AWS workspace-instance tag so the kill step
|
||||
# can proceed. The reconciler reads instance_id from the DB and acts on the
|
||||
# real EC2 regardless of what the API surfaces, so the AWS-tag instance is
|
||||
# the correct kill target. Without this fallback the loop spins to the online
|
||||
# deadline and fails with a misleading "never reached online".
|
||||
if [ $(( $(date +%s) - ONLINE_SINCE )) -ge "$INSTANCE_ID_GRACE_SECS" ]; then
|
||||
# ws-tenant-<slug>-<wsid...> is the workspace EC2 (vs tenant-<slug>).
|
||||
ORIGINAL_INSTANCE_ID=$(e2e_ec2_instances_for_slug "$SLUG" 2>/dev/null \
|
||||
| awk '$2 ~ /^ws-tenant-/ {print $1}' | sort -u | head -1)
|
||||
if [ -n "$ORIGINAL_INSTANCE_ID" ]; then
|
||||
log " instance_id not surfaced by API after ${INSTANCE_ID_GRACE_SECS}s — using AWS workspace tag: $ORIGINAL_INSTANCE_ID"
|
||||
break
|
||||
fi
|
||||
fi
|
||||
log " $WS_ID online but instance_id not populated yet — waiting"
|
||||
fi
|
||||
# 'failed' is transient on cold boot (bootstrap-watcher deadline vs heartbeat
|
||||
|
||||
@@ -886,7 +886,7 @@ fi
|
||||
# identical on main's scheduled synthetic E2E and on PRs (so it is an
|
||||
# environmental backend regression, never PR-introduced).
|
||||
if echo "$AGENT_TEXT" | grep -qiF "message contained no text content"; then
|
||||
fail "A2A — EMPTY COMPLETION (backend regression, NOT a platform/workspace-server bug). The configured model (MODEL_SLUG=${MODEL_SLUG:-?}) returned a 2xx completion with no text part; the runtime surfaced 'message contained no text content.'. Operator action: check the staging LLM backend / proxy for the canary model (MiniMax-M2 since #2710) — empty assistant turns, not an auth/quota/boot fault. Raw: $AGENT_TEXT"
|
||||
fail "A2A — EMPTY COMPLETION (backend regression, NOT a platform/workspace-server bug). The configured model (MODEL_SLUG=${MODEL_SLUG:-?}) returned a 2xx completion with no text part; the runtime surfaced 'message contained no text content.'. Operator action: check the staging LLM backend / proxy for the canary model (the claude-code default is minimax:MiniMax-M2.7 since #2263; was bare MiniMax-M2 #2710) — empty assistant turns, not an auth/quota/boot fault. Raw: $AGENT_TEXT"
|
||||
fi
|
||||
# Generic catch-all — falls through if none of the known regressions hit.
|
||||
if echo "$AGENT_TEXT" | grep -qiE "error|exception"; then
|
||||
@@ -952,7 +952,14 @@ for KA_ATTEMPT in $(seq 1 6); do
|
||||
KA_SAFE_BODY=$(printf '%s' "$KA_RESP" | sanitize_http_body)
|
||||
# Retry ONLY on transient transport errors — never on an agent-level
|
||||
# error (those must surface and fail the gate).
|
||||
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
|
||||
# #2263: include the Cloudflare-shaped literal `error code: 502/504` token so a
|
||||
# bare edge/gateway 502 (no "Bad Gateway" body) is retried here the same way the
|
||||
# cold-start PONG probe (line ~800) and the delegation loop (line ~1234) already
|
||||
# do. Without it, a single un-retried edge 502 right after a healthy round-trip
|
||||
# fell through to break and failed the gate on the first attempt (Platform Boot
|
||||
# job, task 268859). Bounded by the existing 6-attempt / sleep-10 loop — no new
|
||||
# sleep-as-fix; this only widens the transient-match to the sibling pattern.
|
||||
if echo "$KA_CODE" | grep -Eq '^(502|503|504)$' && echo "$KA_SAFE_BODY" | grep -Eqi 'Service Unavailable|Bad Gateway|Gateway Timeout|error code: 502|error code: 504|workspace agent unreachable|connection refused|no healthy upstream|workspace agent busy|native_session'; then
|
||||
log " known-answer A2A transient $KA_CODE attempt $KA_ATTEMPT/6: $KA_SAFE_BODY"
|
||||
if [ "$KA_ATTEMPT" -lt 6 ]; then sleep 10; continue; fi
|
||||
fi
|
||||
|
||||
@@ -0,0 +1,141 @@
|
||||
package handlers
|
||||
|
||||
// a2a_outbound_envelope_test.go — outbound A2A `message/send` envelope
|
||||
// CONTRACT gate (issue #2251).
|
||||
//
|
||||
// #2251: an outbound A2A envelope shipped without `role` and with text
|
||||
// parts keyed `type` instead of the v0.3-canonical `kind`. The receiver's
|
||||
// a-2-a-sdk v0.3 Pydantic validator silently rejected the message
|
||||
// post-dispatch — the sender saw a happy 200/202 while the brief was
|
||||
// dropped (the same invisible-rejection failure class as the v0.2→v0.3
|
||||
// content bug pinned by a2a_corpus_test.go, but on the SEND side).
|
||||
//
|
||||
// The inbound corpus replay (a2a_corpus_test.go) proves normalizeA2APayload
|
||||
// produces `parts[].kind` + a non-empty messageId, but it does NOT assert
|
||||
// `role`, and it only covers what we RECEIVE. Nothing pins what core
|
||||
// EMITS. This file pins the emit contract at the helper that builds the
|
||||
// parts (buildA2AMessageParts, used by both delegate_task and
|
||||
// delegate_task_async) and asserts the canonical Part key is `kind`.
|
||||
//
|
||||
// Part-object schema (A2A v0.3): every Part MUST carry a `kind`
|
||||
// discriminator ("text" | "file" | "data"); there is NO `type` key. A
|
||||
// text Part is {"kind":"text","text":"..."}. Emitting `type` makes the
|
||||
// v0.3 validator drop the Part.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestBuildA2AMessageParts_TextPartUsesKindNotType pins the v0.3 Part
|
||||
// discriminator for the text part emitted on every outbound A2A
|
||||
// delegation. RED before #2251's fix (the helper emitted
|
||||
// {"type":"text",...}); the receiver's v0.3 Pydantic validator drops a
|
||||
// Part keyed `type`, silently losing the task text.
|
||||
func TestBuildA2AMessageParts_TextPartUsesKindNotType(t *testing.T) {
|
||||
parts := buildA2AMessageParts("do the work", nil)
|
||||
if len(parts) == 0 {
|
||||
t.Fatal("buildA2AMessageParts returned no parts for a non-empty task")
|
||||
}
|
||||
text := parts[0]
|
||||
|
||||
if _, hasType := text["type"]; hasType {
|
||||
t.Errorf("text part uses forbidden v0.2 key `type` %v — A2A v0.3 Parts discriminate on `kind`; `type` is dropped by the receiver's validator (#2251)", text)
|
||||
}
|
||||
kind, ok := text["kind"].(string)
|
||||
if !ok {
|
||||
t.Fatalf("text part missing string `kind` discriminator; got %v", text)
|
||||
}
|
||||
if kind != "text" {
|
||||
t.Errorf("text part kind = %q, want \"text\"", kind)
|
||||
}
|
||||
if text["text"] != "do the work" {
|
||||
t.Errorf("text part text = %v, want \"do the work\"", text["text"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildA2AMessageParts_FilePartUsesKind guards the file-attachment
|
||||
// Part the same way. The file path was already correct (it used `kind`),
|
||||
// so this is a non-regression pin — it must STAY `kind` when the text
|
||||
// path is fixed (a careless "make them consistent" edit could flip both
|
||||
// to the wrong key).
|
||||
func TestBuildA2AMessageParts_FilePartUsesKind(t *testing.T) {
|
||||
atts := []AgentMessageAttachment{
|
||||
{URI: "https://example.com/a.png", MimeType: "image/png", Name: "a.png"},
|
||||
}
|
||||
parts := buildA2AMessageParts("caption", atts)
|
||||
if len(parts) < 2 {
|
||||
t.Fatalf("expected text + file parts, got %d", len(parts))
|
||||
}
|
||||
file := parts[1]
|
||||
if _, hasType := file["type"]; hasType {
|
||||
t.Errorf("file part uses forbidden `type` key: %v", file)
|
||||
}
|
||||
if _, hasKind := file["kind"]; !hasKind {
|
||||
t.Errorf("file part missing `kind` discriminator: %v", file)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDelegationOutboundEnvelope_RoleAndKind pins the FULL outbound
|
||||
// envelope contract — role + parts[].kind — on the canonical helper.
|
||||
// A v0.3 `message` MUST carry `role` ("user" for a delegation request)
|
||||
// and `parts` whose every entry discriminates on `kind`. This is the
|
||||
// shape the receiver's MessageSendParams validator accepts; an envelope
|
||||
// missing `role` or keyed `type` is silently rejected (#2251).
|
||||
//
|
||||
// Built from the same primitives delegation.go / mcp_tools.go assemble
|
||||
// (role:"user" + buildA2AMessageParts) so the round-trip through
|
||||
// json.Marshal proves the wire bytes are v0.3-valid.
|
||||
func TestDelegationOutboundEnvelope_RoleAndKind(t *testing.T) {
|
||||
envelope := map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": "deleg-1",
|
||||
"parts": buildA2AMessageParts("do the work", nil),
|
||||
},
|
||||
},
|
||||
}
|
||||
raw, err := json.Marshal(envelope)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal envelope: %v", err)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal envelope: %v", err)
|
||||
}
|
||||
|
||||
params, _ := parsed["params"].(map[string]interface{})
|
||||
if params == nil {
|
||||
t.Fatal("envelope missing params")
|
||||
}
|
||||
msg, _ := params["message"].(map[string]interface{})
|
||||
if msg == nil {
|
||||
t.Fatal("envelope missing params.message")
|
||||
}
|
||||
|
||||
// role is mandatory on a v0.3 message — the receiver rejects without it.
|
||||
role, hasRole := msg["role"].(string)
|
||||
if !hasRole || role == "" {
|
||||
t.Errorf("params.message missing non-empty `role` — v0.3 requires it; omitting it is the other half of #2251")
|
||||
}
|
||||
|
||||
parts, _ := msg["parts"].([]interface{})
|
||||
if len(parts) == 0 {
|
||||
t.Fatal("params.message.parts is empty")
|
||||
}
|
||||
for i, p := range parts {
|
||||
pm, _ := p.(map[string]interface{})
|
||||
if pm == nil {
|
||||
t.Errorf("part %d is not an object: %v", i, p)
|
||||
continue
|
||||
}
|
||||
if _, hasType := pm["type"]; hasType {
|
||||
t.Errorf("part %d uses forbidden `type` key (must be `kind`): %v", i, pm)
|
||||
}
|
||||
if _, hasKind := pm["kind"]; !hasKind {
|
||||
t.Errorf("part %d missing `kind` discriminator: %v", i, pm)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -179,8 +179,11 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) —
|
||||
// a `type`-keyed Part is dropped by the receiver's v0.3
|
||||
// validator, silently losing the delegated task.
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -192,7 +192,11 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
|
||||
// follow in the order provided, with kind derived from MIME type.
|
||||
func buildA2AMessageParts(task string, attachments []AgentMessageAttachment) []map[string]interface{} {
|
||||
parts := []map[string]interface{}{
|
||||
{"type": "text", "text": task},
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251).
|
||||
// The receiver's v0.3 Pydantic validator drops a Part keyed
|
||||
// `type`, silently losing the task text — the file part below
|
||||
// already uses `kind`, this is the matching fix for text.
|
||||
{"kind": "text", "text": task},
|
||||
}
|
||||
for _, att := range attachments {
|
||||
kind := kindFromMimeType(att.MimeType)
|
||||
|
||||
@@ -16,7 +16,7 @@ const SchemaVersion = 1
|
||||
// Fingerprint is a stable content hash of the generated projection (schema
|
||||
// version + provider catalog + runtime native sets). It changes iff the
|
||||
// registry DATA changes (comment-only YAML edits do not churn it).
|
||||
const Fingerprint = "ec6b93409e7b9cf8"
|
||||
const Fingerprint = "e457249eb0fd77a2"
|
||||
|
||||
// GenProvider is the generated projection of one provider catalog entry —
|
||||
// the subset a downstream consumer needs to derive + display a provider.
|
||||
@@ -84,8 +84,8 @@ var Runtimes = map[string][]GenRuntimeRef{
|
||||
"claude-code": {
|
||||
{Name: "anthropic-oauth", Models: []string{"sonnet", "opus", "haiku", "anthropic:sonnet", "anthropic:opus", "anthropic:haiku"}},
|
||||
{Name: "anthropic-api", Models: []string{"claude-sonnet-4-6", "claude-opus-4-7", "claude-haiku-4-5", "claude-sonnet-4-5", "anthropic:claude-sonnet-4-6", "anthropic:claude-opus-4-7", "anthropic:claude-haiku-4-5", "anthropic:claude-sonnet-4-5"}},
|
||||
{Name: "kimi-coding", Models: []string{"kimi-for-coding", "kimi-k2.5", "kimi-k2", "moonshot:kimi-k2.6", "moonshot:kimi-k2.5"}},
|
||||
{Name: "minimax", Models: []string{"MiniMax-M2", "MiniMax-M2.7", "MiniMax-M2.7-highspeed", "MiniMax-M3", "minimax:MiniMax-M2", "minimax:MiniMax-M2.7", "minimax:MiniMax-M2.7-highspeed", "minimax:MiniMax-M3"}},
|
||||
{Name: "kimi-coding", Models: []string{"kimi-for-coding", "kimi-k2.5", "kimi-k2"}},
|
||||
{Name: "minimax", Models: []string{"MiniMax-M2", "MiniMax-M2.7", "MiniMax-M2.7-highspeed", "MiniMax-M3"}},
|
||||
{Name: "platform", Models: []string{"anthropic/claude-opus-4-7", "anthropic/claude-sonnet-4-6", "moonshot/kimi-k2.6", "moonshot/kimi-k2.5", "minimax/MiniMax-M2.7", "minimax/MiniMax-M2.7-highspeed", "minimax/MiniMax-M3"}},
|
||||
{Name: "zai", Models: []string{}},
|
||||
{Name: "deepseek", Models: []string{}},
|
||||
|
||||
@@ -827,29 +827,25 @@ runtimes:
|
||||
- anthropic:claude-sonnet-4-5
|
||||
- name: kimi-coding
|
||||
# BYOK kimi-coding gateway ids — bare form is the canonical id
|
||||
# the gateway routes; the colon form `moonshot:kimi-k2.*` is the
|
||||
# legacy BYOK selection form (already in use on the openclaw
|
||||
# native set below). claude-code's adapter accepts both
|
||||
# (internal#718 P4 PR-1).
|
||||
# the gateway routes. The colon form `moonshot:kimi-k2.*` was
|
||||
# removed because claude-code's adapter cannot strip the
|
||||
# `moonshot:` prefix — it only handles `anthropic:`/`claude:`
|
||||
# (cp#521). The bare forms already cover these models.
|
||||
models:
|
||||
- kimi-for-coding
|
||||
- kimi-k2.5
|
||||
- kimi-k2
|
||||
- moonshot:kimi-k2.6
|
||||
- moonshot:kimi-k2.5
|
||||
- name: minimax
|
||||
# BYOK MiniMax ids — bare form is the canonical id; colon form is
|
||||
# the legacy BYOK selection spelling carried in the create corpus
|
||||
# and the openclaw template (internal#718 P4 PR-1).
|
||||
# BYOK MiniMax ids — bare form is the canonical id. The colon
|
||||
# forms `minimax:MiniMax-*` were removed because claude-code's
|
||||
# adapter cannot strip the `minimax:` prefix — it only handles
|
||||
# `anthropic:`/`claude:` (cp#521). The bare forms already cover
|
||||
# these models.
|
||||
models:
|
||||
- MiniMax-M2
|
||||
- MiniMax-M2.7
|
||||
- MiniMax-M2.7-highspeed
|
||||
- MiniMax-M3
|
||||
- minimax:MiniMax-M2
|
||||
- minimax:MiniMax-M2.7
|
||||
- minimax:MiniMax-M2.7-highspeed
|
||||
- minimax:MiniMax-M3
|
||||
# Platform-managed (no tenant key; Molecule owns billing). The
|
||||
# vendor/model-namespaced ids the proxy resolves to the upstream vendor.
|
||||
# Canonical for the template's `provider: platform` model entries — the
|
||||
|
||||
@@ -324,3 +324,46 @@ func TestVertexProviderRegistered(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlatformProvider_AuthEnvIsUsageTokenOnly is the SSOT-side regression
|
||||
// gate for the platform-managed auth_env drift class (issue #2250 — the
|
||||
// codex template's `platform` provider shipped
|
||||
// auth_env: [MOLECULE_LLM_USAGE_TOKEN, ANTHROPIC_API_KEY], wrongly
|
||||
// advertising a vendor key under a platform-managed provider).
|
||||
//
|
||||
// The `platform` provider is the closed Molecule proxy arm: the platform
|
||||
// owns billing and injects MOLECULE_LLM_USAGE_TOKEN, so a tenant supplies
|
||||
// NO vendor credential. Listing ANTHROPIC_API_KEY (or any other vendor key)
|
||||
// in its auth_env makes the canvas demand a credential the platform path
|
||||
// neither needs nor uses, and lets a stray vendor key satisfy the
|
||||
// "auth present" check on a path that ignores it — exactly the wrong-bill /
|
||||
// silent-no-op failure mode the BYOK-vs-platform split exists to prevent.
|
||||
//
|
||||
// EXACT-equality (not membership): the prior template-side test only
|
||||
// asserted `"MOLECULE_LLM_USAGE_TOKEN" in auth_env`, which PASSED against
|
||||
// the buggy two-element list. Pin the WHOLE set so an extra vendor key
|
||||
// trips the gate. This is the core providers.yaml SSOT; the template
|
||||
// derives from / must byte-match this set (drift-gated by molecule-ci).
|
||||
// On core this currently PASSES (auth_env is already clean; the vendor
|
||||
// key lives in the separate auth_token_env field) — the gate locks that
|
||||
// in so a future drift onto this SSOT trips CI.
|
||||
func TestPlatformProvider_AuthEnvIsUsageTokenOnly(t *testing.T) {
|
||||
ps, err := Load()
|
||||
if err != nil {
|
||||
t.Fatalf("Load() error = %v", err)
|
||||
}
|
||||
var platform *Provider
|
||||
for i := range ps {
|
||||
if ps[i].Name == "platform" {
|
||||
platform = &ps[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if platform == nil {
|
||||
t.Fatal("platform provider missing from providers.yaml — the closed proxy arm must exist")
|
||||
}
|
||||
want := []string{"MOLECULE_LLM_USAGE_TOKEN"}
|
||||
if len(platform.AuthEnv) != len(want) || platform.AuthEnv[0] != want[0] {
|
||||
t.Errorf("platform provider auth_env = %v, want exactly %v — a vendor key under a platform-managed provider is the #2250 drift; auth_token_env (the proxy's internal projection target) is a SEPARATE field and must not leak into auth_env", platform.AuthEnv, want)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,14 +117,15 @@ func TestModelsForRuntime_ExactModelIDs(t *testing.T) {
|
||||
"anthropic:claude-haiku-4-5", "anthropic:claude-sonnet-4-5",
|
||||
// anthropic via platform proxy (namespaced)
|
||||
"anthropic/claude-opus-4-7", "anthropic/claude-sonnet-4-6",
|
||||
// kimi (kimi-coding gateway, bare + legacy colon-namespaced BYOK)
|
||||
// kimi (kimi-coding gateway, bare form only — colon-forms removed
|
||||
// because claude-code's adapter cannot strip the moonshot: prefix;
|
||||
// openclaw retains them natively, cp#521).
|
||||
"kimi-for-coding", "kimi-k2.5", "kimi-k2",
|
||||
"moonshot:kimi-k2.6", "moonshot:kimi-k2.5",
|
||||
// kimi via platform proxy
|
||||
"moonshot/kimi-k2.6", "moonshot/kimi-k2.5",
|
||||
// minimax BYOK (bare + legacy colon-namespaced)
|
||||
// minimax BYOK (bare form only — colon-forms removed because
|
||||
// claude-code's adapter cannot strip the minimax: prefix, cp#521).
|
||||
"MiniMax-M2", "MiniMax-M2.7", "MiniMax-M2.7-highspeed", "MiniMax-M3",
|
||||
"minimax:MiniMax-M2", "minimax:MiniMax-M2.7", "minimax:MiniMax-M2.7-highspeed", "minimax:MiniMax-M3",
|
||||
// minimax via platform proxy
|
||||
"minimax/MiniMax-M2.7", "minimax/MiniMax-M2.7-highspeed", "minimax/MiniMax-M3",
|
||||
},
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
// canonicalProvidersYAMLSHA256 is the sha256 of the canonical providers.yaml as
|
||||
// synced from molecule-controlplane. Bumped deliberately on each re-sync (see
|
||||
// file doc). Cross-checked live by the sync-providers-yaml CI workflow.
|
||||
const canonicalProvidersYAMLSHA256 = "846ddef11ec423ebf2e96b5da21bd89129dbc3f0a2d14ac086940e005c079387"
|
||||
const canonicalProvidersYAMLSHA256 = "9eb6f97fc37b528c91936be4a75dd87f6c7172742b4535d76b9bb2231ee18e80"
|
||||
|
||||
func TestSyncedYAMLMatchesCanonicalSHA(t *testing.T) {
|
||||
sum := sha256.Sum256(embeddedYAML)
|
||||
|
||||
@@ -36,11 +36,22 @@ package registry
|
||||
// runtime <> 'external'. Paused/hibernated/removed/provisioning/
|
||||
// awaiting_agent rows are out of scope; external rows are covered by
|
||||
// the remote-heartbeat pass.
|
||||
// - Per-cycle row cap + per-workspace timeout so one slow CP call can't
|
||||
// stall the sweep.
|
||||
// - Per-cycle row cap + per-cycle deadline + per-workspace timeout so
|
||||
// one slow CP call (or a degraded-but-not-erroring CP) can't stall
|
||||
// the sweep.
|
||||
// - TOCTOU re-confirm before any flip: IsRunning resolves instance_id
|
||||
// independently, so a row whose instance_id was cleared/NULLed (by a
|
||||
// concurrent delete, the CP-orphan-sweeper, or a reprovision) between
|
||||
// the reconciler's SELECT and the IsRunning probe yields a STALE
|
||||
// (false, nil) that does NOT prove the EC2 is dead. We re-read the
|
||||
// row's current (status, instance_id) and flip ONLY when the SAME
|
||||
// non-empty instance we asked CP about is still the workspace's
|
||||
// recorded instance AND it's still online/degraded. Mirrors the
|
||||
// guarded-write re-confirm in healthsweep.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
@@ -70,6 +81,20 @@ const CPInstanceReconcileLimit = 200
|
||||
// timeout context derived from the cycle context.
|
||||
const cpInstanceCheckTimeout = 10 * time.Second
|
||||
|
||||
// cpInstanceCycleDeadline bounds the wall-time of one whole reconcile
|
||||
// pass. With per-workspace 10s timeouts and a 200-row cap, a degraded-
|
||||
// but-not-erroring CP (each IsRunning slow but under the per-workspace
|
||||
// cap) could otherwise drag one cycle out for tens of minutes and starve
|
||||
// the next tick. Mirrors cp_orphan_sweeper's orphanSweepDeadline; chosen
|
||||
// under the 60s interval so a stuck cycle is abandoned before the next
|
||||
// one is due and the backlog drains across subsequent cycles.
|
||||
const cpInstanceCycleDeadline = 45 * time.Second
|
||||
|
||||
// cpInstanceReconfirmTimeout bounds the TOCTOU re-confirm read. This is a
|
||||
// single indexed primary-key lookup, so it should never be slow; a tight
|
||||
// timeout keeps the re-confirm from itself becoming a stall point.
|
||||
const cpInstanceReconfirmTimeout = 5 * time.Second
|
||||
|
||||
// StartCPInstanceReconciler runs the authoritative EC2-state reconcile
|
||||
// loop until ctx is cancelled. A nil checker makes the loop a no-op
|
||||
// (matches the nil-tolerant pattern of the sibling CP sweeper).
|
||||
@@ -106,21 +131,41 @@ func StartCPInstanceReconciler(ctx context.Context, checker InstanceRunningCheck
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileRow pairs a workspace id with the instance_id captured in the
|
||||
// SAME SELECT, so the TOCTOU re-confirm can verify CP's (false, nil)
|
||||
// answer is about the instance the row still records — not one cleared
|
||||
// out from under us between the SELECT and the IsRunning probe.
|
||||
type reconcileRow struct {
|
||||
id string
|
||||
instanceID string
|
||||
}
|
||||
|
||||
// reconcileOnce executes one reconcile pass. Defensive against db.DB
|
||||
// being nil so a misconfigured boot doesn't panic.
|
||||
//
|
||||
// Scope: online + SaaS-EC2 workspaces only. runtime='external' rows are
|
||||
// excluded (covered by the remote-heartbeat pass); paused/hibernated/
|
||||
// removed/provisioning/awaiting_agent are excluded by the status filter.
|
||||
func reconcileOnce(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler) {
|
||||
// Scope: online/degraded + SaaS-EC2 workspaces only. runtime='external'
|
||||
// rows are excluded (covered by the remote-heartbeat pass); paused/
|
||||
// hibernated/removed/provisioning/awaiting_agent are excluded by the
|
||||
// status filter. `degraded` is included because a SaaS workspace whose
|
||||
// heartbeat handler flipped it degraded then lost its EC2 falls through
|
||||
// every other sweep (matches healthsweep's `status IN ('online',
|
||||
// 'degraded')`).
|
||||
func reconcileOnce(parent context.Context, checker InstanceRunningChecker, onOffline OfflineHandler) {
|
||||
if db.DB == nil {
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id::text
|
||||
// Per-cycle deadline so a degraded-but-not-erroring CP (each IsRunning
|
||||
// slow but under the per-workspace cap) can't drag one cycle out for
|
||||
// tens of minutes and starve the next tick. Per-workspace IsRunning
|
||||
// timeouts derive from this cycle context.
|
||||
cycleCtx, cancelCycle := context.WithTimeout(parent, cpInstanceCycleDeadline)
|
||||
defer cancelCycle()
|
||||
|
||||
rows, err := db.DB.QueryContext(cycleCtx, `
|
||||
SELECT id::text, instance_id
|
||||
FROM workspaces
|
||||
WHERE status = 'online'
|
||||
WHERE status IN ('online', 'degraded')
|
||||
AND instance_id IS NOT NULL
|
||||
AND instance_id != ''
|
||||
AND COALESCE(runtime, '') <> 'external'
|
||||
@@ -133,46 +178,130 @@ func reconcileOnce(ctx context.Context, checker InstanceRunningChecker, onOfflin
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ids []string
|
||||
var candidates []reconcileRow
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if scanErr := rows.Scan(&id); scanErr != nil {
|
||||
var r reconcileRow
|
||||
if scanErr := rows.Scan(&r.id, &r.instanceID); scanErr != nil {
|
||||
log.Printf("cp-instance-reconciler: row scan failed: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
ids = append(ids, id)
|
||||
candidates = append(candidates, r)
|
||||
}
|
||||
if iterErr := rows.Err(); iterErr != nil {
|
||||
log.Printf("cp-instance-reconciler: rows iteration failed: %v", iterErr)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
processed, skipped := 0, 0
|
||||
for _, c := range candidates {
|
||||
// Abandon the cycle if we've blown the per-cycle deadline; the
|
||||
// next tick re-reads from the top (ORDER BY updated_at DESC) and
|
||||
// drains the backlog. Without this a slow CP could keep one cycle
|
||||
// running past its interval and never let a fresh one start.
|
||||
if cycleCtx.Err() != nil {
|
||||
log.Printf("cp-instance-reconciler: cycle deadline reached — processed %d, %d skipped (TOCTOU/changed), remaining deferred to next cycle", processed, skipped)
|
||||
return
|
||||
}
|
||||
processed++
|
||||
|
||||
// Per-workspace timeout so one slow CP round-trip can't stall
|
||||
// the whole sweep.
|
||||
checkCtx, cancel := context.WithTimeout(ctx, cpInstanceCheckTimeout)
|
||||
running, checkErr := checker.IsRunning(checkCtx, id)
|
||||
// the whole sweep. Derived from cycleCtx so the cycle deadline
|
||||
// always dominates.
|
||||
checkCtx, cancel := context.WithTimeout(cycleCtx, cpInstanceCheckTimeout)
|
||||
running, checkErr := checker.IsRunning(checkCtx, c.id)
|
||||
cancel()
|
||||
|
||||
if checkErr != nil {
|
||||
// FAIL-SAFE: transient DB/transport error (or a no-backend
|
||||
// signal). IsRunning returns (true, err) on these, so never
|
||||
// flip — leave the row online and retry next cycle.
|
||||
log.Printf("cp-instance-reconciler: IsRunning(%s) errored, leaving online (fail-safe): %v", id, checkErr)
|
||||
log.Printf("cp-instance-reconciler: IsRunning(%s) errored, leaving online (fail-safe): %v", c.id, checkErr)
|
||||
continue
|
||||
}
|
||||
if running {
|
||||
continue
|
||||
}
|
||||
|
||||
// CLEAN "not running" — CP authoritatively reports the EC2 is
|
||||
// terminated/stopped/absent. Feed it into the existing offline +
|
||||
// (false, nil) is NOT yet proof the EC2 is dead. IsRunning
|
||||
// resolves instance_id independently (resolveInstanceID); if the
|
||||
// row's instance_id was cleared/NULLed (concurrent delete, the
|
||||
// CP-orphan-sweeper NULLing it, a reprovision) or the row moved
|
||||
// off online/degraded between our SELECT and this probe,
|
||||
// IsRunning returns a STALE (false, nil) that reflects a missing
|
||||
// instance_id, NOT a confirmed-terminated EC2. Re-confirm against
|
||||
// the row's CURRENT state and flip ONLY when the SAME non-empty
|
||||
// instance we asked CP about is still recorded AND the row is
|
||||
// still online/degraded. Mirrors healthsweep's guarded write.
|
||||
if !reconfirmStillOfflineCandidate(cycleCtx, c) {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// CONFIRMED "not running" — CP authoritatively reports the EC2 is
|
||||
// terminated/stopped/absent AND the row still records that exact
|
||||
// instance as online/degraded. Feed it into the existing offline +
|
||||
// auto-heal machinery: onOffline flips the row offline and
|
||||
// triggers RestartByID, which reprovisions with the existing
|
||||
// volume.
|
||||
log.Printf("cp-instance-reconciler: workspace %s is status=online but its EC2 is not running (terminated/stopped) — flipping offline + triggering reprovision", id)
|
||||
log.Printf("cp-instance-reconciler: workspace %s (instance %s) is online/degraded but its EC2 is not running (terminated/stopped) — flipping offline + triggering reprovision", c.id, c.instanceID)
|
||||
if onOffline != nil {
|
||||
onOffline(ctx, id)
|
||||
onOffline(cycleCtx, c.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reconfirmStillOfflineCandidate re-reads the workspace's CURRENT
|
||||
// (status, instance_id) and reports whether it is STILL a valid offline
|
||||
// candidate for the instance we just probed. It returns true ONLY when:
|
||||
//
|
||||
// - the row still exists, AND
|
||||
// - current status IN ('online','degraded'), AND
|
||||
// - current instance_id is non-empty, AND
|
||||
// - current instance_id == the instance_id captured in the original
|
||||
// SELECT (the one whose liveness CP just answered about).
|
||||
//
|
||||
// Any other outcome (row gone, status moved off online/degraded,
|
||||
// instance_id cleared or now points at a different instance) means the
|
||||
// IsRunning (false, nil) was a stale/cleared-instance snapshot rather
|
||||
// than a confirmed-terminated EC2 — return false so the caller skips the
|
||||
// flip. A DB error during re-confirm is treated as "not confirmed"
|
||||
// (false): fail-safe toward NOT flipping a workspace we can't re-verify.
|
||||
func reconfirmStillOfflineCandidate(parent context.Context, c reconcileRow) bool {
|
||||
if db.DB == nil {
|
||||
return false
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(parent, cpInstanceReconfirmTimeout)
|
||||
defer cancel()
|
||||
|
||||
var curStatus, curInstanceID string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT status, COALESCE(instance_id, '')
|
||||
FROM workspaces
|
||||
WHERE id = $1
|
||||
`, c.id).Scan(&curStatus, &curInstanceID)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// Row deleted between SELECT and re-confirm — definitely not a
|
||||
// terminated-EC2 signal. Skip.
|
||||
log.Printf("cp-instance-reconciler: re-confirm %s: row gone — skipping flip (stale snapshot, not a dead EC2)", c.id)
|
||||
return false
|
||||
}
|
||||
// Transient DB error — fail-safe toward NOT flipping.
|
||||
log.Printf("cp-instance-reconciler: re-confirm %s errored, skipping flip (fail-safe): %v", c.id, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if curStatus != "online" && curStatus != "degraded" {
|
||||
log.Printf("cp-instance-reconciler: re-confirm %s: status moved to %q since SELECT — skipping flip", c.id, curStatus)
|
||||
return false
|
||||
}
|
||||
if curInstanceID == "" {
|
||||
log.Printf("cp-instance-reconciler: re-confirm %s: instance_id cleared since SELECT — skipping flip (CP answered about a now-detached instance)", c.id)
|
||||
return false
|
||||
}
|
||||
if curInstanceID != c.instanceID {
|
||||
log.Printf("cp-instance-reconciler: re-confirm %s: instance_id changed %s -> %s since SELECT (reprovision) — skipping flip", c.id, c.instanceID, curInstanceID)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -63,16 +63,48 @@ func (r *recordingOffline) got() []string {
|
||||
}
|
||||
|
||||
// expectReconcileQuery registers the reconciler's SELECT, pinning the
|
||||
// scope-critical predicates: status='online', instance_id present, and
|
||||
// runtime <> 'external'. A future widening that drops any of these (e.g.
|
||||
// sweeping paused rows, or external rows the heartbeat pass owns) fails
|
||||
// every test that uses this helper.
|
||||
// scope-critical predicates: status IN ('online','degraded'), instance_id
|
||||
// present (captured as a column for the TOCTOU re-confirm), and runtime
|
||||
// <> 'external'. A future widening that drops any of these (e.g. sweeping
|
||||
// paused rows, or external rows the heartbeat pass owns), or that drops
|
||||
// the instance_id column the re-confirm depends on, fails every test that
|
||||
// uses this helper.
|
||||
func expectReconcileQuery(mock sqlmock.Sqlmock, rows *sqlmock.Rows) {
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'online'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+AND COALESCE\(runtime, ''\) <> 'external'\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text, instance_id\s+FROM workspaces\s+WHERE status IN \('online', 'degraded'\)\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+AND COALESCE\(runtime, ''\) <> 'external'\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
|
||||
WithArgs(CPInstanceReconcileLimit).
|
||||
WillReturnRows(rows)
|
||||
}
|
||||
|
||||
// reconcileRows builds the two-column (id, instance_id) result the
|
||||
// reconciler's SELECT now returns. Pass id/instance_id pairs.
|
||||
func reconcileRows(pairs ...[2]string) *sqlmock.Rows {
|
||||
r := sqlmock.NewRows([]string{"id", "instance_id"})
|
||||
for _, p := range pairs {
|
||||
r.AddRow(p[0], p[1])
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// expectReconfirm registers the TOCTOU re-confirm primary-key lookup for
|
||||
// workspace id `wsID`, returning the row's CURRENT (status, instance_id).
|
||||
// This is what the reconciler re-reads after IsRunning returns (false,
|
||||
// nil), before it flips: it only flips when the SAME non-empty instance
|
||||
// is still recorded AND status is still online/degraded.
|
||||
func expectReconfirm(mock sqlmock.Sqlmock, wsID, curStatus, curInstanceID string) {
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT status, COALESCE\(instance_id, ''\)\s+FROM workspaces\s+WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status", "instance_id"}).AddRow(curStatus, curInstanceID))
|
||||
}
|
||||
|
||||
// expectReconfirmNoRows registers a re-confirm lookup that finds the row
|
||||
// gone (deleted between SELECT and re-confirm) — the reconciler must
|
||||
// treat this as "not a dead EC2" and skip the flip.
|
||||
func expectReconfirmNoRows(mock sqlmock.Sqlmock, wsID string) {
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT status, COALESCE\(instance_id, ''\)\s+FROM workspaces\s+WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status", "instance_id"}))
|
||||
}
|
||||
|
||||
// TestReconcileOnce_NotRunning_FlipsOffline — the core bug (core#2247):
|
||||
// an online SaaS workspace whose EC2 is terminated. CP reports a CLEAN
|
||||
// (false, nil); onOffline MUST be called with that id so the existing
|
||||
@@ -82,7 +114,10 @@ func TestReconcileOnce_NotRunning_FlipsOffline(t *testing.T) {
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-dead": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-dead"))
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-dead", "i-dead"}))
|
||||
// (false,nil) → re-confirm: row still online with the SAME instance →
|
||||
// confirmed-dead → flip.
|
||||
expectReconfirm(mock, "ws-dead", "online", "i-dead")
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
@@ -102,7 +137,8 @@ func TestReconcileOnce_Running_DoesNotFlip(t *testing.T) {
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-alive": true}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-alive"))
|
||||
// Running → no re-confirm, no flip.
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-alive", "i-alive"}))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
@@ -126,7 +162,9 @@ func TestReconcileOnce_TransientError_DoesNotFlip(t *testing.T) {
|
||||
}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-blip"))
|
||||
// (true,err) short-circuits BEFORE the re-confirm — no re-confirm query
|
||||
// is registered, so a stray re-confirm would fail ExpectationsWereMet.
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-blip", "i-blip"}))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
@@ -143,19 +181,20 @@ func TestReconcileOnce_TransientError_DoesNotFlip(t *testing.T) {
|
||||
|
||||
// TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline — pins the
|
||||
// SELECT predicate. The regex in expectReconcileQuery requires
|
||||
// status='online' AND runtime <> 'external'; if a future edit widens the
|
||||
// scope to include paused/hibernated/removed rows or external rows (owned
|
||||
// by the heartbeat pass), this query no longer matches and sqlmock fails
|
||||
// the test. With the predicate intact, a DB that has only out-of-scope
|
||||
// rows returns empty → no IsRunning, no flip.
|
||||
// status IN ('online','degraded') AND runtime <> 'external'; if a future
|
||||
// edit widens the scope to include paused/hibernated/removed rows or
|
||||
// external rows (owned by the heartbeat pass), or narrows it back to drop
|
||||
// 'degraded', this query no longer matches and sqlmock fails the test.
|
||||
// With the predicate intact, a DB that has only out-of-scope rows returns
|
||||
// empty → no IsRunning, no flip.
|
||||
func TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
|
||||
// The predicate filters out external + non-online rows server-side,
|
||||
// modelled as the empty result those filters produce.
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
|
||||
// The predicate filters out external + out-of-scope-status rows
|
||||
// server-side, modelled as the empty result those filters produce.
|
||||
expectReconcileQuery(mock, reconcileRows())
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
@@ -180,10 +219,13 @@ func TestReconcileOnce_MixedBatch(t *testing.T) {
|
||||
}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-dead").
|
||||
AddRow("ws-alive").
|
||||
AddRow("ws-blip"))
|
||||
expectReconcileQuery(mock, reconcileRows(
|
||||
[2]string{"ws-dead", "i-dead"},
|
||||
[2]string{"ws-alive", "i-alive"},
|
||||
[2]string{"ws-blip", "i-blip"},
|
||||
))
|
||||
// Only ws-dead reaches the re-confirm ((false,nil)); it confirms.
|
||||
expectReconfirm(mock, "ws-dead", "online", "i-dead")
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
@@ -195,6 +237,147 @@ func TestReconcileOnce_MixedBatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_TOCTOU_InstanceChanged_DoesNotFlip — the HIGH-1
|
||||
// regression guard. IsRunning returns a CLEAN (false, nil), but between
|
||||
// the reconciler's SELECT and the probe the row's instance_id changed
|
||||
// (reprovision attached a fresh EC2). IsRunning's independent
|
||||
// resolveInstanceID is the reason the (false,nil) is stale: it may have
|
||||
// resolved an empty/old instance. The re-confirm sees a DIFFERENT
|
||||
// instance_id and MUST skip — flipping here would knock out a workspace
|
||||
// whose NEW EC2 is not proven dead and fire RestartByID on a just-
|
||||
// reprovisioned row.
|
||||
func TestReconcileOnce_TOCTOU_InstanceChanged_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-race": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-race", "i-old"}))
|
||||
// Re-confirm: row is still online but now points at a DIFFERENT
|
||||
// instance (reprovisioned) → the (false,nil) was about i-old which is
|
||||
// no longer attached → skip.
|
||||
expectReconfirm(mock, "ws-race", "online", "i-new")
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("TOCTOU guard violated: instance_id changed since SELECT must NOT flip, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_TOCTOU_InstanceCleared_DoesNotFlip — same HIGH-1
|
||||
// guard, the instance_id-NULLed variant (CP-orphan-sweeper or a delete
|
||||
// cleared it). Re-confirm sees an empty instance_id → skip.
|
||||
func TestReconcileOnce_TOCTOU_InstanceCleared_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-cleared": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-cleared", "i-gone"}))
|
||||
expectReconfirm(mock, "ws-cleared", "online", "") // instance_id cleared
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("TOCTOU guard violated: cleared instance_id must NOT flip, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_TOCTOU_StatusMoved_DoesNotFlip — same HIGH-1 guard,
|
||||
// the status-moved variant. The row left online/degraded (e.g. paused or
|
||||
// removed) between SELECT and re-confirm → skip.
|
||||
func TestReconcileOnce_TOCTOU_StatusMoved_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-paused": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-paused", "i-keep"}))
|
||||
expectReconfirm(mock, "ws-paused", "paused", "i-keep") // status moved out of scope
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("TOCTOU guard violated: row no longer online/degraded must NOT flip, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_TOCTOU_RowGone_DoesNotFlip — same HIGH-1 guard, the
|
||||
// row-deleted variant. The re-confirm finds no row (concurrent delete) →
|
||||
// skip; a stale (false,nil) about a just-deleted row must never fire
|
||||
// onOffline/RestartByID.
|
||||
func TestReconcileOnce_TOCTOU_RowGone_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-deleted": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-deleted", "i-x"}))
|
||||
expectReconfirmNoRows(mock, "ws-deleted") // row gone
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("TOCTOU guard violated: deleted row must NOT flip, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_Degraded_FlipsOffline — MED-3 scope. A `degraded`
|
||||
// SaaS workspace whose EC2 is gone is otherwise covered by NO sweep. It's
|
||||
// in scope (the SELECT regex requires status IN ('online','degraded')),
|
||||
// CP reports (false,nil), the re-confirm shows it STILL degraded with the
|
||||
// SAME instance → flip.
|
||||
func TestReconcileOnce_Degraded_FlipsOffline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-degraded": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-degraded", "i-deg"}))
|
||||
expectReconfirm(mock, "ws-degraded", "degraded", "i-deg")
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 1 || got[0] != "ws-degraded" {
|
||||
t.Fatalf("expected onOffline(ws-degraded), got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconfirm_DBError_DoesNotFlip — re-confirm fail-safe. If the
|
||||
// re-confirm read itself errors (transient DB blip), we treat it as "not
|
||||
// confirmed" and skip the flip rather than acting on an unverifiable
|
||||
// (false,nil).
|
||||
func TestReconcileOnce_ReconfirmDBError_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-x": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, reconcileRows([2]string{"ws-x", "i-x"}))
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT status, COALESCE\(instance_id, ''\)\s+FROM workspaces\s+WHERE id = \$1`).
|
||||
WithArgs("ws-x").
|
||||
WillReturnError(errors.New("connection reset"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("re-confirm DB error must fail-safe (no flip), got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_QueryError — DB transient failure. Reconcile returns
|
||||
// without panicking and never probes IsRunning or flips anything.
|
||||
func TestReconcileOnce_QueryError(t *testing.T) {
|
||||
@@ -202,7 +385,7 @@ func TestReconcileOnce_QueryError(t *testing.T) {
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text, instance_id\s+FROM workspaces`).
|
||||
WithArgs(CPInstanceReconcileLimit).
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user