chore: quality pass — native dialogs, env sync, Go handler splits
chore: quality pass — native dialogs, env sync, Go handler splits
This commit is contained in:
commit
3e1e46faa5
25
.env.example
25
.env.example
@ -12,6 +12,30 @@ PORT=8080
|
||||
SECRETS_ENCRYPTION_KEY= # 32-byte key (raw or base64). Leave empty for plaintext (dev only).
|
||||
CONFIGS_DIR= # Path to workspace-configs-templates/ (auto-discovered if empty)
|
||||
PLUGINS_DIR= # Path to plugins/ directory (default: /plugins in container)
|
||||
# PLATFORM_URL=http://host.docker.internal:8080 # URL agent containers use to reach the platform; injected into workspace env. Default derives from PORT.
|
||||
# MOLECULE_URL=http://localhost:8080 # Canonical MCP-client URL (mirrors PLATFORM_URL inside containers). Read by the MCP server (mcp-server/) and Molecule MCP tooling.
|
||||
# WORKSPACE_DIR= # Optional global host path bind-mounted to /workspace in every container. Per-workspace workspace_dir column overrides this; if neither is set each workspace gets an isolated Docker named volume.
|
||||
# MOLECULE_ENV=development # Environment label (development/staging/production). Used for log tagging and conditional behaviour.
|
||||
|
||||
# CORS / rate limiting
|
||||
# CORS_ORIGINS=http://localhost:3000,http://localhost:3001 # Comma-separated allowed origins for the HTTP API.
|
||||
# RATE_LIMIT=600 # Requests/minute per client (default 600).
|
||||
|
||||
# Activity retention
|
||||
# ACTIVITY_RETENTION_DAYS=7 # Days to keep rows in activity_logs before pruning.
|
||||
# ACTIVITY_CLEANUP_INTERVAL_HOURS=6 # How often the background pruner runs.
|
||||
|
||||
# Container/runtime detection
|
||||
# MOLECULE_IN_DOCKER= # Set when running the platform inside Docker (accepts 1/0, true/false). Triggers A2A proxy to rewrite 127.0.0.1:<port> agent URLs to Docker bridge hostnames. Auto-detected via /.dockerenv; only set if detection fails or to force off.
|
||||
|
||||
# Observability (Awareness)
|
||||
# AWARENESS_URL= # If set, injected into workspace containers along with a deterministic AWARENESS_NAMESPACE derived from workspace ID. Enables the cross-session memory MCP server.
|
||||
|
||||
# Webhooks
|
||||
# GITHUB_WEBHOOK_SECRET= # HMAC secret used to verify incoming GitHub webhook payloads at /webhooks/github.
|
||||
|
||||
# CLI clients
|
||||
# MOLECLI_URL=http://localhost:8080 # URL the molecli TUI uses to reach the platform.
|
||||
|
||||
# Plugin install safeguards (POST /workspaces/:id/plugins)
|
||||
# All three bound the cost of a single install so a slow/malicious
|
||||
@ -39,7 +63,6 @@ CEREBRAS_API_KEY= # Cerebras API key (cloud.cerebras.ai). Use with
|
||||
GOOGLE_API_KEY= # Google AI API key (aistudio.google.com). Use with model: google_genai:gemini-2.5-flash
|
||||
MAX_TOKENS=2048 # Max output tokens for OpenRouter requests (default: 2048)
|
||||
LANGGRAPH_RECURSION_LIMIT=500 # LangGraph/DeepAgents max ReAct steps per turn (lib default: 25; raised to 500 — PM fan-out to 6+ reports + synthesis routinely exceeds 100)
|
||||
MOLECULE_IN_DOCKER= # Set when running the platform inside Docker (accepts 1/0, true/false — anything strconv.ParseBool recognises). Triggers A2A proxy to rewrite 127.0.0.1:<port> agent URLs to Docker bridge hostnames. Auto-detected via /.dockerenv; only set if detection fails (e.g. Podman, custom runtimes) or to force off.
|
||||
MODEL_PROVIDER=anthropic:claude-sonnet-4-6 # Format: provider:model. Providers: anthropic, openai, openrouter, groq, cerebras, google_genai, ollama
|
||||
|
||||
# Social Channels (optional — configure per-workspace via API or Canvas)
|
||||
|
||||
@ -11,6 +11,9 @@ interface Props {
|
||||
confirmVariant?: "danger" | "primary" | "warning";
|
||||
onConfirm: () => void;
|
||||
onCancel: () => void;
|
||||
// Hide the Cancel button for single-action info toasts.
|
||||
// onCancel is still invoked on Esc / backdrop-click.
|
||||
singleButton?: boolean;
|
||||
}
|
||||
|
||||
export function ConfirmDialog({
|
||||
@ -21,6 +24,7 @@ export function ConfirmDialog({
|
||||
confirmVariant = "primary",
|
||||
onConfirm,
|
||||
onCancel,
|
||||
singleButton = false,
|
||||
}: Props) {
|
||||
const dialogRef = useRef<HTMLDivElement>(null);
|
||||
const [mounted, setMounted] = useState(false);
|
||||
@ -71,12 +75,14 @@ export function ConfirmDialog({
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-end gap-2 px-5 py-3 border-t border-zinc-800 bg-zinc-950/50">
|
||||
<button
|
||||
onClick={onCancel}
|
||||
className="px-3.5 py-1.5 text-[13px] text-zinc-400 hover:text-zinc-200 bg-zinc-800 hover:bg-zinc-700 border border-zinc-700 rounded-lg transition-colors"
|
||||
>
|
||||
Cancel
|
||||
</button>
|
||||
{!singleButton && (
|
||||
<button
|
||||
onClick={onCancel}
|
||||
className="px-3.5 py-1.5 text-[13px] text-zinc-400 hover:text-zinc-200 bg-zinc-800 hover:bg-zinc-700 border border-zinc-700 rounded-lg transition-colors"
|
||||
>
|
||||
Cancel
|
||||
</button>
|
||||
)}
|
||||
<button
|
||||
onClick={onConfirm}
|
||||
className={`px-3.5 py-1.5 text-[13px] rounded-lg transition-colors ${confirmColors}`}
|
||||
|
||||
@ -41,11 +41,10 @@ export class ErrorBoundary extends React.Component<
|
||||
};
|
||||
// Log the full report to console for collection by monitoring tools
|
||||
console.error("Error Report:", JSON.stringify(errorDetails, null, 2));
|
||||
// Copy error info to clipboard for manual reporting
|
||||
navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2)).then(
|
||||
() => window.alert("Error details copied to clipboard."),
|
||||
() => window.alert("Error details logged to console.")
|
||||
);
|
||||
// Copy error info to clipboard for manual reporting (button click is its
|
||||
// own affordance — no native alert needed). On clipboard failure the
|
||||
// console.error above still surfaces the report.
|
||||
void navigator.clipboard?.writeText(JSON.stringify(errorDetails, null, 2)).catch(() => {});
|
||||
};
|
||||
|
||||
render() {
|
||||
|
||||
@ -4,6 +4,7 @@ import { useState, useEffect, useCallback, useRef } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { checkDeploySecrets, type PreflightResult } from "@/lib/deploy-preflight";
|
||||
import { MissingKeysModal } from "./MissingKeysModal";
|
||||
import { ConfirmDialog } from "./ConfirmDialog";
|
||||
|
||||
interface Template {
|
||||
id: string;
|
||||
@ -144,6 +145,7 @@ const TIER_LABELS: Record<number, { label: string; color: string }> = {
|
||||
|
||||
function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
const [importing, setImporting] = useState(false);
|
||||
const [notice, setNotice] = useState<string | null>(null);
|
||||
const fileInputRef = useRef<HTMLInputElement>(null);
|
||||
|
||||
const handleFiles = async (fileList: FileList) => {
|
||||
@ -173,7 +175,7 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
}
|
||||
|
||||
if (Object.keys(files).length === 0) {
|
||||
alert("No files found in the selected folder");
|
||||
setNotice("No files found in the selected folder");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -181,7 +183,7 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
await api.post("/templates/import", { name, files });
|
||||
onImported();
|
||||
} catch (e) {
|
||||
alert(e instanceof Error ? e.message : "Import failed");
|
||||
setNotice(e instanceof Error ? e.message : "Import failed");
|
||||
} finally {
|
||||
setImporting(false);
|
||||
}
|
||||
@ -205,6 +207,16 @@ function ImportAgentButton({ onImported }: { onImported: () => void }) {
|
||||
>
|
||||
{importing ? "Importing..." : "Import Agent Folder"}
|
||||
</button>
|
||||
<ConfirmDialog
|
||||
open={!!notice}
|
||||
title="Import"
|
||||
message={notice ?? ""}
|
||||
confirmLabel="OK"
|
||||
confirmVariant="primary"
|
||||
singleButton
|
||||
onConfirm={() => setNotice(null)}
|
||||
onCancel={() => setNotice(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface ChannelAdapter {
|
||||
type: string;
|
||||
@ -39,6 +40,7 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [showForm, setShowForm] = useState(false);
|
||||
const [testing, setTesting] = useState<string | null>(null);
|
||||
const [pendingDelete, setPendingDelete] = useState<Channel | null>(null);
|
||||
|
||||
// Form state
|
||||
const [formType, setFormType] = useState("telegram");
|
||||
@ -146,8 +148,10 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
load();
|
||||
};
|
||||
|
||||
const handleDelete = async (ch: Channel) => {
|
||||
if (!confirm(`Delete ${ch.channel_type} channel?`)) return;
|
||||
const confirmDelete = async () => {
|
||||
if (!pendingDelete) return;
|
||||
const ch = pendingDelete;
|
||||
setPendingDelete(null);
|
||||
await api.del(`/workspaces/${workspaceId}/channels/${ch.id}`);
|
||||
load();
|
||||
};
|
||||
@ -338,7 +342,7 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
{ch.enabled ? "On" : "Off"}
|
||||
</button>
|
||||
<button
|
||||
onClick={() => handleDelete(ch)}
|
||||
onClick={() => setPendingDelete(ch)}
|
||||
className="text-[10px] px-2 py-0.5 rounded bg-red-900/20 text-red-400 hover:bg-red-900/40 transition"
|
||||
>
|
||||
Remove
|
||||
@ -354,6 +358,16 @@ export function ChannelsTab({ workspaceId }: Props) {
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
|
||||
<ConfirmDialog
|
||||
open={!!pendingDelete}
|
||||
title="Remove channel"
|
||||
message={`Delete ${pendingDelete?.channel_type ?? ""} channel? This will stop messages flowing through this integration.`}
|
||||
confirmLabel="Remove"
|
||||
confirmVariant="danger"
|
||||
onConfirm={confirmDelete}
|
||||
onCancel={() => setPendingDelete(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import { type ChatMessage, createMessage } from "./chat/types";
|
||||
import { extractResponseText, extractRequestText } from "./chat/message-parser";
|
||||
import { AgentCommsPanel } from "./chat/AgentCommsPanel";
|
||||
import { runtimeDisplayName } from "@/lib/runtime-names";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface Props {
|
||||
workspaceId: string;
|
||||
@ -145,6 +146,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
const sendingFromAPIRef = useRef(false);
|
||||
const [agentReachable, setAgentReachable] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [confirmRestart, setConfirmRestart] = useState(false);
|
||||
const bottomRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// Load chat history from database on mount
|
||||
@ -392,11 +394,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
<span className="text-[10px] text-red-400">{error}</span>
|
||||
{!isOnline && (
|
||||
<button
|
||||
onClick={() => {
|
||||
if (confirm("Restart this workspace?")) {
|
||||
useCanvasStore.getState().restartWorkspace(workspaceId);
|
||||
}
|
||||
}}
|
||||
onClick={() => setConfirmRestart(true)}
|
||||
className="text-[9px] px-2 py-0.5 bg-red-800/40 text-red-300 rounded hover:bg-red-700/50"
|
||||
>
|
||||
Restart
|
||||
@ -432,6 +430,19 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<ConfirmDialog
|
||||
open={confirmRestart}
|
||||
title="Restart workspace"
|
||||
message="Restart this workspace? The agent container will be stopped and re-provisioned."
|
||||
confirmLabel="Restart"
|
||||
confirmVariant="warning"
|
||||
onConfirm={() => {
|
||||
useCanvasStore.getState().restartWorkspace(workspaceId);
|
||||
setConfirmRestart(false);
|
||||
}}
|
||||
onCancel={() => setConfirmRestart(false)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { api } from "@/lib/api";
|
||||
import { ConfirmDialog } from "@/components/ConfirmDialog";
|
||||
|
||||
interface Schedule {
|
||||
id: string;
|
||||
@ -64,6 +65,7 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
const [formPrompt, setFormPrompt] = useState("");
|
||||
const [formEnabled, setFormEnabled] = useState(true);
|
||||
const [error, setError] = useState("");
|
||||
const [pendingDelete, setPendingDelete] = useState<{ id: string; name: string } | null>(null);
|
||||
|
||||
const fetchSchedules = useCallback(async () => {
|
||||
try {
|
||||
@ -120,8 +122,10 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
}
|
||||
};
|
||||
|
||||
const handleDelete = async (id: string, name: string) => {
|
||||
if (!window.confirm(`Delete schedule "${name || "Unnamed"}"? This cannot be undone.`)) return;
|
||||
const confirmDelete = async () => {
|
||||
if (!pendingDelete) return;
|
||||
const { id } = pendingDelete;
|
||||
setPendingDelete(null);
|
||||
await api.del(`/workspaces/${workspaceId}/schedules/${id}`);
|
||||
fetchSchedules();
|
||||
};
|
||||
@ -343,7 +347,7 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
✎
|
||||
</button>
|
||||
<button
|
||||
onClick={() => handleDelete(sched.id, sched.name)}
|
||||
onClick={() => setPendingDelete({ id: sched.id, name: sched.name })}
|
||||
className="text-[8px] px-1.5 py-0.5 text-red-400 hover:bg-red-600/20 rounded transition-colors"
|
||||
title="Delete"
|
||||
>
|
||||
@ -355,6 +359,16 @@ export function ScheduleTab({ workspaceId }: Props) {
|
||||
))
|
||||
)}
|
||||
</div>
|
||||
|
||||
<ConfirmDialog
|
||||
open={!!pendingDelete}
|
||||
title="Delete schedule"
|
||||
message={`Delete schedule "${pendingDelete?.name || "Unnamed"}"? This cannot be undone.`}
|
||||
confirmLabel="Delete"
|
||||
confirmVariant="danger"
|
||||
onConfirm={confirmDelete}
|
||||
onCancel={() => setPendingDelete(null)}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@ -217,7 +217,49 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve workspace URL (cache first, then DB)
|
||||
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
normalizedBody, a2aMethod, proxyErr := normalizeA2APayload(body)
|
||||
if proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
body = normalizedBody
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
defer cancelFwd()
|
||||
}
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to read agent response"},
|
||||
}
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
}
|
||||
return resp.StatusCode, respBody, nil
|
||||
}
|
||||
|
||||
// resolveAgentURL returns a routable URL for the target workspace agent. It
|
||||
// checks the Redis URL cache first, then falls back to a DB lookup, caching
|
||||
// the result on success. When the platform runs inside Docker, 127.0.0.1:<host
|
||||
// port> is rewritten to the container's Docker-bridge hostname (host-side
|
||||
// platforms keep the original URL because the bridge name wouldn't resolve).
|
||||
func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID string) (string, *proxyA2AError) {
|
||||
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
||||
if err != nil {
|
||||
var urlNullable sql.NullString
|
||||
@ -226,20 +268,20 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
`SELECT url, status FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&urlNullable, &status)
|
||||
if err == sql.ErrNoRows {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusNotFound,
|
||||
Response: gin.H{"error": "workspace not found"},
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("ProxyA2A lookup error: %v", err)
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "lookup failed"},
|
||||
}
|
||||
}
|
||||
if !urlNullable.Valid || urlNullable.String == "" {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace has no URL", "status": status},
|
||||
}
|
||||
@ -251,19 +293,20 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// When the platform runs inside Docker, 127.0.0.1:{host_port} is
|
||||
// unreachable (it's the platform container's own localhost, not the
|
||||
// Docker host). Rewrite to the container's Docker-bridge hostname.
|
||||
//
|
||||
// But ONLY when we're actually inside Docker. If the platform runs
|
||||
// on the host (the default dev setup via infra/scripts/setup.sh),
|
||||
// 127.0.0.1:<ephemeral> IS the reachable URL and the container
|
||||
// hostname wouldn't resolve.
|
||||
if strings.HasPrefix(agentURL, "http://127.0.0.1:") && h.provisioner != nil && platformInDocker {
|
||||
agentURL = provisioner.InternalURL(workspaceID)
|
||||
}
|
||||
return agentURL, nil
|
||||
}
|
||||
|
||||
// Normalize the request into a valid A2A JSON-RPC 2.0 message
|
||||
// normalizeA2APayload parses the incoming body, wraps it in a JSON-RPC 2.0
|
||||
// envelope if absent, ensures params.message.messageId is set, and re-marshals
|
||||
// to bytes. Also returns the A2A method name (for logging) extracted from the
|
||||
// payload.
|
||||
func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
var payload map[string]interface{}
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusBadRequest,
|
||||
Response: gin.H{"error": "invalid JSON"},
|
||||
}
|
||||
@ -290,175 +333,194 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
|
||||
marshaledBody, marshalErr := json.Marshal(payload)
|
||||
if marshalErr != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
return nil, "", &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "failed to marshal request"},
|
||||
}
|
||||
}
|
||||
body = marshaledBody
|
||||
|
||||
// Extract method for logging
|
||||
var a2aMethod string
|
||||
if m, ok := payload["method"].(string); ok {
|
||||
a2aMethod = m
|
||||
}
|
||||
return marshaledBody, a2aMethod, nil
|
||||
}
|
||||
|
||||
// Forward to the agent. Uses WithoutCancel so delegation chains survive client
|
||||
// disconnect (browser tab close).
|
||||
// Default timeouts: canvas = 5 min, agent-to-agent = 30 min.
|
||||
// Callers can override via X-Timeout header (handled in ProxyA2A handler above).
|
||||
startTime := time.Now()
|
||||
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
|
||||
// chains survive client disconnect (browser tab close). Default timeouts:
|
||||
// canvas (callerID == "") = 5 min, agent-to-agent = 30 min. Callers can
|
||||
// override via the X-Timeout header (applied to ctx upstream in ProxyA2A).
|
||||
func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, agentURL string, body []byte, callerID string) (*http.Response, context.CancelFunc, error) {
|
||||
forwardCtx := context.WithoutCancel(ctx)
|
||||
var cancel context.CancelFunc
|
||||
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
|
||||
// No caller-specified deadline — apply defaults
|
||||
if callerID == "" {
|
||||
var cancel context.CancelFunc
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 5*time.Minute)
|
||||
defer cancel()
|
||||
} else {
|
||||
var cancel context.CancelFunc
|
||||
forwardCtx, cancel = context.WithTimeout(forwardCtx, 30*time.Minute)
|
||||
defer cancel()
|
||||
}
|
||||
}
|
||||
req, err := http.NewRequestWithContext(forwardCtx, "POST", agentURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
// Wrap the construction failure so the caller can distinguish it
|
||||
// from an upstream Do() error and produce the correct 500 response.
|
||||
return nil, nil, &proxyDispatchBuildError{err: err}
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, doErr := a2aClient.Do(req)
|
||||
return resp, cancel, doErr
|
||||
}
|
||||
|
||||
// proxyDispatchBuildError is a sentinel wrapper for failures inside
|
||||
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
|
||||
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
|
||||
type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and (when `logActivity` is true)
|
||||
// schedules a detached LogActivity goroutine for the failed attempt.
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
// Build-time failure (couldn't even create the http.Request) — return
|
||||
// a 500 without the reactive-health / busy-retry paths.
|
||||
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
|
||||
_ = buildErr
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusInternalServerError,
|
||||
Response: gin.H{"error": "failed to create proxy request"},
|
||||
}
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := a2aClient.Do(req)
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
log.Printf("ProxyA2A forward error: %v", err)
|
||||
log.Printf("ProxyA2A forward error: %v", err)
|
||||
|
||||
// Reactive health check: if the request failed, check if the container is actually dead.
|
||||
// Skip for external workspaces (no Docker container).
|
||||
containerDead := false
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner != nil && wsRuntime != "external" {
|
||||
if running, _ := h.provisioner.IsRunning(ctx, workspaceID); !running {
|
||||
containerDead = true
|
||||
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
|
||||
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
|
||||
go h.RestartByID(workspaceID)
|
||||
}
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
// Log failed A2A attempt (detached context — request may be done)
|
||||
errMsg := err.Error()
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
errWsName = workspaceID
|
||||
}
|
||||
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
DurationMs: &durationMs,
|
||||
Status: "error",
|
||||
ErrorDetail: &errMsg,
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
||||
}
|
||||
}
|
||||
// Container is alive but upstream Do() failed with a timeout/EOF-
|
||||
// shaped error — the agent is most likely mid-synthesis on a
|
||||
// previous request (single-threaded main loop). Surface as 503
|
||||
// Busy with a Retry-After hint so callers can distinguish this
|
||||
// from a real unreachable-agent (502) and retry with backoff.
|
||||
// Issue #110.
|
||||
if isUpstreamBusyError(err) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
Response: gin.H{
|
||||
"error": "workspace agent busy — retry after a short backoff",
|
||||
"busy": true,
|
||||
"retry_after": busyRetryAfterSeconds,
|
||||
},
|
||||
}
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to reach workspace agent"},
|
||||
}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if err != nil {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to read agent response"},
|
||||
}
|
||||
}
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
|
||||
if logActivity {
|
||||
// Log successful A2A communication
|
||||
logStatus := "ok"
|
||||
if resp.StatusCode >= 400 {
|
||||
logStatus = "error"
|
||||
}
|
||||
// Resolve workspace name for readable summary
|
||||
var wsNameForLog string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
|
||||
if wsNameForLog == "" {
|
||||
wsNameForLog = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
}(ctx)
|
||||
|
||||
// For canvas-initiated requests, broadcast the response via WebSocket
|
||||
// so the frontend receives it instantly without polling.
|
||||
if callerID == "" && resp.StatusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
||||
}
|
||||
}
|
||||
return resp.StatusCode, respBody, nil
|
||||
// Container is alive but upstream Do() failed with a timeout/EOF-
|
||||
// shaped error — the agent is most likely mid-synthesis on a
|
||||
// previous request (single-threaded main loop). Surface as 503
|
||||
// Busy with a Retry-After hint so callers can distinguish this
|
||||
// from a real unreachable-agent (502) and retry with backoff.
|
||||
// Issue #110.
|
||||
if isUpstreamBusyError(err) {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
Response: gin.H{
|
||||
"error": "workspace agent busy — retry after a short backoff",
|
||||
"busy": true,
|
||||
"retry_after": busyRetryAfterSeconds,
|
||||
},
|
||||
}
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to reach workspace agent"},
|
||||
}
|
||||
}
|
||||
|
||||
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
||||
// If the workspace's Docker container is no longer running (and the workspace
|
||||
// isn't external), it marks the workspace offline, clears Redis state,
|
||||
// broadcasts WORKSPACE_OFFLINE, and triggers an async restart. Returns true
|
||||
// when the container was found dead.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if h.provisioner == nil || wsRuntime == "external" {
|
||||
return false
|
||||
}
|
||||
if running, _ := h.provisioner.IsRunning(ctx, workspaceID); running {
|
||||
return false
|
||||
}
|
||||
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'offline', updated_at = now() WHERE id = $1 AND status NOT IN ('removed', 'provisioning')`, workspaceID); err != nil {
|
||||
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
||||
}
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_OFFLINE", workspaceID, map[string]interface{}{})
|
||||
go h.RestartByID(workspaceID)
|
||||
return true
|
||||
}
|
||||
|
||||
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
||||
// goroutine (the request context may already be done by the time it runs).
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
||||
errMsg := err.Error()
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
errWsName = workspaceID
|
||||
}
|
||||
summary := "A2A request to " + errWsName + " failed: " + errMsg
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
DurationMs: &durationMs,
|
||||
Status: "error",
|
||||
ErrorDetail: &errMsg,
|
||||
})
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
}
|
||||
var wsNameForLog string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsNameForLog)
|
||||
if wsNameForLog == "" {
|
||||
wsNameForLog = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
go func(parent context.Context) {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
}(ctx)
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "A2A_RESPONSE", map[string]interface{}{
|
||||
"response_body": json.RawMessage(respBody),
|
||||
"method": a2aMethod,
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) *string {
|
||||
|
||||
@ -733,3 +733,83 @@ func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
|
||||
t.Errorf("expected 401, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Direct unit tests for normalizeA2APayload (extracted from proxyA2ARequest) ---
|
||||
|
||||
func TestNormalizeA2APayload_InvalidJSON(t *testing.T) {
|
||||
_, _, perr := normalizeA2APayload([]byte("not json"))
|
||||
if perr == nil {
|
||||
t.Fatal("expected error for invalid JSON, got nil")
|
||||
}
|
||||
if perr.Status != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d", perr.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_WrapsBareMessage(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"role":"user","parts":[{"type":"text","text":"hi"}]}}}`)
|
||||
out, method, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
if method != "message/send" {
|
||||
t.Errorf("expected method=message/send, got %q", method)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
t.Fatalf("output is not valid JSON: %v", err)
|
||||
}
|
||||
if parsed["jsonrpc"] != "2.0" {
|
||||
t.Errorf("expected jsonrpc=2.0 wrapper, got %v", parsed["jsonrpc"])
|
||||
}
|
||||
if parsed["id"] == nil || parsed["id"] == "" {
|
||||
t.Error("expected generated id, got empty")
|
||||
}
|
||||
params := parsed["params"].(map[string]interface{})
|
||||
msg := params["message"].(map[string]interface{})
|
||||
if msg["messageId"] == nil || msg["messageId"] == "" {
|
||||
t.Error("expected messageId injected, got empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_PreservesExistingJSONRPC(t *testing.T) {
|
||||
raw := []byte(`{"jsonrpc":"2.0","id":"custom-id","method":"tasks/list","params":{}}`)
|
||||
out, method, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
if method != "tasks/list" {
|
||||
t.Errorf("expected method=tasks/list, got %q", method)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
_ = json.Unmarshal(out, &parsed)
|
||||
if parsed["id"] != "custom-id" {
|
||||
t.Errorf("existing id overwritten: got %v", parsed["id"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_PreservesExistingMessageId(t *testing.T) {
|
||||
raw := []byte(`{"method":"message/send","params":{"message":{"messageId":"fixed-mid","role":"user","parts":[]}}}`)
|
||||
out, _, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
_ = json.Unmarshal(out, &parsed)
|
||||
params := parsed["params"].(map[string]interface{})
|
||||
msg := params["message"].(map[string]interface{})
|
||||
if msg["messageId"] != "fixed-mid" {
|
||||
t.Errorf("existing messageId overwritten: got %v", msg["messageId"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_MissingMethodReturnsEmpty(t *testing.T) {
|
||||
raw := []byte(`{"params":{"message":{"role":"user"}}}`)
|
||||
_, method, perr := normalizeA2APayload(raw)
|
||||
if perr != nil {
|
||||
t.Fatalf("unexpected error: %+v", perr)
|
||||
}
|
||||
if method != "" {
|
||||
t.Errorf("expected empty method, got %q", method)
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,17 +118,44 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
// It searches the workspace's own activity logs and memories without adding a new storage layer.
|
||||
func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
query := strings.TrimSpace(c.DefaultQuery("q", ""))
|
||||
limitStr := c.DefaultQuery("limit", "50")
|
||||
query, limit := parseSessionSearchParams(c)
|
||||
|
||||
sqlQuery, args := buildSessionSearchQuery(workspaceID, query, limit)
|
||||
|
||||
rows, err := db.DB.QueryContext(c.Request.Context(), sqlQuery, args...)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "session search failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
items, scanErr := scanSessionSearchRows(rows)
|
||||
if scanErr != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query iteration failed"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, items)
|
||||
}
|
||||
|
||||
// parseSessionSearchParams extracts the `q` and `limit` query params for SessionSearch,
|
||||
// applying the default limit (50) and cap (200).
|
||||
func parseSessionSearchParams(c *gin.Context) (string, int) {
|
||||
query := strings.TrimSpace(c.DefaultQuery("q", ""))
|
||||
limit := 50
|
||||
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
|
||||
if n, err := strconv.Atoi(c.DefaultQuery("limit", "50")); err == nil && n > 0 {
|
||||
limit = n
|
||||
if limit > 200 {
|
||||
limit = 200
|
||||
}
|
||||
}
|
||||
return query, limit
|
||||
}
|
||||
|
||||
// buildSessionSearchQuery composes the UNION-ALL SQL across activity_logs and
|
||||
// agent_memories with an optional ILIKE filter, returning the SQL string and
|
||||
// positional args ready for QueryContext.
|
||||
func buildSessionSearchQuery(workspaceID, query string, limit int) (string, []interface{}) {
|
||||
sqlQuery := `
|
||||
WITH session_items AS (
|
||||
SELECT
|
||||
@ -179,14 +206,17 @@ func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
|
||||
sqlQuery += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(len(args)+1)
|
||||
args = append(args, limit)
|
||||
return sqlQuery, args
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(c.Request.Context(), sqlQuery, args...)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "session search failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// scanSessionSearchRows materialises rows from the SessionSearch query into the
|
||||
// JSON-shaped maps the endpoint returns. Per-row scan errors are logged and
|
||||
// skipped (matches prior behavior); a rows.Err() failure is surfaced.
|
||||
func scanSessionSearchRows(rows interface {
|
||||
Next() bool
|
||||
Scan(dest ...interface{}) error
|
||||
Err() error
|
||||
}) ([]map[string]interface{}, error) {
|
||||
items := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
var (
|
||||
@ -219,11 +249,9 @@ func (h *ActivityHandler) SessionSearch(c *gin.Context) {
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Session search rows error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query iteration failed"})
|
||||
return
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, items)
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// Notify handles POST /workspaces/:id/notify — agents push messages to the canvas chat.
|
||||
|
||||
@ -35,6 +35,13 @@ func NewDelegationHandler(wh *WorkspaceHandler, b *events.Broadcaster) *Delegati
|
||||
return &DelegationHandler{workspace: wh, broadcaster: b}
|
||||
}
|
||||
|
||||
// delegateRequest is the bound POST /workspaces/:id/delegate body.
|
||||
type delegateRequest struct {
|
||||
TargetID string `json:"target_id" binding:"required"`
|
||||
Task string `json:"task" binding:"required"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
}
|
||||
|
||||
// Delegate handles POST /workspaces/:id/delegate
|
||||
// Sends an A2A message to the target workspace in the background.
|
||||
// Returns immediately with a delegation_id.
|
||||
@ -42,97 +49,27 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
sourceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var body struct {
|
||||
TargetID string `json:"target_id" binding:"required"`
|
||||
Task string `json:"task" binding:"required"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Validate target_id is a valid UUID
|
||||
if _, err := uuid.Parse(body.TargetID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "target_id must be a valid UUID"})
|
||||
return
|
||||
var body delegateRequest
|
||||
if err := bindDelegateRequest(c, &body); err != nil {
|
||||
return // response already written
|
||||
}
|
||||
|
||||
// #124 — idempotency. If the caller supplies an idempotency_key, return
|
||||
// the existing delegation when (workspace_id, idempotency_key) already
|
||||
// exists and is not in a failed terminal state. Failed rows are NOT
|
||||
// reused so callers can retry after a real failure (the unique index
|
||||
// is on the column itself; we delete-then-insert below to release it).
|
||||
if body.IdempotencyKey != "" {
|
||||
var existingID, existingStatus, existingTarget string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status, target_id
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&existingID, &existingStatus, &existingTarget)
|
||||
if err == nil && existingID != "" {
|
||||
if existingStatus == "failed" {
|
||||
// Release the unique slot so the retry can take it. Bounded
|
||||
// to this exact row by id-of-key; safe.
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
DELETE FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2 AND status = 'failed'
|
||||
`, sourceID, body.IdempotencyKey)
|
||||
} else {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": existingID,
|
||||
"status": existingStatus,
|
||||
"target_id": existingTarget,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
// exists and is not in a failed terminal state.
|
||||
if hit := lookupIdempotentDelegation(ctx, c, sourceID, body.IdempotencyKey); hit {
|
||||
return
|
||||
}
|
||||
|
||||
delegationID := uuid.New().String()
|
||||
|
||||
// Store delegation in DB (request_body must be JSONB, include delegation_id for correlation)
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
outcome := insertDelegationRow(ctx, c, sourceID, body, delegationID)
|
||||
if outcome == insertHandledByIdempotent {
|
||||
return // idempotency-conflict response already written
|
||||
}
|
||||
var trackingOK bool
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
|
||||
if err != nil {
|
||||
// A unique-constraint hit means a concurrent request just took the
|
||||
// slot — rare, but worth surfacing as the same idempotent response
|
||||
// rather than a generic 500. Re-query to fetch the winner's id.
|
||||
if body.IdempotencyKey != "" {
|
||||
var winnerID, winnerStatus string
|
||||
if qerr := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&winnerID, &winnerStatus); qerr == nil && winnerID != "" {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": winnerID,
|
||||
"status": winnerStatus,
|
||||
"target_id": body.TargetID,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Printf("Delegation: failed to store: %v", err)
|
||||
} else {
|
||||
trackingOK = true
|
||||
}
|
||||
_ = trackingOK
|
||||
// insertTrackingUnavailable means insert failed for a non-idempotency
|
||||
// reason (logged); we still dispatch the A2A request and surface the
|
||||
// warning in the response.
|
||||
|
||||
// Build A2A payload
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
@ -160,12 +97,122 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
"status": "delegated",
|
||||
"target_id": body.TargetID,
|
||||
}
|
||||
if !trackingOK {
|
||||
if outcome == insertTrackingUnavailable {
|
||||
resp["warning"] = "delegation dispatched but status tracking unavailable"
|
||||
}
|
||||
c.JSON(http.StatusAccepted, resp)
|
||||
}
|
||||
|
||||
// bindDelegateRequest binds and validates the JSON body. On error it writes
|
||||
// the 400 response and returns the error so the caller can return.
|
||||
func bindDelegateRequest(c *gin.Context, body *delegateRequest) error {
|
||||
if err := c.ShouldBindJSON(body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return err
|
||||
}
|
||||
if _, err := uuid.Parse(body.TargetID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "target_id must be a valid UUID"})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// lookupIdempotentDelegation returns true (and writes the response) when an
|
||||
// existing non-failed delegation matches the (sourceID, idempotencyKey) pair.
|
||||
// Failed rows are deleted to release the unique slot so the retry can take it.
|
||||
// Returns false when there's no key, no existing row, or the existing row was
|
||||
// failed and just deleted.
|
||||
func lookupIdempotentDelegation(ctx context.Context, c *gin.Context, sourceID, idempotencyKey string) bool {
|
||||
if idempotencyKey == "" {
|
||||
return false
|
||||
}
|
||||
var existingID, existingStatus, existingTarget string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status, target_id
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, idempotencyKey).Scan(&existingID, &existingStatus, &existingTarget)
|
||||
if err != nil || existingID == "" {
|
||||
return false
|
||||
}
|
||||
if existingStatus == "failed" {
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
DELETE FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2 AND status = 'failed'
|
||||
`, sourceID, idempotencyKey)
|
||||
return false
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": existingID,
|
||||
"status": existingStatus,
|
||||
"target_id": existingTarget,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return true
|
||||
}
|
||||
|
||||
// insertDelegationOutcome captures the three distinct results of storing
|
||||
// the pending delegation row, so callers never have to decode a positional
|
||||
// (bool, bool) tuple.
|
||||
type insertDelegationOutcome int
|
||||
|
||||
const (
|
||||
// insertOK — row stored; caller continues with dispatch and does NOT
|
||||
// surface a tracking warning.
|
||||
insertOK insertDelegationOutcome = iota
|
||||
// insertHandledByIdempotent — a concurrent idempotent request took the
|
||||
// slot; the winner's JSON response is already written and the caller
|
||||
// MUST return without further writes.
|
||||
insertHandledByIdempotent
|
||||
// insertTrackingUnavailable — insert failed for a non-idempotency
|
||||
// reason (logged by this function); caller continues with dispatch
|
||||
// and surfaces a tracking-unavailable warning in the response.
|
||||
insertTrackingUnavailable
|
||||
)
|
||||
|
||||
// insertDelegationRow stores the pending delegation row. See
|
||||
// insertDelegationOutcome for the three possible return values.
|
||||
func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, body delegateRequest, delegationID string) insertDelegationOutcome {
|
||||
taskJSON, _ := json.Marshal(map[string]interface{}{
|
||||
"task": body.Task,
|
||||
"delegation_id": delegationID,
|
||||
})
|
||||
var idemArg interface{}
|
||||
if body.IdempotencyKey != "" {
|
||||
idemArg = body.IdempotencyKey
|
||||
}
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
|
||||
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
|
||||
if err == nil {
|
||||
return insertOK
|
||||
}
|
||||
// A unique-constraint hit means a concurrent request just took the
|
||||
// slot — rare, but worth surfacing as the same idempotent response
|
||||
// rather than a generic 500. Re-query to fetch the winner's id.
|
||||
if body.IdempotencyKey != "" {
|
||||
var winnerID, winnerStatus string
|
||||
if qerr := db.DB.QueryRowContext(ctx, `
|
||||
SELECT request_body->>'delegation_id', status
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND idempotency_key = $2
|
||||
LIMIT 1
|
||||
`, sourceID, body.IdempotencyKey).Scan(&winnerID, &winnerStatus); qerr == nil && winnerID != "" {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"delegation_id": winnerID,
|
||||
"status": winnerStatus,
|
||||
"target_id": body.TargetID,
|
||||
"idempotent_hit": true,
|
||||
})
|
||||
return insertHandledByIdempotent
|
||||
}
|
||||
}
|
||||
log.Printf("Delegation: failed to store: %v", err)
|
||||
return insertTrackingUnavailable
|
||||
}
|
||||
|
||||
// executeDelegation runs in a goroutine — sends A2A and stores the result.
|
||||
// Updates delegation status through: pending → dispatched → received → completed/failed
|
||||
// delegationRetryDelay is the pause between the first failed proxy attempt
|
||||
|
||||
@ -40,70 +40,30 @@ func (h *DiscoveryHandler) Discover(c *gin.Context) {
|
||||
return // response already written
|
||||
}
|
||||
|
||||
if callerID != "" {
|
||||
if !registry.CanCommunicate(callerID, targetID) {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not authorized to discover this workspace"})
|
||||
return
|
||||
}
|
||||
if !registry.CanCommunicate(callerID, targetID) {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "not authorized to discover this workspace"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Workspace-to-workspace: return Docker-internal URL (containers can't reach host ports)
|
||||
// External workspaces: return their registered URL with host.docker.internal
|
||||
// Canvas/external: return host-accessible URL
|
||||
// Workspace-to-workspace: return Docker-internal URL (containers can't
|
||||
// reach host ports). External targets need their registered URL with
|
||||
// 127.0.0.1/localhost rewritten to host.docker.internal when the caller
|
||||
// is itself a Docker container.
|
||||
if callerID != "" {
|
||||
var wsName, wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(name,''), COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, targetID).Scan(&wsName, &wsRuntime)
|
||||
|
||||
// External workspaces: return their registered URL.
|
||||
// Rewrite 127.0.0.1/localhost → host.docker.internal ONLY when the
|
||||
// caller itself is a Docker container; a remote (external) caller
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if wsRuntime == "external" {
|
||||
var wsURL string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(url,'') FROM workspaces WHERE id = $1`, targetID).Scan(&wsURL)
|
||||
if wsURL != "" {
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if callerRuntime != "external" {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": outURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Try cached internal URL first
|
||||
if internalURL, err := db.GetCachedInternalURL(ctx, targetID); err == nil && internalURL != "" {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Fallback: only synthesize a URL if the workspace exists and is online/degraded
|
||||
var wsStatus string
|
||||
dbErr := db.DB.QueryRowContext(ctx,
|
||||
`SELECT status FROM workspaces WHERE id = $1`, targetID,
|
||||
).Scan(&wsStatus)
|
||||
if dbErr == nil && (wsStatus == "online" || wsStatus == "degraded") {
|
||||
internalURL := provisioner.InternalURL(targetID)
|
||||
if cacheErr := db.CacheInternalURL(ctx, targetID, internalURL); cacheErr != nil {
|
||||
log.Printf("Discovery: failed to cache internal URL for %s: %v", targetID, cacheErr)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Workspace is not reachable — don't fall through to host URL path
|
||||
if dbErr == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not available", "status": wsStatus})
|
||||
} else {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
}
|
||||
discoverWorkspacePeer(ctx, c, callerID, targetID)
|
||||
return
|
||||
}
|
||||
discoverHostPeer(ctx, c, targetID)
|
||||
}
|
||||
|
||||
// discoverHostPeer handles the canvas/external (no X-Workspace-ID) branch of
|
||||
// Discover. It returns the host-accessible URL for `targetID`, following any
|
||||
// forwarding chain (max 5 hops). Currently unreachable because Discover
|
||||
// requires the X-Workspace-ID header up front, but kept to preserve the
|
||||
// original code path 1:1 in case the requirement is relaxed.
|
||||
func discoverHostPeer(ctx context.Context, c *gin.Context, targetID string) {
|
||||
if url, err := db.GetCachedURL(ctx, targetID); err == nil {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": url})
|
||||
return
|
||||
@ -149,6 +109,72 @@ func (h *DiscoveryHandler) Discover(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// discoverWorkspacePeer handles the workspace-to-workspace branch of Discover —
|
||||
// resolves an internal/Docker-routable URL for `targetID` from the perspective
|
||||
// of `callerID` and writes the JSON response (or an appropriate 404/503 error).
|
||||
func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, targetID string) {
|
||||
var wsName, wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(name,''), COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, targetID).Scan(&wsName, &wsRuntime)
|
||||
|
||||
// External workspaces: return their registered URL.
|
||||
// Rewrite 127.0.0.1/localhost → host.docker.internal ONLY when the
|
||||
// caller itself is a Docker container; a remote (external) caller
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if wsRuntime == "external" {
|
||||
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Try cached internal URL first
|
||||
if internalURL, err := db.GetCachedInternalURL(ctx, targetID); err == nil && internalURL != "" {
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Fallback: only synthesize a URL if the workspace exists and is online/degraded
|
||||
var wsStatus string
|
||||
dbErr := db.DB.QueryRowContext(ctx,
|
||||
`SELECT status FROM workspaces WHERE id = $1`, targetID,
|
||||
).Scan(&wsStatus)
|
||||
if dbErr == nil && (wsStatus == "online" || wsStatus == "degraded") {
|
||||
internalURL := provisioner.InternalURL(targetID)
|
||||
if cacheErr := db.CacheInternalURL(ctx, targetID, internalURL); cacheErr != nil {
|
||||
log.Printf("Discovery: failed to cache internal URL for %s: %v", targetID, cacheErr)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": internalURL, "name": wsName})
|
||||
return
|
||||
}
|
||||
// Workspace is not reachable — don't fall through to host URL path
|
||||
if dbErr == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "workspace not available", "status": wsStatus})
|
||||
} else {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
}
|
||||
}
|
||||
|
||||
// writeExternalWorkspaceURL resolves the registered URL for an external-runtime
|
||||
// target and writes the response. Returns true when a response was written
|
||||
// (URL present); returns false when the external workspace has no URL on
|
||||
// file, leaving the caller to fall through to the internal-URL path.
|
||||
func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, targetID, wsName string) bool {
|
||||
var wsURL string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(url,'') FROM workspaces WHERE id = $1`, targetID).Scan(&wsURL)
|
||||
if wsURL == "" {
|
||||
return false
|
||||
}
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if callerRuntime != "external" {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"id": targetID, "url": outURL, "name": wsName})
|
||||
return true
|
||||
}
|
||||
|
||||
// Peers handles GET /registry/:id/peers
|
||||
func (h *DiscoveryHandler) Peers(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user