forked from molecule-ai/molecule-core
fix(slack): address code review — 6 critical + improvement fixes
Code review findings addressed:
Critical:
1. Bot echo loop: add bot_id + subtype='bot_message' check in ParseWebhook
to prevent outbound auto-posts from triggering inbound → infinite loop
2. Connection leak: close resp.Body immediately after reading instead of
defer inside loop (was holding N connections open for N chunks)
3. Cancelled context: auto-post goroutine now uses context.Background()
with 30s timeout instead of inheriting fireCtx (which gets cancelled
by deferred cancel() when fireSchedule returns)
4. Slug validation: regex ^[a-zA-Z0-9 _-]+$ rejects path traversal and
special chars in [slug] routing
Improvements:
5. Shared HTTP client (slackHTTPClient) for connection pooling instead of
per-request &http.Client{}
6. Rune-safe truncation in BroadcastToWorkspaceChannels for CJK/emoji
7. Log async HandleInbound errors instead of silently discarding
8. url_verification challenge properly returned (c.JSON with challenge)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
65bc6a8ca5
commit
95d0bc25a3
@ -449,9 +449,10 @@ func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID,
|
||||
if text == "" || db.DB == nil {
|
||||
return
|
||||
}
|
||||
// Truncate to keep Slack messages digestible
|
||||
if len(text) > 500 {
|
||||
text = text[:497] + "..."
|
||||
// Truncate to keep Slack messages digestible (rune-safe for CJK/emoji)
|
||||
runes := []rune(text)
|
||||
if len(runes) > 500 {
|
||||
text = string(runes[:497]) + "..."
|
||||
}
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id FROM workspace_channels
|
||||
|
||||
@ -19,6 +19,8 @@ const (
|
||||
slackHTTPTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
var slackHTTPClient = &http.Client{Timeout: slackHTTPTimeout}
|
||||
|
||||
// SlackAdapter implements ChannelAdapter for Slack Incoming Webhooks.
|
||||
//
|
||||
// Outbound messages are sent via Slack Incoming Webhooks (the simple,
|
||||
@ -101,14 +103,12 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int
|
||||
req.Header.Set("Content-Type", "application/json; charset=utf-8")
|
||||
req.Header.Set("Authorization", "Bearer "+botToken)
|
||||
|
||||
client := &http.Client{Timeout: slackHTTPTimeout}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := slackHTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("slack: send: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
resp.Body.Close()
|
||||
var result struct {
|
||||
OK bool `json:"ok"`
|
||||
Error string `json:"error"`
|
||||
@ -140,12 +140,10 @@ func (s *SlackAdapter) sendWebhookMessage(ctx context.Context, config map[string
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: slackHTTPTimeout}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := slackHTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("slack: send: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
@ -206,27 +204,34 @@ func (s *SlackAdapter) ParseWebhook(c *gin.Context, _ map[string]interface{}) (*
|
||||
|
||||
var payload struct {
|
||||
Type string `json:"type"`
|
||||
Challenge string `json:"challenge"` // url_verification
|
||||
Challenge string `json:"challenge"`
|
||||
Event struct {
|
||||
Type string `json:"type"`
|
||||
User string `json:"user"`
|
||||
Text string `json:"text"`
|
||||
Channel string `json:"channel"`
|
||||
Ts string `json:"ts"`
|
||||
BotID string `json:"bot_id"`
|
||||
Subtype string `json:"subtype"`
|
||||
} `json:"event"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
return nil, fmt.Errorf("slack: parse event: %w", err)
|
||||
}
|
||||
|
||||
// url_verification handshake — no message, respond via the handler layer
|
||||
// url_verification handshake — respond with challenge directly
|
||||
if payload.Type == "url_verification" {
|
||||
log.Printf("Channels: Slack url_verification challenge (not handled by ParseWebhook)")
|
||||
c.JSON(200, gin.H{"challenge": payload.Challenge})
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Ignore bot messages to prevent echo loops. Our own auto-posts
|
||||
// via chat.postMessage fire Events API callbacks with bot_id set.
|
||||
if payload.Event.BotID != "" || payload.Event.Subtype == "bot_message" {
|
||||
return nil, nil
|
||||
}
|
||||
if payload.Event.Type != "message" || payload.Event.Text == "" {
|
||||
return nil, nil // Ignore non-message events
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
text = payload.Event.Text
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -449,12 +450,16 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
|
||||
// in a shared channel and route to a specific agent.
|
||||
targetSlug := ""
|
||||
routedText := msg.Text
|
||||
validSlugRe := regexp.MustCompile(`^[a-zA-Z0-9 _-]+$`)
|
||||
if len(msg.Text) > 2 && msg.Text[0] == '[' {
|
||||
if idx := strings.Index(msg.Text, "]"); idx > 1 && idx < 40 {
|
||||
targetSlug = strings.ToLower(strings.TrimSpace(msg.Text[1:idx]))
|
||||
routedText = strings.TrimSpace(msg.Text[idx+1:])
|
||||
if routedText == "" {
|
||||
routedText = msg.Text // Don't send empty — keep original
|
||||
candidate := strings.ToLower(strings.TrimSpace(msg.Text[1:idx]))
|
||||
if validSlugRe.MatchString(candidate) {
|
||||
targetSlug = candidate
|
||||
routedText = strings.TrimSpace(msg.Text[idx+1:])
|
||||
if routedText == "" {
|
||||
routedText = msg.Text
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -543,7 +548,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
|
||||
// Process asynchronously — don't block the webhook response
|
||||
go func() {
|
||||
bgCtx := context.Background()
|
||||
_ = h.manager.HandleInbound(bgCtx, ch, msg)
|
||||
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
|
||||
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
|
||||
}
|
||||
}()
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "accepted"})
|
||||
|
||||
@ -379,7 +379,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
if s.channels != nil && lastStatus == "ok" && !isEmpty {
|
||||
summary := s.extractResponseSummary(respBody)
|
||||
if summary != "" {
|
||||
go s.channels.BroadcastToWorkspaceChannels(ctx, sched.WorkspaceID, summary)
|
||||
go func(wsID, text string) {
|
||||
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer postCancel()
|
||||
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)
|
||||
}(sched.WorkspaceID, summary)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user