feat(platform): add per-workspace budget_limit field and A2A enforcement (#541)

- Migration 025: ADD COLUMN budget_limit BIGINT DEFAULT NULL and
  monthly_spend BIGINT NOT NULL DEFAULT 0 to workspaces table
- Models: BudgetLimit *int64 in CreateWorkspacePayload;
  MonthlySpend int64 in HeartbeatPayload
- workspace.go: scanWorkspaceRow, workspaceListQuery, Get, Create, and
  Update all handle budget_limit/monthly_spend; budget_limit is gated
  as a sensitiveUpdateField
- registry.go: heartbeat conditionally writes monthly_spend only when
  payload.MonthlySpend > 0 (avoids overwriting with zero)
- a2a_proxy.go: checkWorkspaceBudget() returns 429 when
  monthly_spend >= budget_limit (NULL = no limit; fail-open on DB error)
- Tests: 8 new workspace_budget_test.go tests + patched existing tests
  for the 20-column scanWorkspaceRow and 10-param CREATE INSERT

Field type: BIGINT (int64), units: USD cents (budget_limit=500 = $5.00/month)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Molecule AI Backend Engineer 2026-04-17 01:33:51 +00:00
parent f5c9132413
commit f99d8b0a1a
10 changed files with 578 additions and 28 deletions

View File

@ -203,6 +203,33 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
c.Data(status, "application/json", respBody)
}
// checkWorkspaceBudget returns a proxyA2AError with 429 when the workspace
// has a budget_limit set and monthly_spend has reached or exceeded it.
// DB errors are logged and treated as fail-open — a budget check failure
// must not block legitimate A2A traffic.
func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID string) *proxyA2AError {
var budgetLimit sql.NullInt64
var monthlySpend int64
err := db.DB.QueryRowContext(ctx,
`SELECT budget_limit, COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&budgetLimit, &monthlySpend)
if err != nil {
if err != sql.ErrNoRows {
log.Printf("ProxyA2A: budget check failed for %s: %v", workspaceID, err)
}
return nil // fail-open
}
if budgetLimit.Valid && monthlySpend >= budgetLimit.Int64 {
log.Printf("ProxyA2A: budget exceeded for %s (spend=%d limit=%d)", workspaceID, monthlySpend, budgetLimit.Int64)
return &proxyA2AError{
Status: http.StatusTooManyRequests,
Response: gin.H{"error": "workspace budget limit exceeded"},
}
}
return nil
}
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) {
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
@ -217,6 +244,14 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
}
// Budget enforcement: reject A2A calls when the workspace has exceeded its
// monthly spend ceiling. Checked after access control so unauthorized calls
// are rejected first (403 > 429 in the denial hierarchy). Fail-open on DB
// errors so a budget check failure never blocks legitimate traffic.
if proxyErr := h.checkWorkspaceBudget(ctx, workspaceID); proxyErr != nil {
return 0, nil, proxyErr
}
agentURL, proxyErr := h.resolveAgentURL(ctx, workspaceID)
if proxyErr != nil {
return 0, nil, proxyErr

View File

@ -30,7 +30,7 @@ func TestWorkspaceCreate_WithParentID(t *testing.T) {
parentID := "parent-ws-123"
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 1, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none").
WithArgs(sqlmock.AnyArg(), "Child Agent", nil, 1, "langgraph", sqlmock.AnyArg(), &parentID, nil, "none", (*int64)(nil)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -65,7 +65,7 @@ func TestWorkspaceCreate_ExplicitClaudeCodeRuntime(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none").
WithArgs(sqlmock.AnyArg(), "CC Agent", nil, 2, "claude-code", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
@ -194,12 +194,13 @@ func TestWorkspaceList_WithData(t *testing.T) {
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001",
nil, 3, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false).
nil, 3, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)).
AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "",
nil, 0, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true)
nil, 0, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0))
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)

View File

@ -253,7 +253,7 @@ func TestWorkspaceCreate(t *testing.T) {
// Expect workspace INSERT (uuid is dynamic, use AnyArg for id, runtime, awareness_namespace)
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none").
WithArgs(sqlmock.AnyArg(), "Test Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect transaction commit (no secrets in this payload)
@ -340,12 +340,13 @@ func TestWorkspaceList(t *testing.T) {
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001",
nil, 0, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false).
nil, 0, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)).
AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "",
nil, 0, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false)
nil, 0, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0))
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
@ -1007,12 +1008,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) {
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("ws-task").
WillReturnRows(sqlmock.NewRows(columns).AddRow(
"ws-task", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000",
nil, 2, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false,
nil, int64(0),
))
w := httptest.NewRecorder()

View File

@ -239,18 +239,40 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
// late heartbeat from a container that's being torn down doesn't
// refresh last_heartbeat_at on a tombstoned workspace (which would
// otherwise confuse the liveness monitor).
_, err := db.DB.ExecContext(ctx, `
UPDATE workspaces SET
last_heartbeat_at = now(),
last_error_rate = $2,
last_sample_error = $3,
active_tasks = $4,
uptime_seconds = $5,
current_task = $6,
updated_at = now()
WHERE id = $1 AND status != 'removed'
`, payload.WorkspaceID, payload.ErrorRate, payload.SampleError,
payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask)
//
// monthly_spend: updated when the agent reports a positive value (cumulative
// USD cents for the current month). Zero means "no update" — never write
// zero to avoid accidentally clearing a previously-reported spend value.
var err error
if payload.MonthlySpend > 0 {
_, err = db.DB.ExecContext(ctx, `
UPDATE workspaces SET
last_heartbeat_at = now(),
last_error_rate = $2,
last_sample_error = $3,
active_tasks = $4,
uptime_seconds = $5,
current_task = $6,
monthly_spend = $7,
updated_at = now()
WHERE id = $1 AND status != 'removed'
`, payload.WorkspaceID, payload.ErrorRate, payload.SampleError,
payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask,
payload.MonthlySpend)
} else {
_, err = db.DB.ExecContext(ctx, `
UPDATE workspaces SET
last_heartbeat_at = now(),
last_error_rate = $2,
last_sample_error = $3,
active_tasks = $4,
uptime_seconds = $5,
current_task = $6,
updated_at = now()
WHERE id = $1 AND status != 'removed'
`, payload.WorkspaceID, payload.ErrorRate, payload.SampleError,
payload.ActiveTasks, payload.UptimeSeconds, payload.CurrentTask)
}
if err != nil {
log.Printf("Heartbeat update error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update"})

View File

@ -150,9 +150,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// Insert workspace with runtime persisted in DB (inside transaction)
_, err := tx.ExecContext(ctx, `
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess)
INSERT INTO workspaces (id, name, role, tier, runtime, awareness_namespace, status, parent_id, workspace_dir, workspace_access, budget_limit)
VALUES ($1, $2, $3, $4, $5, $6, 'provisioning', $7, $8, $9, $10)
`, id, payload.Name, role, payload.Tier, payload.Runtime, awarenessNamespace, payload.ParentID, workspaceDir, workspaceAccess, payload.BudgetLimit)
if err != nil {
tx.Rollback() //nolint:errcheck
log.Printf("Create workspace error: %v", err)
@ -293,10 +293,13 @@ func scanWorkspaceRow(rows interface {
var collapsed bool
var parentID *string
var agentCard []byte
var budgetLimit sql.NullInt64
var monthlySpend int64
err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url,
&parentID, &activeTasks, &errorRate, &sampleError, &uptimeSeconds,
&currentTask, &runtime, &workspaceDir, &x, &y, &collapsed)
&currentTask, &runtime, &workspaceDir, &x, &y, &collapsed,
&budgetLimit, &monthlySpend)
if err != nil {
return nil, err
}
@ -315,11 +318,19 @@ func scanWorkspaceRow(rows interface {
"current_task": currentTask,
"runtime": runtime,
"workspace_dir": nilIfEmpty(workspaceDir),
"monthly_spend": monthlySpend,
"x": x,
"y": y,
"collapsed": collapsed,
}
// budget_limit: nil when no limit set, int64 otherwise
if budgetLimit.Valid {
ws["budget_limit"] = budgetLimit.Int64
} else {
ws["budget_limit"] = nil
}
// Only include non-empty values
if role != "" {
ws["role"] = role
@ -344,7 +355,8 @@ const workspaceListQuery = `
COALESCE(w.last_sample_error, ''), w.uptime_seconds,
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false)
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
w.budget_limit, COALESCE(w.monthly_spend, 0)
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.status != 'removed'
@ -389,7 +401,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
COALESCE(w.last_sample_error, ''), w.uptime_seconds,
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false)
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
w.budget_limit, COALESCE(w.monthly_spend, 0)
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.id = $1
@ -506,6 +519,7 @@ var sensitiveUpdateFields = map[string]struct{}{
"parent_id": {},
"runtime": {},
"workspace_dir": {},
"budget_limit": {}, // cost-control ceiling — requires admin auth to change
}
// Update handles PATCH /workspaces/:id
@ -603,6 +617,26 @@ func (h *WorkspaceHandler) Update(c *gin.Context) {
}
needsRestart = true
}
if budgetLimitVal, ok := body["budget_limit"]; ok {
// Allow null to clear (remove) the budget ceiling.
// Non-null values come in as JSON float64 from map[string]interface{}
// — convert to int64 for storage (USD cents).
var budgetArg interface{}
if budgetLimitVal != nil {
switch v := budgetLimitVal.(type) {
case float64:
budgetArg = int64(v)
case int64:
budgetArg = v
default:
c.JSON(http.StatusBadRequest, gin.H{"error": "budget_limit must be an integer (USD cents) or null"})
return
}
}
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET budget_limit = $2, updated_at = now() WHERE id = $1`, id, budgetArg); err != nil {
log.Printf("Update budget_limit error for %s: %v", id, err)
}
}
// Update canvas position if both x and y provided
if x, xOk := body["x"]; xOk {

View File

@ -0,0 +1,430 @@
package handlers
// Tests for per-workspace budget_limit field and A2A enforcement (#541).
//
// Coverage:
// - GET /workspaces/:id includes budget_limit (nil when unset, int when set)
// - GET /workspaces/:id includes monthly_spend
// - POST /workspaces creates workspace with budget_limit
// - PATCH /workspaces/:id updates budget_limit (nil clears the ceiling)
// - A2A proxy returns 429 when monthly_spend >= budget_limit
// - A2A proxy passes through when monthly_spend < budget_limit
// - A2A proxy passes through when budget_limit is NULL (no limit)
// - A2A proxy fail-open on DB error during budget check
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// wsColumns is the canonical column list for scanWorkspaceRow tests.
var wsColumns = []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}
// ==================== GET — budget_limit serialisation ====================
// TestWorkspaceBudget_Get_NilLimit verifies that budget_limit is null in the
// JSON response when the DB column IS NULL (no ceiling configured).
func TestWorkspaceBudget_Get_NilLimit(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("ws-nobudget").
WillReturnRows(sqlmock.NewRows(wsColumns).
AddRow("ws-nobudget", "Free Agent", "worker", 1, "online",
[]byte(`{}`), "http://localhost:9001",
nil, 0, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
nil, // budget_limit NULL
0)) // monthly_spend 0
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-nobudget"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-nobudget", nil)
handler.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["budget_limit"] != nil {
t.Errorf("expected budget_limit=nil, got %v", resp["budget_limit"])
}
if resp["monthly_spend"] != float64(0) {
t.Errorf("expected monthly_spend=0, got %v", resp["monthly_spend"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_Get_WithLimit verifies that a non-NULL budget_limit is
// returned as the correct integer value (USD cents) in the response.
func TestWorkspaceBudget_Get_WithLimit(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("ws-limited").
WillReturnRows(sqlmock.NewRows(wsColumns).
AddRow("ws-limited", "Capped Agent", "worker", 1, "online",
[]byte(`{}`), "http://localhost:9002",
nil, 0, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
int64(500), // budget_limit = $5.00
int64(123))) // monthly_spend = $1.23
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-limited"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-limited", nil)
handler.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["budget_limit"] != float64(500) {
t.Errorf("expected budget_limit=500, got %v", resp["budget_limit"])
}
if resp["monthly_spend"] != float64(123) {
t.Errorf("expected monthly_spend=123, got %v", resp["monthly_spend"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// ==================== POST — create with budget_limit ====================
// TestWorkspaceBudget_Create_WithLimit verifies that POST /workspaces with
// a budget_limit passes the value as the 10th INSERT parameter ($10).
func TestWorkspaceBudget_Create_WithLimit(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
budgetVal := int64(1000) // $10.00
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(
sqlmock.AnyArg(), // id
"Budgeted Agent", // name
nil, // role
1, // tier
"langgraph", // runtime
sqlmock.AnyArg(), // awareness_namespace
(*string)(nil), // parent_id
nil, // workspace_dir
"none", // workspace_access
&budgetVal, // budget_limit
).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO canvas_layouts").
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"Budgeted Agent","budget_limit":1000}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Errorf("expected 201, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// ==================== PATCH — update budget_limit ====================
// TestWorkspaceBudget_Update_SetLimit verifies that PATCH /workspaces/:id with
// budget_limit=500 issues an UPDATE workspaces SET budget_limit = 500.
func TestWorkspaceBudget_Update_SetLimit(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Existence probe
mock.ExpectQuery("SELECT EXISTS.*workspaces WHERE id").
WithArgs("ws-upd-budget").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// budget_limit UPDATE
mock.ExpectExec("UPDATE workspaces SET budget_limit").
WithArgs("ws-upd-budget", int64(500)).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-upd-budget"}}
body := `{"budget_limit":500}`
c.Request = httptest.NewRequest("PATCH", "/workspaces/ws-upd-budget", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_Update_ClearLimit verifies that PATCH /workspaces/:id
// with budget_limit=null issues an UPDATE with NULL, clearing the ceiling.
func TestWorkspaceBudget_Update_ClearLimit(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT EXISTS.*workspaces WHERE id").
WithArgs("ws-clear-budget").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// NULL clears the budget ceiling
mock.ExpectExec("UPDATE workspaces SET budget_limit").
WithArgs("ws-clear-budget", nil).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-clear-budget"}}
body := `{"budget_limit":null}`
c.Request = httptest.NewRequest("PATCH", "/workspaces/ws-clear-budget", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// ==================== A2A enforcement ====================
// TestWorkspaceBudget_A2A_ExceededReturns429 verifies that the A2A proxy
// returns HTTP 429 {"error":"workspace budget limit exceeded"} when
// monthly_spend equals budget_limit.
func TestWorkspaceBudget_A2A_ExceededReturns429(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Cache a URL so resolveAgentURL doesn't need a DB query after budget check
mr.Set(fmt.Sprintf("ws:%s:url", "ws-over-budget"), "http://localhost:9999")
// Budget check query: spend = limit → exceeded
mock.ExpectQuery("SELECT budget_limit, COALESCE").
WithArgs("ws-over-budget").
WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}).
AddRow(int64(500), int64(500)))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-over-budget"}}
body := `{"message":{"role":"user","parts":[{"text":"hello"}]}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-over-budget/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
if w.Code != http.StatusTooManyRequests {
t.Errorf("expected 429 when budget exceeded, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["error"] != "workspace budget limit exceeded" {
t.Errorf("expected 'workspace budget limit exceeded', got %v", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_A2A_AboveLimitReturns429 verifies 429 when spend > limit.
func TestWorkspaceBudget_A2A_AboveLimitReturns429(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mr.Set(fmt.Sprintf("ws:%s:url", "ws-way-over"), "http://localhost:9999")
// spend > limit
mock.ExpectQuery("SELECT budget_limit, COALESCE").
WithArgs("ws-way-over").
WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}).
AddRow(int64(100), int64(9999)))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-way-over"}}
body := `{"message":{"role":"user","parts":[{"text":"test"}]}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-way-over/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
if w.Code != http.StatusTooManyRequests {
t.Errorf("expected 429 when spend > limit, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_A2A_UnderLimitPassesThrough verifies that A2A calls
// succeed normally when monthly_spend is below budget_limit.
func TestWorkspaceBudget_A2A_UnderLimitPassesThrough(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Stand up a minimal mock agent that returns a valid A2A response
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-under-budget"), agentServer.URL)
// Budget check: spend (100) < limit (500) → pass-through
mock.ExpectQuery("SELECT budget_limit, COALESCE").
WithArgs("ws-under-budget").
WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}).
AddRow(int64(500), int64(100)))
// Activity log INSERT from logA2ASuccess
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-under-budget"}}
body := `{"jsonrpc":"2.0","id":"1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hello"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-under-budget/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
// Give the async logA2ASuccess goroutine a moment to fire
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Errorf("expected 200 when under budget, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_A2A_NilLimitPassesThrough verifies that when
// budget_limit IS NULL (no ceiling set), A2A calls pass through unconditionally.
func TestWorkspaceBudget_A2A_NilLimitPassesThrough(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"2","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-no-limit"), agentServer.URL)
// budget_limit NULL → no enforcement regardless of monthly_spend
mock.ExpectQuery("SELECT budget_limit, COALESCE").
WithArgs("ws-no-limit").
WillReturnRows(sqlmock.NewRows([]string{"budget_limit", "monthly_spend"}).
AddRow(nil, int64(999999))) // huge spend but no limit set
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-no-limit"}}
body := `{"jsonrpc":"2.0","id":"2","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-no-limit/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Errorf("expected 200 when no limit set, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestWorkspaceBudget_A2A_DBErrorFailOpen verifies that a DB error during the
// budget check is fail-open — the request proceeds rather than being blocked.
func TestWorkspaceBudget_A2A_DBErrorFailOpen(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"3","result":{"status":"ok"}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-db-err-budget"), agentServer.URL)
// Budget check fails with DB error → fail-open (request proceeds)
mock.ExpectQuery("SELECT budget_limit, COALESCE").
WithArgs("ws-db-err-budget").
WillReturnError(sql.ErrConnDone)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-db-err-budget"}}
body := `{"jsonrpc":"2.0","id":"3","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"fail-open test"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-db-err-budget/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Errorf("expected 200 on DB error (fail-open), got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}

View File

@ -24,13 +24,15 @@ func TestWorkspaceGet_Success(t *testing.T) {
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("ws-get-1").
WillReturnRows(sqlmock.NewRows(columns).
AddRow("ws-get-1", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`),
"http://localhost:8001", nil, 2, 0.05, "", 3600, "working", "langgraph",
"", 10.0, 20.0, false))
"", 10.0, 20.0, false,
nil, 0))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@ -149,7 +151,7 @@ func TestWorkspaceCreate_DBInsertError(t *testing.T) {
// Transaction begins, workspace INSERT fails, transaction is rolled back.
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none").
WithArgs(sqlmock.AnyArg(), "Failing Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WillReturnError(sql.ErrConnDone)
mock.ExpectRollback()
@ -181,7 +183,7 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
mock.ExpectBegin()
// Expect workspace INSERT with defaulted tier=1, runtime="langgraph"
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none").
WithArgs(sqlmock.AnyArg(), "Default Agent", nil, 1, "langgraph", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
@ -344,6 +346,7 @@ func TestWorkspaceList_Empty(t *testing.T) {
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
}))
w := httptest.NewRecorder()

View File

@ -44,6 +44,11 @@ type HeartbeatPayload struct {
ActiveTasks int `json:"active_tasks"`
UptimeSeconds int `json:"uptime_seconds"`
CurrentTask string `json:"current_task"`
// MonthlySpend is the agent's self-reported accumulated LLM API spend for
// the current month, in USD cents. Zero means "no update" — the platform
// only writes to monthly_spend when this field is > 0. Agents should
// report their cumulative spend each heartbeat (not the delta).
MonthlySpend int64 `json:"monthly_spend"`
}
type UpdateCardPayload struct {
@ -63,6 +68,9 @@ type CreateWorkspacePayload struct {
WorkspaceDir string `json:"workspace_dir"` // host path to mount as /workspace (empty = isolated volume)
WorkspaceAccess string `json:"workspace_access"` // "none" (default), "read_only", or "read_write" — see #65
ParentID *string `json:"parent_id"`
// BudgetLimit is the optional monthly spend ceiling in USD cents.
// NULL (omitted) means no limit. budget_limit=500 means $5.00/month.
BudgetLimit *int64 `json:"budget_limit"`
// Secrets is an optional map of key→plaintext-value pairs to persist as
// workspace secrets at creation time. Stored encrypted (same path as
// POST /workspaces/:id/secrets). Nil/empty map is a no-op.

View File

@ -0,0 +1,3 @@
ALTER TABLE workspaces
DROP COLUMN IF EXISTS budget_limit,
DROP COLUMN IF EXISTS monthly_spend;

View File

@ -0,0 +1,11 @@
-- Per-workspace monthly budget limit (#541).
-- NULL means no limit. When monthly_spend reaches budget_limit, the A2A
-- proxy returns 429 {"error":"workspace budget limit exceeded"} and rejects
-- further A2A calls until budget_limit is raised or monthly_spend is reset.
--
-- Units: USD cents (integer). budget_limit=500 means $5.00/month.
-- monthly_spend is updated by the workspace via the heartbeat endpoint;
-- agents report their accumulated LLM API cost each heartbeat cycle.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS budget_limit BIGINT DEFAULT NULL,
ADD COLUMN IF NOT EXISTS monthly_spend BIGINT NOT NULL DEFAULT 0;