chore: quality pass — native dialogs, env sync, Go handler splits
Three parallel cleanups driven by the second code-review pass.
## Native dialogs → ConfirmDialog (7 sites)
Violated the standing feedback_no_native_dialogs rule.
- ChannelsTab: confirm() → ConfirmDialog danger variant with pendingDelete state
- ScheduleTab: window.confirm() → ConfirmDialog danger
- ChatTab: confirm("Restart...") → ConfirmDialog warning (restart is recoverable)
- TemplatePalette: two alert() sites collapsed into a single notice state +
ConfirmDialog as OK-only info toast
- ErrorBoundary: dropped both window.alert calls entirely. Clipboard-copy
click is self-evident; console.error already captures the fallback.
## .env.example ↔ Go env var sync
Added 11 previously-undocumented env vars grouped into 6 new sections:
- Platform: PLATFORM_URL, MOLECULE_URL, WORKSPACE_DIR, MOLECULE_ENV
- CORS / rate limiting: CORS_ORIGINS, RATE_LIMIT
- Activity retention: ACTIVITY_RETENTION_DAYS, ACTIVITY_CLEANUP_INTERVAL_HOURS
- Container detection: MOLECULE_IN_DOCKER (moved to dedup)
- Observability: AWARENESS_URL
- Webhooks: GITHUB_WEBHOOK_SECRET
- CLI: MOLECLI_URL
All 21 distinct os.Getenv / envx.* keys (excluding HOME) now documented.
Zero orphans in the other direction.
## Go handler function splits (4 funcs, pure refactor)
No behavior change; same tests pass.
| Function | Before | After | Helpers |
|---------------------------|-------:|------:|---------------------------------------------------------------|
| proxyA2ARequest | 257 | 56 | resolveAgentURL, normalizeA2APayload, dispatchA2A, |
| | | | handleA2ADispatchError, maybeMarkContainerDead, |
| | | | logA2AFailure, logA2ASuccess |
| Delegate | 127 | 60 | bindDelegateRequest, lookupIdempotentDelegation, |
| | | | insertDelegationRow |
| Discover | 125 | 40 | discoverWorkspacePeer, writeExternalWorkspaceURL, |
| | | | discoverHostPeer |
| SessionSearch | 109 | 24 | parseSessionSearchParams, buildSessionSearchQuery, |
| | | | scanSessionSearchRows |
Preserved exact error semantics, log.Printf calls, status codes, and
response shapes. Introduced a proxyDispatchBuildError sentinel in
a2a_proxy so the orchestrator can distinguish "couldn't build the
request" from "Do() failed" without changing existing branches.
## Verification
- go build ./... clean
- go vet ./... clean
- go test -race ./internal/... — all pass
- canvas npm run build — clean
- canvas npm test -- --run — 352/352 pass
- grep window.confirm|window.alert|window.prompt in canvas/src — 0 matches
- every platform os.Getenv key present in .env.example
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
7a2df32dd0
commit
789f568bef
25
.env.example
25
.env.example
@ -12,6 +12,30 @@ PORT=8080
|
||||
SECRETS_ENCRYPTION_KEY= # 32-byte key (raw or base64). Leave empty for plaintext (dev only).
|
||||
CONFIGS_DIR= # Path to workspace-configs-templates/ (auto-discovered if empty)
|
||||
PLUGINS_DIR= # Path to plugins/ directory (default: /plugins in container)
|
||||
# PLATFORM_URL=http://host.docker.internal:8080 # URL agent containers use to reach the platform; injected into workspace env. Default derives from PORT.
|
||||
# MOLECULE_URL=http://localhost:8080 # Canonical MCP-client URL (mirrors PLATFORM_URL inside containers). Read by the MCP server (mcp-server/) and Molecule MCP tooling.
|
||||
# WORKSPACE_DIR= # Optional global host path bind-mounted to /workspace in every container. Per-workspace workspace_dir column overrides this; if neither is set each workspace gets an isolated Docker named volume.
|
||||
# MOLECULE_ENV=development # Environment label (development/staging/production). Used for log tagging and conditional behaviour.
|
||||
|
||||
# CORS / rate limiting
|
||||
# CORS_ORIGINS=http://localhost:3000,http://localhost:3001 # Comma-separated allowed origins for the HTTP API.
|
||||
# RATE_LIMIT=600 # Requests/minute per client (default 600).
|
||||
|
||||
# Activity retention
|
||||
# ACTIVITY_RETENTION_DAYS=7 # Days to keep rows in activity_logs before pruning.
|
||||
# ACTIVITY_CLEANUP_INTERVAL_HOURS=6 # How often the background pruner runs.
|
||||
|
||||
# Container/runtime detection
|
||||
# MOLECULE_IN_DOCKER= # Set when running the platform inside Docker (accepts 1/0, true/false). Triggers A2A proxy to rewrite 127.0.0.1:<port> agent URLs to Docker bridge hostnames. Auto-detected via /.dockerenv; only set if detection fails or to force off.
|
||||
|
||||
# Observability (Awareness)
|
||||
# AWARENESS_URL= # If set, injected into workspace containers along with a deterministic AWARENESS_NAMESPACE derived from workspace ID. Enables the cross-session memory MCP server.
|
||||
|
||||
# Webhooks
|
||||
# GITHUB_WEBHOOK_SECRET= # HMAC secret used to verify incoming GitHub webhook payloads at /webhooks/github.
|
||||
|
||||
# CLI clients
|
||||
# MOLECLI_URL=http://localhost:8080 # URL the molecli TUI uses to reach the platform.
|
||||
|
||||
# Plugin install safeguards (POST /workspaces/:id/plugins)
|
||||
# All three bound the cost of a single install so a slow/malicious
|
||||
@ -39,7 +63,6 @@ CEREBRAS_API_KEY= # Cerebras API key (cloud.cerebras.ai). Use with
|
||||
GOOGLE_API_KEY= # Google AI API key (aistudio.google.com). Use with model: google_genai:gemini-2.5-flash
|
||||
MAX_TOKENS=2048 # Max output tokens for OpenRouter requests (default: 2048)
|
||||
LANGGRAPH_RECURSION_LIMIT=500 # LangGraph/DeepAgents max ReAct steps per turn (lib default: 25; raised to 500 — PM fan-out to 6+ reports + synthesis routinely exceeds 100)
|
||||
MOLECULE_IN_DOCKER= # Set when running the platform inside Docker (accepts 1/0, true/false — anything strconv.ParseBool recognises). Triggers A2A proxy to rewrite 127.0.0.1:<port> agent URLs to Docker bridge hostnames. Auto-detected via /.dockerenv; only set if detection fails (e.g. Podman, custom runtimes) or to force off.
|
||||
MODEL_PROVIDER=anthropic:claude-sonnet-4-6 # Format: provider:model. Providers: anthropic, openai, openrouter, groq, cerebras, google_genai, ollama
|
||||
|
||||
# Social Channels (optional — configure per-workspace via API or Canvas)
|
||||
|
||||
@ -41,11 +41,10 @@ export class ErrorBoundary extends React.Component<
|
||||
};
|
||||
// Log the full report to console for collection by monitoring tools
|
||||
console.error("Error Report:", JSON.stringify(errorDetails, null, 2));
|
||||
// Copy error info to clipboard for manual reporting
|
||||
navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2)).then(
|
||||
() => window.alert("Error details copied to clipboard."),
|
||||
() => window.alert("Error details logged to console.")
|
||||
);
|
||||
// Copy error info to clipboard for manual reporting (button click is its
|
||||
// own affordance — no native alert needed). On clipboard failure the
|
||||
// console.error above still surfaces the report.
|
||||
navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2));
|
||||
};
|
||||
|
||||
render() {
|
||||
|
||||
@ -4,6 +4,7 @@ import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { checkDeploySecrets, type PreflightResult } from "@/lib/deploy-preflight";
|
||||
import { MissingKeysModal } from "./MissingKeysModal";
|
||||
import { ConfirmDialog } from "./ConfirmDialog";
|
||||
|
||||
interface Template {
|
||||
id: string;
|
||||
@ -144,6 +145,7 @@ const TIER_LABELS: Record<number, { label: string; color: string }> = {
|
||||
|
||||
function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
const [importing, setImporting] = useState(false);
|
||||
const [notice, setNotice] = useState<string | null>(null);
|
||||
const fileInputRef = useRef<HTMLInputElement>(null);
|
||||
|
||||
const handleFiles = async (fileList: FileList) => {
|
||||
@ -173,7 +175,7 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
}
|
||||
|
||||
if (Object.keys(files).length === 0) {
|
||||
alert("No files found in the selected folder");
|
||||
setNotice("No files found in the selected folder");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -181,7 +183,7 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
await api.post("/templates/import", { name, files });
|
||||
onImported();
|
||||
} catch (e) {
|
||||
alert(e instanceof Error ? e.message : "Import failed");
|
||||
setNotice(e instanceof Error ? e.message : "Import failed");
|
||||
} finally {
|
||||
setImporting(false);
|
||||
}
|
||||
@ -205,6 +207,15 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
>
|
||||
{importing ? "Importing..." : "Import Agent Folder"}
|
||||
</button>
|
||||
<ConfirmDialog
|
||||
open={!!notice}
|
||||
title="Import"
|
||||
message={notice ?? ""}
|
||||
confirmLabel="OK"
|
||||
confirmVariant="primary"
|
||||
onConfirm={() => setNotice(null)}
|
||||
onCancel={() => setNotice(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface ChannelAdapter {
|
||||
type: string;
|
||||
@ -39,6 +40,7 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [showForm, setShowForm] = useState(false);
|
||||
const [testing, setTesting] = useState<string | null>(null);
|
||||
const [pendingDelete, setPendingDelete] = useState<Channel | null>(null);
|
||||
|
||||
// Form state
|
||||
const [formType, setFormType] = useState("telegram");
|
||||
@ -146,8 +148,10 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
load();
|
||||
};
|
||||
|
||||
const handleDelete = async (ch: Channel) => {
|
||||
if (!confirm(`Delete ${ch.channel_type} channel?`)) return;
|
||||
const confirmDelete = async () => {
|
||||
if (!pendingDelete) return;
|
||||
const ch = pendingDelete;
|
||||
setPendingDelete(null);
|
||||
await api.del(`/workspaces/${workspaceId}/channels/${ch.id}`);
|
||||
load();
|
||||
};
|
||||
@ -338,7 +342,7 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
{ch.enabled ? "On" : "Off"}
|
||||
</button>
|
||||
<button
|
||||
onClick={() => handleDelete(ch)}
|
||||
onClick={() => setPendingDelete(ch)}
|
||||
className="text-[10px] px-2 py-0.5 rounded bg-red-900/20 text-red-400 hover:bg-red-900/40 transition"
|
||||
>
|
||||
Remove
|
||||
@ -354,6 +358,16 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
|
||||
<ConfirmDialog
|
||||
open={!!pendingDelete}
|
||||
title="Remove channel"
|
||||
message={`Delete ${pendingDelete?.channel_type ?? ""} channel? This will stop messages flowing through this integration.`}
|
||||
confirmLabel="Remove"
|
||||
confirmVariant="danger"
|
||||
onConfirm={confirmDelete}
|
||||
onCancel={() => setPendingDelete(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import { type ChatMessage, createMessage } from "./chat/types";
|
||||
import { extractResponseText, extractRequestText } from "./chat/message-parser";
|
||||
import { AgentCommsPanel } from "./chat/AgentCommsPanel";
|
||||
import { runtimeDisplayName } from "@/lib/runtime-names";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface Props {
|
||||
workspaceId: string;
|
||||
@ -145,6 +146,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
const sendingFromAPIRef = useRef(false);
|
||||
const [agentReachable, setAgentReachable] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [confirmRestart, setConfirmRestart] = useState(false);
|
||||
const bottomRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// Load chat history from database on mount
|
||||
@ -392,11 +394,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
<span className="text-[10px] text-red-400">{error}</span>
|
||||
{!isOnline && (
|
||||
<button
|
||||
onClick={() => {
|
||||
if (confirm("Restart this workspace?")) {
|
||||
useCanvasStore.getState().restartWorkspace(workspaceId);
|
||||
}
|
||||
}}
|
||||
onClick={() => setConfirmRestart(true)}
|
||||
className="text-[9px] px-2 py-0.5 bg-red-800/40 text-red-300 rounded hover:bg-red-700/50"
|
||||
>
|
||||
Restart
|
||||
@ -432,6 +430,19 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<ConfirmDialog
|
||||
open={confirmRestart}
|
||||
title="Restart workspace"
|
||||
message="Restart this workspace? The agent container will be stopped and re-provisioned."
|
||||
confirmLabel="Restart"
|
||||
confirmVariant="warning"
|
||||
onConfirm={() => {
|
||||
useCanvasStore.getState().restartWorkspace(workspaceId);
|
||||
setConfirmRestart(false);
|
||||
}}
|
||||
onCancel={() => setConfirmRestart(false)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface Schedule {
|
||||
id: string;
|
||||
@ -64,6 +65,7 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
const [formPrompt, setFormPrompt] = useState("");
|
||||
const [formEnabled, setFormEnabled] = useState(true);
|
||||
const [error, setError] = useState("");
|
||||
const [pendingDelete, setPendingDelete] = useState<{ id: string; name: string } | null>(null);
|
||||
|
||||
const fetchSchedules = useCallback(async () => {
|
||||
try {
|
||||
@ -120,8 +122,10 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
}
|
||||
};
|
||||
|
||||
const handleDelete = async (id: string, name: string) => {
|
||||
if (!window.confirm(`Delete schedule "${name || "Unnamed"}"? This cannot be undone.`)) return;
|
||||
const confirmDelete = async () => {
|
||||
if (!pendingDelete) return;
|
||||
const { id } = pendingDelete;
|
||||
setPendingDelete(null);
|
||||
await api.del(`/workspaces/${workspaceId}/schedules/${id}`);
|
||||
fetchSchedules();
|
||||
};
|
||||
@ -343,7 +347,7 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
✎
|
||||
</button>
|
||||
<button
|
||||
onClick={() => handleDelete(sched.id, sched.name)}
|
||||
onClick={() => setPendingDelete({ id: sched.id, name: sched.name })}
|
||||
className="text-[8px] px-1.5 py-0.5 text-red-400 hover:bg-red-600/20 rounded transition-colors"
|
||||
title="Delete"
|
||||
>
|
||||
@ -355,6 +359,16 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
))
|
||||
)}
|
||||
</div>
|
||||
|
||||
<ConfirmDialog
|
||||
open={!!pendingDelete}
|
||||
title="Delete schedule"
|
||||
message={`Delete schedule "${pendingDelete?.name || "Unnamed"}"? This cannot be undone.`}
|
||||
confirmLabel="Delete"
|
||||
confirmVariant="danger"
|
||||
onConfirm={confirmDelete}
|
||||
onCancel={() => setPendingDelete(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -217,7 +217,49 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve workspace URL (cache first, then DB)
|
||||
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
body = normalizedBody
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
defer cancelFwd()
|
||||
}
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to read agent response"},
|
||||
}
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
}
|
||||
return resp.StatusCode, respBody, nil
|
||||
}
|
||||
|
||||
// resolveAgentURL returns a routable URL for the target workspace agent. It
|
||||
// checks the Redis URL cache first, then falls back to a DB lookup, caching
|
||||
// the result on success. When the platform runs inside Docker, 127.0.0.1:<host
|
||||
// port> is rewritten to the container's Docker-bridge hostname (host-side
|
||||
// platforms keep the original URL because the bridge name wouldn't resolve).
|
||||
func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID string) (string, *proxyA2AError) {
|
||||
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
||||
if err != nil {
|
||||
var urlNullable sql.NullString
|
||||
@ -226,20 +268,20 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&urlNullable, &status)
|
||||
if err == sql.ErrNoRows {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusNotFound,
|
||||
Response: gin.H{"error": "workspace not found"},
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("ProxyA2A lookup error: %v", err)
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "lookup failed"},
|
||||
}
|
||||
}
|
||||
if !urlNullable.Valid || urlNullable.String == "" {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace has no URL", "status": status},
|
||||
}
|
||||
@ -251,19 +293,20 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// When the platform runs inside Docker, 127.0.0.1:{host_port} is
|
||||
// unreachable (it's the platform container's own localhost, not the
|
||||
// Docker host). Rewrite to the container's Docker-bridge hostname.
|
||||
//
|
||||
// But ONLY when we're actually inside Docker. If the platform runs
|
||||
// on the host (the default dev setup via infra/scripts/setup.sh),
|
||||
// 127.0.0.1:<ephemeral> IS the reachable URL and the container
|
||||
// hostname wouldn't resolve.
|
||||
if strings.HasPrefix(agentURL, "http://127.0.0.1:") && h.provisioner != nil && platformInDocker {
|
||||
agentURL = provisioner.InternalURL(workspaceID)
|
||||
}
|
||||
return agentURL, nil
|
||||
}
|
||||
|
||||
// Normalize the request into a valid A2A JSON-RPC 2.0 message
|
||||
// normalizeA2APayload parses the incoming body, wraps it in a JSON-RPC 2.0
|
||||
// envelope if absent, ensures params.message.messageId is set, and re-marshals
|
||||
// to bytes. Also returns the A2A method name (for logging) extracted from the
|
||||
// payload.
|
||||
func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
var payload map[string]interface{}
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusBadRequest,
|
||||
Response: gin.H{"error": "invalid JSON"},
|
||||
}
|
||||
@ -290,175 +333,194 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
|
||||
marshaledBody, marshalErr := json.Marshal(payload)
|
||||
if marshalErr != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "failed to marshal request"},
|
||||
}
|
||||
}
|
||||
body = marshaledBody
|
||||
|
||||
// Extract method for logging
|
||||
var a2aMethod string
|
||||
if m, ok := payload["method"].(string); ok {
|
||||
a2aMethod = m
|
||||
}
|
||||
return marshaledBody, a2aMethod, nil
|
||||
}
|
||||
|
||||
// Forward to the agent. Uses WithoutCancel so delegation chains survive client
|
||||
// disconnect (browser tab close).
|
||||
// Default timeouts: canvas = 5 min, agent-to-agent = 30 min.
|
||||
// Callers can override via X-Timeout header (handled in ProxyA2A handler above).
|
||||
startTime := time.Now()
|
||||
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
|
||||
// chains survive client disconnect (browser tab close). Default timeouts:
|
||||
// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can
|
||||
// override via the X-Timeout header (applied to ctx upstream in ProxyA2A).
|
||||
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
|
||||
forwardCtx := context.WithoutCancel(ctx)
|
||||
var cancel context.CancelFunc
|
||||
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
|
||||
// No caller-specified deadline — apply defaults
|
||||
if callerID == "" {
|
||||
var cancel context.CancelFunc
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute)
|
||||
defer cancel()
|
||||
} else {
|
||||
var cancel context.CancelFunc
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute)
|
||||
defer cancel()
|
||||
}
|
||||
}
|
||||
req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
// Wrap the construction failure so the caller can distinguish it
|
||||
// from an upstream Do() error and produce the correct 500 response.
|
||||
return nil, nil, &proxyDispatchBuildError{err: err}
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, doErr := a2aClient.Do(req)
|
||||
return resp, cancel, doErr
|
||||
}
|
||||
|
||||
// proxyDispatchBuildError is a sentinel wrapper for failures inside
|
||||
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
|
||||
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
|
||||
type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and (when `logActivity` is true)
|
||||
// schedules a detached LogActivity goroutine for the failed attempt.
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
// Build-time failure (couldn't even create the http.Request) — return
|
||||
// a 500 without the reactive-health / busy-retry paths.
|
||||
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
|
||||
_ = buildErr
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "failed to create proxy request"},
|
||||
}
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := a2aClient.Do(req)
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
log.Printf("ProxyA2A forward error: %v", err)
|
||||
log.Printf("ProxyA2A forward error: %v", err)
|
||||
|
||||
// Reactive health check: if the request failed, check if the container is actually dead.
|
||||
// Skip for external workspaces (no Docker container).
|
||||
containerDead := false
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner != nil && wsRuntime != "external" {
|
||||
if running, _ := h.provisioner.IsRunning(ctx, workspaceID); !running {
|
||||
containerDead = true
|
||||
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
|
||||
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
|
||||
go h.RestartByID(workspaceID)
|
||||
}
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
// Log failed A2A attempt (detached context — request may be done)
|
||||
errMsg := err.Error()
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
errWsName = workspaceID
|
||||
}
|
||||
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
DurationMs: &durationMs,
|
||||
Status: "error",
|
||||
ErrorDetail: &errMsg,
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
||||
}
|
||||
}
|
||||
// Container is alive but upstream Do() failed with a timeout/EOF-
|
||||
// shaped error — the agent is most likely mid-synthesis on a
|
||||
// previous request (single-threaded main loop). Surface as 503
|
||||
// Busy with a Retry-After hint so callers can distinguish this
|
||||
// from a real unreachable-agent (502) and retry with backoff.
|
||||
// Issue #110.
|
||||
if isUpstreamBusyError(err) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
Response: gin.H{
|
||||
"error": "workspace agent busy — retry after a short backoff",
|
||||
"busy": true,
|
||||
"retry_after": busyRetryAfterSeconds,
|
||||
},
|
||||
}
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to reach workspace agent"},
|
||||
}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to read agent response"},
|
||||
}
|
||||
}
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
|
||||
if logActivity {
|
||||
// Log successful A2A communication
|
||||
logStatus := "ok"
|
||||
if resp.StatusCode >= 400 {
|
||||
logStatus = "error"
|
||||
}
|
||||
// Resolve workspace name for readable summary
|
||||
var wsNameForLog string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
|
||||
if wsNameForLog == "" {
|
||||
wsNameForLog = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
}(ctx)
|
||||
|
||||
// For canvas-initiated requests, broadcast the response via WebSocket
|
||||
// so the frontend receives it instantly without polling.
|
||||
if callerID == "" && resp.StatusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
||||
}
|
||||
}
|
||||
return resp.StatusCode, respBody, nil
|
||||
// Container is alive but upstream Do() failed with a timeout/EOF-
|
||||
// shaped error — the agent is most likely mid-synthesis on a
|
||||
// previous request (single-threaded main loop). Surface as 503
|
||||
// Busy with a Retry-After hint so callers can distinguish this
|
||||
// from a real unreachable-agent (502) and retry with backoff.
|
||||
// Issue #110.
|
||||
if isUpstreamBusyError(err) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
Response: gin.H{
|
||||
"error": "workspace agent busy — retry after a short backoff",
|
||||
"busy": true,
|
||||
"retry_after": busyRetryAfterSeconds,
|
||||
},
|
||||
}
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to reach workspace agent"},
|
||||
}
|
||||
}
|
||||
|
||||
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
||||
// If the workspace's Docker container is no longer running (and the workspace
|
||||
// isn't external), it marks the workspace offline, clears Redis state,
|
||||
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
|
||||
// when the container was found dead.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner == nil || wsRuntime == "external" {
|
||||
return false
|
||||
}
|
||||
if running, _ := h.provisioner.IsRunning(ctx, workspaceID); running {
|
||||
return false
|
||||
}
|
||||
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
|
||||
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
|
||||
go h.RestartByID(workspaceID)
|
||||
return true
|
||||
}
|
||||
|
||||
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
||||
// goroutine (the request context may already be done by the time it runs).
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
||||
errMsg := err.Error()
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
errWsName = workspaceID
|
||||
}
|
||||
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
DurationMs: &durationMs,
|
||||
Status: "error",
|
||||
ErrorDetail: &errMsg,
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
}
|
||||
var wsNameForLog string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
|
||||
if wsNameForLog == "" {
|
||||
wsNameForLog = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
}(ctx)
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) *string {
|
||||
|
||||
@ -118,17 +118,44 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// It searches the workspace's own activity logs and memories without adding a new storage layer.
|
||||
func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
query := strings.TrimSpace(c.DefaultQuery("q", ""))
|
||||
limitStr := c.DefaultQuery("limit", "50")
|
||||
query, limit := parseSessionSearchParams(c)
|
||||
|
||||
sqlQuery, args := buildSessionSearchQuery(workspaceID, query, limit)
|
||||
|
||||
rows, err := db.DB.QueryContext(c.Request.Context(), sqlQuery, args...)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "session search failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
items, scanErr := scanSessionSearchRows(rows)
|
||||
if scanErr != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query iteration failed"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, items)
|
||||
}
|
||||
|
||||
// parseSessionSearchParams extracts the `q` and `limit` query params for SessionSearch,
|
||||
// applying the default limit (50) and cap (200).
|
||||
func parseSessionSearchParams(c *gin.Context) (string, int) {
|
||||
query := strings.TrimSpace(c.DefaultQuery("q", ""))
|
||||
limit := 50
|
||||
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
|
||||
if n, err := strconv.Atoi(c.DefaultQuery("limit", "50")); err == nil && n > 0 {
|
||||
limit = n
|
||||
if limit > 200 {
|
||||
limit = 200
|
||||
}
|
||||
}
|
||||
return query, limit
|
||||
}
|
||||
|
||||
// buildSessionSearchQuery composes the UNION-ALL SQL across activity_logs and
|
||||
// agent_memories with an optional ILIKE filter, returning the SQL string and
|
||||
// positional args ready for QueryContext.
|
||||
func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []interface{}) {
|
||||
sqlQuery := `
|
||||
WITH session_items AS (
|
||||
SELECT
|
||||
@ -179,14 +206,17 @@ func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
|
||||
sqlQuery += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(len(args)+1)
|
||||
args = append(args, limit)
|
||||
return sqlQuery, args
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(c.Request.Context(), sqlQuery, args...)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "session search failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// scanSessionSearchRows materialises rows from the SessionSearch query into the
|
||||
// JSON-shaped maps the endpoint returns. Per-row scan errors are logged and
|
||||
// skipped (matches prior behavior); a rows.Err() failure is surfaced.
|
||||
func scanSessionSearchRows(rows interface {
|
||||
Next() bool
|
||||
Scan(dest ...interface{}) error
|
||||
Err() error
|
||||
}) ([]map[string]interface{}, error) {
|
||||
items := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
var (
|
||||
@ -219,11 +249,9 @@ func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Session search rows error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query iteration failed"})
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, items)
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// Notify handles POST /workspaces/:id/notify — agents push messages to the canvas chat.
|
||||
|
||||
@ -35,6 +35,13 @@ func NewDelegationHandler(wh *WorkspaceHandler, b *events.Broadcaster) *Delegati
|
||||
return &DelegationHandler{workspace: wh, broadcaster: b}
|
||||
}
|
||||
|
||||
// delegateRequest is the bound POST /workspaces/:id/delegate body.
|
||||
type delegateRequest struct {
|
||||
TargetID string `json:"target_id" binding:"required"`
|
||||
Task string `json:"task" binding:"required"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
}
|
||||
|
||||
// Delegate handles POST /workspaces/:id/delegate
|
||||
// Sends an A2A message to the target workspace in the background.
|
||||
// Returns immediately with a delegation_id.
|
||||
@ -42,97 +49,27 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
sourceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var body struct {
|
||||
TargetID string `json:"target_id" binding:"required"`
|
||||
Task string `json:"task" binding:"required"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Validate target_id is a valid UUID
|
||||
if _, err := uuid.Parse(body.TargetID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "target_id must be a valid UUID"})
|
||||
return
|
||||
var body delegateRequest
|
||||
if err := bindDelegateRequest(c, &body); err != nil {
|
||||
return // response already written
|
||||
}
|
||||
|
||||
// #124 — idempotency. If the caller supplies an idempotency_key, return
|
||||
// the existing delegation when (workspace_id, idempotency_key) already
|
||||
// exists and is not in a failed terminal state. Failed rows are NOT
|
||||
// reused so callers can retry after a real failure (the unique index
|
||||
// is on the column itself; we delete-then-insert below to release it).
|
||||
if body.IdempotencyKey != "" {
|
||||
var existingID, existingStatus, existingTarget string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status, target_id
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&existingID, &existingStatus, &existingTarget)
|
||||
if err == nil && existingID != "" {
|
||||
if existingStatus == "failed" {
|
||||
// Release the unique slot so the retry can take it. Bounded
|
||||
// to this exact row by id-of-key; safe.
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
DELETE FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2 AND status = 'failed'
|
||||
`, sourceID, body.IdempotencyKey)
|
||||
} else {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": existingID,
|
||||
"status": existingStatus,
|
||||
"target_id": existingTarget,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
// exists and is not in a failed terminal state.
|
||||
if hit := lookupIdempotentDelegation(ctx, c, sourceID, body.IdempotencyKey); hit {
|
||||
return
|
||||
}
|
||||
|
||||
delegationID := uuid.New().String()
|
||||
|
||||
// Store delegation in DB (request_body must be JSONB, include delegation_id for correlation)
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
trackingOK, handled := insertDelegationRow(ctx, c, sourceID, body, delegationID)
|
||||
if handled {
|
||||
return // idempotency-conflict response already written
|
||||
}
|
||||
var trackingOK bool
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
|
||||
if err != nil {
|
||||
// A unique-constraint hit means a concurrent request just took the
|
||||
// slot — rare, but worth surfacing as the same idempotent response
|
||||
// rather than a generic 500. Re-query to fetch the winner's id.
|
||||
if body.IdempotencyKey != "" {
|
||||
var winnerID, winnerStatus string
|
||||
if qerr := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&winnerID, &winnerStatus); qerr == nil && winnerID != "" {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": winnerID,
|
||||
"status": winnerStatus,
|
||||
"target_id": body.TargetID,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Printf("Delegation: failed to store: %v", err)
|
||||
} else {
|
||||
trackingOK = true
|
||||
}
|
||||
_ = trackingOK
|
||||
// trackingOK==false (and !handled) means insert failed for a non-
|
||||
// idempotency reason (logged); we still dispatch the A2A request and
|
||||
// surface the warning in the response.
|
||||
|
||||
// Build A2A payload
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
@ -166,6 +103,103 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
c.JSON(http.StatusAccepted, resp)
|
||||
}
|
||||
|
||||
// bindDelegateRequest binds and validates the JSON body. On error it writes
|
||||
// the 400 response and returns the error so the caller can return.
|
||||
func bindDelegateRequest(c *gin.Context, body *delegateRequest) error {
|
||||
if err := c.ShouldBindJSON(body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return err
|
||||
}
|
||||
if _, err := uuid.Parse(body.TargetID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "target_id must be a valid UUID"})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// lookupIdempotentDelegation returns true (and writes the response) when an
|
||||
// existing non-failed delegation matches the (sourceID, idempotencyKey) pair.
|
||||
// Failed rows are deleted to release the unique slot so the retry can take it.
|
||||
// Returns false when there's no key, no existing row, or the existing row was
|
||||
// failed and just deleted.
|
||||
func lookupIdempotentDelegation(ctx context.Context, c *gin.Context, sourceID, idempotencyKey string) bool {
|
||||
if idempotencyKey == "" {
|
||||
return false
|
||||
}
|
||||
var existingID, existingStatus, existingTarget string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status, target_id
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, idempotencyKey).Scan(&existingID, &existingStatus, &existingTarget)
|
||||
if err != nil || existingID == "" {
|
||||
return false
|
||||
}
|
||||
if existingStatus == "failed" {
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
DELETE FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2 AND status = 'failed'
|
||||
`, sourceID, idempotencyKey)
|
||||
return false
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": existingID,
|
||||
"status": existingStatus,
|
||||
"target_id": existingTarget,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
// insertDelegationRow stores the pending delegation row.
|
||||
// Returns (trackingOK, handled):
|
||||
// - (true, false) on success — caller continues with dispatch.
|
||||
// - (false, true) on a unique-constraint hit when a concurrent idempotent
|
||||
// request just took the slot; the winner's JSON response is written here
|
||||
// and the caller MUST return without further writes.
|
||||
// - (false, false) on any other DB failure — caller continues with dispatch
|
||||
// and surfaces a tracking-unavailable warning in the response.
|
||||
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) (bool, bool) {
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
}
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
|
||||
if err == nil {
|
||||
return true, false
|
||||
}
|
||||
// A unique-constraint hit means a concurrent request just took the
|
||||
// slot — rare, but worth surfacing as the same idempotent response
|
||||
// rather than a generic 500. Re-query to fetch the winner's id.
|
||||
if body.IdempotencyKey != "" {
|
||||
var winnerID, winnerStatus string
|
||||
if qerr := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&winnerID, &winnerStatus); qerr == nil && winnerID != "" {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": winnerID,
|
||||
"status": winnerStatus,
|
||||
"target_id": body.TargetID,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return false, true
|
||||
}
|
||||
}
|
||||
log.Printf("Delegation: failed to store: %v", err)
|
||||
return false, false
|
||||
}
|
||||
|
||||
// executeDelegation runs in a goroutine — sends A2A and stores the result.
|
||||
// Updates delegation status through: pending → dispatched → received → completed/failed
|
||||
// delegationRetryDelay is the pause between the first failed proxy attempt
|
||||
|
||||
@ -40,70 +40,30 @@ func (h *DiscoveryHandler) Discover(c *gin.Context) {
|
||||
return // response already written
|
||||
}
|
||||
|
||||
if callerID != "" {
|
||||
if !registry.CanCommunicate(callerID, targetID) {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not authorized to discover this workspace"})
|
||||
return
|
||||
}
|
||||
if !registry.CanCommunicate(callerID, targetID) {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not authorized to discover this workspace"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Workspace-to-workspace: return Docker-internal URL (containers can't reach host ports)
|
||||
// External workspaces: return their registered URL with host.docker.internal
|
||||
// Canvas/external: return host-accessible URL
|
||||
// Workspace-to-workspace: return Docker-internal URL (containers can't
|
||||
// reach host ports). External targets need their registered URL with
|
||||
// 127.0.0.1/localhost rewritten to host.docker.internal when the caller
|
||||
// is itself a Docker container.
|
||||
if callerID != "" {
|
||||
var wsName, wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(name,''), COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, targetID).Scan(&wsName, &wsRuntime)
|
||||
|
||||
// External workspaces: return their registered URL.
|
||||
// Rewrite 127.0.0.1/localhost → host.docker.internal ONLY when the
|
||||
// caller itself is a Docker container; a remote (external) caller
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if wsRuntime == "external" {
|
||||
var wsURL string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(url,'') FROM workspaces WHERE id = $1`, targetID).Scan(&wsURL)
|
||||
if wsURL != "" {
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if callerRuntime != "external" {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": outURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Try cached internal URL first
|
||||
if internalURL, err := db.GetCachedInternalURL(ctx, targetID); err == nil && internalURL != "" {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Fallback: only synthesize a URL if the workspace exists and is online/degraded
|
||||
var wsStatus string
|
||||
dbErr := db.DB.QueryRowContext(ctx,
|
||||
`SELECT status FROM workspaces WHERE id = $1`, targetID,
|
||||
).Scan(&wsStatus)
|
||||
if dbErr == nil && (wsStatus == "online" || wsStatus == "degraded") {
|
||||
internalURL := provisioner.InternalURL(targetID)
|
||||
if cacheErr := db.CacheInternalURL(ctx, targetID, internalURL); cacheErr != nil {
|
||||
log.Printf("Discovery: failed to cache internal URL for %s: %v", targetID, cacheErr)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Workspace is not reachable — don't fall through to host URL path
|
||||
if dbErr == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not available", "status": wsStatus})
|
||||
} else {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
}
|
||||
discoverWorkspacePeer(ctx, c, callerID, targetID)
|
||||
return
|
||||
}
|
||||
discoverHostPeer(ctx, c, targetID)
|
||||
}
|
||||
|
||||
// discoverHostPeer handles the canvas/external (no X-Workspace-ID) branch of
|
||||
// Discover. It returns the host-accessible URL for `targetID`, following any
|
||||
// forwarding chain (max 5 hops). Currently unreachable because Discover
|
||||
// requires the X-Workspace-ID header up front, but kept to preserve the
|
||||
// original code path 1:1 in case the requirement is relaxed.
|
||||
func discoverHostPeer(ctx context.Context, c *gin.Context, targetID string) {
|
||||
if url, err := db.GetCachedURL(ctx, targetID); err == nil {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": url})
|
||||
return
|
||||
@ -149,6 +109,72 @@ func (h *DiscoveryHandler) Discover(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// discoverWorkspacePeer handles the workspace-to-workspace branch of Discover —
|
||||
// resolves an internal/Docker-routable URL for `targetID` from the perspective
|
||||
// of `callerID` and writes the JSON response (or an appropriate 404/503 error).
|
||||
func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, targetID string) {
|
||||
var wsName, wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(name,''), COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, targetID).Scan(&wsName, &wsRuntime)
|
||||
|
||||
// External workspaces: return their registered URL.
|
||||
// Rewrite 127.0.0.1/localhost → host.docker.internal ONLY when the
|
||||
// caller itself is a Docker container; a remote (external) caller
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if wsRuntime == "external" {
|
||||
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Try cached internal URL first
|
||||
if internalURL, err := db.GetCachedInternalURL(ctx, targetID); err == nil && internalURL != "" {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Fallback: only synthesize a URL if the workspace exists and is online/degraded
|
||||
var wsStatus string
|
||||
dbErr := db.DB.QueryRowContext(ctx,
|
||||
`SELECT status FROM workspaces WHERE id = $1`, targetID,
|
||||
).Scan(&wsStatus)
|
||||
if dbErr == nil && (wsStatus == "online" || wsStatus == "degraded") {
|
||||
internalURL := provisioner.InternalURL(targetID)
|
||||
if cacheErr := db.CacheInternalURL(ctx, targetID, internalURL); cacheErr != nil {
|
||||
log.Printf("Discovery: failed to cache internal URL for %s: %v", targetID, cacheErr)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Workspace is not reachable — don't fall through to host URL path
|
||||
if dbErr == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not available", "status": wsStatus})
|
||||
} else {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
}
|
||||
}
|
||||
|
||||
// writeExternalWorkspaceURL resolves the registered URL for an external-runtime
|
||||
// target and writes the response. Returns true when a response was written
|
||||
// (URL present); returns false when the external workspace has no URL on
|
||||
// file, leaving the caller to fall through to the internal-URL path.
|
||||
func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, targetID, wsName string) bool {
|
||||
var wsURL string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(url,'') FROM workspaces WHERE id = $1`, targetID).Scan(&wsURL)
|
||||
if wsURL == "" {
|
||||
return false
|
||||
}
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if callerRuntime != "external" {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": outURL, "name": wsName})
|
||||
return true
|
||||
}
|
||||
|
||||
// Peers handles GET /registry/:id/peers
|
||||
func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user