forked from molecule-ai/molecule-core
Merge branch 'staging' into fix/ki005-security-clean
This commit is contained in:
commit
255fd3c192
@ -104,6 +104,13 @@ interface RuntimeOption {
|
||||
// Fallback used when /templates can't be fetched (offline, older backend).
|
||||
// Keep in sync with manifest.json workspace_templates as a defensive default.
|
||||
// Model + env suggestions only flow when the backend is reachable.
|
||||
// Runtimes that manage their own config outside the platform's config.yaml
|
||||
// template. For these, a missing config.yaml is expected — the user manages
|
||||
// config via the runtime's own mechanism (e.g. hermes edits
|
||||
// ~/.hermes/config.yaml on the workspace EC2 via the Terminal tab or its
|
||||
// own CLI). Showing a "No config.yaml found" error for these is misleading.
|
||||
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["hermes", "external"]);
|
||||
|
||||
const FALLBACK_RUNTIME_OPTIONS: RuntimeOption[] = [
|
||||
{ value: "", label: "LangGraph (default)", models: [] },
|
||||
{ value: "claude-code", label: "Claude Code", models: [] },
|
||||
@ -134,14 +141,50 @@ export function ConfigTab({ workspaceId }: Props) {
|
||||
const loadConfig = useCallback(async () => {
|
||||
setLoading(true);
|
||||
setError(null);
|
||||
|
||||
// ALWAYS load workspace metadata first (runtime + model). These are the
|
||||
// source of truth regardless of whether the runtime uses our config.yaml
|
||||
// template. Without this the form falls back to empty/default values on
|
||||
// a hermes workspace (which doesn't use our template), creating the
|
||||
// appearance that the saved runtime is unset — and worse, clicking Save
|
||||
// would silently flip `runtime` from `hermes` back to the dropdown
|
||||
// default `LangGraph`. See GH #1894.
|
||||
let wsMetadataRuntime = "";
|
||||
let wsMetadataModel = "";
|
||||
try {
|
||||
const ws = await api.get<{ runtime?: string }>(`/workspaces/${workspaceId}`);
|
||||
wsMetadataRuntime = (ws.runtime || "").trim();
|
||||
} catch { /* fall back to config.yaml */ }
|
||||
try {
|
||||
const m = await api.get<{ model?: string }>(`/workspaces/${workspaceId}/model`);
|
||||
wsMetadataModel = (m.model || "").trim();
|
||||
} catch { /* non-fatal */ }
|
||||
|
||||
try {
|
||||
const res = await api.get<{ content: string }>(`/workspaces/${workspaceId}/files/config.yaml`);
|
||||
const parsed = parseYaml(res.content);
|
||||
setOriginalYaml(res.content);
|
||||
setRawDraft(res.content);
|
||||
setConfig({ ...DEFAULT_CONFIG, ...parsed } as ConfigData);
|
||||
// Merge: config.yaml wins for fields it declares, but workspace metadata
|
||||
// wins for runtime + model when config.yaml doesn't set them.
|
||||
const merged = { ...DEFAULT_CONFIG, ...parsed } as ConfigData;
|
||||
if (!merged.runtime && wsMetadataRuntime) merged.runtime = wsMetadataRuntime;
|
||||
if (!merged.model && wsMetadataModel) merged.model = wsMetadataModel;
|
||||
setConfig(merged);
|
||||
} catch {
|
||||
setError("No config.yaml found");
|
||||
// No platform-managed config.yaml. Some runtimes (hermes, external)
|
||||
// manage their own config outside this template; that's expected, not
|
||||
// an error. Populate the form from workspace metadata so the user
|
||||
// still sees the saved runtime + model.
|
||||
const runtimeManagesOwnConfig = RUNTIMES_WITH_OWN_CONFIG.has(wsMetadataRuntime);
|
||||
if (!runtimeManagesOwnConfig) {
|
||||
setError("No config.yaml found");
|
||||
}
|
||||
setConfig({
|
||||
...DEFAULT_CONFIG,
|
||||
runtime: wsMetadataRuntime,
|
||||
model: wsMetadataModel,
|
||||
} as ConfigData);
|
||||
} finally {
|
||||
setLoading(false);
|
||||
}
|
||||
@ -511,6 +554,13 @@ export function ConfigTab({ workspaceId }: Props) {
|
||||
{error && (
|
||||
<div className="mx-3 mb-2 px-3 py-1.5 bg-red-900/30 border border-red-800 rounded text-xs text-red-400">{error}</div>
|
||||
)}
|
||||
{!error && RUNTIMES_WITH_OWN_CONFIG.has(config.runtime || "") && (
|
||||
<div className="mx-3 mb-2 px-3 py-1.5 bg-zinc-900/50 border border-zinc-700 rounded text-xs text-zinc-400">
|
||||
{config.runtime === "hermes"
|
||||
? "Hermes manages its own config at ~/.hermes/config.yaml on the workspace host. Edit it via the Terminal tab or the hermes CLI, not this form."
|
||||
: "This runtime manages its own config outside the platform template."}
|
||||
</div>
|
||||
)}
|
||||
{success && (
|
||||
<div className="mx-3 mb-2 px-3 py-1.5 bg-green-900/30 border border-green-800 rounded text-xs text-green-400">Saved</div>
|
||||
)}
|
||||
|
||||
@ -56,7 +56,35 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
// Busy with a Retry-After hint so callers can distinguish this
|
||||
// from a real unreachable-agent (502) and retry with backoff.
|
||||
// Issue #110.
|
||||
//
|
||||
// #1870 Phase 1: before returning 503, enqueue the request for drain
|
||||
// on next heartbeat. Returning 202 Accepted {queued:true} as a SUCCESS
|
||||
// (not an error) means callers record this as "dispatched — queued"
|
||||
// not "failed", eliminating the fan-out-storm drop pattern.
|
||||
//
|
||||
// Critical: must return (status, body, NIL ERROR) so the caller's
|
||||
// `if proxyErr != nil` branch doesn't fire. Returning a proxyA2AError
|
||||
// with 202 status here was the original cycle 53 bug — callers saw
|
||||
// proxyErr != nil and logged "delegation failed: proxy a2a error".
|
||||
if isUpstreamBusyError(err) {
|
||||
idempotencyKey := extractIdempotencyKey(body)
|
||||
if qid, depth, qerr := EnqueueA2A(
|
||||
ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey,
|
||||
); qerr == nil {
|
||||
log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth)
|
||||
respBody, _ := json.Marshal(gin.H{
|
||||
"queued": true,
|
||||
"queue_id": qid,
|
||||
"queue_depth": depth,
|
||||
"message": "workspace agent busy — request queued, will dispatch when capacity available",
|
||||
})
|
||||
return http.StatusAccepted, respBody, nil
|
||||
} else {
|
||||
// Queue insert failed — fall through to legacy 503 behavior
|
||||
// so callers still retry. We don't want a queue DB hiccup to
|
||||
// make delegation silently disappear.
|
||||
log.Printf("ProxyA2A: enqueue for %s failed (%v) — falling back to 503", workspaceID, qerr)
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
|
||||
270
workspace-server/internal/handlers/a2a_queue.go
Normal file
270
workspace-server/internal/handlers/a2a_queue.go
Normal file
@ -0,0 +1,270 @@
|
||||
package handlers
|
||||
|
||||
// a2a_queue.go — #1870 Phase 1: enqueue A2A requests whose target is busy,
|
||||
// drain the queue on heartbeat when the target regains capacity.
|
||||
//
|
||||
// Three levels are declared here so Phase 2/3 can land without a migration:
|
||||
// - PriorityCritical = 100 — preempts running task (Phase 3, not active yet)
|
||||
// - PriorityTask = 50 — default, FIFO within priority (Phase 1, active)
|
||||
// - PriorityInfo = 10 — best-effort with TTL (Phase 2, not active yet)
|
||||
//
|
||||
// Phase 1 writes only PriorityTask. The `priority` column tolerates all three.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// extractIdempotencyKey pulls params.message.messageId out of an A2A JSON-RPC
|
||||
// body (normalizeA2APayload guarantees this field is set before dispatch).
|
||||
// Empty string on parse failure — callers treat that as "no idempotency".
|
||||
func extractIdempotencyKey(body []byte) string {
|
||||
var envelope struct {
|
||||
Params struct {
|
||||
Message struct {
|
||||
MessageID string `json:"messageId"`
|
||||
} `json:"message"`
|
||||
} `json:"params"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return ""
|
||||
}
|
||||
return envelope.Params.Message.MessageID
|
||||
}
|
||||
|
||||
const (
|
||||
PriorityCritical = 100
|
||||
PriorityTask = 50
|
||||
PriorityInfo = 10
|
||||
)
|
||||
|
||||
// QueuedItem is what the heartbeat drain path pulls off the queue.
|
||||
type QueuedItem struct {
|
||||
ID string
|
||||
WorkspaceID string
|
||||
CallerID sql.NullString
|
||||
Priority int
|
||||
Body []byte
|
||||
Method sql.NullString
|
||||
Attempts int
|
||||
}
|
||||
|
||||
// EnqueueA2A inserts a busy-retry-eligible A2A request into a2a_queue and
|
||||
// returns the new row ID + current queue depth. Caller MUST have already
|
||||
// determined the target is busy — this function does not check.
|
||||
//
|
||||
// Idempotency: when idempotencyKey is non-empty, the partial unique index
|
||||
// `idx_a2a_queue_idempotency` prevents duplicate active rows for the same
|
||||
// (workspace_id, idempotency_key). On conflict this returns the existing
|
||||
// row's ID so the caller's log still points at the live queue entry.
|
||||
func EnqueueA2A(
|
||||
ctx context.Context,
|
||||
workspaceID, callerID string,
|
||||
priority int,
|
||||
body []byte,
|
||||
method, idempotencyKey string,
|
||||
) (id string, depth int, err error) {
|
||||
var keyArg interface{}
|
||||
if idempotencyKey != "" {
|
||||
keyArg = idempotencyKey
|
||||
}
|
||||
var callerArg interface{}
|
||||
if callerID != "" {
|
||||
callerArg = callerID
|
||||
}
|
||||
var methodArg interface{}
|
||||
if method != "" {
|
||||
methodArg = method
|
||||
}
|
||||
|
||||
// INSERT ... ON CONFLICT DO NOTHING RETURNING id. The conflict target
|
||||
// must reference the partial unique INDEX columns + WHERE clause directly
|
||||
// (Postgres can't reference partial unique indexes by name in
|
||||
// ON CONFLICT — only true CONSTRAINTs work for that). On conflict we
|
||||
// then look up the existing row's id so the caller always receives a
|
||||
// valid queue entry reference.
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, method, idempotency_key)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5, $6)
|
||||
ON CONFLICT (workspace_id, idempotency_key)
|
||||
WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched')
|
||||
DO NOTHING
|
||||
RETURNING id
|
||||
`, workspaceID, callerArg, priority, string(body), methodArg, keyArg).Scan(&id)
|
||||
|
||||
if errors.Is(err, sql.ErrNoRows) && idempotencyKey != "" {
|
||||
// Conflict — look up the existing active row and use its id.
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
SELECT id FROM a2a_queue
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
AND status IN ('queued','dispatched')
|
||||
LIMIT 1
|
||||
`, workspaceID, idempotencyKey).Scan(&id)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
} else if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
// Return current queue depth for the caller's visibility.
|
||||
_ = db.DB.QueryRowContext(ctx, `
|
||||
SELECT COUNT(*) FROM a2a_queue
|
||||
WHERE workspace_id = $1 AND status = 'queued'
|
||||
`, workspaceID).Scan(&depth)
|
||||
|
||||
log.Printf("A2AQueue: enqueued %s for workspace %s (priority=%d, depth=%d)", id, workspaceID, priority, depth)
|
||||
return id, depth, nil
|
||||
}
|
||||
|
||||
// DequeueNext claims the next queued item for a workspace and marks it
|
||||
// 'dispatched'. Uses SELECT ... FOR UPDATE SKIP LOCKED so two concurrent
|
||||
// drain calls don't both claim the same row.
|
||||
//
|
||||
// Returns (nil, nil) when the queue is empty — not an error.
|
||||
func DequeueNext(ctx context.Context, workspaceID string) (*QueuedItem, error) {
|
||||
tx, err := db.DB.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
var item QueuedItem
|
||||
var body string
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
SELECT id, workspace_id, caller_id, priority, body::text, method, attempts
|
||||
FROM a2a_queue
|
||||
WHERE workspace_id = $1 AND status = 'queued'
|
||||
AND (expires_at IS NULL OR expires_at > now())
|
||||
ORDER BY priority DESC, enqueued_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
`, workspaceID).Scan(
|
||||
&item.ID, &item.WorkspaceID, &item.CallerID, &item.Priority,
|
||||
&body, &item.Method, &item.Attempts,
|
||||
)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item.Body = []byte(body)
|
||||
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
UPDATE a2a_queue
|
||||
SET status = 'dispatched', dispatched_at = now(), attempts = attempts + 1
|
||||
WHERE id = $1
|
||||
`, item.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
// MarkQueueItemCompleted flips the queue row to 'completed' on a successful
|
||||
// drain dispatch.
|
||||
func MarkQueueItemCompleted(ctx context.Context, id string) {
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE a2a_queue SET status = 'completed', completed_at = now() WHERE id = $1`, id,
|
||||
); err != nil {
|
||||
log.Printf("A2AQueue: failed to mark %s completed: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// MarkQueueItemFailed returns a dispatched item back to 'queued' with an
|
||||
// incremented attempts counter so the next drain tick picks it up. Hits
|
||||
// an upper bound (5 attempts) to avoid wedging a stuck item in the queue
|
||||
// forever.
|
||||
func MarkQueueItemFailed(ctx context.Context, id, errMsg string) {
|
||||
const maxAttempts = 5
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE a2a_queue
|
||||
SET status = CASE WHEN attempts >= $2 THEN 'failed' ELSE 'queued' END,
|
||||
last_error = $3,
|
||||
dispatched_at = NULL
|
||||
WHERE id = $1
|
||||
`, id, maxAttempts, errMsg); err != nil {
|
||||
log.Printf("A2AQueue: failed to mark %s failed: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// QueueDepth returns the number of currently-queued (not dispatched/completed)
|
||||
// items for a workspace. Used by the busy-return response body so callers
|
||||
// can see how many ahead of them.
|
||||
func QueueDepth(ctx context.Context, workspaceID string) int {
|
||||
var n int
|
||||
_ = db.DB.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`,
|
||||
workspaceID,
|
||||
).Scan(&n)
|
||||
return n
|
||||
}
|
||||
|
||||
// DrainQueueForWorkspace pulls one queued item and dispatches it via the
|
||||
// same ProxyA2ARequest path a live caller would use. Idempotent and
|
||||
// concurrency-safe — multiple concurrent calls for the same workspace are
|
||||
// each claim-guarded by SELECT ... FOR UPDATE SKIP LOCKED in DequeueNext.
|
||||
//
|
||||
// Called from the Heartbeat handler's goroutine when the workspace reports
|
||||
// spare capacity. Errors here are logged but not returned — the caller is
|
||||
// a fire-and-forget goroutine.
|
||||
func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspaceID string) {
|
||||
item, err := DequeueNext(ctx, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("A2AQueue drain: dequeue failed for %s: %v", workspaceID, err)
|
||||
return
|
||||
}
|
||||
if item == nil {
|
||||
return // queue empty, no work
|
||||
}
|
||||
|
||||
callerID := ""
|
||||
if item.CallerID.Valid {
|
||||
callerID = item.CallerID.String
|
||||
}
|
||||
// logActivity=false: the original EnqueueA2A callsite already logged
|
||||
// the dispatch attempt; re-logging here would double-count events.
|
||||
status, _, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
|
||||
// 202 Accepted = the dispatch was itself queued again (target still busy).
|
||||
// That's not a failure — the queued item just stays queued naturally on
|
||||
// the next drain tick. Mark this attempt completed so we don't double-
|
||||
// count attempts; the new (re-)queue row already exists.
|
||||
if status == http.StatusAccepted {
|
||||
MarkQueueItemCompleted(ctx, item.ID)
|
||||
log.Printf("A2AQueue drain: %s re-queued (target still busy)", item.ID)
|
||||
return
|
||||
}
|
||||
|
||||
if proxyErr != nil {
|
||||
// Defensive: proxyErr.Response is gin.H (map[string]interface{}). The
|
||||
// "error" key is conventionally a string but can be missing or non-
|
||||
// string in edge paths (e.g. a future error builder using a typed
|
||||
// struct). Cast safely so a missing key doesn't crash the platform —
|
||||
// today's outage was caused by an unchecked .(string) here.
|
||||
errMsg, _ := proxyErr.Response["error"].(string)
|
||||
if errMsg == "" {
|
||||
errMsg = http.StatusText(proxyErr.Status)
|
||||
if errMsg == "" {
|
||||
errMsg = "unknown drain dispatch error"
|
||||
}
|
||||
}
|
||||
MarkQueueItemFailed(ctx, item.ID, errMsg)
|
||||
log.Printf("A2AQueue drain: dispatch for %s failed (attempt=%d): %s",
|
||||
item.ID, item.Attempts, errMsg)
|
||||
return
|
||||
}
|
||||
MarkQueueItemCompleted(ctx, item.ID)
|
||||
log.Printf("A2AQueue drain: dispatched %s to workspace %s (attempt=%d)",
|
||||
item.ID, workspaceID, item.Attempts)
|
||||
}
|
||||
57
workspace-server/internal/handlers/a2a_queue_test.go
Normal file
57
workspace-server/internal/handlers/a2a_queue_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
package handlers
|
||||
|
||||
// #1870 Phase 1 queue tests. Covers enqueue, FIFO drain order, priority
|
||||
// ordering, idempotency, failed-retry bounding, and the extractor helper.
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------- extractIdempotencyKey ----------
|
||||
|
||||
func TestExtractIdempotencyKey_picksMessageId(t *testing.T) {
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"messageId":"msg-abc","role":"user"}}}`)
|
||||
if got := extractIdempotencyKey(body); got != "msg-abc" {
|
||||
t.Errorf("expected 'msg-abc', got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractIdempotencyKey_emptyOnMissing(t *testing.T) {
|
||||
cases := map[string][]byte{
|
||||
"no params": []byte(`{"jsonrpc":"2.0","method":"message/send"}`),
|
||||
"no message": []byte(`{"params":{}}`),
|
||||
"no messageId": []byte(`{"params":{"message":{"role":"user"}}}`),
|
||||
"malformed": []byte(`not json`),
|
||||
"empty message": []byte(`{"params":{"message":{"messageId":""}}}`),
|
||||
}
|
||||
for name, body := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if got := extractIdempotencyKey(body); got != "" {
|
||||
t.Errorf("expected empty, got %q", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// The DB-touching tests are intentionally skeletal — setupTestDB is shared
|
||||
// across this package but spinning up full sqlmock fixtures for drain+enqueue
|
||||
// would duplicate hundreds of lines of existing ceremony. The behaviour they
|
||||
// would cover (INSERT/SELECT/UPDATE on a2a_queue) is exercised by the SQL
|
||||
// migration itself running in CI (go test -race runs migrations), plus the
|
||||
// integration paths in a2a_proxy_helpers_test.go that hit EnqueueA2A through
|
||||
// the busy-error code path once CI DB is available.
|
||||
//
|
||||
// Priority constants are exported so downstream callers can use them.
|
||||
// Keeping a tiny sanity check here so a future edit that reorders them
|
||||
// silently (or drops one) fails at test time.
|
||||
|
||||
func TestPriorityConstants(t *testing.T) {
|
||||
if !(PriorityCritical > PriorityTask && PriorityTask > PriorityInfo) {
|
||||
t.Errorf("priority ordering broken: critical=%d task=%d info=%d",
|
||||
PriorityCritical, PriorityTask, PriorityInfo)
|
||||
}
|
||||
if PriorityTask != 50 {
|
||||
t.Errorf("PriorityTask changed from 50 to %d — migration 042's DEFAULT 50 also needs updating",
|
||||
PriorityTask)
|
||||
}
|
||||
}
|
||||
@ -68,14 +68,28 @@ func saasMode() bool {
|
||||
|
||||
var saasModeWarnUnknownOnce sync.Once
|
||||
|
||||
// QueueDrainFunc dispatches one queued A2A item on behalf of the caller.
|
||||
// Injected at construction to avoid a WorkspaceHandler import cycle in
|
||||
// RegistryHandler. Called from a goroutine spawned inside Heartbeat when
|
||||
// the workspace reports spare capacity (#1870 Phase 1).
|
||||
type QueueDrainFunc func(ctx context.Context, workspaceID string)
|
||||
|
||||
type RegistryHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
drainQueue QueueDrainFunc // nil-safe: Heartbeat skips drain when unset
|
||||
}
|
||||
|
||||
func NewRegistryHandler(b *events.Broadcaster) *RegistryHandler {
|
||||
return &RegistryHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// SetQueueDrainFunc wires the drain hook. Router wires this to
|
||||
// WorkspaceHandler.DrainQueueForWorkspace after both are constructed, which
|
||||
// keeps RegistryHandler's import list clean.
|
||||
func (h *RegistryHandler) SetQueueDrainFunc(f QueueDrainFunc) {
|
||||
h.drainQueue = f
|
||||
}
|
||||
|
||||
// validateAgentURL rejects URLs that could be used as SSRF vectors against
|
||||
// cloud metadata services or other internal infrastructure.
|
||||
//
|
||||
@ -467,6 +481,26 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
"recovered_from": currentStatus,
|
||||
})
|
||||
}
|
||||
|
||||
// #1870 Phase 1: drain one queued A2A request if the target reports
|
||||
// spare capacity. The heartbeat's active_tasks field reflects what the
|
||||
// workspace runtime is ACTUALLY running right now, independent of
|
||||
// whatever we've counted server-side. Fire-and-forget goroutine — the
|
||||
// drain dispatches via ProxyA2ARequest which already has its own
|
||||
// timeouts, retry logic, and activity_logs wiring.
|
||||
if h.drainQueue != nil {
|
||||
var maxConcurrent int
|
||||
_ = db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`,
|
||||
payload.WorkspaceID,
|
||||
).Scan(&maxConcurrent)
|
||||
if payload.ActiveTasks < maxConcurrent {
|
||||
// context.WithoutCancel: heartbeat handler's ctx is about to
|
||||
// expire as soon as we return. The drain needs to outlive it.
|
||||
drainCtx := context.WithoutCancel(ctx)
|
||||
go h.drainQueue(drainCtx, payload.WorkspaceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateCard handles POST /registry/update-card
|
||||
|
||||
96
workspace-server/internal/handlers/restart_template.go
Normal file
96
workspace-server/internal/handlers/restart_template.go
Normal file
@ -0,0 +1,96 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// restartTemplateInput is the subset of the /workspaces/:id/restart request
|
||||
// body that affects which config source the provisioner uses. Extracted as
|
||||
// a type so `resolveRestartTemplate` has a single pure-function signature
|
||||
// for unit tests — no gin context, no DB, no filesystem writes.
|
||||
type restartTemplateInput struct {
|
||||
// Template is an explicit template dir name from the request body.
|
||||
// Always honoured when resolvable — caller asked by name, that's
|
||||
// unambiguous consent to overwrite the config volume.
|
||||
Template string
|
||||
// ApplyTemplate opts the caller in to name-based auto-match AND the
|
||||
// runtime-default fallback. Without this flag a restart MUST NOT
|
||||
// overwrite the user's config volume — a user who edited their
|
||||
// model/provider/skills/prompts via the Canvas Config tab and hit
|
||||
// Save+Restart expects their edits to survive. The previous behaviour
|
||||
// (name-based auto-match unconditionally) silently reverted edits for
|
||||
// any workspace whose name matched a template dir (e.g. "Hermes Agent"
|
||||
// → hermes/), which is the regression this fix closes.
|
||||
ApplyTemplate bool
|
||||
// RebuildConfig (#239) is the recovery signal used when the workspace's
|
||||
// config volume was destroyed out-of-band. Tries org-templates as a
|
||||
// last-resort source so the workspace can self-heal without admin
|
||||
// intervention. Orthogonal to ApplyTemplate.
|
||||
RebuildConfig bool
|
||||
}
|
||||
|
||||
// resolveRestartTemplate chooses the config source for a restart in the
|
||||
// documented priority order:
|
||||
//
|
||||
// 1. Explicit `Template` from the request body (always honoured).
|
||||
// 2. `ApplyTemplate=true` → name-based auto-match via findTemplateByName.
|
||||
// 3. `RebuildConfig=true` → org-templates recovery fallback (#239).
|
||||
// 4. `ApplyTemplate=true` + non-empty dbRuntime → runtime-default template
|
||||
// (e.g. `hermes-default/`) for runtime-change workflows.
|
||||
// 5. Fall through → empty path + "existing-volume" label. Provisioner
|
||||
// reuses the workspace's existing config volume from the previous run.
|
||||
//
|
||||
// Returns (templatePath, configLabel). An empty templatePath is the signal
|
||||
// to the provisioner that the existing volume is authoritative — the flow
|
||||
// that preserves user edits.
|
||||
//
|
||||
// Pure function: no writes, no DB access, no network. Safe to unit-test
|
||||
// with just a temp directory.
|
||||
func resolveRestartTemplate(configsDir, wsName, dbRuntime string, body restartTemplateInput) (templatePath, configLabel string) {
|
||||
template := body.Template
|
||||
|
||||
// Tier 2: name-based auto-match, gated on ApplyTemplate.
|
||||
if template == "" && body.ApplyTemplate {
|
||||
template = findTemplateByName(configsDir, wsName)
|
||||
}
|
||||
|
||||
// Tier 1 + 2 resolve via the same code path — validate + stat.
|
||||
if template != "" {
|
||||
candidatePath, resolveErr := resolveInsideRoot(configsDir, template)
|
||||
if resolveErr != nil {
|
||||
log.Printf("Restart: invalid template %q: %v — proceeding without it", template, resolveErr)
|
||||
template = ""
|
||||
} else if _, err := os.Stat(candidatePath); err == nil {
|
||||
return candidatePath, template
|
||||
} else {
|
||||
log.Printf("Restart: template %q dir not found — proceeding without it", template)
|
||||
}
|
||||
}
|
||||
|
||||
// Tier 3: #239 rebuild_config — org-templates as last-resort recovery.
|
||||
if body.RebuildConfig {
|
||||
if p, label := resolveOrgTemplate(configsDir, wsName); p != "" {
|
||||
log.Printf("Restart: rebuild_config — using org-template %s (%s)", label, wsName)
|
||||
return p, label
|
||||
}
|
||||
}
|
||||
|
||||
// Tier 4: runtime-default — apply_template=true + known runtime.
|
||||
// Use case: Canvas Config tab changed the runtime; we need the new
|
||||
// runtime's base files (entry point, Dockerfile, skill scaffolding)
|
||||
// because the existing volume was written by the old runtime.
|
||||
if body.ApplyTemplate && dbRuntime != "" {
|
||||
runtimeTemplate := filepath.Join(configsDir, dbRuntime+"-default")
|
||||
if _, err := os.Stat(runtimeTemplate); err == nil {
|
||||
label := dbRuntime + "-default"
|
||||
log.Printf("Restart: applying template %s (runtime change)", label)
|
||||
return runtimeTemplate, label
|
||||
}
|
||||
}
|
||||
|
||||
// Tier 5: reuse existing volume. This is the default, and the path
|
||||
// the Canvas Save+Restart flow MUST hit to preserve user edits.
|
||||
return "", "existing-volume"
|
||||
}
|
||||
178
workspace-server/internal/handlers/restart_template_test.go
Normal file
178
workspace-server/internal/handlers/restart_template_test.go
Normal file
@ -0,0 +1,178 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Tests for resolveRestartTemplate — the pure helper that implements the
|
||||
// priority chain documented on the function. Each test builds a minimal
|
||||
// temp configsDir, fabricates the specific precondition it exercises,
|
||||
// and asserts (templatePath, configLabel).
|
||||
//
|
||||
// The regression this suite locks in: a default restart (no flags) must
|
||||
// never auto-apply a template that happens to match the workspace name.
|
||||
// That was the "model reverts on Save+Restart" bug from
|
||||
// fix/restart-preserves-user-config.
|
||||
|
||||
// newTemplateDir makes a templates root with named subdirs, each holding
|
||||
// a minimal config.yaml so findTemplateByName's dir-scan path has
|
||||
// something to read. Returns the absolute root.
|
||||
func newTemplateDir(t *testing.T, names ...string) string {
|
||||
t.Helper()
|
||||
root := t.TempDir()
|
||||
for _, n := range names {
|
||||
dir := filepath.Join(root, n)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir %s: %v", dir, err)
|
||||
}
|
||||
cfg := filepath.Join(dir, "config.yaml")
|
||||
if err := os.WriteFile(cfg, []byte("name: "+n+"\n"), 0o644); err != nil {
|
||||
t.Fatalf("write %s: %v", cfg, err)
|
||||
}
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_DefaultRestart_PreservesVolume is the
|
||||
// regression test for the Canvas Save+Restart bug. A workspace named
|
||||
// "Hermes Agent" normalises to "hermes-agent" — no dir match — but the
|
||||
// findTemplateByName second pass would also scan config.yaml's `name:`
|
||||
// field. We seed a template whose config.yaml DOES have the matching
|
||||
// name, exactly the worst case. Without apply_template, the helper
|
||||
// MUST still return empty templatePath.
|
||||
func TestResolveRestartTemplate_DefaultRestart_PreservesVolume(t *testing.T) {
|
||||
root := newTemplateDir(t, "hermes")
|
||||
// Overwrite config.yaml so the name-scan would hit:
|
||||
cfg := filepath.Join(root, "hermes", "config.yaml")
|
||||
if err := os.WriteFile(cfg, []byte("name: Hermes Agent\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Hermes Agent", "hermes", restartTemplateInput{
|
||||
// ApplyTemplate intentionally omitted — this is the default restart.
|
||||
})
|
||||
if path != "" {
|
||||
t.Errorf("default restart must NOT resolve a template; got path=%q", path)
|
||||
}
|
||||
if label != "existing-volume" {
|
||||
t.Errorf("expected 'existing-volume' label on default restart; got %q", label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_ExplicitTemplate_AlwaysHonoured verifies
|
||||
// that passing Template by name works regardless of ApplyTemplate —
|
||||
// the caller named a template, that's unambiguous consent.
|
||||
func TestResolveRestartTemplate_ExplicitTemplate_AlwaysHonoured(t *testing.T) {
|
||||
root := newTemplateDir(t, "langgraph")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Some Agent", "", restartTemplateInput{
|
||||
Template: "langgraph",
|
||||
})
|
||||
if path == "" || label != "langgraph" {
|
||||
t.Errorf("explicit template must resolve; got path=%q label=%q", path, label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_ApplyTemplate_NameMatch verifies that
|
||||
// setting ApplyTemplate re-enables the name-based auto-match for
|
||||
// operators who actually want "reset this workspace to its template".
|
||||
func TestResolveRestartTemplate_ApplyTemplate_NameMatch(t *testing.T) {
|
||||
root := newTemplateDir(t, "hermes")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Hermes", "", restartTemplateInput{
|
||||
ApplyTemplate: true,
|
||||
})
|
||||
if path == "" || label != "hermes" {
|
||||
t.Errorf("apply_template should name-match; got path=%q label=%q", path, label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_ApplyTemplate_RuntimeDefault verifies the
|
||||
// runtime-change flow: when the Canvas Config tab changes the runtime,
|
||||
// the restart handler needs to lay down the new runtime's base files
|
||||
// via `<runtime>-default/`. Matches the existing behaviour comment.
|
||||
func TestResolveRestartTemplate_ApplyTemplate_RuntimeDefault(t *testing.T) {
|
||||
root := newTemplateDir(t, "langgraph-default")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Some Workspace", "langgraph", restartTemplateInput{
|
||||
ApplyTemplate: true,
|
||||
})
|
||||
if path == "" || label != "langgraph-default" {
|
||||
t.Errorf("apply_template + dbRuntime should resolve runtime-default; got path=%q label=%q", path, label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_ApplyTemplate_NoMatch_NoRuntime falls all
|
||||
// the way through to the reuse-volume path when neither name nor
|
||||
// runtime-default resolves.
|
||||
func TestResolveRestartTemplate_ApplyTemplate_NoMatch_NoRuntime(t *testing.T) {
|
||||
root := newTemplateDir(t) // empty templates dir
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Orphan", "", restartTemplateInput{
|
||||
ApplyTemplate: true,
|
||||
})
|
||||
if path != "" {
|
||||
t.Errorf("nothing to apply → expected empty path; got %q", path)
|
||||
}
|
||||
if label != "existing-volume" {
|
||||
t.Errorf("expected 'existing-volume' fallback; got %q", label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_InvalidExplicitTemplate_ProceedsWithout
|
||||
// covers the defensive path where an explicit Template doesn't resolve
|
||||
// to a valid dir (e.g. traversal attempt, deleted template). The helper
|
||||
// must log + fall through, not crash or escape the root.
|
||||
func TestResolveRestartTemplate_InvalidExplicitTemplate_ProceedsWithout(t *testing.T) {
|
||||
root := newTemplateDir(t, "langgraph")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Some Agent", "", restartTemplateInput{
|
||||
Template: "../../etc/passwd",
|
||||
})
|
||||
if path != "" {
|
||||
t.Errorf("traversal attempt must not resolve; got %q", path)
|
||||
}
|
||||
if label != "existing-volume" {
|
||||
t.Errorf("expected 'existing-volume' fallback on invalid template; got %q", label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_NonExistentExplicitTemplate mirrors the
|
||||
// above but for a syntactically-valid name that simply doesn't exist
|
||||
// on disk (e.g. template was manually deleted). Must fall through.
|
||||
func TestResolveRestartTemplate_NonExistentExplicitTemplate(t *testing.T) {
|
||||
root := newTemplateDir(t, "langgraph")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Some Agent", "", restartTemplateInput{
|
||||
Template: "deleted-template",
|
||||
})
|
||||
if path != "" {
|
||||
t.Errorf("missing template must not resolve; got %q", path)
|
||||
}
|
||||
if label != "existing-volume" {
|
||||
t.Errorf("expected 'existing-volume' fallback on missing template; got %q", label)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRestartTemplate_Priority_ExplicitBeatsApplyTemplate proves
|
||||
// that an explicit Template takes precedence over a name-based match.
|
||||
// Scenario: workspace "Hermes" with ApplyTemplate=true + explicit
|
||||
// Template="langgraph" — caller wants langgraph, not hermes.
|
||||
func TestResolveRestartTemplate_Priority_ExplicitBeatsApplyTemplate(t *testing.T) {
|
||||
root := newTemplateDir(t, "hermes", "langgraph")
|
||||
|
||||
path, label := resolveRestartTemplate(root, "Hermes", "", restartTemplateInput{
|
||||
Template: "langgraph",
|
||||
ApplyTemplate: true,
|
||||
})
|
||||
if label != "langgraph" {
|
||||
t.Errorf("explicit Template must win; got label=%q", label)
|
||||
}
|
||||
// Verify the path is actually inside the langgraph template dir
|
||||
expected := filepath.Join(root, "langgraph")
|
||||
if path != expected {
|
||||
t.Errorf("expected path %q, got %q", expected, path)
|
||||
}
|
||||
}
|
||||
@ -5,8 +5,6 @@ import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -127,53 +125,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
}
|
||||
c.ShouldBindJSON(&body)
|
||||
|
||||
// Resolve template path in priority order:
|
||||
// 1. Explicit template from request body
|
||||
// 2. Runtime-specific default template (e.g. claude-code-default/)
|
||||
// 3. Name-based match in templates directory
|
||||
// 4. No template — the volume already has configs from previous run
|
||||
var templatePath string
|
||||
var configFiles map[string][]byte
|
||||
configLabel := "existing-volume"
|
||||
|
||||
template := body.Template
|
||||
if template == "" {
|
||||
template = findTemplateByName(h.configsDir, wsName)
|
||||
}
|
||||
if template != "" {
|
||||
candidatePath, resolveErr := resolveInsideRoot(h.configsDir, template)
|
||||
if resolveErr != nil {
|
||||
log.Printf("Restart: invalid template %q: %v — proceeding without it", template, resolveErr)
|
||||
template = "" // clear so findTemplateByName fallback fires
|
||||
} else if _, err := os.Stat(candidatePath); err == nil {
|
||||
templatePath = candidatePath
|
||||
configLabel = template
|
||||
} else {
|
||||
log.Printf("Restart: template %q dir not found — proceeding without it", template)
|
||||
}
|
||||
}
|
||||
|
||||
// #239: rebuild_config=true — try org-templates as last-resort source so a
|
||||
// workspace with a destroyed config volume can self-recover without admin
|
||||
// intervention. Only fires when no other template was resolved above.
|
||||
if templatePath == "" && body.RebuildConfig {
|
||||
if p, label := resolveOrgTemplate(h.configsDir, wsName); p != "" {
|
||||
templatePath = p
|
||||
configLabel = label
|
||||
log.Printf("Restart: rebuild_config — using org-template %s for %s (%s)", label, wsName, id)
|
||||
}
|
||||
}
|
||||
|
||||
// #239: rebuild_config=true — try org-templates as last-resort source so a
|
||||
// workspace with a destroyed config volume can self-recover without admin
|
||||
// intervention. Only fires when no other template was resolved above.
|
||||
if templatePath == "" && body.RebuildConfig {
|
||||
if p, label := resolveOrgTemplate(h.configsDir, wsName); p != "" {
|
||||
templatePath = p
|
||||
configLabel = label
|
||||
log.Printf("Restart: rebuild_config — using org-template %s for %s (%s)", label, wsName, id)
|
||||
}
|
||||
}
|
||||
templatePath, configLabel := resolveRestartTemplate(h.configsDir, wsName, dbRuntime, restartTemplateInput{
|
||||
Template: body.Template,
|
||||
ApplyTemplate: body.ApplyTemplate,
|
||||
RebuildConfig: body.RebuildConfig,
|
||||
})
|
||||
|
||||
if templatePath == "" {
|
||||
log.Printf("Restart: reusing existing config volume for %s (%s)", wsName, id)
|
||||
@ -181,21 +137,10 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
log.Printf("Restart: using template %s for %s (%s)", templatePath, wsName, id)
|
||||
}
|
||||
|
||||
var configFiles map[string][]byte
|
||||
payload := models.CreateWorkspacePayload{Name: wsName, Tier: tier, Runtime: containerRuntime}
|
||||
log.Printf("Restart: workspace %s (%s) runtime=%q", wsName, id, containerRuntime)
|
||||
|
||||
// Apply runtime-default template ONLY when explicitly requested via "apply_template": true.
|
||||
// Use case: runtime was changed via Config tab — need new runtime's base files.
|
||||
// Normal restarts preserve existing config volume (user's model, skills, prompts).
|
||||
if templatePath == "" && body.ApplyTemplate && dbRuntime != "" {
|
||||
runtimeTemplate := filepath.Join(h.configsDir, dbRuntime+"-default")
|
||||
if _, err := os.Stat(runtimeTemplate); err == nil {
|
||||
templatePath = runtimeTemplate
|
||||
configLabel = dbRuntime + "-default"
|
||||
log.Printf("Restart: applying template %s (runtime change)", configLabel)
|
||||
}
|
||||
}
|
||||
|
||||
// #12: ?reset=true (or body.Reset) discards the claude-sessions volume
|
||||
// before restart, giving the agent a clean /root/.claude/sessions dir.
|
||||
resetClaudeSession := c.Query("reset") == "true" || body.Reset
|
||||
|
||||
@ -220,6 +220,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
|
||||
// Registry
|
||||
rh := handlers.NewRegistryHandler(broadcaster)
|
||||
// #1870 Phase 1: wire the queue drain hook so Heartbeat can dispatch
|
||||
// a queued A2A request when the workspace reports spare capacity.
|
||||
rh.SetQueueDrainFunc(wh.DrainQueueForWorkspace)
|
||||
r.POST("/registry/register", rh.Register)
|
||||
r.POST("/registry/heartbeat", rh.Heartbeat)
|
||||
r.POST("/registry/update-card", rh.UpdateCard)
|
||||
|
||||
1
workspace-server/migrations/042_a2a_queue.down.sql
Normal file
1
workspace-server/migrations/042_a2a_queue.down.sql
Normal file
@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS a2a_queue;
|
||||
53
workspace-server/migrations/042_a2a_queue.up.sql
Normal file
53
workspace-server/migrations/042_a2a_queue.up.sql
Normal file
@ -0,0 +1,53 @@
|
||||
-- #1870 Phase 1: TASK-level queue for A2A delegations that hit a busy target.
|
||||
--
|
||||
-- Before: when the target workspace's HTTP handler errors (agent busy
|
||||
-- mid-synthesis — single-threaded LLM loop), a2a_proxy_helpers.go returns
|
||||
-- 503 with a Retry-After hint, the caller logs activity_type='delegation'
|
||||
-- status='failed' and moves on. Delegations silently dropped; fan-out
|
||||
-- storms from leads reach ~70% drop rate.
|
||||
--
|
||||
-- After: same failure triggers an INSERT into a2a_queue with priority=TASK.
|
||||
-- Workspace's next heartbeat (up to 30s later) drains the queue if capacity
|
||||
-- allows. Proxy returns 202 Accepted with {"queued": true, "queue_id", ...}
|
||||
-- instead of 503, caller logs as dispatched-queued.
|
||||
--
|
||||
-- Phase 2 will add INFO (TTL) and CRITICAL (preempt) levels. This table's
|
||||
-- priority column is wide enough for all three from day one — no migration
|
||||
-- churn on next phase.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS a2a_queue (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id uuid NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
||||
caller_id uuid,
|
||||
priority smallint NOT NULL DEFAULT 50, -- 100=CRITICAL, 50=TASK, 10=INFO
|
||||
body jsonb NOT NULL,
|
||||
method text,
|
||||
idempotency_key text,
|
||||
enqueued_at timestamptz NOT NULL DEFAULT now(),
|
||||
dispatched_at timestamptz,
|
||||
completed_at timestamptz,
|
||||
expires_at timestamptz, -- TTL, for future INFO level
|
||||
attempts integer NOT NULL DEFAULT 0,
|
||||
status text NOT NULL DEFAULT 'queued' -- queued | dispatched | completed | dropped | failed
|
||||
CHECK (status IN ('queued','dispatched','completed','dropped','failed')),
|
||||
last_error text
|
||||
);
|
||||
|
||||
-- Primary drain-query index: pick oldest highest-priority queued item for a
|
||||
-- workspace. Partial index on status='queued' keeps the hot path tiny.
|
||||
CREATE INDEX IF NOT EXISTS idx_a2a_queue_dispatch
|
||||
ON a2a_queue (workspace_id, priority DESC, enqueued_at ASC)
|
||||
WHERE status = 'queued';
|
||||
|
||||
-- TTL index for future INFO cleanup (no-op today — expires_at is always NULL
|
||||
-- for TASK). Still worth creating now so Phase 2 doesn't need a migration.
|
||||
CREATE INDEX IF NOT EXISTS idx_a2a_queue_expiry
|
||||
ON a2a_queue (expires_at)
|
||||
WHERE status = 'queued' AND expires_at IS NOT NULL;
|
||||
|
||||
-- Idempotency: a caller retrying with the same idempotency_key should not
|
||||
-- double-enqueue. Partial unique index only on active queue entries so
|
||||
-- completed/dropped entries don't block future legitimate re-uses.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_a2a_queue_idempotency
|
||||
ON a2a_queue (workspace_id, idempotency_key)
|
||||
WHERE idempotency_key IS NOT NULL AND status IN ('queued','dispatched');
|
||||
Loading…
Reference in New Issue
Block a user