Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ce89cc17a1 | |||
| a18d13a351 | |||
| 432873d261 | |||
| 540222220a | |||
| 7f703b3feb | |||
| ac61332098 | |||
| f8c64fadb6 | |||
| aaeb0411b2 |
@@ -19,7 +19,6 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/envx"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
@@ -367,14 +366,8 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
|
||||
// Post-RFC#637: canvas-user identity workspaces also bypass CanCommunicate
|
||||
// because human users sit outside the org hierarchy.
|
||||
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
|
||||
if !registry.CanCommunicate(callerID, workspaceID) {
|
||||
log.Printf("ProxyA2A: access denied %s → %s", callerID, workspaceID)
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusForbidden,
|
||||
Response: gin.H{"error": "access denied: workspaces cannot communicate per hierarchy rules"},
|
||||
}
|
||||
}
|
||||
if proxyErr := requireCanCommunicate(callerID, workspaceID, isCanvasUser, "ProxyA2A"); proxyErr != nil {
|
||||
return 0, nil, proxyErr
|
||||
}
|
||||
|
||||
// Budget enforcement: reject A2A calls when the workspace has exceeded its
|
||||
|
||||
@@ -14,12 +14,12 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/middleware"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/orgtoken"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
@@ -31,6 +31,23 @@ type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// requireCanCommunicate returns a proxyA2AError when callerID is not allowed to
|
||||
// reach workspaceID. Self-calls, system callers, and canvas users are permitted
|
||||
// without a registry check.
|
||||
func requireCanCommunicate(callerID, workspaceID string, isCanvasUser bool, logPrefix string) *proxyA2AError {
|
||||
if callerID == "" || callerID == workspaceID || isSystemCaller(callerID) || isCanvasUser {
|
||||
return nil
|
||||
}
|
||||
if !registry.CanCommunicate(callerID, workspaceID) {
|
||||
log.Printf("%s: access denied %s → %s", logPrefix, callerID, workspaceID)
|
||||
return &proxyA2AError{
|
||||
Status: http.StatusForbidden,
|
||||
Response: gin.H{"error": "access denied: workspaces cannot communicate per hierarchy rules"},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and records the outcome. Busy
|
||||
// targets that are successfully queued are logged as queued, not failed.
|
||||
@@ -432,7 +449,7 @@ func nilIfEmpty(s string) *string {
|
||||
//
|
||||
// On auth failure this writes the 401 via c and returns an error so the
|
||||
// handler aborts without running the proxy.
|
||||
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (isCanvasUser bool, err error) {
|
||||
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (bool, error) {
|
||||
hasLive, dbErr := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
|
||||
if dbErr != nil {
|
||||
// Fail-open here matches the heartbeat path — A2A caller auth is
|
||||
@@ -442,25 +459,28 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (
|
||||
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, dbErr)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
|
||||
if !hasLive {
|
||||
// Tokenless workspace — could be legacy/pre-upgrade caller or
|
||||
// canvas-user identity. Distinguish by request auth signals.
|
||||
if middleware.IsSameOriginCanvas(c) {
|
||||
return true, nil
|
||||
}
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok != "" {
|
||||
adminSecret := os.Getenv("ADMIN_TOKEN")
|
||||
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
|
||||
return true, nil
|
||||
}
|
||||
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
if tok == "" {
|
||||
return false, nil // legacy / pre-upgrade caller
|
||||
}
|
||||
adminSecret := os.Getenv("ADMIN_TOKEN")
|
||||
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
|
||||
return true, nil
|
||||
}
|
||||
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil // legacy / pre-upgrade caller
|
||||
}
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
|
||||
if tok == "" {
|
||||
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing caller auth token"})
|
||||
return false, errInvalidCallerToken
|
||||
|
||||
@@ -2341,6 +2341,106 @@ func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_PollMode_CanvasUserCallerID_PropagatesToActivityLog pins
|
||||
// the specific contract that broke in molecule-core#1675 (2026-05-22):
|
||||
// canvas chat messages from a user with an identity workspace (RFC#637
|
||||
// canvas-user-identity rollout) MUST write an activity_logs row whose
|
||||
// source_id matches the canvas user's workspace UUID, NOT NULL — so the
|
||||
// channel plugin's poll path can deliver them as `<channel kind="canvas_user">`
|
||||
// tags to the bound Claude Code session, AND the canvas chat-history can
|
||||
// re-render the user's own message on reopen.
|
||||
//
|
||||
// The sibling test TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch
|
||||
// covers the legacy "no callerID" path (anonymous canvas without RFC#637).
|
||||
// THIS test covers the post-RFC#637 path with an explicit X-Workspace-ID
|
||||
// header naming the canvas user's identity workspace.
|
||||
//
|
||||
// Empirical trigger: Hongming's tenant 30ba7f0b had 3+ hours of silent
|
||||
// canvas-message loss while peer-agent A2A (PM→CEO_Assistant) kept
|
||||
// arriving — the breakage was specific to canvas user → target workspace
|
||||
// routing. The fix MUST ensure logA2AReceiveQueued runs synchronously
|
||||
// before the queued 200, and source_id is populated from callerID.
|
||||
func TestProxyA2A_PollMode_CanvasUserCallerID_PropagatesToActivityLog(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const targetWS = "ws-canvas-target-1675"
|
||||
const canvasUserWS = "344a2623-50bf-4ab9-9732-220779305c8f" // shape from #1675 evidence
|
||||
|
||||
// Post-fix (PR #1756): validateCallerToken checks whether the caller
|
||||
// workspace has live tokens. Canvas-user identity workspaces are
|
||||
// tokenless, so they fall through to the admin/org-token detection
|
||||
// path. We set ADMIN_TOKEN + Authorization so the caller is identified
|
||||
// as a canvas user and CanCommunicate is bypassed.
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
|
||||
WithArgs(canvasUserWS).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
expectBudgetCheck(mock, targetWS)
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "test-admin-secret-1675")
|
||||
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(targetWS).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
|
||||
|
||||
// CRITICAL: the activity_logs INSERT MUST happen, and its source_id
|
||||
// argument MUST match the canvas user's workspace UUID. The previous
|
||||
// behaviour (sqlmock.ExpectExec with no WithArgs) accepted any args —
|
||||
// which is exactly how the regression in #1675 escaped CI: the INSERT
|
||||
// fired, but with source_id=NULL because callerID propagation was
|
||||
// bypassed somewhere upstream. Pin the source_id position explicitly.
|
||||
mock.ExpectQuery("SELECT name FROM workspaces WHERE id").
|
||||
WithArgs(targetWS).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Canvas Target"))
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(
|
||||
targetWS, // workspace_id
|
||||
"a2a_receive", // activity_type
|
||||
canvasUserWS, // source_id (NOT NULL — the contract this test exists to pin)
|
||||
targetWS, // target_id
|
||||
"message/send", // method
|
||||
sqlmock.AnyArg(), // summary
|
||||
sqlmock.AnyArg(), // request_body
|
||||
sqlmock.AnyArg(), // response_body (nil for queued)
|
||||
sqlmock.AnyArg(), // tool_trace
|
||||
sqlmock.AnyArg(), // duration_ms
|
||||
"ok", // status
|
||||
sqlmock.AnyArg(), // error_detail
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: targetWS}}
|
||||
// X-Workspace-ID is the canonical way canvas Next.js identifies the
|
||||
// signed-in user's identity workspace to the platform (per RFC#637).
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+targetWS+"/a2a",
|
||||
bytes.NewBufferString(`{"jsonrpc":"2.0","id":"canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hello from canvas"}]}}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Request.Header.Set("X-Workspace-ID", canvasUserWS)
|
||||
c.Request.Header.Set("Authorization", "Bearer test-admin-secret-1675")
|
||||
|
||||
handler.ProxyA2A(c)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 queued, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "queued" {
|
||||
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations — the activity INSERT may have been skipped OR fired with a different source_id (the #1675 regression shape): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract:
|
||||
// a push-mode workspace (default) is NOT affected by the new short-circuit.
|
||||
// It still proceeds to resolveAgentURL + dispatch. Without this guard, a
|
||||
|
||||
@@ -1146,7 +1146,7 @@ func TestIsSafeURL_Blocks169_254_Metadata(t *testing.T) {
|
||||
|
||||
func TestIsSafeURL_Blocks10xPrivate(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
err := isSafeURL("http://10.0.0.1/agent")
|
||||
if err == nil {
|
||||
t.Errorf("isSafeURL: expected 10.x.x.x to be blocked, got nil")
|
||||
@@ -1155,7 +1155,7 @@ func TestIsSafeURL_Blocks10xPrivate(t *testing.T) {
|
||||
|
||||
func TestIsSafeURL_Blocks172Private(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
err := isSafeURL("http://172.16.0.1/agent")
|
||||
if err == nil {
|
||||
t.Errorf("isSafeURL: expected 172.16.0.0/12 to be blocked, got nil")
|
||||
@@ -1164,7 +1164,7 @@ func TestIsSafeURL_Blocks172Private(t *testing.T) {
|
||||
|
||||
func TestIsSafeURL_Blocks192_168Private(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
err := isSafeURL("http://192.168.1.100/agent")
|
||||
if err == nil {
|
||||
t.Errorf("isSafeURL: expected 192.168.x.x to be blocked, got nil")
|
||||
@@ -1189,7 +1189,7 @@ func TestIsSafeURL_BlocksInvalidURL(t *testing.T) {
|
||||
|
||||
func TestIsPrivateOrMetadataIP_10Range(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
tests := []string{"10.0.0.0", "10.255.255.255", "10.1.2.3"}
|
||||
for _, ip := range tests {
|
||||
if !isPrivateOrMetadataIP(net.ParseIP(ip)) {
|
||||
@@ -1200,7 +1200,7 @@ func TestIsPrivateOrMetadataIP_10Range(t *testing.T) {
|
||||
|
||||
func TestIsPrivateOrMetadataIP_172Range(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
tests := []string{"172.16.0.0", "172.31.255.255", "172.20.1.1"}
|
||||
for _, ip := range tests {
|
||||
if !isPrivateOrMetadataIP(net.ParseIP(ip)) {
|
||||
@@ -1211,7 +1211,7 @@ func TestIsPrivateOrMetadataIP_172Range(t *testing.T) {
|
||||
|
||||
func TestIsPrivateOrMetadataIP_192_168Range(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
tests := []string{"192.168.0.0", "192.168.255.255", "192.168.1.1"}
|
||||
for _, ip := range tests {
|
||||
if !isPrivateOrMetadataIP(net.ParseIP(ip)) {
|
||||
|
||||
@@ -713,7 +713,7 @@ func TestHeartbeat_SkipsRemovedRows(t *testing.T) {
|
||||
|
||||
func TestValidateAgentURL(t *testing.T) {
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
url string
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/registry"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/scheduler"
|
||||
@@ -472,7 +471,7 @@ func (h *ScheduleHandler) Health(c *gin.Context) {
|
||||
// Skip for system callers and self-calls, same as the A2A proxy.
|
||||
// Post-RFC#637: canvas users may read schedule health too.
|
||||
isCanvasUser := false
|
||||
if !isSystemCaller(callerID) && callerID != workspaceID {
|
||||
if callerID != workspaceID && !isSystemCaller(callerID) {
|
||||
var err error
|
||||
isCanvasUser, err = validateCallerToken(ctx, c, callerID)
|
||||
if err != nil {
|
||||
@@ -482,12 +481,9 @@ func (h *ScheduleHandler) Health(c *gin.Context) {
|
||||
|
||||
// CanCommunicate gate — only peers in the org hierarchy may read health.
|
||||
// Canvas users (human operators) bypass this gate.
|
||||
if callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
|
||||
if !registry.CanCommunicate(callerID, workspaceID) {
|
||||
log.Printf("ScheduleHealth: access denied %s → %s", callerID, workspaceID)
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "access denied"})
|
||||
return
|
||||
}
|
||||
if proxyErr := requireCanCommunicate(callerID, workspaceID, isCanvasUser, "ScheduleHealth"); proxyErr != nil {
|
||||
c.JSON(proxyErr.Status, proxyErr.Response)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
|
||||
@@ -96,6 +96,7 @@ func TestSecurity_GetTemplates_FreshInstall_FailsOpen(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
t.Setenv("ADMIN_TOKEN", "")
|
||||
|
||||
authDB, authMock := newFreshInstallAuthDB(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
@@ -154,6 +155,7 @@ func TestSecurity_GetOrgTemplates_FreshInstall_FailsOpen(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
t.Setenv("ADMIN_TOKEN", "")
|
||||
|
||||
authDB, authMock := newFreshInstallAuthDB(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
@@ -840,6 +840,8 @@ func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
|
||||
// into cfg.ConfigFiles[".auth_token"].
|
||||
func TestIssueAndInjectToken_HappyPath(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// Hermetic: container may have MOLECULE_ORG_ID set (SaaS mode skips injection).
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
@@ -879,6 +881,8 @@ func TestIssueAndInjectToken_HappyPath(t *testing.T) {
|
||||
// issuing a fresh one so we never accumulate stale live tokens in the DB.
|
||||
func TestIssueAndInjectToken_RotatesExistingToken(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// Hermetic: container may have MOLECULE_ORG_ID set (SaaS mode skips injection).
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
@@ -924,6 +928,8 @@ func TestIssueAndInjectToken_RotatesExistingToken(t *testing.T) {
|
||||
// live token that the old file might accidentally present.
|
||||
func TestIssueAndInjectToken_RevokeFailSkipsInjection(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// Hermetic: container may have MOLECULE_ORG_ID set (SaaS mode skips injection).
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
@@ -947,6 +953,8 @@ func TestIssueAndInjectToken_RevokeFailSkipsInjection(t *testing.T) {
|
||||
// IssueToken also skips injection without panicking.
|
||||
func TestIssueAndInjectToken_IssueFailSkipsInjection(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// Hermetic: container may have MOLECULE_ORG_ID set (SaaS mode skips injection).
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
@@ -975,6 +983,8 @@ func TestIssueAndInjectToken_IssueFailSkipsInjection(t *testing.T) {
|
||||
// ConfigFiles map is allocated before the token is written.
|
||||
func TestIssueAndInjectToken_NilConfigFilesAllocated(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// Hermetic: container may have MOLECULE_ORG_ID set (SaaS mode skips injection).
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
|
||||
Reference in New Issue
Block a user