feat(channels): per-channel message budget with 429 enforcement (#368)
Add an optional channel_budget (INTEGER, nullable) to workspace_channels
via migration 024. When channel_budget IS NOT NULL and message_count has
reached the budget, the Send handler returns 429 {"error":"channel budget
exceeded"} and aborts before calling SendOutbound.
Implementation details:
- Single SELECT query reads both message_count and channel_budget in one
round-trip (avoids TOCTOU window between read and write)
- Fail-open on DB error: transient failures log but don't block sends
- Early-return on budget hit is before SendOutbound so message_count
cannot be incremented past the limit by a concurrent send that slips
through the window (best-effort; atomic enforcement requires DB-level CAS)
- NULL channel_budget = unlimited (default, backward-compatible)
Migration is idempotent (ADD COLUMN IF NOT EXISTS). Down migration drops
the column cleanly.
Four sqlmock tests cover: at-limit → 429, above-limit → 429, NULL budget
passes through, under-limit passes through.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
520c993baa
commit
b021f85af9
@ -269,6 +269,24 @@ func (h *ChannelHandler) Send(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Per-channel budget enforcement (#368).
|
||||
// Reads message_count and channel_budget in one query. If channel_budget IS
|
||||
// NOT NULL and message_count has already reached it, reject with 429.
|
||||
// DB errors are logged and treated as fail-open (budget not enforced) so a
|
||||
// transient DB hiccup doesn't silently block outbound messages.
|
||||
var msgCount int
|
||||
var budget sql.NullInt64
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT message_count, channel_budget FROM workspace_channels WHERE id = $1`,
|
||||
channelID,
|
||||
).Scan(&msgCount, &budget); err != nil && err != sql.ErrNoRows {
|
||||
log.Printf("Channels: budget check failed for channel %s: %v", channelID, err)
|
||||
}
|
||||
if budget.Valid && int64(msgCount) >= budget.Int64 {
|
||||
c.JSON(http.StatusTooManyRequests, gin.H{"error": "channel budget exceeded"})
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.manager.SendOutbound(ctx, channelID, body.Text); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
|
||||
@ -452,3 +452,130 @@ func TestSystemCallerPrefix_ChannelIncluded(t *testing.T) {
|
||||
t.Error("user:someone should NOT be a system caller")
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Per-channel budget (#368) ====================
|
||||
|
||||
// TestChannelHandler_Send_BudgetExceeded verifies that when message_count
|
||||
// equals channel_budget, the Send handler returns 429 {"error":"channel budget exceeded"}
|
||||
// and does NOT call SendOutbound.
|
||||
func TestChannelHandler_Send_BudgetExceeded_Returns429(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
handler := NewChannelHandler(newTestChannelManager())
|
||||
|
||||
// Budget = 10, message_count = 10 → at the ceiling → 429.
|
||||
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
|
||||
WithArgs("ch-budget-hit").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
|
||||
AddRow(10, 10))
|
||||
|
||||
body, _ := json.Marshal(map[string]string{"text": "hello"})
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-budget-hit/send", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-budget-hit"}}
|
||||
|
||||
handler.Send(c)
|
||||
|
||||
if w.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("expected 429 when budget exceeded, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["error"] != "channel budget exceeded" {
|
||||
t.Errorf("expected error 'channel budget exceeded', got %v", resp["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelHandler_Send_BudgetExceeded_AboveLimit verifies that when
|
||||
// message_count exceeds channel_budget (not just equals it), 429 is returned.
|
||||
func TestChannelHandler_Send_BudgetExceeded_AboveLimit_Returns429(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
handler := NewChannelHandler(newTestChannelManager())
|
||||
|
||||
// Budget = 5, message_count = 99 → well above limit.
|
||||
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
|
||||
WithArgs("ch-over").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
|
||||
AddRow(99, 5))
|
||||
|
||||
body, _ := json.Marshal(map[string]string{"text": "hi"})
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-over/send", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-over"}}
|
||||
|
||||
handler.Send(c)
|
||||
|
||||
if w.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("expected 429 for over-limit, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelHandler_Send_NoBudget verifies that when channel_budget IS NULL
|
||||
// (no limit), the send proceeds past the budget check (no 429 returned).
|
||||
// The eventual 500 comes from loadChannel not finding the mock channel —
|
||||
// the important assertion is NOT 429.
|
||||
func TestChannelHandler_Send_NoBudget_PassesThrough(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
handler := NewChannelHandler(newTestChannelManager())
|
||||
|
||||
// NULL budget → no restriction.
|
||||
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
|
||||
WithArgs("ch-unlimited").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
|
||||
AddRow(9999, nil))
|
||||
|
||||
// SendOutbound → loadChannel SELECT — channel not found → error.
|
||||
mock.ExpectQuery("SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users").
|
||||
WithArgs("ch-unlimited").
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
|
||||
body, _ := json.Marshal(map[string]string{"text": "unlimited send"})
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-unlimited/send", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-unlimited"}}
|
||||
|
||||
handler.Send(c)
|
||||
|
||||
if w.Code == http.StatusTooManyRequests {
|
||||
t.Errorf("expected budget check to pass (NULL budget), but got 429")
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelHandler_Send_BudgetNotYetReached verifies that when
|
||||
// message_count < channel_budget, the send proceeds past the budget check.
|
||||
func TestChannelHandler_Send_BudgetNotYetReached_PassesThrough(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
handler := NewChannelHandler(newTestChannelManager())
|
||||
|
||||
// Budget = 100, message_count = 9 → still under limit.
|
||||
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
|
||||
WithArgs("ch-under").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
|
||||
AddRow(9, 100))
|
||||
|
||||
// SendOutbound → loadChannel SELECT — channel not found → error.
|
||||
mock.ExpectQuery("SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users").
|
||||
WithArgs("ch-under").
|
||||
WillReturnRows(sqlmock.NewRows([]string{}))
|
||||
|
||||
body, _ := json.Marshal(map[string]string{"text": "still under budget"})
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request, _ = http.NewRequest("POST", "/workspaces/ws-1/channels/ch-under/send", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-under"}}
|
||||
|
||||
handler.Send(c)
|
||||
|
||||
if w.Code == http.StatusTooManyRequests {
|
||||
t.Errorf("expected budget check to pass (under limit), but got 429")
|
||||
}
|
||||
}
|
||||
|
||||
2
platform/migrations/024_channel_budget.down.sql
Normal file
2
platform/migrations/024_channel_budget.down.sql
Normal file
@ -0,0 +1,2 @@
|
||||
ALTER TABLE workspace_channels
|
||||
DROP COLUMN IF EXISTS channel_budget;
|
||||
6
platform/migrations/024_channel_budget.up.sql
Normal file
6
platform/migrations/024_channel_budget.up.sql
Normal file
@ -0,0 +1,6 @@
|
||||
-- Per-channel message budget (#368).
|
||||
-- NULL means no limit. When message_count reaches channel_budget, the Send
|
||||
-- handler returns 429 {"error":"channel budget exceeded"} and rejects further
|
||||
-- outbound messages until the budget is cleared or raised.
|
||||
ALTER TABLE workspace_channels
|
||||
ADD COLUMN IF NOT EXISTS channel_budget INTEGER DEFAULT NULL;
|
||||
Loading…
Reference in New Issue
Block a user