From f99d8b0a1ad7c74595dd4ea538001e04785656fa Mon Sep 17 00:00:00 2001 From: Molecule AI Backend Engineer Date: Fri, 17 Apr 2026 01:33:51 +0000 Subject: [PATCH] feat(platform): add per-workspace budget_limit field and A2A enforcement (#541) - Migration 025: ADD COLUMN budget_limit BIGINT DEFAULT NULL and monthly_spend BIGINT NOT NULL DEFAULT 0 to workspaces table - Models: BudgetLimit *int64 in CreateWorkspacePayload; MonthlySpend int64 in HeartbeatPayload - workspace.go: scanWorkspaceRow, workspaceListQuery, Get, Create, and Update all handle budget_limit/monthly_spend; budget_limit is gated as a sensitiveUpdateField - registry.go: heartbeat conditionally writes monthly_spend only when payload.MonthlySpend > 0 (avoids overwriting with zero) - a2a_proxy.go: checkWorkspaceBudget() returns 429 when monthly_spend >= budget_limit (NULL = no limit; fail-open on DB error) - Tests: 8 new workspace_budget_test.go tests + patched existing tests for the 20-column scanWorkspaceRow and 10-param CREATE INSERT Field type: BIGINT (int64), units: USD cents (budget_limit=500 = $5.00/month) Co-Authored-By: Claude Sonnet 4.6 --- platform/internal/handlers/a2a_proxy.go | 35 ++ .../handlers/handlers_additional_test.go | 9 +- platform/internal/handlers/handlers_test.go | 9 +- platform/internal/handlers/registry.go | 46 +- platform/internal/handlers/workspace.go | 46 +- .../handlers/workspace_budget_test.go | 430 ++++++++++++++++++ platform/internal/handlers/workspace_test.go | 9 +- platform/internal/models/workspace.go | 8 + .../migrations/025_workspace_budget.down.sql | 3 + .../migrations/025_workspace_budget.up.sql | 11 + 10 files changed, 578 insertions(+), 28 deletions(-) create mode 100644 platform/internal/handlers/workspace_budget_test.go create mode 100644 platform/migrations/025_workspace_budget.down.sql create mode 100644 platform/migrations/025_workspace_budget.up.sql diff --git a/platform/internal/handlers/a2a_proxy.go b/platform/internal/handlers/a2a_proxy.go index 307c3311..32c2966f 100644 --- a/platform/internal/handlers/a2a_proxy.go +++ b/platform/internal/handlers/a2a_proxy.go @@ -203,6 +203,33 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) { c.Data(status, "application/json", respBody) } +// checkWorkspaceBudget returns a proxyA2AError with 429 when the workspace +// has a budget_limit set and monthly_spend has reached or exceeded it. +// DB errors are logged and treated as fail-open — a budget check failure +// must not block legitimate A2A traffic. +func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID string) *proxyA2AError { + var budgetLimit sql.NullInt64 + var monthlySpend int64 + err := db.DB.QueryRowContext(ctx, + `SELECT budget_limit, COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`, + workspaceID, + ).Scan(&budgetLimit, &monthlySpend) + if err != nil { + if err != sql.ErrNoRows { + log.Printf("ProxyA2A: budget check failed for %s: %v", workspaceID, err) + } + return nil // fail-open + } + if budgetLimit.Valid && monthlySpend >= budgetLimit.Int64 { + log.Printf("ProxyA2A: budget exceeded for %s (spend=%d limit=%d)", workspaceID, monthlySpend, budgetLimit.Int64) + return &proxyA2AError{ + Status: http.StatusTooManyRequests, + Response: gin.H{"error": "workspace budget limit exceeded"}, + } + } + return nil +} + func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) { // Access control: workspace-to-workspace requests must pass CanCommunicate check. // Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*) @@ -217,6 +244,14 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } } + // Budget enforcement: reject A2A calls when the workspace has exceeded its + // monthly spend ceiling. Checked after access control so unauthorized calls + // are rejected first (403 > 429 in the denial hierarchy). Fail-open on DB + // errors so a budget check failure never blocks legitimate traffic. + if proxyErr := h.checkWorkspaceBudget(ctx, workspaceID); proxyErr != nil { + return 0, nil, proxyErr + } + agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID) if proxyErr != nil { return 0, nil, proxyErr diff --git a/platform/internal/handlers/handlers_additional_test.go b/platform/internal/handlers/handlers_additional_test.go index 1ca55547..5316497c 100644 --- a/platform/internal/handlers/handlers_additional_test.go +++ b/platform/internal/handlers/handlers_additional_test.go @@ -30,7 +30,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) { parentID := "parent-ws-123" mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 1, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none"). + WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 1, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil)). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -65,7 +65,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none"). + WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() mock.ExpectExec("INSERT INTO canvas_layouts"). @@ -194,12 +194,13 @@ func TestWorkspaceList_WithData(t *testing.T) { "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", } rows := sqlmock.NewRows(columns). AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001", - nil, 3, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false). + nil, 3, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)). AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "", - nil, 0, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true) + nil, 0, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0)) mock.ExpectQuery("SELECT w.id, w.name"). WillReturnRows(rows) diff --git a/platform/internal/handlers/handlers_test.go b/platform/internal/handlers/handlers_test.go index c8dae41e..25a67578 100644 --- a/platform/internal/handlers/handlers_test.go +++ b/platform/internal/handlers/handlers_test.go @@ -253,7 +253,7 @@ func TestWorkspaceCreate(t *testing.T) { // Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace) mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none"). + WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). WillReturnResult(sqlmock.NewResult(0, 1)) // Expect transaction commit (no secrets in this payload) @@ -340,12 +340,13 @@ func TestWorkspaceList(t *testing.T) { "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", } rows := sqlmock.NewRows(columns). AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001", - nil, 0, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false). + nil, 0, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)). AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "", - nil, 0, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false) + nil, 0, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0)) mock.ExpectQuery("SELECT w.id, w.name"). WillReturnRows(rows) @@ -1007,12 +1008,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) { "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs("ws-task"). WillReturnRows(sqlmock.NewRows(columns).AddRow( "ws-task", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000", nil, 2, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false, + nil, int64(0), )) w := httptest.NewRecorder() diff --git a/platform/internal/handlers/registry.go b/platform/internal/handlers/registry.go index 445d6903..b07bc1b7 100644 --- a/platform/internal/handlers/registry.go +++ b/platform/internal/handlers/registry.go @@ -239,18 +239,40 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { // late heartbeat from a container that's being torn down doesn't // refresh last_heartbeat_at on a tombstoned workspace (which would // otherwise confuse the liveness monitor). - _, err := db.DB.ExecContext(ctx, ` - UPDATE workspaces SET - last_heartbeat_at = now(), - last_error_rate = $2, - last_sample_error = $3, - active_tasks = $4, - uptime_seconds = $5, - current_task = $6, - updated_at = now() - WHERE id = $1 AND status != 'removed' - `, payload.WorkspaceID, payload.ErrorRate, payload.SampleError, - payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask) + // + // monthly_spend: updated when the agent reports a positive value (cumulative + // USD cents for the current month). Zero means "no update" — never write + // zero to avoid accidentally clearing a previously-reported spend value. + var err error + if payload.MonthlySpend > 0 { + _, err = db.DB.ExecContext(ctx, ` + UPDATE workspaces SET + last_heartbeat_at = now(), + last_error_rate = $2, + last_sample_error = $3, + active_tasks = $4, + uptime_seconds = $5, + current_task = $6, + monthly_spend = $7, + updated_at = now() + WHERE id = $1 AND status != 'removed' + `, payload.WorkspaceID, payload.ErrorRate, payload.SampleError, + payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask, + payload.MonthlySpend) + } else { + _, err = db.DB.ExecContext(ctx, ` + UPDATE workspaces SET + last_heartbeat_at = now(), + last_error_rate = $2, + last_sample_error = $3, + active_tasks = $4, + uptime_seconds = $5, + current_task = $6, + updated_at = now() + WHERE id = $1 AND status != 'removed' + `, payload.WorkspaceID, payload.ErrorRate, payload.SampleError, + payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask) + } if err != nil { log.Printf("Heartbeat update error: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update"}) diff --git a/platform/internal/handlers/workspace.go b/platform/internal/handlers/workspace.go index dc727833..99b83eeb 100644 --- a/platform/internal/handlers/workspace.go +++ b/platform/internal/handlers/workspace.go @@ -150,9 +150,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { // Insert workspace with runtime persisted in DB (inside transaction) _, err := tx.ExecContext(ctx, ` - INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access) - VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9) - `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess) + INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit) + VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10) + `, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit) if err != nil { tx.Rollback() //nolint:errcheck log.Printf("Create workspace error: %v", err) @@ -293,10 +293,13 @@ func scanWorkspaceRow(rows interface { var collapsed bool var parentID *string var agentCard []byte + var budgetLimit sql.NullInt64 + var monthlySpend int64 err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url, &parentID, &activeTasks, &errorRate, &sampleError, &uptimeSeconds, - ¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed) + ¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed, + &budgetLimit, &monthlySpend) if err != nil { return nil, err } @@ -315,11 +318,19 @@ func scanWorkspaceRow(rows interface { "current_task": currentTask, "runtime": runtime, "workspace_dir": nilIfEmpty(workspaceDir), + "monthly_spend": monthlySpend, "x": x, "y": y, "collapsed": collapsed, } + // budget_limit: nil when no limit set, int64 otherwise + if budgetLimit.Valid { + ws["budget_limit"] = budgetLimit.Int64 + } else { + ws["budget_limit"] = nil + } + // Only include non-empty values if role != "" { ws["role"] = role @@ -344,7 +355,8 @@ const workspaceListQuery = ` COALESCE(w.last_sample_error, ''), w.uptime_seconds, COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'), COALESCE(w.workspace_dir, ''), - COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false) + COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false), + w.budget_limit, COALESCE(w.monthly_spend, 0) FROM workspaces w LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id WHERE w.status != 'removed' @@ -389,7 +401,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) { COALESCE(w.last_sample_error, ''), w.uptime_seconds, COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'), COALESCE(w.workspace_dir, ''), - COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false) + COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false), + w.budget_limit, COALESCE(w.monthly_spend, 0) FROM workspaces w LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id WHERE w.id = $1 @@ -506,6 +519,7 @@ var sensitiveUpdateFields = map[string]struct{}{ "parent_id": {}, "runtime": {}, "workspace_dir": {}, + "budget_limit": {}, // cost-control ceiling — requires admin auth to change } // Update handles PATCH /workspaces/:id @@ -603,6 +617,26 @@ func (h *WorkspaceHandler) Update(c *gin.Context) { } needsRestart = true } + if budgetLimitVal, ok := body["budget_limit"]; ok { + // Allow null to clear (remove) the budget ceiling. + // Non-null values come in as JSON float64 from map[string]interface{} + // — convert to int64 for storage (USD cents). + var budgetArg interface{} + if budgetLimitVal != nil { + switch v := budgetLimitVal.(type) { + case float64: + budgetArg = int64(v) + case int64: + budgetArg = v + default: + c.JSON(http.StatusBadRequest, gin.H{"error": "budget_limit must be an integer (USD cents) or null"}) + return + } + } + if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET budget_limit = $2, updated_at = now() WHERE id = $1`, id, budgetArg); err != nil { + log.Printf("Update budget_limit error for %s: %v", id, err) + } + } // Update canvas position if both x and y provided if x, xOk := body["x"]; xOk { diff --git a/platform/internal/handlers/workspace_budget_test.go b/platform/internal/handlers/workspace_budget_test.go new file mode 100644 index 00000000..345467c9 --- /dev/null +++ b/platform/internal/handlers/workspace_budget_test.go @@ -0,0 +1,430 @@ +package handlers + +// Tests for per-workspace budget_limit field and A2A enforcement (#541). +// +// Coverage: +// - GET /workspaces/:id includes budget_limit (nil when unset, int when set) +// - GET /workspaces/:id includes monthly_spend +// - POST /workspaces creates workspace with budget_limit +// - PATCH /workspaces/:id updates budget_limit (nil clears the ceiling) +// - A2A proxy returns 429 when monthly_spend >= budget_limit +// - A2A proxy passes through when monthly_spend < budget_limit +// - A2A proxy passes through when budget_limit is NULL (no limit) +// - A2A proxy fail-open on DB error during budget check + +import ( + "bytes" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// wsColumns is the canonical column list for scanWorkspaceRow tests. +var wsColumns = []string{ + "id", "name", "role", "tier", "status", "agent_card", "url", + "parent_id", "active_tasks", "last_error_rate", "last_sample_error", + "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", +} + +// ==================== GET — budget_limit serialisation ==================== + +// TestWorkspaceBudget_Get_NilLimit verifies that budget_limit is null in the +// JSON response when the DB column IS NULL (no ceiling configured). +func TestWorkspaceBudget_Get_NilLimit(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery("SELECT w.id, w.name"). + WithArgs("ws-nobudget"). + WillReturnRows(sqlmock.NewRows(wsColumns). + AddRow("ws-nobudget", "Free Agent", "worker", 1, "online", + []byte(`{}`), "http://localhost:9001", + nil, 0, 0.0, "", 0, "", "langgraph", "", + 0.0, 0.0, false, + nil, // budget_limit NULL + 0)) // monthly_spend 0 + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-nobudget"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-nobudget", nil) + handler.Get(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + if resp["budget_limit"] != nil { + t.Errorf("expected budget_limit=nil, got %v", resp["budget_limit"]) + } + if resp["monthly_spend"] != float64(0) { + t.Errorf("expected monthly_spend=0, got %v", resp["monthly_spend"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_Get_WithLimit verifies that a non-NULL budget_limit is +// returned as the correct integer value (USD cents) in the response. +func TestWorkspaceBudget_Get_WithLimit(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery("SELECT w.id, w.name"). + WithArgs("ws-limited"). + WillReturnRows(sqlmock.NewRows(wsColumns). + AddRow("ws-limited", "Capped Agent", "worker", 1, "online", + []byte(`{}`), "http://localhost:9002", + nil, 0, 0.0, "", 0, "", "langgraph", "", + 0.0, 0.0, false, + int64(500), // budget_limit = $5.00 + int64(123))) // monthly_spend = $1.23 + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-limited"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-limited", nil) + handler.Get(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + if resp["budget_limit"] != float64(500) { + t.Errorf("expected budget_limit=500, got %v", resp["budget_limit"]) + } + if resp["monthly_spend"] != float64(123) { + t.Errorf("expected monthly_spend=123, got %v", resp["monthly_spend"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// ==================== POST — create with budget_limit ==================== + +// TestWorkspaceBudget_Create_WithLimit verifies that POST /workspaces with +// a budget_limit passes the value as the 10th INSERT parameter ($10). +func TestWorkspaceBudget_Create_WithLimit(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + budgetVal := int64(1000) // $10.00 + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs( + sqlmock.AnyArg(), // id + "Budgeted Agent", // name + nil, // role + 1, // tier + "langgraph", // runtime + sqlmock.AnyArg(), // awareness_namespace + (*string)(nil), // parent_id + nil, // workspace_dir + "none", // workspace_access + &budgetVal, // budget_limit + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO canvas_layouts"). + WithArgs(sqlmock.AnyArg(), float64(0), float64(0)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"name":"Budgeted Agent","budget_limit":1000}` + c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Create(c) + + if w.Code != http.StatusCreated { + t.Errorf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// ==================== PATCH — update budget_limit ==================== + +// TestWorkspaceBudget_Update_SetLimit verifies that PATCH /workspaces/:id with +// budget_limit=500 issues an UPDATE workspaces SET budget_limit = 500. +func TestWorkspaceBudget_Update_SetLimit(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Existence probe + mock.ExpectQuery("SELECT EXISTS.*workspaces WHERE id"). + WithArgs("ws-upd-budget"). + WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true)) + // budget_limit UPDATE + mock.ExpectExec("UPDATE workspaces SET budget_limit"). + WithArgs("ws-upd-budget", int64(500)). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-upd-budget"}} + body := `{"budget_limit":500}` + c.Request = httptest.NewRequest("PATCH", "/workspaces/ws-upd-budget", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Update(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_Update_ClearLimit verifies that PATCH /workspaces/:id +// with budget_limit=null issues an UPDATE with NULL, clearing the ceiling. +func TestWorkspaceBudget_Update_ClearLimit(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery("SELECT EXISTS.*workspaces WHERE id"). + WithArgs("ws-clear-budget"). + WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true)) + // NULL clears the budget ceiling + mock.ExpectExec("UPDATE workspaces SET budget_limit"). + WithArgs("ws-clear-budget", nil). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-clear-budget"}} + body := `{"budget_limit":null}` + c.Request = httptest.NewRequest("PATCH", "/workspaces/ws-clear-budget", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Update(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// ==================== A2A enforcement ==================== + +// TestWorkspaceBudget_A2A_ExceededReturns429 verifies that the A2A proxy +// returns HTTP 429 {"error":"workspace budget limit exceeded"} when +// monthly_spend equals budget_limit. +func TestWorkspaceBudget_A2A_ExceededReturns429(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Cache a URL so resolveAgentURL doesn't need a DB query after budget check + mr.Set(fmt.Sprintf("ws:%s:url", "ws-over-budget"), "http://localhost:9999") + + // Budget check query: spend = limit → exceeded + mock.ExpectQuery("SELECT budget_limit, COALESCE"). + WithArgs("ws-over-budget"). + WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}). + AddRow(int64(500), int64(500))) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-over-budget"}} + body := `{"message":{"role":"user","parts":[{"text":"hello"}]}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-over-budget/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.ProxyA2A(c) + + if w.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 when budget exceeded, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["error"] != "workspace budget limit exceeded" { + t.Errorf("expected 'workspace budget limit exceeded', got %v", resp["error"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_A2A_AboveLimitReturns429 verifies 429 when spend > limit. +func TestWorkspaceBudget_A2A_AboveLimitReturns429(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-way-over"), "http://localhost:9999") + + // spend > limit + mock.ExpectQuery("SELECT budget_limit, COALESCE"). + WithArgs("ws-way-over"). + WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}). + AddRow(int64(100), int64(9999))) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-way-over"}} + body := `{"message":{"role":"user","parts":[{"text":"test"}]}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-way-over/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.ProxyA2A(c) + + if w.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 when spend > limit, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_A2A_UnderLimitPassesThrough verifies that A2A calls +// succeed normally when monthly_spend is below budget_limit. +func TestWorkspaceBudget_A2A_UnderLimitPassesThrough(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Stand up a minimal mock agent that returns a valid A2A response + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-under-budget"), agentServer.URL) + + // Budget check: spend (100) < limit (500) → pass-through + mock.ExpectQuery("SELECT budget_limit, COALESCE"). + WithArgs("ws-under-budget"). + WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}). + AddRow(int64(500), int64(100))) + + // Activity log INSERT from logA2ASuccess + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-under-budget"}} + body := `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hello"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-under-budget/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.ProxyA2A(c) + + // Give the async logA2ASuccess goroutine a moment to fire + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 when under budget, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_A2A_NilLimitPassesThrough verifies that when +// budget_limit IS NULL (no ceiling set), A2A calls pass through unconditionally. +func TestWorkspaceBudget_A2A_NilLimitPassesThrough(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"2","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-no-limit"), agentServer.URL) + + // budget_limit NULL → no enforcement regardless of monthly_spend + mock.ExpectQuery("SELECT budget_limit, COALESCE"). + WithArgs("ws-no-limit"). + WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}). + AddRow(nil, int64(999999))) // huge spend but no limit set + + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-no-limit"}} + body := `{"jsonrpc":"2.0","id":"2","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-no-limit/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 when no limit set, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestWorkspaceBudget_A2A_DBErrorFailOpen verifies that a DB error during the +// budget check is fail-open — the request proceeds rather than being blocked. +func TestWorkspaceBudget_A2A_DBErrorFailOpen(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"3","result":{"status":"ok"}}`) + })) + defer agentServer.Close() + + mr.Set(fmt.Sprintf("ws:%s:url", "ws-db-err-budget"), agentServer.URL) + + // Budget check fails with DB error → fail-open (request proceeds) + mock.ExpectQuery("SELECT budget_limit, COALESCE"). + WithArgs("ws-db-err-budget"). + WillReturnError(sql.ErrConnDone) + + mock.ExpectExec("INSERT INTO activity_logs"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-db-err-budget"}} + body := `{"jsonrpc":"2.0","id":"3","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"fail-open test"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-db-err-budget/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.ProxyA2A(c) + + time.Sleep(50 * time.Millisecond) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 on DB error (fail-open), got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} diff --git a/platform/internal/handlers/workspace_test.go b/platform/internal/handlers/workspace_test.go index e36665d0..cb458332 100644 --- a/platform/internal/handlers/workspace_test.go +++ b/platform/internal/handlers/workspace_test.go @@ -24,13 +24,15 @@ func TestWorkspaceGet_Success(t *testing.T) { "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs("ws-get-1"). WillReturnRows(sqlmock.NewRows(columns). AddRow("ws-get-1", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`), "http://localhost:8001", nil, 2, 0.05, "", 3600, "working", "langgraph", - "", 10.0, 20.0, false)) + "", 10.0, 20.0, false, + nil, 0)) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -149,7 +151,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) { // Transaction begins, workspace INSERT fails, transaction is rolled back. mock.ExpectBegin() mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none"). + WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). WillReturnError(sql.ErrConnDone) mock.ExpectRollback() @@ -181,7 +183,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) { mock.ExpectBegin() // Expect workspace INSERT with defaulted tier=1, runtime="langgraph" mock.ExpectExec("INSERT INTO workspaces"). - WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none"). + WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() @@ -344,6 +346,7 @@ func TestWorkspaceList_Empty(t *testing.T) { "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", + "budget_limit", "monthly_spend", })) w := httptest.NewRecorder() diff --git a/platform/internal/models/workspace.go b/platform/internal/models/workspace.go index 4bf9ed9a..e7c642f2 100644 --- a/platform/internal/models/workspace.go +++ b/platform/internal/models/workspace.go @@ -44,6 +44,11 @@ type HeartbeatPayload struct { ActiveTasks int `json:"active_tasks"` UptimeSeconds int `json:"uptime_seconds"` CurrentTask string `json:"current_task"` + // MonthlySpend is the agent's self-reported accumulated LLM API spend for + // the current month, in USD cents. Zero means "no update" — the platform + // only writes to monthly_spend when this field is > 0. Agents should + // report their cumulative spend each heartbeat (not the delta). + MonthlySpend int64 `json:"monthly_spend"` } type UpdateCardPayload struct { @@ -63,6 +68,9 @@ type CreateWorkspacePayload struct { WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume) WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65 ParentID *string `json:"parent_id"` + // BudgetLimit is the optional monthly spend ceiling in USD cents. + // NULL (omitted) means no limit. budget_limit=500 means $5.00/month. + BudgetLimit *int64 `json:"budget_limit"` // Secrets is an optional map of key→plaintext-value pairs to persist as // workspace secrets at creation time. Stored encrypted (same path as // POST /workspaces/:id/secrets). Nil/empty map is a no-op. diff --git a/platform/migrations/025_workspace_budget.down.sql b/platform/migrations/025_workspace_budget.down.sql new file mode 100644 index 00000000..c7cd48e7 --- /dev/null +++ b/platform/migrations/025_workspace_budget.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE workspaces + DROP COLUMN IF EXISTS budget_limit, + DROP COLUMN IF EXISTS monthly_spend; diff --git a/platform/migrations/025_workspace_budget.up.sql b/platform/migrations/025_workspace_budget.up.sql new file mode 100644 index 00000000..28334047 --- /dev/null +++ b/platform/migrations/025_workspace_budget.up.sql @@ -0,0 +1,11 @@ +-- Per-workspace monthly budget limit (#541). +-- NULL means no limit. When monthly_spend reaches budget_limit, the A2A +-- proxy returns 429 {"error":"workspace budget limit exceeded"} and rejects +-- further A2A calls until budget_limit is raised or monthly_spend is reset. +-- +-- Units: USD cents (integer). budget_limit=500 means $5.00/month. +-- monthly_spend is updated by the workspace via the heartbeat endpoint; +-- agents report their accumulated LLM API cost each heartbeat cycle. +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS budget_limit BIGINT DEFAULT NULL, + ADD COLUMN IF NOT EXISTS monthly_spend BIGINT NOT NULL DEFAULT 0;