fix(security): scope PausePollersForToken to requesting workspace (closes #329)

CI 5/6 pass (E2E cancel = run-supersession pattern). Dev Lead review 04:21:  Approved. Fixes cross-tenant token exposure: PausePollersForToken now scoped to requesting workspace_id via SQL WHERE clause. Closes #329.
This commit is contained in:
Hongming Wang 2026-04-15 21:22:50 -07:00 committed by GitHub
parent 12dc0ebdf2
commit a205c92428
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 66 additions and 15 deletions

View File

@ -87,7 +87,7 @@ export function ChannelsTab({ workspaceId }: Props) {
try {
const res = await api.post<{ chats: { chat_id: string; name: string; type: string }[]; hint: string }>(
`/channels/discover`,
{ channel_type: formType, bot_token: formBotToken }
{ channel_type: formType, bot_token: formBotToken, workspace_id: workspaceId }
);
const chats = res.chats || [];
setDiscoveredChats(chats);

View File

@ -102,23 +102,29 @@ func (m *Manager) Start(ctx context.Context) {
m.Reload(ctx)
}
// PausePollersForToken stops any pollers that share the given bot token,
// then returns a resume function. Used during discovery to avoid Telegram's
// "only one getUpdates at a time" 409 Conflict.
// PausePollersForToken stops any pollers in the given workspace that share
// the given bot token, then returns a resume function. Used during discovery
// to avoid Telegram's "only one getUpdates at a time" 409 Conflict.
//
// #319: bot_token is stored encrypted in channel_config so we cannot match
// with SQL `channel_config->>'bot_token' = $1` anymore. Load all enabled
// channels, decrypt each, and compare the plaintext in Go. The cardinality
// is small (typically <10 enabled channels per install) so the extra work
// is negligible.
func (m *Manager) PausePollersForToken(botToken string) func() {
if botToken == "" {
// with SQL `channel_config->>'bot_token' = $1` anymore. Load channels,
// decrypt each, and compare the plaintext in Go.
//
// #329: scope the lookup to the requesting workspace. The unscoped variant
// loaded plaintext tokens for every tenant into memory on each discovery
// call — blast-radius concern if a heap dump / profiler leaked process
// memory. Reload() keeps the unscoped query since it legitimately needs
// every workspace's pollers at startup; PausePollersForToken operates in
// the context of a single workspace's API request and does not.
func (m *Manager) PausePollersForToken(workspaceID, botToken string) func() {
if botToken == "" || workspaceID == "" {
return func() {}
}
rows, err := db.DB.QueryContext(context.Background(), `
SELECT id, channel_config FROM workspace_channels WHERE enabled = true
`)
SELECT id, channel_config FROM workspace_channels
WHERE enabled = true AND workspace_id = $1
`, workspaceID)
if err != nil {
return func() {}
}

View File

@ -310,11 +310,22 @@ func (h *ChannelHandler) Discover(c *gin.Context) {
var body struct {
ChannelType string `json:"channel_type"`
BotToken string `json:"bot_token"`
WorkspaceID string `json:"workspace_id"`
}
if err := c.ShouldBindJSON(&body); err != nil || body.BotToken == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "bot_token is required"})
return
}
// #329: workspace_id is required so PausePollersForToken can scope the
// decryption lookup to the caller's tenant. Legacy clients that omit
// the field still work — they just won't be able to pause a previously-
// saved poller sharing the same token, which fails loudly at Telegram
// with a 409 Conflict rather than silently decrypting every tenant's
// token.
if body.WorkspaceID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace_id is required"})
return
}
adapter, ok := channels.GetAdapter(body.ChannelType)
if !ok {
@ -329,9 +340,10 @@ func (h *ChannelHandler) Discover(c *gin.Context) {
return
}
// Pause any active poller using this bot token to avoid Telegram's
// "only one getUpdates at a time" 409 Conflict.
resumeFn := h.manager.PausePollersForToken(body.BotToken)
// Pause any active poller in THIS workspace using this bot token to
// avoid Telegram's "only one getUpdates at a time" 409 Conflict.
// #329: scoped to workspace_id so we never decrypt other tenants' tokens.
resumeFn := h.manager.PausePollersForToken(body.WorkspaceID, body.BotToken)
defer resumeFn()
result, err := tg.DiscoverChats(c.Request.Context(), body.BotToken)

View File

@ -361,9 +361,12 @@ func TestChannelHandler_Discover_MissingToken(t *testing.T) {
func TestChannelHandler_Discover_UnsupportedType(t *testing.T) {
handler := NewChannelHandler(newTestChannelManager())
// #329: workspace_id required — include so we actually reach the
// unsupported-type check instead of bouncing at the new scope gate.
body, _ := json.Marshal(map[string]interface{}{
"channel_type": "whatsapp",
"bot_token": "fake",
"workspace_id": "ws-test",
})
w := httptest.NewRecorder()
@ -384,6 +387,7 @@ func TestChannelHandler_Discover_InvalidBotToken(t *testing.T) {
body, _ := json.Marshal(map[string]interface{}{
"channel_type": "telegram",
"bot_token": "clearly-not-a-real-token",
"workspace_id": "ws-test",
})
w := httptest.NewRecorder()
@ -406,6 +410,35 @@ func TestChannelHandler_Discover_InvalidBotToken(t *testing.T) {
}
}
// #329: workspace_id is now required. Without it, Discover must 400
// *before* issuing the unscoped DB query that would decrypt every
// tenant's bot tokens.
func TestChannelHandler_Discover_329_RequiresWorkspaceID(t *testing.T) {
handler := NewChannelHandler(newTestChannelManager())
body, _ := json.Marshal(map[string]interface{}{
"channel_type": "telegram",
"bot_token": "any-non-empty-token",
// workspace_id intentionally omitted
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request, _ = http.NewRequest("POST", "/channels/discover", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Discover(c)
if w.Code != 400 {
t.Errorf("expected 400 when workspace_id missing, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if errMsg, _ := resp["error"].(string); errMsg != "workspace_id is required" {
t.Errorf("expected workspace_id error, got %q", errMsg)
}
}
// ==================== System Caller Prefix ====================
func TestSystemCallerPrefix_ChannelIncluded(t *testing.T) {