Merge pull request #844 from Molecule-AI/feat/slack-bot-api-channels

feat(slack): Bot API adapter with per-agent identity + fix pgvector migration guard
This commit is contained in:
Hongming Wang 2026-04-17 14:16:44 -07:00 committed by GitHub
commit 66fce40d44
7 changed files with 396 additions and 39 deletions

View File

@ -196,6 +196,9 @@ func main() {
channelMgr := channels.NewManager(wh, broadcaster)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
// Wire channel manager into scheduler for auto-posting cron output to Slack
cronSched.SetChannels(channelMgr)
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)

View File

@ -437,6 +437,41 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
return nil
}
// BroadcastToWorkspaceChannels sends a message to ALL enabled channels
// configured for a workspace. Used by the scheduler to auto-post cron
// output summaries and by delegation handlers to post completion notices.
//
// Unlike SendOutbound (which targets a specific channel row by ID), this
// fans out to every enabled channel for the workspace — so a single cron
// completion posts to both #mol-engineering AND #mol-firehose if the
// workspace has both configured via chat_id comma-separation.
func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) {
if text == "" || db.DB == nil {
return
}
// Truncate to keep Slack messages digestible (rune-safe for CJK/emoji)
runes := []rune(text)
if len(runes) > 500 {
text = string(runes[:497]) + "..."
}
rows, err := db.DB.QueryContext(ctx, `
SELECT id FROM workspace_channels
WHERE workspace_id = $1 AND enabled = true
`, workspaceID)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var channelID string
if rows.Scan(&channelID) == nil {
if sendErr := m.SendOutbound(ctx, channelID, text); sendErr != nil {
log.Printf("Channels: broadcast to %s failed: %v", channelID[:12], sendErr)
}
}
}
}
func splitChatIDs(raw string) []string {
var ids []string
for _, s := range strings.Split(raw, ",") {

View File

@ -19,6 +19,8 @@ const (
slackHTTPTimeout = 10 * time.Second
)
var slackHTTPClient = &http.Client{Timeout: slackHTTPTimeout}
// SlackAdapter implements ChannelAdapter for Slack Incoming Webhooks.
//
// Outbound messages are sent via Slack Incoming Webhooks (the simple,
@ -35,19 +37,90 @@ func (s *SlackAdapter) DisplayName() string { return "Slack" }
// Returns an error whose message becomes part of the 400 response body so
// keep it human-readable for the canvas UI.
func (s *SlackAdapter) ValidateConfig(config map[string]interface{}) error {
botToken, _ := config["bot_token"].(string)
webhookURL, _ := config["webhook_url"].(string)
if webhookURL == "" {
return fmt.Errorf("missing required field: webhook_url")
if botToken == "" && webhookURL == "" {
return fmt.Errorf("missing required field: bot_token or webhook_url")
}
if !strings.HasPrefix(webhookURL, slackWebhookPrefix) {
if botToken != "" {
if cid, _ := config["channel_id"].(string); cid == "" {
return fmt.Errorf("bot_token mode requires channel_id")
}
}
if webhookURL != "" && !strings.HasPrefix(webhookURL, slackWebhookPrefix) {
return fmt.Errorf("invalid Slack webhook URL")
}
return nil
}
// SendMessage posts text to the configured Slack Incoming Webhook.
// chatID is ignored for Slack webhooks — the channel is encoded in the URL.
func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, _ string, text string) error {
// SendMessage posts text to Slack. Supports two modes:
//
// - Bot API (bot_token set): uses chat.postMessage with per-agent identity
// via chat:write.customize scope. Supports username + icon_emoji overrides.
// - Webhook (webhook_url set, legacy): simple POST, no identity override.
//
// chatID overrides channel_id from config if non-empty (for multi-channel routing).
func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, chatID string, text string) error {
botToken, _ := config["bot_token"].(string)
if botToken != "" {
return s.sendBotMessage(ctx, config, chatID, text)
}
return s.sendWebhookMessage(ctx, config, text)
}
func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]interface{}, chatID, text string) error {
botToken, _ := config["bot_token"].(string)
channelID := chatID
if channelID == "" {
channelID, _ = config["channel_id"].(string)
}
if channelID == "" {
return fmt.Errorf("slack: no channel_id")
}
username, _ := config["username"].(string)
iconEmoji, _ := config["icon_emoji"].(string)
// Split long messages at newline boundaries
chunks := slackSplitMessage(text, 3000)
for _, chunk := range chunks {
payload := map[string]interface{}{
"channel": channelID,
"text": chunk,
}
if username != "" {
payload["username"] = username
}
if iconEmoji != "" {
payload["icon_emoji"] = iconEmoji
}
body, _ := json.Marshal(payload)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("slack: build request: %w", err)
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Authorization", "Bearer "+botToken)
resp, err := slackHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("slack: send: %w", err)
}
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
resp.Body.Close()
var result struct {
OK bool `json:"ok"`
Error string `json:"error"`
}
if json.Unmarshal(respBody, &result) == nil && !result.OK {
return fmt.Errorf("slack: API error: %s", result.Error)
}
}
return nil
}
func (s *SlackAdapter) sendWebhookMessage(ctx context.Context, config map[string]interface{}, text string) error {
webhookURL, _ := config["webhook_url"].(string)
if webhookURL == "" {
return fmt.Errorf("webhook_url not configured")
@ -67,12 +140,10 @@ func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interf
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: slackHTTPTimeout}
resp, err := client.Do(req)
resp, err := slackHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("slack: send: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
@ -81,6 +152,27 @@ func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interf
return nil
}
func slackSplitMessage(text string, maxLen int) []string {
if len(text) <= maxLen {
return []string{text}
}
var chunks []string
for len(text) > 0 {
end := maxLen
if end > len(text) {
end = len(text)
}
if end < len(text) {
if idx := strings.LastIndex(text[:end], "\n"); idx > 0 {
end = idx + 1
}
}
chunks = append(chunks, text[:end])
text = text[end:]
}
return chunks
}
// ParseWebhook handles a Slack slash command or event API POST.
// The payload is either URL-encoded (slash commands) or JSON (Events API).
// Returns nil, nil for non-message events (e.g. url_verification challenge).
@ -112,27 +204,34 @@ func (s *SlackAdapter) ParseWebhook(c *gin.Context, _ map[string]interface{}) (*
var payload struct {
Type string `json:"type"`
Challenge string `json:"challenge"` // url_verification
Challenge string `json:"challenge"`
Event struct {
Type string `json:"type"`
User string `json:"user"`
Text string `json:"text"`
Channel string `json:"channel"`
Ts string `json:"ts"`
BotID string `json:"bot_id"`
Subtype string `json:"subtype"`
} `json:"event"`
}
if err := json.Unmarshal(body, &payload); err != nil {
return nil, fmt.Errorf("slack: parse event: %w", err)
}
// url_verification handshake — no message, respond via the handler layer
// url_verification handshake — respond with challenge directly
if payload.Type == "url_verification" {
log.Printf("Channels: Slack url_verification challenge (not handled by ParseWebhook)")
c.JSON(200, gin.H{"challenge": payload.Challenge})
return nil, nil
}
// Ignore bot messages to prevent echo loops. Our own auto-posts
// via chat.postMessage fire Events API callbacks with bot_id set.
if payload.Event.BotID != "" || payload.Event.Subtype == "bot_message" {
return nil, nil
}
if payload.Event.Type != "message" || payload.Event.Text == "" {
return nil, nil // Ignore non-message events
return nil, nil
}
text = payload.Event.Text

View File

@ -0,0 +1,115 @@
package channels
import (
"context"
"strings"
"testing"
)
func TestSlackSplitMessage_Short(t *testing.T) {
chunks := slackSplitMessage("hello", 3000)
if len(chunks) != 1 || chunks[0] != "hello" {
t.Errorf("expected 1 chunk 'hello', got %v", chunks)
}
}
func TestSlackSplitMessage_Long(t *testing.T) {
long := strings.Repeat("a", 6000)
chunks := slackSplitMessage(long, 3000)
if len(chunks) != 2 {
t.Errorf("expected 2 chunks, got %d", len(chunks))
}
for _, c := range chunks {
if len(c) > 3000 {
t.Errorf("chunk exceeds max: %d", len(c))
}
}
}
func TestSlackSplitMessage_SplitAtNewline(t *testing.T) {
text := strings.Repeat("x", 2900) + "\n" + strings.Repeat("y", 200)
chunks := slackSplitMessage(text, 3000)
if len(chunks) != 2 {
t.Errorf("expected 2 chunks, got %d", len(chunks))
}
if !strings.HasSuffix(chunks[0], "\n") {
t.Error("first chunk should end at newline boundary")
}
}
func TestSlackValidateConfig_BotToken(t *testing.T) {
a := &SlackAdapter{}
err := a.ValidateConfig(map[string]interface{}{
"bot_token": "xoxb-test",
"channel_id": "C123",
})
if err != nil {
t.Errorf("expected valid, got %v", err)
}
}
func TestSlackValidateConfig_BotTokenMissingChannel(t *testing.T) {
a := &SlackAdapter{}
err := a.ValidateConfig(map[string]interface{}{
"bot_token": "xoxb-test",
})
if err == nil {
t.Error("expected error for missing channel_id")
}
}
func TestSlackValidateConfig_WebhookURL(t *testing.T) {
a := &SlackAdapter{}
err := a.ValidateConfig(map[string]interface{}{
"webhook_url": "https://hooks.slack.com/services/T000/B000/xxx",
})
if err != nil {
t.Errorf("expected valid, got %v", err)
}
}
func TestSlackValidateConfig_InvalidWebhook(t *testing.T) {
a := &SlackAdapter{}
err := a.ValidateConfig(map[string]interface{}{
"webhook_url": "https://evil.com/steal",
})
if err == nil {
t.Error("expected error for invalid webhook URL")
}
}
func TestSlackValidateConfig_NeitherSet(t *testing.T) {
a := &SlackAdapter{}
err := a.ValidateConfig(map[string]interface{}{})
if err == nil {
t.Error("expected error when neither bot_token nor webhook_url set")
}
}
func TestFetchChannelHistory_EmptyToken(t *testing.T) {
msgs, err := FetchChannelHistory(context.Background(), "", "C123", 10)
if err != nil || msgs != nil {
t.Errorf("expected nil,nil for empty token, got %v,%v", msgs, err)
}
}
func TestFetchChannelHistory_EmptyChannel(t *testing.T) {
msgs, err := FetchChannelHistory(context.Background(), "xoxb-test", "", 10)
if err != nil || msgs != nil {
t.Errorf("expected nil,nil for empty channel, got %v,%v", msgs, err)
}
}
func TestSlackAdapter_Type(t *testing.T) {
a := &SlackAdapter{}
if a.Type() != "slack" {
t.Errorf("expected 'slack', got %q", a.Type())
}
}
func TestSlackAdapter_DisplayName(t *testing.T) {
a := &SlackAdapter{}
if a.DisplayName() != "Slack" {
t.Errorf("expected 'Slack', got %q", a.DisplayName())
}
}

View File

@ -12,6 +12,7 @@ import (
"log"
"net/http"
"os"
"regexp"
"strings"
"github.com/gin-gonic/gin"
@ -443,9 +444,27 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
return
}
// [slug] routing: if the message starts with [word], extract it as
// a target agent slug and match against the channel config's username
// field (lowercased). This lets humans type "[backend] what's #800?"
// in a shared channel and route to a specific agent.
targetSlug := ""
routedText := msg.Text
validSlugRe := regexp.MustCompile(`^[a-zA-Z0-9 _-]+$`)
if len(msg.Text) > 2 && msg.Text[0] == '[' {
if idx := strings.Index(msg.Text, "]"); idx > 1 && idx < 40 {
candidate := strings.ToLower(strings.TrimSpace(msg.Text[1:idx]))
if validSlugRe.MatchString(candidate) {
targetSlug = candidate
routedText = strings.TrimSpace(msg.Text[idx+1:])
if routedText == "" {
routedText = msg.Text
}
}
}
}
// Look up channels by type and find one whose chat_id list contains msg.ChatID.
// We can't use SQL LIKE — that matches substrings (chat_id "123" would match "1234").
// Fetch all enabled channels of this type, then exact-match in code.
rows, err := db.DB.QueryContext(ctx, `
SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users
FROM workspace_channels
@ -458,6 +477,7 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
defer rows.Close()
var ch channels.ChannelRow
var candidates []channels.ChannelRow
found := false
for rows.Next() {
var row channels.ChannelRow
@ -467,36 +487,59 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
}
json.Unmarshal(configJSON, &row.Config)
json.Unmarshal(allowedJSON, &row.AllowedUsers)
// #319: decrypt sensitive fields before comparing webhook_secret /
// using bot_token downstream. Skip rows whose decrypt fails so a
// single corrupt channel cannot block webhooks for all others.
if err := channels.DecryptSensitiveFields(row.Config); err != nil {
log.Printf("Channels: decrypt webhook row %s: %v", row.ID, err)
continue
}
// Verify webhook secret_token if the channel has one configured.
// #337: use constant-time comparison. Go's `!=` short-circuits on
// the first mismatched byte and leaks timing information; an
// attacker on the Docker network could enumerate the secret
// byte-by-byte. subtle.ConstantTimeCompare runs in time
// proportional to the length of the shorter input and returns
// 1 on match / 0 otherwise (never -1). Same posture as the
// cdp-proxy token compare in host-bridge.
if expectedSecret, _ := row.Config["webhook_secret"].(string); expectedSecret != "" {
receivedSecret := c.GetHeader("X-Telegram-Bot-Api-Secret-Token")
if subtle.ConstantTimeCompare([]byte(receivedSecret), []byte(expectedSecret)) != 1 {
continue // Wrong secret — try other channels (could be different bot)
continue
}
}
// Exact match against the comma-separated chat_id list
if matchesChatID(row.Config, msg.ChatID) {
ch = row
found = true
break
candidates = append(candidates, row)
}
}
if targetSlug != "" {
// [slug] routing — match against config username (lowercased)
for _, row := range candidates {
username, _ := row.Config["username"].(string)
usernameLC := strings.ToLower(username)
// Match: [backend] → "Backend Engineer", [pm] → "PM", [dev lead] → "Dev Lead"
if usernameLC == targetSlug ||
strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", "-"), targetSlug) ||
strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", ""), targetSlug) {
ch = row
found = true
msg.Text = routedText // Strip the [slug] prefix before routing
break
}
}
if !found {
// No match for slug — respond with available agents
var names []string
for _, row := range candidates {
if u, _ := row.Config["username"].(string); u != "" {
names = append(names, "["+strings.ToLower(strings.ReplaceAll(u, " ", "-"))+"]")
}
}
c.JSON(http.StatusOK, gin.H{
"status": "unknown_agent",
"requested_slug": targetSlug,
"available_slugs": names,
})
return
}
} else if len(candidates) > 0 {
// No [slug] prefix — route to first matching channel (backward compat)
ch = candidates[0]
found = true
}
if !found {
c.JSON(http.StatusOK, gin.H{"status": "no_channel"})
return
@ -505,7 +548,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
// Process asynchronously — don't block the webhook response
go func() {
bgCtx := context.Background()
_ = h.manager.HandleInbound(bgCtx, ch, msg)
if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil {
log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err)
}
}()
c.JSON(http.StatusOK, gin.H{"status": "accepted"})

View File

@ -43,12 +43,19 @@ type scheduleRow struct {
Prompt string
}
// ChannelBroadcaster posts messages to and reads context from workspace channels.
type ChannelBroadcaster interface {
BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string)
FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string
}
// Scheduler polls the workspace_schedules table and fires A2A messages
// when a schedule's next_run_at has passed. Follows the same goroutine
// pattern as registry.StartHealthSweep.
type Scheduler struct {
proxy A2AProxy
broadcaster Broadcaster
channels ChannelBroadcaster
// lastTickAt records the wall-clock time of the most recent tick
// (whether it fired schedules or not). Read by Healthy() and the
@ -67,6 +74,12 @@ func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler {
}
}
// SetChannels wires the channel manager for auto-posting cron output.
// Called after both scheduler and channel manager are initialized.
func (s *Scheduler) SetChannels(ch ChannelBroadcaster) {
s.channels = ch
}
// LastTickAt returns the wall-clock time of the most recently completed tick.
// Returns a zero time.Time if the scheduler has never completed a tick.
func (s *Scheduler) LastTickAt() time.Time {
@ -248,6 +261,17 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
// Level 3: inject ambient Slack channel context into the cron prompt.
// The agent sees recent peer messages before acting, enabling cross-agent
// awareness without explicit A2A delegation. Best-effort — if the fetch
// fails or the workspace has no Slack channels, the prompt is unchanged.
prompt := sched.Prompt
if s.channels != nil {
if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" {
prompt = channelCtx + "\n" + prompt
}
}
msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8])
a2aBody, _ := json.Marshal(map[string]interface{}{
@ -256,7 +280,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
"message": map[string]interface{}{
"role": "user",
"messageId": msgID,
"parts": []map[string]interface{}{{"kind": "text", "text": sched.Prompt}},
"parts": []map[string]interface{}{{"kind": "text", "text": prompt}},
},
},
})
@ -360,6 +384,20 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
"status": lastStatus,
})
}
// Level 1: auto-post cron output to workspace's Slack channels.
// Only post non-empty successful responses — errors and empties are
// noise that clutters the channel without adding value.
if s.channels != nil && lastStatus == "ok" && !isEmpty {
summary := s.extractResponseSummary(respBody)
if summary != "" {
go func(wsID, text string) {
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer postCancel()
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)
}(sched.WorkspaceID, summary)
}
}
}
// recordSkipped advances next_run_at and logs a cron_run activity entry
@ -475,6 +513,31 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
// produced no meaningful output. Catches "(no response generated)" from
// the workspace runtime + genuinely empty/null responses. Used by the
// consecutive-empty tracker (#795) to detect phantom-producing crons.
// extractResponseSummary pulls the agent's text from the A2A response body.
// Returns empty string if parsing fails or the response has no text content.
func (s *Scheduler) extractResponseSummary(body []byte) string {
if len(body) == 0 {
return ""
}
var resp map[string]interface{}
if json.Unmarshal(body, &resp) != nil {
return ""
}
// A2A response: result.parts[].text
if result, ok := resp["result"].(map[string]interface{}); ok {
if parts, ok := result["parts"].([]interface{}); ok {
for _, p := range parts {
if part, ok := p.(map[string]interface{}); ok {
if text, ok := part["text"].(string); ok && text != "" {
return text
}
}
}
}
}
return ""
}
func isEmptyResponse(body []byte) bool {
if len(body) == 0 {
return true

View File

@ -3,10 +3,9 @@
-- Adds a dense-vector embedding column to agent_memories to power semantic
-- (cosine-similarity) memory recall alongside the existing FTS path.
--
-- Requires the pgvector Postgres extension. The entire migration is wrapped
-- in a single DO block so if pgvector is unavailable, ALL statements are
-- skipped (not just CREATE EXTENSION). This prevents "type vector does not
-- exist" errors on the ALTER TABLE / CREATE INDEX that follow.
-- Requires the pgvector Postgres extension. The DO block is a no-op guard:
-- if the extension is unavailable this migration exits early so a boot
-- without pgvector installed does not break the migration sweep.
--
-- Issue: #576
@ -20,8 +19,6 @@ BEGIN
-- ivfflat approximate nearest-neighbour index for cosine similarity.
-- lists=100 is a reasonable default for tables up to ~1M rows.
-- Partial index (WHERE embedding IS NOT NULL) keeps it lean — unembedded
-- rows are skipped entirely.
CREATE INDEX IF NOT EXISTS agent_memories_embedding_idx
ON agent_memories USING ivfflat (embedding vector_cosine_ops)
WHERE embedding IS NOT NULL;