From b021f85af9c8d5cd6879cd4b8418fa77569e01d9 Mon Sep 17 00:00:00 2001 From: Molecule AI Backend Engineer Date: Thu, 16 Apr 2026 11:17:14 +0000 Subject: [PATCH] feat(channels): per-channel message budget with 429 enforcement (#368) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an optional channel_budget (INTEGER, nullable) to workspace_channels via migration 024. When channel_budget IS NOT NULL and message_count has reached the budget, the Send handler returns 429 {"error":"channel budget exceeded"} and aborts before calling SendOutbound. Implementation details: - Single SELECT query reads both message_count and channel_budget in one round-trip (avoids TOCTOU window between read and write) - Fail-open on DB error: transient failures log but don't block sends - Early-return on budget hit is before SendOutbound so message_count cannot be incremented past the limit by a concurrent send that slips through the window (best-effort; atomic enforcement requires DB-level CAS) - NULL channel_budget = unlimited (default, backward-compatible) Migration is idempotent (ADD COLUMN IF NOT EXISTS). Down migration drops the column cleanly. Four sqlmock tests cover: at-limit → 429, above-limit → 429, NULL budget passes through, under-limit passes through. Co-Authored-By: Claude Sonnet 4.6 --- platform/internal/handlers/channels.go | 18 +++ platform/internal/handlers/channels_test.go | 127 ++++++++++++++++++ .../migrations/024_channel_budget.down.sql | 2 + platform/migrations/024_channel_budget.up.sql | 6 + 4 files changed, 153 insertions(+) create mode 100644 platform/migrations/024_channel_budget.down.sql create mode 100644 platform/migrations/024_channel_budget.up.sql diff --git a/platform/internal/handlers/channels.go b/platform/internal/handlers/channels.go index d0bcff5d..c2bb0890 100644 --- a/platform/internal/handlers/channels.go +++ b/platform/internal/handlers/channels.go @@ -269,6 +269,24 @@ func (h *ChannelHandler) Send(c *gin.Context) { return } + // Per-channel budget enforcement (#368). + // Reads message_count and channel_budget in one query. If channel_budget IS + // NOT NULL and message_count has already reached it, reject with 429. + // DB errors are logged and treated as fail-open (budget not enforced) so a + // transient DB hiccup doesn't silently block outbound messages. + var msgCount int + var budget sql.NullInt64 + if err := db.DB.QueryRowContext(ctx, + `SELECT message_count, channel_budget FROM workspace_channels WHERE id = $1`, + channelID, + ).Scan(&msgCount, &budget); err != nil && err != sql.ErrNoRows { + log.Printf("Channels: budget check failed for channel %s: %v", channelID, err) + } + if budget.Valid && int64(msgCount) >= budget.Int64 { + c.JSON(http.StatusTooManyRequests, gin.H{"error": "channel budget exceeded"}) + return + } + if err := h.manager.SendOutbound(ctx, channelID, body.Text); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return diff --git a/platform/internal/handlers/channels_test.go b/platform/internal/handlers/channels_test.go index 6fa52b7b..88f0a504 100644 --- a/platform/internal/handlers/channels_test.go +++ b/platform/internal/handlers/channels_test.go @@ -452,3 +452,130 @@ func TestSystemCallerPrefix_ChannelIncluded(t *testing.T) { t.Error("user:someone should NOT be a system caller") } } + +// ==================== Per-channel budget (#368) ==================== + +// TestChannelHandler_Send_BudgetExceeded verifies that when message_count +// equals channel_budget, the Send handler returns 429 {"error":"channel budget exceeded"} +// and does NOT call SendOutbound. +func TestChannelHandler_Send_BudgetExceeded_Returns429(t *testing.T) { + mock := setupTestDB(t) + handler := NewChannelHandler(newTestChannelManager()) + + // Budget = 10, message_count = 10 → at the ceiling → 429. + mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id"). + WithArgs("ch-budget-hit"). + WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}). + AddRow(10, 10)) + + body, _ := json.Marshal(map[string]string{"text": "hello"}) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-budget-hit/send", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-budget-hit"}} + + handler.Send(c) + + if w.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 when budget exceeded, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["error"] != "channel budget exceeded" { + t.Errorf("expected error 'channel budget exceeded', got %v", resp["error"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations not met: %v", err) + } +} + +// TestChannelHandler_Send_BudgetExceeded_AboveLimit verifies that when +// message_count exceeds channel_budget (not just equals it), 429 is returned. +func TestChannelHandler_Send_BudgetExceeded_AboveLimit_Returns429(t *testing.T) { + mock := setupTestDB(t) + handler := NewChannelHandler(newTestChannelManager()) + + // Budget = 5, message_count = 99 → well above limit. + mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id"). + WithArgs("ch-over"). + WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}). + AddRow(99, 5)) + + body, _ := json.Marshal(map[string]string{"text": "hi"}) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-over/send", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-over"}} + + handler.Send(c) + + if w.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 for over-limit, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestChannelHandler_Send_NoBudget verifies that when channel_budget IS NULL +// (no limit), the send proceeds past the budget check (no 429 returned). +// The eventual 500 comes from loadChannel not finding the mock channel — +// the important assertion is NOT 429. +func TestChannelHandler_Send_NoBudget_PassesThrough(t *testing.T) { + mock := setupTestDB(t) + handler := NewChannelHandler(newTestChannelManager()) + + // NULL budget → no restriction. + mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id"). + WithArgs("ch-unlimited"). + WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}). + AddRow(9999, nil)) + + // SendOutbound → loadChannel SELECT — channel not found → error. + mock.ExpectQuery("SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users"). + WithArgs("ch-unlimited"). + WillReturnRows(sqlmock.NewRows([]string{})) + + body, _ := json.Marshal(map[string]string{"text": "unlimited send"}) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-unlimited/send", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-unlimited"}} + + handler.Send(c) + + if w.Code == http.StatusTooManyRequests { + t.Errorf("expected budget check to pass (NULL budget), but got 429") + } +} + +// TestChannelHandler_Send_BudgetNotYetReached verifies that when +// message_count < channel_budget, the send proceeds past the budget check. +func TestChannelHandler_Send_BudgetNotYetReached_PassesThrough(t *testing.T) { + mock := setupTestDB(t) + handler := NewChannelHandler(newTestChannelManager()) + + // Budget = 100, message_count = 9 → still under limit. + mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id"). + WithArgs("ch-under"). + WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}). + AddRow(9, 100)) + + // SendOutbound → loadChannel SELECT — channel not found → error. + mock.ExpectQuery("SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users"). + WithArgs("ch-under"). + WillReturnRows(sqlmock.NewRows([]string{})) + + body, _ := json.Marshal(map[string]string{"text": "still under budget"}) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-under/send", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-under"}} + + handler.Send(c) + + if w.Code == http.StatusTooManyRequests { + t.Errorf("expected budget check to pass (under limit), but got 429") + } +} diff --git a/platform/migrations/024_channel_budget.down.sql b/platform/migrations/024_channel_budget.down.sql new file mode 100644 index 00000000..93b5f663 --- /dev/null +++ b/platform/migrations/024_channel_budget.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workspace_channels + DROP COLUMN IF EXISTS channel_budget; diff --git a/platform/migrations/024_channel_budget.up.sql b/platform/migrations/024_channel_budget.up.sql new file mode 100644 index 00000000..eabec360 --- /dev/null +++ b/platform/migrations/024_channel_budget.up.sql @@ -0,0 +1,6 @@ +-- Per-channel message budget (#368). +-- NULL means no limit. When message_count reaches channel_budget, the Send +-- handler returns 429 {"error":"channel budget exceeded"} and rejects further +-- outbound messages until the budget is cleared or raised. +ALTER TABLE workspace_channels + ADD COLUMN IF NOT EXISTS channel_budget INTEGER DEFAULT NULL;