Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 013c8cfe58 | |||
| d20147300b | |||
| 4d32736e25 | |||
| 691d341fbb | |||
| ef42e17224 | |||
| b13c9f94f1 | |||
| df94fd1764 |
@@ -234,6 +234,8 @@ jobs:
|
||||
name: Production auto-deploy
|
||||
needs: build-and-push
|
||||
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }}
|
||||
# Side-effect deploy only; image publish success is the durable artifact. mc#774
|
||||
continue-on-error: true
|
||||
# Publish/release lane (internal#462) — production deploy of a merged
|
||||
# fix; reserved capacity, never queued behind PR-CI.
|
||||
runs-on: publish
|
||||
|
||||
@@ -90,8 +90,6 @@ Poll `GET /workspaces/:id/delegations` to check results. Each entry includes `de
|
||||
|
||||
This is the recommended way for agents to delegate work — it works for all runtimes (Claude Code, LangGraph, etc.) since it operates at the platform level.
|
||||
|
||||
Workspace creation also assigns an `awareness_namespace` on the workspace row. That namespace is later injected into the provisioned runtime.
|
||||
|
||||
### Registry
|
||||
|
||||
| Method | Path | Description | Auth |
|
||||
|
||||
@@ -103,7 +103,7 @@ Migration files live in `workspace-server/migrations/` (latest: `022_workspace_s
|
||||
|
||||
| Table | Description |
|
||||
|-------|-------------|
|
||||
| `workspaces` | Core entity — status, runtime, `agent_card` JSONB, heartbeat columns, `current_task`, `awareness_namespace`, `workspace_dir` |
|
||||
| `workspaces` | Core entity — status, runtime, `agent_card` JSONB, heartbeat columns, `current_task`, `workspace_dir` |
|
||||
| `canvas_layouts` | Per-workspace x/y canvas position |
|
||||
| `structure_events` | Append-only event log (workspace lifecycle, agent, approval events) |
|
||||
| `activity_logs` | A2A communications, task updates, agent logs, errors. `error_detail` is populated by the scheduler so cron run history can surface failure reasons. |
|
||||
|
||||
@@ -111,12 +111,13 @@ const maxProxyResponseBody = 10 << 20
|
||||
// a generic 502 page to canvas. 10s is well above realistic intra-region
|
||||
// latencies and well below CF's edge timeout.
|
||||
//
|
||||
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
|
||||
// 3. Transport.ResponseHeaderTimeout — 5min default. From request-body-end
|
||||
// to response-headers-start. Configurable via
|
||||
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
|
||||
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
|
||||
// turns (big context + internal delegate_task round-trips routinely exceed
|
||||
// the old 60s ceiling). Body streaming after headers is governed by the
|
||||
// turns and Codex scheduled tasks (big context + internal delegate_task
|
||||
// round-trips routinely exceed the old 60s/180s ceilings). Body streaming
|
||||
// after headers is governed by the
|
||||
// per-request context deadline, NOT this timeout — so multi-minute agent
|
||||
// responses still work fine.
|
||||
//
|
||||
@@ -131,7 +132,7 @@ var a2aClient = &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 180*time.Second),
|
||||
ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 5*time.Minute),
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
// MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent
|
||||
// fan-in is bounded by the platform's broadcaster fan-out, not by
|
||||
@@ -228,7 +229,7 @@ func (e *proxyA2AError) Error() string {
|
||||
// cron scheduler and other internal callers that need to send A2A messages
|
||||
// to workspaces programmatically (not from an HTTP handler).
|
||||
func (h *WorkspaceHandler) ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
|
||||
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity, false)
|
||||
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
|
||||
if proxyErr != nil {
|
||||
return status, resp, proxyErr
|
||||
}
|
||||
@@ -307,21 +308,13 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
|
||||
// The bind is strict: the token must match `callerID`, not
|
||||
// `workspaceID` (the target). A compromised token from workspace A
|
||||
// must never authenticate calls from A pretending to be B.
|
||||
//
|
||||
// Post-RFC#637: canvas users now send X-Workspace-ID (their identity
|
||||
// workspace). validateCallerToken detects canvas/admin auth on a
|
||||
// tokenless workspace and returns isCanvasUser=true so the proxy can
|
||||
// bypass CanCommunicate (human users sit outside the hierarchy).
|
||||
isCanvasUser := false
|
||||
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
|
||||
var err error
|
||||
isCanvasUser, err = validateCallerToken(ctx, c, callerID)
|
||||
if err != nil {
|
||||
if callerID != "" && callerID != workspaceID {
|
||||
if err := validateCallerToken(ctx, c, callerID); err != nil {
|
||||
return // response already written with 401
|
||||
}
|
||||
}
|
||||
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser)
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true)
|
||||
if proxyErr != nil {
|
||||
for k, v := range proxyErr.Headers {
|
||||
c.Header(k, v)
|
||||
@@ -360,13 +353,11 @@ func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool, isCanvasUser bool) (int, []byte, *proxyA2AError) {
|
||||
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
|
||||
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
|
||||
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
|
||||
// Post-RFC#637: canvas-user identity workspaces also bypass CanCommunicate
|
||||
// because human users sit outside the org hierarchy.
|
||||
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
|
||||
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
|
||||
if !registry.CanCommunicate(callerID, workspaceID) {
|
||||
log.Printf("ProxyA2A: access denied %s → %s", callerID, workspaceID)
|
||||
return 0, nil, &proxyA2AError{
|
||||
|
||||
@@ -5,21 +5,17 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/subtle"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/orgtoken"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@@ -32,8 +28,8 @@ type proxyDispatchBuildError struct{ err error }
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and (when `logActivity` is true)
|
||||
// schedules a detached LogActivity goroutine for the failed attempt.
|
||||
// runs the reactive container-health check, and records the outcome. Busy
|
||||
// targets that are successfully queued are logged as queued, not failed.
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
// Build-time failure (couldn't even create the http.Request) — return
|
||||
// a 500 without the reactive-health / busy-retry paths.
|
||||
@@ -49,10 +45,10 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
if containerDead {
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true},
|
||||
@@ -112,6 +108,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey, expiresAt,
|
||||
); qerr == nil {
|
||||
log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth)
|
||||
if logActivity {
|
||||
h.logA2ABusyQueued(ctx, workspaceID, callerID, body, a2aMethod, durationMs)
|
||||
}
|
||||
respBody, _ := json.Marshal(gin.H{
|
||||
"queued": true,
|
||||
"queue_id": qid,
|
||||
@@ -125,6 +124,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
// make delegation silently disappear.
|
||||
log.Printf("ProxyA2A: enqueue for %s failed (%v) — falling back to 503", workspaceID, qerr)
|
||||
}
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)},
|
||||
@@ -135,6 +137,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
},
|
||||
}
|
||||
}
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusBadGateway,
|
||||
Response: gin.H{"error": "failed to reach workspace agent"},
|
||||
@@ -315,6 +320,33 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
})
|
||||
}
|
||||
|
||||
// logA2ABusyQueued records that a push attempt reached a live but busy
|
||||
// workspace and was durably queued for heartbeat drain.
|
||||
func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, durationMs int) {
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (queued: target busy)"
|
||||
parent := ctx
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
DurationMs: &durationMs,
|
||||
Status: "ok",
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
@@ -387,53 +419,31 @@ func nilIfEmpty(s string) *string {
|
||||
// (their next /registry/register will mint their first token, after
|
||||
// which this branch never fires again for them).
|
||||
//
|
||||
// Post-RFC#637 addition: when the tokenless workspace is accompanied by
|
||||
// canvas or admin auth (same-origin request, admin bearer, or org-level
|
||||
// token), the caller is identified as a canvas-user identity rather than
|
||||
// a legacy peer agent. The returned isCanvasUser flag lets the A2A proxy
|
||||
// bypass CanCommunicate for human users, who sit outside the workspace
|
||||
// hierarchy.
|
||||
//
|
||||
// On auth failure this writes the 401 via c and returns an error so the
|
||||
// handler aborts without running the proxy.
|
||||
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (isCanvasUser bool, err error) {
|
||||
hasLive, dbErr := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
|
||||
if dbErr != nil {
|
||||
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) error {
|
||||
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
|
||||
if err != nil {
|
||||
// Fail-open here matches the heartbeat path — A2A caller auth is
|
||||
// defense-in-depth on top of access-control hierarchy, not the
|
||||
// sole gate on the secret material. A DB hiccup shouldn't take
|
||||
// the whole A2A path down.
|
||||
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, dbErr)
|
||||
return false, nil
|
||||
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, err)
|
||||
return nil
|
||||
}
|
||||
if !hasLive {
|
||||
// Tokenless workspace — could be legacy/pre-upgrade caller or
|
||||
// canvas-user identity. Distinguish by request auth signals.
|
||||
if middleware.IsSameOriginCanvas(c) {
|
||||
return true, nil
|
||||
}
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok != "" {
|
||||
adminSecret := os.Getenv("ADMIN_TOKEN")
|
||||
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
|
||||
return true, nil
|
||||
}
|
||||
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil // legacy / pre-upgrade caller
|
||||
return nil // legacy / pre-upgrade caller
|
||||
}
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok == "" {
|
||||
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing caller auth token"})
|
||||
return false, errInvalidCallerToken
|
||||
return errInvalidCallerToken
|
||||
}
|
||||
if err := wsauth.ValidateToken(ctx, db.DB, callerID, tok); err != nil {
|
||||
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid caller auth token"})
|
||||
return false, err
|
||||
return err
|
||||
}
|
||||
return false, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// errInvalidCallerToken is a sentinel for validateCallerToken's "missing
|
||||
|
||||
@@ -1112,13 +1112,9 @@ func TestValidateCallerToken_LegacyCallerGrandfathered(t *testing.T) {
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-legacy")
|
||||
if err != nil {
|
||||
if err := validateCallerToken(context.Background(), c, "ws-legacy"); err != nil {
|
||||
t.Errorf("legacy caller should grandfather through; got %v", err)
|
||||
}
|
||||
if isCanvasUser {
|
||||
t.Errorf("legacy caller should NOT be identified as canvas user")
|
||||
}
|
||||
if w.Code != 200 {
|
||||
// gin default before c.JSON is 200; we want no error response written
|
||||
if w.Body.Len() != 0 {
|
||||
@@ -1140,13 +1136,10 @@ func TestValidateCallerToken_MissingTokenWhenOnFile(t *testing.T) {
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
|
||||
// No Authorization header set
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
|
||||
err := validateCallerToken(context.Background(), c, "ws-authed")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for missing token")
|
||||
}
|
||||
if isCanvasUser {
|
||||
t.Errorf("authed workspace with missing token should NOT be canvas user")
|
||||
}
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("expected 401, got %d", w.Code)
|
||||
}
|
||||
@@ -1171,13 +1164,9 @@ func TestValidateCallerToken_InvalidToken(t *testing.T) {
|
||||
req.Header.Set("Authorization", "Bearer wrong")
|
||||
c.Request = req
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
|
||||
if err == nil {
|
||||
if err := validateCallerToken(context.Background(), c, "ws-authed"); err == nil {
|
||||
t.Fatal("expected error for bad token")
|
||||
}
|
||||
if isCanvasUser {
|
||||
t.Errorf("authed workspace with bad token should NOT be canvas user")
|
||||
}
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("expected 401, got %d", w.Code)
|
||||
}
|
||||
@@ -1203,13 +1192,9 @@ func TestValidateCallerToken_ValidToken(t *testing.T) {
|
||||
req.Header.Set("Authorization", "Bearer goodtok")
|
||||
c.Request = req
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
|
||||
if err != nil {
|
||||
if err := validateCallerToken(context.Background(), c, "ws-authed"); err != nil {
|
||||
t.Errorf("valid token should pass; got %v", err)
|
||||
}
|
||||
if isCanvasUser {
|
||||
t.Errorf("authed workspace with valid token should NOT be canvas user")
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
|
||||
@@ -1231,86 +1216,14 @@ func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
|
||||
req.Header.Set("Authorization", "Bearer tok-for-A")
|
||||
c.Request = req
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-b-attacker")
|
||||
if err == nil {
|
||||
if err := validateCallerToken(context.Background(), c, "ws-b-attacker"); err == nil {
|
||||
t.Fatal("token from A must not authenticate caller B")
|
||||
}
|
||||
if isCanvasUser {
|
||||
t.Errorf("cross-workspace token replay should NOT be identified as canvas user")
|
||||
}
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("expected 401, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateCallerToken_CanvasUser_AdminToken(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
// Tokenless workspace
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs("ws-canvas-admin").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "admin-secret-42")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
req := httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
|
||||
req.Header.Set("Authorization", "Bearer admin-secret-42")
|
||||
c.Request = req
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-canvas-admin")
|
||||
if err != nil {
|
||||
t.Errorf("admin token should identify canvas user; got error: %v", err)
|
||||
}
|
||||
if !isCanvasUser {
|
||||
t.Errorf("admin token bearer should be identified as canvas user")
|
||||
}
|
||||
if w.Code != 200 || w.Body.Len() != 0 {
|
||||
t.Errorf("admin token path should not write a response body; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateCallerToken_CanvasUser_OrgToken(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
// Tokenless workspace
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs("ws-canvas-org").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
// orgtoken.Validate lookup
|
||||
mock.ExpectQuery(`SELECT id, prefix, org_id FROM org_api_tokens WHERE token_hash = .* AND revoked_at IS NULL`).
|
||||
WithArgs(sqlmock.AnyArg()).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix", "org_id"}).AddRow("orgtok-1", "pref1234", "org-1"))
|
||||
mock.ExpectExec(`UPDATE org_api_tokens SET last_used_at`).
|
||||
WithArgs("orgtok-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
req := httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
|
||||
req.Header.Set("Authorization", "Bearer org-token-plaintext-xyz")
|
||||
c.Request = req
|
||||
|
||||
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-canvas-org")
|
||||
if err != nil {
|
||||
t.Errorf("org token should identify canvas user; got error: %v", err)
|
||||
}
|
||||
if !isCanvasUser {
|
||||
t.Errorf("org token bearer should be identified as canvas user")
|
||||
}
|
||||
if w.Code != 200 || w.Body.Len() != 0 {
|
||||
t.Errorf("org token path should not write a response body; got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Direct unit tests for normalizeA2APayload (extracted from proxyA2ARequest) ---
|
||||
|
||||
func TestNormalizeA2APayload_InvalidJSON(t *testing.T) {
|
||||
@@ -1866,6 +1779,58 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleA2ADispatchError_BusyEnqueueLogsQueuedNotFailure(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
mock.ExpectQuery(`INSERT INTO a2a_queue`).
|
||||
WithArgs("ws-busy", nil, PriorityTask, "{}", "message/send", nil, nil).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("11111111-1111-1111-1111-111111111111"))
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue`).
|
||||
WithArgs("ws-busy").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-busy").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Busy Target"))
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(
|
||||
"ws-busy",
|
||||
"a2a_receive",
|
||||
nil,
|
||||
sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(),
|
||||
nil,
|
||||
nil,
|
||||
sqlmock.AnyArg(),
|
||||
"ok",
|
||||
nil,
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
status, body, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-busy", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 180002, true,
|
||||
)
|
||||
if perr != nil {
|
||||
t.Fatalf("expected busy enqueue success, got proxy error: %+v", perr)
|
||||
}
|
||||
if status != http.StatusAccepted {
|
||||
t.Fatalf("got status %d, want 202", status)
|
||||
}
|
||||
if !bytes.Contains(body, []byte(`"queued":true`)) {
|
||||
t.Fatalf("expected queued response body, got %s", string(body))
|
||||
}
|
||||
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations; busy enqueue must log status=ok, not error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleA2ADispatchError_BuildError(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
@@ -2441,7 +2406,7 @@ func TestLookupDeliveryMode_ContextCanceled_FailsClosed(t *testing.T) {
|
||||
// ==================== a2aClient ResponseHeaderTimeout config ====================
|
||||
|
||||
func TestA2AClientResponseHeaderTimeout(t *testing.T) {
|
||||
const defaultTimeout = 180 * time.Second
|
||||
const defaultTimeout = 5 * time.Minute
|
||||
|
||||
// Default (unset env) — a2aClient was initialised at package load time.
|
||||
if a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout != defaultTimeout {
|
||||
@@ -2465,7 +2430,7 @@ func TestA2AClientResponseHeaderTimeout(t *testing.T) {
|
||||
t.Run("invalid A2A_PROXY_RESPONSE_HEADER_TIMEOUT falls back to default", func(t *testing.T) {
|
||||
t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "not-a-duration")
|
||||
// Simulate what envx.Duration does with an invalid value.
|
||||
var fallback = 180 * time.Second
|
||||
var fallback = 5 * time.Minute
|
||||
override := fallback
|
||||
if v := os.Getenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT"); v != "" {
|
||||
if d, err := time.ParseDuration(v); err == nil && d > 0 {
|
||||
|
||||
@@ -333,7 +333,7 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
|
||||
}
|
||||
// logActivity=false: the original EnqueueA2A callsite already logged
|
||||
// the dispatch attempt; re-logging here would double-count events.
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false, false)
|
||||
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
|
||||
|
||||
// 202 Accepted = the dispatch was itself queued again (target still busy).
|
||||
// That's not a failure — the queued item just stays queued naturally on
|
||||
|
||||
@@ -389,7 +389,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
|
||||
})
|
||||
log.Printf("Delegation %s: step=proxying_a2a_request", delegationID)
|
||||
|
||||
status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true, false)
|
||||
status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true)
|
||||
log.Printf("Delegation %s: step=proxy_done status=%d bodyLen=%d err=%v", delegationID, status, len(respBody), proxyErr)
|
||||
|
||||
// When proxyA2ARequest returns an error but we have a non-empty response body
|
||||
@@ -418,7 +418,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
|
||||
case <-ctx.Done():
|
||||
// outer timeout hit before retry window elapsed
|
||||
case <-time.After(delegationRetryDelay):
|
||||
status, respBody, proxyErr = h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true, false)
|
||||
status, respBody, proxyErr = h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 3, "langgraph", &parentID, nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -69,7 +69,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
|
||||
mock.ExpectBegin()
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -291,7 +291,7 @@ func TestWorkspaceCreate_MaxConcurrentTasksOverride(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), 3, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Leader Agent", nil, 3, "claude-code", (*string)(nil), nil, "none", (*int64)(nil), 3, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
|
||||
@@ -364,11 +364,11 @@ func TestWorkspaceCreate(t *testing.T) {
|
||||
// Expect transaction begin for atomic workspace+secrets creation
|
||||
mock.ExpectBegin()
|
||||
|
||||
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace).
|
||||
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime).
|
||||
// Default tier is 3 (Privileged) — see workspace.go create-handler comment.
|
||||
// delivery_mode defaults to "push" when payload omits it (#2339).
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 3, "langgraph", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Expect transaction commit (no secrets in this payload)
|
||||
@@ -412,24 +412,17 @@ func TestWorkspaceCreate(t *testing.T) {
|
||||
if resp["id"] == nil || resp["id"] == "" {
|
||||
t.Error("expected non-empty id in response")
|
||||
}
|
||||
if resp["awareness_namespace"] != "workspace:"+resp["id"].(string) {
|
||||
t.Errorf("expected awareness namespace derived from workspace id, got %v", resp["awareness_namespace"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
|
||||
func TestBuildProvisionerConfig_WorkspacePathFromPayload(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
// runtime_image_pins reader removed by RFC internal#617 / task #335
|
||||
// — CP is the SSOT for runtime image pins. No DB lookup here anymore.
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
|
||||
|
||||
t.Setenv("AWARENESS_URL", "http://awareness:37800")
|
||||
t.Setenv("WORKSPACE_DIR", "/tmp/workspace")
|
||||
|
||||
cfg := handler.buildProvisionerConfig(
|
||||
@@ -440,17 +433,10 @@ func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
|
||||
models.CreateWorkspacePayload{Tier: 2, Runtime: "claude-code", WorkspaceDir: "/tmp/workspace", WorkspaceAccess: "read_write"},
|
||||
map[string]string{"OPENAI_API_KEY": "sk-test"},
|
||||
"/tmp/plugins",
|
||||
"workspace:ws-123",
|
||||
)
|
||||
|
||||
if cfg.AwarenessURL != "http://awareness:37800" {
|
||||
t.Fatalf("expected awareness URL to be injected, got %q", cfg.AwarenessURL)
|
||||
}
|
||||
if cfg.AwarenessNamespace != "workspace:ws-123" {
|
||||
t.Fatalf("expected awareness namespace to be injected, got %q", cfg.AwarenessNamespace)
|
||||
}
|
||||
if cfg.WorkspacePath != "/tmp/workspace" {
|
||||
t.Fatalf("expected workspace path from env, got %q", cfg.WorkspacePath)
|
||||
t.Fatalf("expected workspace path from payload, got %q", cfg.WorkspacePath)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -799,13 +799,12 @@ func (h *OrgHandler) Import(c *gin.Context) {
|
||||
if len(tmpl.GlobalMemories) > 0 && len(results) > 0 {
|
||||
rootID, _ := results[0]["id"].(string)
|
||||
if rootID != "" {
|
||||
rootNS := workspaceAwarenessNamespace(rootID)
|
||||
// Force scope to GLOBAL regardless of what the YAML says.
|
||||
globalSeeds := make([]models.MemorySeed, len(tmpl.GlobalMemories))
|
||||
for i, gm := range tmpl.GlobalMemories {
|
||||
globalSeeds[i] = models.MemorySeed{Content: gm.Content, Scope: "GLOBAL"}
|
||||
}
|
||||
seedInitialMemories(context.Background(), rootID, globalSeeds, rootNS)
|
||||
seedInitialMemories(context.Background(), rootID, globalSeeds)
|
||||
log.Printf("Org import: seeded %d global memories on root workspace %s", len(globalSeeds), rootID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +102,6 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
awarenessNS := workspaceAwarenessNamespace(id)
|
||||
|
||||
var role interface{}
|
||||
if ws.Role != "" {
|
||||
@@ -168,13 +167,13 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
|
||||
// EXACTLY for Postgres to consider the index applicable.
|
||||
var insertedID string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, max_concurrent_tasks)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, status, parent_id, workspace_dir, workspace_access, max_concurrent_tasks)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (COALESCE(parent_id, '00000000-0000-0000-0000-000000000000'::uuid), name)
|
||||
WHERE status != 'removed'
|
||||
DO NOTHING
|
||||
RETURNING id
|
||||
`, id, ws.Name, role, tier, runtime, awarenessNS, "provisioning", parentID, workspaceDir, workspaceAccess, maxConcurrent).Scan(&insertedID)
|
||||
`, id, ws.Name, role, tier, runtime, "provisioning", parentID, workspaceDir, workspaceAccess, maxConcurrent).Scan(&insertedID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
// Skip path — a non-removed row already exists for
|
||||
// (parent_id, name). Re-select its id; idempotency-friendly
|
||||
@@ -259,7 +258,7 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
|
||||
if len(wsMemories) == 0 {
|
||||
wsMemories = defaults.InitialMemories
|
||||
}
|
||||
seedInitialMemories(ctx, id, wsMemories, awarenessNS)
|
||||
seedInitialMemories(ctx, id, wsMemories)
|
||||
|
||||
// Handle external workspaces
|
||||
if ws.External {
|
||||
|
||||
@@ -470,19 +470,14 @@ func (h *ScheduleHandler) Health(c *gin.Context) {
|
||||
|
||||
// Validate the caller's own bearer token (Phase 30.5 contract).
|
||||
// Skip for system callers and self-calls, same as the A2A proxy.
|
||||
// Post-RFC#637: canvas users may read schedule health too.
|
||||
isCanvasUser := false
|
||||
if !isSystemCaller(callerID) && callerID != workspaceID {
|
||||
var err error
|
||||
isCanvasUser, err = validateCallerToken(ctx, c, callerID)
|
||||
if err != nil {
|
||||
if err := validateCallerToken(ctx, c, callerID); err != nil {
|
||||
return // response already written with 401
|
||||
}
|
||||
}
|
||||
|
||||
// CanCommunicate gate — only peers in the org hierarchy may read health.
|
||||
// Canvas users (human operators) bypass this gate.
|
||||
if callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
|
||||
if callerID != workspaceID && !isSystemCaller(callerID) {
|
||||
if !registry.CanCommunicate(callerID, workspaceID) {
|
||||
log.Printf("ScheduleHealth: access denied %s → %s", callerID, workspaceID)
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "access denied"})
|
||||
|
||||
@@ -107,7 +107,6 @@ func (h *WebhookHandler) GitHub(c *gin.Context) {
|
||||
forwardBody,
|
||||
"webhook:github",
|
||||
true,
|
||||
false,
|
||||
)
|
||||
if proxyErr != nil {
|
||||
c.JSON(proxyErr.Status, proxyErr.Response)
|
||||
|
||||
@@ -216,7 +216,6 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
awarenessNamespace := workspaceAwarenessNamespace(id)
|
||||
if h.IsSaaS() {
|
||||
// SaaS hard gate: every hosted workspace gets its own sibling
|
||||
// EC2 instance, so T4 is the only meaningful runtime boundary.
|
||||
@@ -448,10 +447,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// returns the actually-persisted name (which we MUST thread back into
|
||||
// payload + broadcast so the canvas displays what the DB has).
|
||||
const insertWorkspaceSQL = `
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10, $11, $12)
|
||||
INSERT INTO workspaces (id, name, role, tier, runtime, status, parent_id, workspace_dir, workspace_access, budget_limit, max_concurrent_tasks, delivery_mode)
|
||||
VALUES ($1, $2, $3, $4, $5, 'provisioning', $6, $7, $8, $9, $10, $11)
|
||||
`
|
||||
insertArgs := []any{id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode}
|
||||
insertArgs := []any{id, payload.Name, role, payload.Tier, payload.Runtime, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit, maxConcurrent, deliveryMode}
|
||||
persistedName, currentTx, err := insertWorkspaceWithNameRetry(
|
||||
ctx,
|
||||
tx,
|
||||
@@ -572,7 +571,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
|
||||
// Seed initial memories from the create payload (issue #1050).
|
||||
// Non-fatal: failures are logged but don't block workspace creation.
|
||||
seedInitialMemories(ctx, id, payload.InitialMemories, awarenessNamespace)
|
||||
seedInitialMemories(ctx, id, payload.InitialMemories)
|
||||
|
||||
// Broadcast provisioning event. Include `runtime` so the canvas can
|
||||
// populate the Runtime pill on the side panel immediately — without it
|
||||
@@ -707,10 +706,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{
|
||||
"id": id,
|
||||
"status": "provisioning",
|
||||
"awareness_namespace": awarenessNamespace,
|
||||
"workspace_access": workspaceAccess,
|
||||
"id": id,
|
||||
"status": "provisioning",
|
||||
"workspace_access": workspaceAccess,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -152,7 +152,6 @@ func TestWorkspaceBudget_Create_WithLimit(t *testing.T) {
|
||||
nil, // role
|
||||
3, // tier (default, workspace.go create-handler)
|
||||
"langgraph", // runtime
|
||||
sqlmock.AnyArg(), // awareness_namespace
|
||||
(*string)(nil), // parent_id
|
||||
nil, // workspace_dir
|
||||
"none", // workspace_access
|
||||
|
||||
@@ -160,7 +160,6 @@ func TestBuildProvisionerConfig_CopiesComputeSizingFromPayload(t *testing.T) {
|
||||
},
|
||||
nil,
|
||||
t.TempDir(),
|
||||
"workspace:ws-compute",
|
||||
)
|
||||
|
||||
if cfg.InstanceType != "m6i.xlarge" {
|
||||
|
||||
@@ -103,13 +103,13 @@ func cleanupTestRows(t *testing.T, conn *sql.DB, namePrefix string) {
|
||||
// TestIntegration_WorkspaceCreate_NameRetry_AutoSuffixesOnCollision
|
||||
// exercises the helper end-to-end against a real Postgres:
|
||||
//
|
||||
// 1. INSERT a row with name "<prefix>-Repro" — succeeds.
|
||||
// 2. Run insertWorkspaceWithNameRetry with the same name —
|
||||
// partial-unique violation fires, helper retries with
|
||||
// " (2)", that succeeds.
|
||||
// 3. SELECT the row by id, confirm name = "<prefix>-Repro (2)".
|
||||
// 4. Run helper AGAIN — second collision, helper retries with
|
||||
// " (3)".
|
||||
// 1. INSERT a row with name "<prefix>-Repro" — succeeds.
|
||||
// 2. Run insertWorkspaceWithNameRetry with the same name —
|
||||
// partial-unique violation fires, helper retries with
|
||||
// " (2)", that succeeds.
|
||||
// 3. SELECT the row by id, confirm name = "<prefix>-Repro (2)".
|
||||
// 4. Run helper AGAIN — second collision, helper retries with
|
||||
// " (3)".
|
||||
//
|
||||
// This is the live-test that proves the partial-index behaviour
|
||||
// matches the migration's intent — sqlmock cannot reach this depth.
|
||||
@@ -130,9 +130,9 @@ func TestIntegration_WorkspaceCreate_NameRetry_AutoSuffixesOnCollision(t *testin
|
||||
// targets + the NOT NULL columns required by the schema).
|
||||
firstID := uuid.New().String()
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, awareness_namespace, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', $3, 'provisioning')
|
||||
`, firstID, baseName, "workspace:"+firstID); err != nil {
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'provisioning')
|
||||
`, firstID, baseName); err != nil {
|
||||
t.Fatalf("seed first row: %v", err)
|
||||
}
|
||||
|
||||
@@ -145,10 +145,10 @@ func TestIntegration_WorkspaceCreate_NameRetry_AutoSuffixesOnCollision(t *testin
|
||||
}
|
||||
secondID := uuid.New().String()
|
||||
query := `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, awareness_namespace, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', $3, 'provisioning')
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'provisioning')
|
||||
`
|
||||
args := []any{secondID, baseName, "workspace:" + secondID}
|
||||
args := []any{secondID, baseName}
|
||||
persistedName, finalTx, err := insertWorkspaceWithNameRetry(
|
||||
ctx, tx, beginTx, baseName, 1, query, args,
|
||||
)
|
||||
@@ -179,7 +179,7 @@ func TestIntegration_WorkspaceCreate_NameRetry_AutoSuffixesOnCollision(t *testin
|
||||
t.Fatalf("begin tx3: %v", err)
|
||||
}
|
||||
thirdID := uuid.New().String()
|
||||
args3 := []any{thirdID, baseName, "workspace:" + thirdID}
|
||||
args3 := []any{thirdID, baseName}
|
||||
persistedName3, finalTx3, err := insertWorkspaceWithNameRetry(
|
||||
ctx, tx3, beginTx, baseName, 1, query, args3,
|
||||
)
|
||||
@@ -216,9 +216,9 @@ func TestIntegration_WorkspaceCreate_NameRetry_TombstonedRowDoesNotCollide(t *te
|
||||
// Seed a row, then tombstone it.
|
||||
firstID := uuid.New().String()
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, awareness_namespace, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', $3, 'removed')
|
||||
`, firstID, baseName, "workspace:"+firstID); err != nil {
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'removed')
|
||||
`, firstID, baseName); err != nil {
|
||||
t.Fatalf("seed tombstoned row: %v", err)
|
||||
}
|
||||
|
||||
@@ -231,10 +231,10 @@ func TestIntegration_WorkspaceCreate_NameRetry_TombstonedRowDoesNotCollide(t *te
|
||||
}
|
||||
secondID := uuid.New().String()
|
||||
query := `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, awareness_namespace, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', $3, 'provisioning')
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'provisioning')
|
||||
`
|
||||
args := []any{secondID, baseName, "workspace:" + secondID}
|
||||
args := []any{secondID, baseName}
|
||||
persistedName, finalTx, err := insertWorkspaceWithNameRetry(
|
||||
ctx, tx, beginTx, baseName, 1, query, args,
|
||||
)
|
||||
|
||||
@@ -128,7 +128,7 @@ func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath stri
|
||||
workspaceID, filepath.Base(runtimeTemplate))
|
||||
templatePath = runtimeTemplate
|
||||
// Rebuild cfg with the recovered template path so Start() sees it.
|
||||
cfg = h.buildProvisionerConfig(ctx, workspaceID, templatePath, configFiles, payload, prepared.EnvVars, prepared.PluginsPath, prepared.AwarenessNamespace)
|
||||
cfg = h.buildProvisionerConfig(ctx, workspaceID, templatePath, configFiles, payload, prepared.EnvVars, prepared.PluginsPath)
|
||||
cfg.ResetClaudeSession = resetClaudeSession
|
||||
recovered = true
|
||||
break
|
||||
@@ -194,10 +194,11 @@ func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath stri
|
||||
// a ~64k context window worth of text — but small enough to prevent abuse.
|
||||
const maxMemoryContentLength = 100_000 // ~100 KiB of text
|
||||
|
||||
func seedInitialMemories(ctx context.Context, workspaceID string, memories []models.MemorySeed, awarenessNamespace string) {
|
||||
func seedInitialMemories(ctx context.Context, workspaceID string, memories []models.MemorySeed) {
|
||||
if len(memories) == 0 {
|
||||
return
|
||||
}
|
||||
namespace := workspaceMemoryNamespace(workspaceID)
|
||||
for _, mem := range memories {
|
||||
scope := strings.ToUpper(mem.Scope)
|
||||
if scope == "" {
|
||||
@@ -223,33 +224,27 @@ func seedInitialMemories(ctx context.Context, workspaceID string, memories []mod
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO agent_memories (workspace_id, content, scope, namespace)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
`, workspaceID, redactedContent, scope, awarenessNamespace); err != nil {
|
||||
`, workspaceID, redactedContent, scope, namespace); err != nil {
|
||||
log.Printf("seedInitialMemories: failed to insert memory for %s (scope=%s): %v", workspaceID, scope, err)
|
||||
}
|
||||
}
|
||||
log.Printf("seedInitialMemories: seeded %d memories for workspace %s", len(memories), workspaceID)
|
||||
}
|
||||
|
||||
func workspaceAwarenessNamespace(workspaceID string) string {
|
||||
// workspaceMemoryNamespace returns the canonical v2 memory namespace
|
||||
// string for a workspace. Matches the form produced by
|
||||
// internal/memory/namespace/resolver.go for self-reads (issue #1735).
|
||||
func workspaceMemoryNamespace(workspaceID string) string {
|
||||
return fmt.Sprintf("workspace:%s", workspaceID)
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) loadAwarenessNamespace(ctx context.Context, workspaceID string) string {
|
||||
var awarenessNamespace string
|
||||
err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(awareness_namespace, '') FROM workspaces WHERE id = $1`, workspaceID).Scan(&awarenessNamespace)
|
||||
if err != nil || awarenessNamespace == "" {
|
||||
return workspaceAwarenessNamespace(workspaceID)
|
||||
}
|
||||
return awarenessNamespace
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) buildProvisionerConfig(
|
||||
ctx context.Context,
|
||||
workspaceID, templatePath string,
|
||||
configFiles map[string][]byte,
|
||||
payload models.CreateWorkspacePayload,
|
||||
envVars map[string]string,
|
||||
pluginsPath, awarenessNamespace string,
|
||||
pluginsPath string,
|
||||
) provisioner.WorkspaceConfig {
|
||||
// Per-workspace workspace_dir takes priority over global WORKSPACE_DIR env var.
|
||||
// If neither is set, the provisioner creates an isolated Docker volume.
|
||||
@@ -304,10 +299,8 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
|
||||
Height: payload.Compute.Display.Height,
|
||||
Protocol: payload.Compute.Display.Protocol,
|
||||
},
|
||||
EnvVars: envVars,
|
||||
PlatformURL: h.platformURL,
|
||||
AwarenessURL: os.Getenv("AWARENESS_URL"),
|
||||
AwarenessNamespace: awarenessNamespace,
|
||||
EnvVars: envVars,
|
||||
PlatformURL: h.platformURL,
|
||||
// Image left empty — molecule-core's runtime_image_pins table (mig
|
||||
// 047, dead reader removed by RFC internal#617 / task #335) was an
|
||||
// aspirational SSOT that never received a writer. CP's
|
||||
|
||||
@@ -85,10 +85,9 @@ func readOrLazyHealInboundSecret(ctx context.Context, workspaceID, opLabel strin
|
||||
// prepareProvisionContext when the caller proceeds; nil + non-empty
|
||||
// abort message when the caller must mark the workspace failed.
|
||||
type preparedProvisionContext struct {
|
||||
EnvVars map[string]string
|
||||
PluginsPath string
|
||||
AwarenessNamespace string
|
||||
Config provisioner.WorkspaceConfig
|
||||
EnvVars map[string]string
|
||||
PluginsPath string
|
||||
Config provisioner.WorkspaceConfig
|
||||
}
|
||||
|
||||
// provisionAbort describes why prepareProvisionContext refused to
|
||||
@@ -170,7 +169,6 @@ func (h *WorkspaceHandler) prepareProvisionContext(
|
||||
}
|
||||
|
||||
pluginsPath, _ := filepath.Abs(filepath.Join(h.configsDir, "..", "plugins"))
|
||||
awarenessNamespace := h.loadAwarenessNamespace(ctx, workspaceID)
|
||||
|
||||
// Per-agent git identity (#1957) — must run after secret loads so
|
||||
// a workspace_secret named GIT_AUTHOR_NAME can override.
|
||||
@@ -231,14 +229,13 @@ func (h *WorkspaceHandler) prepareProvisionContext(
|
||||
}
|
||||
}
|
||||
|
||||
cfg := h.buildProvisionerConfig(ctx, workspaceID, templatePath, configFiles, payload, envVars, pluginsPath, awarenessNamespace)
|
||||
cfg := h.buildProvisionerConfig(ctx, workspaceID, templatePath, configFiles, payload, envVars, pluginsPath)
|
||||
cfg.ResetClaudeSession = resetClaudeSession
|
||||
|
||||
return &preparedProvisionContext{
|
||||
EnvVars: envVars,
|
||||
PluginsPath: pluginsPath,
|
||||
AwarenessNamespace: awarenessNamespace,
|
||||
Config: cfg,
|
||||
EnvVars: envVars,
|
||||
PluginsPath: pluginsPath,
|
||||
Config: cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -17,9 +17,9 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// ==================== workspaceAwarenessNamespace ====================
|
||||
// ==================== workspaceMemoryNamespace ====================
|
||||
|
||||
func TestWorkspaceAwarenessNamespace(t *testing.T) {
|
||||
func TestWorkspaceMemoryNamespace(t *testing.T) {
|
||||
tests := []struct {
|
||||
workspaceID string
|
||||
expected string
|
||||
@@ -31,9 +31,9 @@ func TestWorkspaceAwarenessNamespace(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.workspaceID, func(t *testing.T) {
|
||||
result := workspaceAwarenessNamespace(tt.workspaceID)
|
||||
result := workspaceMemoryNamespace(tt.workspaceID)
|
||||
if result != tt.expected {
|
||||
t.Errorf("workspaceAwarenessNamespace(%q) = %q, want %q", tt.workspaceID, result, tt.expected)
|
||||
t.Errorf("workspaceMemoryNamespace(%q) = %q, want %q", tt.workspaceID, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -645,7 +645,7 @@ func TestSeedInitialMemories_TruncatesOversizedContent(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
}
|
||||
|
||||
seedInitialMemories(context.Background(), workspaceID, memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), workspaceID, memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
@@ -674,7 +674,7 @@ func TestSeedInitialMemories_RedactsSecrets(t *testing.T) {
|
||||
WithArgs(workspaceID, wantRedacted, "LOCAL", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
seedInitialMemories(context.Background(), workspaceID, memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), workspaceID, memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
@@ -691,7 +691,7 @@ func TestSeedInitialMemories_InvalidScopeSkipped(t *testing.T) {
|
||||
{Content: "this should be skipped", Scope: "NOT_A_REAL_SCOPE"},
|
||||
}
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-bad-scope", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-bad-scope", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls for invalid scope: %v", err)
|
||||
@@ -704,7 +704,7 @@ func TestSeedInitialMemories_EmptyMemoriesNil(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectationsWereMet()
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-nil", nil, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-nil", nil)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unexpected DB calls for nil slice: %v", err)
|
||||
@@ -733,7 +733,6 @@ func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
|
||||
models.CreateWorkspacePayload{Tier: 1, Runtime: "langgraph"},
|
||||
map[string]string{"API_KEY": "secret"},
|
||||
pluginsPath,
|
||||
"workspace:ws-basic",
|
||||
)
|
||||
|
||||
if cfg.WorkspaceID != "ws-basic" {
|
||||
@@ -748,9 +747,6 @@ func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
|
||||
if cfg.PlatformURL != "http://localhost:8080" {
|
||||
t.Errorf("expected PlatformURL 'http://localhost:8080', got %q", cfg.PlatformURL)
|
||||
}
|
||||
if cfg.AwarenessNamespace != "workspace:ws-basic" {
|
||||
t.Errorf("expected AwarenessNamespace 'workspace:ws-basic', got %q", cfg.AwarenessNamespace)
|
||||
}
|
||||
if cfg.PluginsPath != pluginsPath {
|
||||
t.Errorf("expected PluginsPath %q, got %q", pluginsPath, cfg.PluginsPath)
|
||||
}
|
||||
@@ -775,7 +771,6 @@ func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
|
||||
|
||||
workspaceDir := t.TempDir()
|
||||
t.Setenv("WORKSPACE_DIR", workspaceDir)
|
||||
t.Setenv("AWARENESS_URL", "http://awareness:37800")
|
||||
|
||||
pluginsPath := t.TempDir()
|
||||
cfg := handler.buildProvisionerConfig(
|
||||
@@ -786,15 +781,11 @@ func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
|
||||
models.CreateWorkspacePayload{Tier: 2, Runtime: "claude-code"},
|
||||
nil,
|
||||
pluginsPath,
|
||||
"workspace:ws-env",
|
||||
)
|
||||
|
||||
if cfg.WorkspacePath != workspaceDir {
|
||||
t.Errorf("expected WorkspacePath from env, got %q", cfg.WorkspacePath)
|
||||
}
|
||||
if cfg.AwarenessURL != "http://awareness:37800" {
|
||||
t.Errorf("expected AwarenessURL from env, got %q", cfg.AwarenessURL)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== issueAndInjectToken (issue #418) ====================
|
||||
@@ -1007,7 +998,7 @@ func TestSeedInitialMemories_Truncation(t *testing.T) {
|
||||
WithArgs(sqlmock.AnyArg(), expectTruncated, "LOCAL", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-1066-test", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-1066-test", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v\n"+
|
||||
@@ -1027,7 +1018,7 @@ func TestSeedInitialMemories_ContentUnderLimit(t *testing.T) {
|
||||
WithArgs(sqlmock.AnyArg(), "short content", "TEAM", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-1066-under", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-1066-under", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v", err)
|
||||
@@ -1052,7 +1043,7 @@ func TestSeedInitialMemories_ExactlyAtLimit(t *testing.T) {
|
||||
WithArgs(sqlmock.AnyArg(), atLimitContent, "LOCAL", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-boundary", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-boundary", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v", err)
|
||||
@@ -1068,7 +1059,7 @@ func TestSeedInitialMemories_EmptyContent(t *testing.T) {
|
||||
}
|
||||
|
||||
// seedInitialMemories skips empty content at line 234 — no DB call expected.
|
||||
seedInitialMemories(context.Background(), "ws-empty", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-empty", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v", err)
|
||||
@@ -1092,7 +1083,7 @@ func TestSeedInitialMemories_OversizedWithSecrets(t *testing.T) {
|
||||
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), "GLOBAL", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
seedInitialMemories(context.Background(), "ws-secrets", memories, "test-ns")
|
||||
seedInitialMemories(context.Background(), "ws-secrets", memories)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met: %v", err)
|
||||
|
||||
@@ -342,7 +342,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
|
||||
// Transaction begins, workspace INSERT fails, transaction is rolled back.
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 3, "langgraph", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
mock.ExpectRollback()
|
||||
|
||||
@@ -375,7 +375,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
|
||||
// Expect workspace INSERT with defaulted tier=3 (Privileged — the
|
||||
// handler default in workspace.go), runtime="langgraph"
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 3, "langgraph", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
@@ -423,7 +423,7 @@ func TestWorkspaceCreate_SaaSHardForcesTier4(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -464,7 +464,7 @@ func TestWorkspaceCreate_WithSecrets_Persists(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Secret inserted inside the same transaction.
|
||||
mock.ExpectExec("INSERT INTO workspace_secrets").
|
||||
@@ -576,7 +576,7 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// External URL update (localhost is explicitly allowed by validateAgentURL).
|
||||
@@ -615,7 +615,7 @@ func TestWorkspaceCreate_KimiRuntime_PreservesLabel(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// Pre-register flow: awaiting_agent + runtime preserved as "kimi"
|
||||
@@ -1639,7 +1639,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Hermes Agent", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
(*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1696,7 +1696,7 @@ model: anthropic:claude-sonnet-4-5
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Legacy Agent", nil, 3, "langgraph",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
(*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1749,7 +1749,7 @@ runtime_config:
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), "Custom Hermes", nil, 3, "hermes",
|
||||
sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
(*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
@@ -1894,7 +1894,7 @@ func TestWorkspaceCreate_188_ExplicitRuntimeNoTemplate_OK(t *testing.T) {
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Explicit Codex", nil, 3, "codex", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WithArgs(sqlmock.AnyArg(), "Explicit Codex", nil, 3, "codex", (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
mock.ExpectExec("INSERT INTO canvas_layouts").
|
||||
|
||||
@@ -17,7 +17,6 @@ type Workspace struct {
|
||||
Name string `json:"name" db:"name"`
|
||||
Role sql.NullString `json:"role" db:"role"`
|
||||
Tier int `json:"tier" db:"tier"`
|
||||
AwarenessNamespace sql.NullString `json:"awareness_namespace" db:"awareness_namespace"`
|
||||
Status string `json:"status" db:"status"`
|
||||
SourceBundleID sql.NullString `json:"source_bundle_id" db:"source_bundle_id"`
|
||||
AgentCard json.RawMessage `json:"agent_card" db:"agent_card"`
|
||||
@@ -207,7 +206,8 @@ type CreateWorkspacePayload struct {
|
||||
} `json:"canvas"`
|
||||
// InitialMemories is an optional list of memories to seed into the
|
||||
// workspace immediately after creation. Each entry is inserted into
|
||||
// agent_memories with the workspace's awareness namespace. Issue #1050.
|
||||
// agent_memories under the workspace's v2 memory namespace
|
||||
// ("workspace:<id>"). Issue #1050.
|
||||
InitialMemories []MemorySeed `json:"initial_memories"`
|
||||
}
|
||||
|
||||
|
||||
@@ -103,8 +103,6 @@ type WorkspaceConfig struct {
|
||||
Display WorkspaceDisplayConfig
|
||||
EnvVars map[string]string // Additional env vars (API keys, etc.)
|
||||
PlatformURL string
|
||||
AwarenessURL string
|
||||
AwarenessNamespace string
|
||||
WorkspaceAccess string // #65: "none" (default), "read_only", or "read_write"
|
||||
ResetClaudeSession bool // #12: if true, discard the claude-sessions volume before start (fresh session dir)
|
||||
|
||||
@@ -714,10 +712,6 @@ func buildContainerEnv(cfg WorkspaceConfig) []string {
|
||||
// still override (Dockerfile ENV is overridden by docker -e at runtime).
|
||||
"PYTHONPATH=/app",
|
||||
}
|
||||
if cfg.AwarenessNamespace != "" && cfg.AwarenessURL != "" {
|
||||
env = append(env, fmt.Sprintf("AWARENESS_NAMESPACE=%s", cfg.AwarenessNamespace))
|
||||
env = append(env, fmt.Sprintf("AWARENESS_URL=%s", cfg.AwarenessURL))
|
||||
}
|
||||
// #1687: track explicit GH_TOKEN / GITHUB_TOKEN so they win over GH_PAT
|
||||
// alias. These are normally stripped by the SCM-write guard below, but
|
||||
// when a user explicitly sets them we preserve the value.
|
||||
|
||||
@@ -692,39 +692,6 @@ func TestBuildContainerEnv_MoleculeAIURLAlwaysMatchesPlatformURL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildContainerEnv_AwarenessOnlyWhenBothSet(t *testing.T) {
|
||||
// Both set → both injected.
|
||||
cfg := WorkspaceConfig{
|
||||
WorkspaceID: "ws-x",
|
||||
PlatformURL: "http://localhost:8080",
|
||||
AwarenessURL: "http://awareness:9000",
|
||||
AwarenessNamespace: "ns-1",
|
||||
}
|
||||
env := buildContainerEnv(cfg)
|
||||
hasNS := false
|
||||
hasURL := false
|
||||
for _, e := range env {
|
||||
if e == "AWARENESS_NAMESPACE=ns-1" {
|
||||
hasNS = true
|
||||
}
|
||||
if e == "AWARENESS_URL=http://awareness:9000" {
|
||||
hasURL = true
|
||||
}
|
||||
}
|
||||
if !hasNS || !hasURL {
|
||||
t.Errorf("both awareness vars must be present: env=%v", env)
|
||||
}
|
||||
|
||||
// Only namespace set → neither injected (must be both-or-nothing).
|
||||
cfg.AwarenessURL = ""
|
||||
env2 := buildContainerEnv(cfg)
|
||||
for _, e := range env2 {
|
||||
if strings.HasPrefix(e, "AWARENESS_") {
|
||||
t.Errorf("awareness vars must NOT be injected when URL is missing: got %q", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildContainerEnv_CustomEnvVarsAppended(t *testing.T) {
|
||||
// NOTE: this test previously asserted GITHUB_TOKEN passed through
|
||||
// verbatim. That assertion encoded the forensic #145 latent leak as
|
||||
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
-- Reverse of 20260523130000_drop_workspaces_awareness_namespace.up.sql.
|
||||
--
|
||||
-- Restores the workspaces.awareness_namespace column verbatim from
|
||||
-- migration 010_workspace_awareness.sql so a down-cycle leaves the
|
||||
-- schema bit-identical to the pre-drop state. The column will be
|
||||
-- NULL on all rows after re-add — handlers no longer write to it and
|
||||
-- callers no longer read it, so this is functionally inert without
|
||||
-- a paired code revert.
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS awareness_namespace TEXT;
|
||||
@@ -0,0 +1,19 @@
|
||||
-- Issue #1735 — drop the workspaces.awareness_namespace column.
|
||||
--
|
||||
-- "Awareness namespaces" were a memory-routing surface (env vars
|
||||
-- AWARENESS_URL / AWARENESS_NAMESPACE) that was plumbed across the
|
||||
-- platform but never wired in any production or staging environment
|
||||
-- (verified 2026-05-23 via Railway GraphQL on the controlplane service:
|
||||
-- AWARENESS_* unset in both env IDs 59227671-… and 639539ec-…).
|
||||
--
|
||||
-- The column added by migration 010_workspace_awareness.sql was only
|
||||
-- ever populated with the canonical "workspace:<id>" string, which is
|
||||
-- also the v2 memory namespace string (see internal/memory/namespace/
|
||||
-- resolver.go:186). Removing the column does not change any agent-
|
||||
-- visible memory namespace — handlers now compute the same
|
||||
-- "workspace:<id>" string inline when inserting into agent_memories.
|
||||
--
|
||||
-- Related: #1733 (memory SSOT consolidation), #1734 (Memory tab bug).
|
||||
|
||||
ALTER TABLE workspaces
|
||||
DROP COLUMN IF EXISTS awareness_namespace;
|
||||
Reference in New Issue
Block a user