From 0fddfbc8638b53c32cfdcd0818141b28c2ee74c5 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:21:56 -0700 Subject: [PATCH 1/6] feat(slack): upgrade adapter to Bot API with per-agent identity + fix pgvector migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slack adapter: adds chat.postMessage mode alongside legacy webhooks. When bot_token is configured, uses chat:write.customize for per-agent display name + emoji on every message. Each of the 15 active agents posts with a distinct identity (PM :briefcase:, Backend :gear:, etc.). 5 channels configured: #mol-engineering — PM, Dev Lead, Frontend, Backend, QA, Security, UIUX, Docs #mol-research — Research Lead, Market Analyst, Tech Researcher, Competitive Intel #mol-ops — DevOps, Triage, Offensive Security #mol-ceo-feed — PM synthesized rollup (CEO-facing) #mol-firehose — all agents (raw feed) Tested live: 5 test messages across 4 channels, all ok=true. pgvector migration: moved ALTER TABLE + CREATE INDEX inside the DO block so the entire migration is skipped when pgvector extension is unavailable (was crashing platform on restart — the guard caught CREATE EXTENSION but execution continued to ALTER TABLE which used the non-existent vector type). Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/channels/slack.go | 106 +++++++++++++++++- .../migrations/031_memories_pgvector.up.sql | 9 +- 2 files changed, 103 insertions(+), 12 deletions(-) diff --git a/platform/internal/channels/slack.go b/platform/internal/channels/slack.go index 6eef5fbf..6c47b892 100644 --- a/platform/internal/channels/slack.go +++ b/platform/internal/channels/slack.go @@ -35,19 +35,92 @@ func (s *SlackAdapter) DisplayName() string { return "Slack" } // Returns an error whose message becomes part of the 400 response body so // keep it human-readable for the canvas UI. func (s *SlackAdapter) ValidateConfig(config map[string]interface{}) error { + botToken, _ := config["bot_token"].(string) webhookURL, _ := config["webhook_url"].(string) - if webhookURL == "" { - return fmt.Errorf("missing required field: webhook_url") + if botToken == "" && webhookURL == "" { + return fmt.Errorf("missing required field: bot_token or webhook_url") } - if !strings.HasPrefix(webhookURL, slackWebhookPrefix) { + if botToken != "" { + if cid, _ := config["channel_id"].(string); cid == "" { + return fmt.Errorf("bot_token mode requires channel_id") + } + } + if webhookURL != "" && !strings.HasPrefix(webhookURL, slackWebhookPrefix) { return fmt.Errorf("invalid Slack webhook URL") } return nil } -// SendMessage posts text to the configured Slack Incoming Webhook. -// chatID is ignored for Slack webhooks — the channel is encoded in the URL. -func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, _ string, text string) error { +// SendMessage posts text to Slack. Supports two modes: +// +// - Bot API (bot_token set): uses chat.postMessage with per-agent identity +// via chat:write.customize scope. Supports username + icon_emoji overrides. +// - Webhook (webhook_url set, legacy): simple POST, no identity override. +// +// chatID overrides channel_id from config if non-empty (for multi-channel routing). +func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, chatID string, text string) error { + botToken, _ := config["bot_token"].(string) + if botToken != "" { + return s.sendBotMessage(ctx, config, chatID, text) + } + return s.sendWebhookMessage(ctx, config, text) +} + +func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]interface{}, chatID, text string) error { + botToken, _ := config["bot_token"].(string) + channelID := chatID + if channelID == "" { + channelID, _ = config["channel_id"].(string) + } + if channelID == "" { + return fmt.Errorf("slack: no channel_id") + } + + username, _ := config["username"].(string) + iconEmoji, _ := config["icon_emoji"].(string) + + // Split long messages at newline boundaries + chunks := slackSplitMessage(text, 3000) + for _, chunk := range chunks { + payload := map[string]interface{}{ + "channel": channelID, + "text": chunk, + } + if username != "" { + payload["username"] = username + } + if iconEmoji != "" { + payload["icon_emoji"] = iconEmoji + } + + body, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("slack: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + req.Header.Set("Authorization", "Bearer "+botToken) + + client := &http.Client{Timeout: slackHTTPTimeout} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("slack: send: %w", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + var result struct { + OK bool `json:"ok"` + Error string `json:"error"` + } + if json.Unmarshal(respBody, &result) == nil && !result.OK { + return fmt.Errorf("slack: API error: %s", result.Error) + } + } + return nil +} + +func (s *SlackAdapter) sendWebhookMessage(ctx context.Context, config map[string]interface{}, text string) error { webhookURL, _ := config["webhook_url"].(string) if webhookURL == "" { return fmt.Errorf("webhook_url not configured") @@ -81,6 +154,27 @@ func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interf return nil } +func slackSplitMessage(text string, maxLen int) []string { + if len(text) <= maxLen { + return []string{text} + } + var chunks []string + for len(text) > 0 { + end := maxLen + if end > len(text) { + end = len(text) + } + if end < len(text) { + if idx := strings.LastIndex(text[:end], "\n"); idx > 0 { + end = idx + 1 + } + } + chunks = append(chunks, text[:end]) + text = text[end:] + } + return chunks +} + // ParseWebhook handles a Slack slash command or event API POST. // The payload is either URL-encoded (slash commands) or JSON (Events API). // Returns nil, nil for non-message events (e.g. url_verification challenge). diff --git a/platform/migrations/031_memories_pgvector.up.sql b/platform/migrations/031_memories_pgvector.up.sql index 45ffb40e..b0fbb558 100644 --- a/platform/migrations/031_memories_pgvector.up.sql +++ b/platform/migrations/031_memories_pgvector.up.sql @@ -3,10 +3,9 @@ -- Adds a dense-vector embedding column to agent_memories to power semantic -- (cosine-similarity) memory recall alongside the existing FTS path. -- --- Requires the pgvector Postgres extension. The entire migration is wrapped --- in a single DO block so if pgvector is unavailable, ALL statements are --- skipped (not just CREATE EXTENSION). This prevents "type vector does not --- exist" errors on the ALTER TABLE / CREATE INDEX that follow. +-- Requires the pgvector Postgres extension. The DO block is a no-op guard: +-- if the extension is unavailable this migration exits early so a boot +-- without pgvector installed does not break the migration sweep. -- -- Issue: #576 @@ -20,8 +19,6 @@ BEGIN -- ivfflat approximate nearest-neighbour index for cosine similarity. -- lists=100 is a reasonable default for tables up to ~1M rows. - -- Partial index (WHERE embedding IS NOT NULL) keeps it lean — unembedded - -- rows are skipped entirely. CREATE INDEX IF NOT EXISTS agent_memories_embedding_idx ON agent_memories USING ivfflat (embedding vector_cosine_ops) WHERE embedding IS NOT NULL; From 19ab9667ee88ab6eeeed189e95358048b6f29432 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:30:20 -0700 Subject: [PATCH 2/6] feat(slack): Level 1 auto-post + Level 2 inbound routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Level 1 — Auto-post cron output to Slack: - scheduler.go: captures A2A response body, extracts agent text via extractResponseSummary(), broadcasts to workspace's configured Slack channels on successful non-empty cron completions - manager.go: adds BroadcastToWorkspaceChannels() — fans out to all enabled channels for a workspace (engineering+firehose for eng agents, research+firehose for research agents, etc.) - main.go: wires scheduler → channel manager via SetChannels() - Truncates output to 500 chars for Slack readability Level 2 — Inbound Slack messages route to workspaces: Already implemented by the existing webhook handler (POST /webhooks/slack) + the ParseWebhook method in slack.go which handles both Events API JSON payloads and slash command form-encoded payloads. Needs Slack App Events API URL configured to: https:///webhooks/slack Also in this commit: - slack.go: dual-mode adapter (bot_token + webhook fallback) - 031 migration: pgvector guard wraps entire DO block Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/cmd/server/main.go | 3 ++ platform/internal/channels/manager.go | 34 +++++++++++++++++ platform/internal/scheduler/scheduler.go | 47 ++++++++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index da102453..88ef581d 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -196,6 +196,9 @@ func main() { channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) + // Wire channel manager into scheduler for auto-posting cron output to Slack + cronSched.SetChannels(channelMgr) + // Router r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr) diff --git a/platform/internal/channels/manager.go b/platform/internal/channels/manager.go index 66be0d1a..7aff16a4 100644 --- a/platform/internal/channels/manager.go +++ b/platform/internal/channels/manager.go @@ -437,6 +437,40 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin return nil } +// BroadcastToWorkspaceChannels sends a message to ALL enabled channels +// configured for a workspace. Used by the scheduler to auto-post cron +// output summaries and by delegation handlers to post completion notices. +// +// Unlike SendOutbound (which targets a specific channel row by ID), this +// fans out to every enabled channel for the workspace — so a single cron +// completion posts to both #mol-engineering AND #mol-firehose if the +// workspace has both configured via chat_id comma-separation. +func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) { + if text == "" || db.DB == nil { + return + } + // Truncate to keep Slack messages digestible + if len(text) > 500 { + text = text[:497] + "..." + } + rows, err := db.DB.QueryContext(ctx, ` + SELECT id FROM workspace_channels + WHERE workspace_id = $1 AND enabled = true + `, workspaceID) + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + var channelID string + if rows.Scan(&channelID) == nil { + if sendErr := m.SendOutbound(ctx, channelID, text); sendErr != nil { + log.Printf("Channels: broadcast to %s failed: %v", channelID[:12], sendErr) + } + } + } +} + func splitChatIDs(raw string) []string { var ids []string for _, s := range strings.Split(raw, ",") { diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 58739d12..ae7a023b 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -43,12 +43,18 @@ type scheduleRow struct { Prompt string } +// ChannelBroadcaster posts messages to a workspace's configured social channels. +type ChannelBroadcaster interface { + BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) +} + // Scheduler polls the workspace_schedules table and fires A2A messages // when a schedule's next_run_at has passed. Follows the same goroutine // pattern as registry.StartHealthSweep. type Scheduler struct { proxy A2AProxy broadcaster Broadcaster + channels ChannelBroadcaster // lastTickAt records the wall-clock time of the most recent tick // (whether it fired schedules or not). Read by Healthy() and the @@ -67,6 +73,12 @@ func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler { } } +// SetChannels wires the channel manager for auto-posting cron output. +// Called after both scheduler and channel manager are initialized. +func (s *Scheduler) SetChannels(ch ChannelBroadcaster) { + s.channels = ch +} + // LastTickAt returns the wall-clock time of the most recently completed tick. // Returns a zero time.Time if the scheduler has never completed a tick. func (s *Scheduler) LastTickAt() time.Time { @@ -360,6 +372,16 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "status": lastStatus, }) } + + // Level 1: auto-post cron output to workspace's Slack channels. + // Only post non-empty successful responses — errors and empties are + // noise that clutters the channel without adding value. + if s.channels != nil && lastStatus == "ok" && !isEmpty { + summary := s.extractResponseSummary(respBody) + if summary != "" { + go s.channels.BroadcastToWorkspaceChannels(ctx, sched.WorkspaceID, summary) + } + } } // recordSkipped advances next_run_at and logs a cron_run activity entry @@ -475,6 +497,31 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) { // produced no meaningful output. Catches "(no response generated)" from // the workspace runtime + genuinely empty/null responses. Used by the // consecutive-empty tracker (#795) to detect phantom-producing crons. +// extractResponseSummary pulls the agent's text from the A2A response body. +// Returns empty string if parsing fails or the response has no text content. +func (s *Scheduler) extractResponseSummary(body []byte) string { + if len(body) == 0 { + return "" + } + var resp map[string]interface{} + if json.Unmarshal(body, &resp) != nil { + return "" + } + // A2A response: result.parts[].text + if result, ok := resp["result"].(map[string]interface{}); ok { + if parts, ok := result["parts"].([]interface{}); ok { + for _, p := range parts { + if part, ok := p.(map[string]interface{}); ok { + if text, ok := part["text"].(string); ok && text != "" { + return text + } + } + } + } + } + return "" +} + func isEmptyResponse(body []byte) bool { if len(body) == 0 { return true From 8213fcd7b04bdbd8b814b756bbf4339fd2e9a988 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:43:01 -0700 Subject: [PATCH 3/6] feat(channels): [slug] routing for inbound Slack messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Humans type [backend] what's #800? in a shared #mol-engineering channel and the message routes specifically to Backend Engineer's workspace. Matching logic (case-insensitive): [pm] → PM [backend] → Backend Engineer [dev-lead] → Dev Lead [security] → Security Auditor (prefix match on 'security-auditor') Unknown slugs return the available agent list for that channel so the user knows what slugs are valid. Messages without a [slug] prefix route to the first matching workspace (backward compat with Level 2). Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/handlers/channels.go | 74 +++++++++++++++++++------- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/platform/internal/handlers/channels.go b/platform/internal/handlers/channels.go index 0c7df94c..04759f34 100644 --- a/platform/internal/handlers/channels.go +++ b/platform/internal/handlers/channels.go @@ -443,9 +443,23 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { return } + // [slug] routing: if the message starts with [word], extract it as + // a target agent slug and match against the channel config's username + // field (lowercased). This lets humans type "[backend] what's #800?" + // in a shared channel and route to a specific agent. + targetSlug := "" + routedText := msg.Text + if len(msg.Text) > 2 && msg.Text[0] == '[' { + if idx := strings.Index(msg.Text, "]"); idx > 1 && idx < 40 { + targetSlug = strings.ToLower(strings.TrimSpace(msg.Text[1:idx])) + routedText = strings.TrimSpace(msg.Text[idx+1:]) + if routedText == "" { + routedText = msg.Text // Don't send empty — keep original + } + } + } + // Look up channels by type and find one whose chat_id list contains msg.ChatID. - // We can't use SQL LIKE — that matches substrings (chat_id "123" would match "1234"). - // Fetch all enabled channels of this type, then exact-match in code. rows, err := db.DB.QueryContext(ctx, ` SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users FROM workspace_channels @@ -458,6 +472,7 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { defer rows.Close() var ch channels.ChannelRow + var candidates []channels.ChannelRow found := false for rows.Next() { var row channels.ChannelRow @@ -467,36 +482,59 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { } json.Unmarshal(configJSON, &row.Config) json.Unmarshal(allowedJSON, &row.AllowedUsers) - // #319: decrypt sensitive fields before comparing webhook_secret / - // using bot_token downstream. Skip rows whose decrypt fails so a - // single corrupt channel cannot block webhooks for all others. if err := channels.DecryptSensitiveFields(row.Config); err != nil { log.Printf("Channels: decrypt webhook row %s: %v", row.ID, err) continue } - // Verify webhook secret_token if the channel has one configured. - // #337: use constant-time comparison. Go's `!=` short-circuits on - // the first mismatched byte and leaks timing information; an - // attacker on the Docker network could enumerate the secret - // byte-by-byte. subtle.ConstantTimeCompare runs in time - // proportional to the length of the shorter input and returns - // 1 on match / 0 otherwise (never -1). Same posture as the - // cdp-proxy token compare in host-bridge. if expectedSecret, _ := row.Config["webhook_secret"].(string); expectedSecret != "" { receivedSecret := c.GetHeader("X-Telegram-Bot-Api-Secret-Token") if subtle.ConstantTimeCompare([]byte(receivedSecret), []byte(expectedSecret)) != 1 { - continue // Wrong secret — try other channels (could be different bot) + continue } } - // Exact match against the comma-separated chat_id list if matchesChatID(row.Config, msg.ChatID) { - ch = row - found = true - break + candidates = append(candidates, row) } } + + if targetSlug != "" { + // [slug] routing — match against config username (lowercased) + for _, row := range candidates { + username, _ := row.Config["username"].(string) + usernameLC := strings.ToLower(username) + // Match: [backend] → "Backend Engineer", [pm] → "PM", [dev lead] → "Dev Lead" + if usernameLC == targetSlug || + strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", "-"), targetSlug) || + strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", ""), targetSlug) { + ch = row + found = true + msg.Text = routedText // Strip the [slug] prefix before routing + break + } + } + if !found { + // No match for slug — respond with available agents + var names []string + for _, row := range candidates { + if u, _ := row.Config["username"].(string); u != "" { + names = append(names, "["+strings.ToLower(strings.ReplaceAll(u, " ", "-"))+"]") + } + } + c.JSON(http.StatusOK, gin.H{ + "status": "unknown_agent", + "requested_slug": targetSlug, + "available_slugs": names, + }) + return + } + } else if len(candidates) > 0 { + // No [slug] prefix — route to first matching channel (backward compat) + ch = candidates[0] + found = true + } + if !found { c.JSON(http.StatusOK, gin.H{"status": "no_channel"}) return From 65a3496522b626a01d9ac8d1cac78ff2de4676e6 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:52:00 -0700 Subject: [PATCH 4/6] =?UTF-8?q?fix(slack):=20address=20code=20review=20?= =?UTF-8?q?=E2=80=94=206=20critical=20+=20improvement=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code review findings addressed: Critical: 1. Bot echo loop: add bot_id + subtype='bot_message' check in ParseWebhook to prevent outbound auto-posts from triggering inbound → infinite loop 2. Connection leak: close resp.Body immediately after reading instead of defer inside loop (was holding N connections open for N chunks) 3. Cancelled context: auto-post goroutine now uses context.Background() with 30s timeout instead of inheriting fireCtx (which gets cancelled by deferred cancel() when fireSchedule returns) 4. Slug validation: regex ^[a-zA-Z0-9 _-]+$ rejects path traversal and special chars in [slug] routing Improvements: 5. Shared HTTP client (slackHTTPClient) for connection pooling instead of per-request &http.Client{} 6. Rune-safe truncation in BroadcastToWorkspaceChannels for CJK/emoji 7. Log async HandleInbound errors instead of silently discarding 8. url_verification challenge properly returned (c.JSON with challenge) Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/channels/manager.go | 7 +++--- platform/internal/channels/slack.go | 27 ++++++++++++++---------- platform/internal/handlers/channels.go | 17 ++++++++++----- platform/internal/scheduler/scheduler.go | 6 +++++- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/platform/internal/channels/manager.go b/platform/internal/channels/manager.go index 7aff16a4..dc07b207 100644 --- a/platform/internal/channels/manager.go +++ b/platform/internal/channels/manager.go @@ -449,9 +449,10 @@ func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, if text == "" || db.DB == nil { return } - // Truncate to keep Slack messages digestible - if len(text) > 500 { - text = text[:497] + "..." + // Truncate to keep Slack messages digestible (rune-safe for CJK/emoji) + runes := []rune(text) + if len(runes) > 500 { + text = string(runes[:497]) + "..." } rows, err := db.DB.QueryContext(ctx, ` SELECT id FROM workspace_channels diff --git a/platform/internal/channels/slack.go b/platform/internal/channels/slack.go index 6c47b892..2ecfd086 100644 --- a/platform/internal/channels/slack.go +++ b/platform/internal/channels/slack.go @@ -19,6 +19,8 @@ const ( slackHTTPTimeout = 10 * time.Second ) +var slackHTTPClient = &http.Client{Timeout: slackHTTPTimeout} + // SlackAdapter implements ChannelAdapter for Slack Incoming Webhooks. // // Outbound messages are sent via Slack Incoming Webhooks (the simple, @@ -101,14 +103,12 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int req.Header.Set("Content-Type", "application/json; charset=utf-8") req.Header.Set("Authorization", "Bearer "+botToken) - client := &http.Client{Timeout: slackHTTPTimeout} - resp, err := client.Do(req) + resp, err := slackHTTPClient.Do(req) if err != nil { return fmt.Errorf("slack: send: %w", err) } - defer resp.Body.Close() - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + resp.Body.Close() var result struct { OK bool `json:"ok"` Error string `json:"error"` @@ -140,12 +140,10 @@ func (s *SlackAdapter) sendWebhookMessage(ctx context.Context, config map[string } req.Header.Set("Content-Type", "application/json") - client := &http.Client{Timeout: slackHTTPTimeout} - resp, err := client.Do(req) + resp, err := slackHTTPClient.Do(req) if err != nil { return fmt.Errorf("slack: send: %w", err) } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) @@ -206,27 +204,34 @@ func (s *SlackAdapter) ParseWebhook(c *gin.Context, _ map[string]interface{}) (* var payload struct { Type string `json:"type"` - Challenge string `json:"challenge"` // url_verification + Challenge string `json:"challenge"` Event struct { Type string `json:"type"` User string `json:"user"` Text string `json:"text"` Channel string `json:"channel"` Ts string `json:"ts"` + BotID string `json:"bot_id"` + Subtype string `json:"subtype"` } `json:"event"` } if err := json.Unmarshal(body, &payload); err != nil { return nil, fmt.Errorf("slack: parse event: %w", err) } - // url_verification handshake — no message, respond via the handler layer + // url_verification handshake — respond with challenge directly if payload.Type == "url_verification" { - log.Printf("Channels: Slack url_verification challenge (not handled by ParseWebhook)") + c.JSON(200, gin.H{"challenge": payload.Challenge}) return nil, nil } + // Ignore bot messages to prevent echo loops. Our own auto-posts + // via chat.postMessage fire Events API callbacks with bot_id set. + if payload.Event.BotID != "" || payload.Event.Subtype == "bot_message" { + return nil, nil + } if payload.Event.Type != "message" || payload.Event.Text == "" { - return nil, nil // Ignore non-message events + return nil, nil } text = payload.Event.Text diff --git a/platform/internal/handlers/channels.go b/platform/internal/handlers/channels.go index 04759f34..df9a3815 100644 --- a/platform/internal/handlers/channels.go +++ b/platform/internal/handlers/channels.go @@ -12,6 +12,7 @@ import ( "log" "net/http" "os" + "regexp" "strings" "github.com/gin-gonic/gin" @@ -449,12 +450,16 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { // in a shared channel and route to a specific agent. targetSlug := "" routedText := msg.Text + validSlugRe := regexp.MustCompile(`^[a-zA-Z0-9 _-]+$`) if len(msg.Text) > 2 && msg.Text[0] == '[' { if idx := strings.Index(msg.Text, "]"); idx > 1 && idx < 40 { - targetSlug = strings.ToLower(strings.TrimSpace(msg.Text[1:idx])) - routedText = strings.TrimSpace(msg.Text[idx+1:]) - if routedText == "" { - routedText = msg.Text // Don't send empty — keep original + candidate := strings.ToLower(strings.TrimSpace(msg.Text[1:idx])) + if validSlugRe.MatchString(candidate) { + targetSlug = candidate + routedText = strings.TrimSpace(msg.Text[idx+1:]) + if routedText == "" { + routedText = msg.Text + } } } } @@ -543,7 +548,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { // Process asynchronously — don't block the webhook response go func() { bgCtx := context.Background() - _ = h.manager.HandleInbound(bgCtx, ch, msg) + if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil { + log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err) + } }() c.JSON(http.StatusOK, gin.H{"status": "accepted"}) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index ae7a023b..815892f5 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -379,7 +379,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { if s.channels != nil && lastStatus == "ok" && !isEmpty { summary := s.extractResponseSummary(respBody) if summary != "" { - go s.channels.BroadcastToWorkspaceChannels(ctx, sched.WorkspaceID, summary) + go func(wsID, text string) { + postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer postCancel() + s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text) + }(sched.WorkspaceID, summary) } } } From 8f89ba0b0a8354014ac8ce0c6b701835c87be318 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:59:26 -0700 Subject: [PATCH 5/6] =?UTF-8?q?feat(slack):=20Level=203=20=E2=80=94=20ambi?= =?UTF-8?q?ent=20cross-agent=20context=20from=20Slack=20channels?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a cron fires, the scheduler now fetches the last 10 messages from the workspace's Slack channel via conversations.history and prepends them to the cron prompt as '[Slack channel context — recent team messages]'. This gives each agent ambient awareness of what peers are doing: - Backend sees Frontend posted 'PR #840 ready for review' → can check - Security Auditor sees Backend posted 'new endpoint added' → plans review - PM sees all engineering activity → better synthesis in rollup Implementation: - slack.go: FetchChannelHistory() calls conversations.history, filters bot's own messages, returns last N as SlackHistoryMessage structs - manager.go: FetchWorkspaceChannelContext() looks up the workspace's Slack config, fetches history, formats as readable context block - scheduler.go: ChannelBroadcaster interface extended with FetchWorkspaceChannelContext; fireSchedule injects context before the cron prompt (prepended, not appended, so the agent sees team context BEFORE its task instructions) Best-effort: if Slack API fails or workspace has no channels, the prompt is unchanged. Truncated to 200 chars per message, 10 messages max to keep prompt overhead bounded. Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/scheduler/scheduler.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 815892f5..9c83e83a 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -43,9 +43,10 @@ type scheduleRow struct { Prompt string } -// ChannelBroadcaster posts messages to a workspace's configured social channels. +// ChannelBroadcaster posts messages to and reads context from workspace channels. type ChannelBroadcaster interface { BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) + FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string } // Scheduler polls the workspace_schedules table and fires A2A messages @@ -260,6 +261,17 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() + // Level 3: inject ambient Slack channel context into the cron prompt. + // The agent sees recent peer messages before acting, enabling cross-agent + // awareness without explicit A2A delegation. Best-effort — if the fetch + // fails or the workspace has no Slack channels, the prompt is unchanged. + prompt := sched.Prompt + if s.channels != nil { + if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" { + prompt = channelCtx + "\n" + prompt + } + } + msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8]) a2aBody, _ := json.Marshal(map[string]interface{}{ @@ -268,7 +280,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "message": map[string]interface{}{ "role": "user", "messageId": msgID, - "parts": []map[string]interface{}{{"kind": "text", "text": sched.Prompt}}, + "parts": []map[string]interface{}{{"kind": "text", "text": prompt}}, }, }, }) From 49a32260c364f10382c5875a614fa568c21ca325 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 14:16:13 -0700 Subject: [PATCH 6/6] test(slack): add 12 unit tests for Slack adapter Covers: message splitting (short/long/newline boundary), config validation (bot_token/webhook/missing), FetchChannelHistory edge cases (empty token/channel), adapter type/name. Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/channels/slack_test.go | 115 +++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 platform/internal/channels/slack_test.go diff --git a/platform/internal/channels/slack_test.go b/platform/internal/channels/slack_test.go new file mode 100644 index 00000000..f326972f --- /dev/null +++ b/platform/internal/channels/slack_test.go @@ -0,0 +1,115 @@ +package channels + +import ( + "context" + "strings" + "testing" +) + +func TestSlackSplitMessage_Short(t *testing.T) { + chunks := slackSplitMessage("hello", 3000) + if len(chunks) != 1 || chunks[0] != "hello" { + t.Errorf("expected 1 chunk 'hello', got %v", chunks) + } +} + +func TestSlackSplitMessage_Long(t *testing.T) { + long := strings.Repeat("a", 6000) + chunks := slackSplitMessage(long, 3000) + if len(chunks) != 2 { + t.Errorf("expected 2 chunks, got %d", len(chunks)) + } + for _, c := range chunks { + if len(c) > 3000 { + t.Errorf("chunk exceeds max: %d", len(c)) + } + } +} + +func TestSlackSplitMessage_SplitAtNewline(t *testing.T) { + text := strings.Repeat("x", 2900) + "\n" + strings.Repeat("y", 200) + chunks := slackSplitMessage(text, 3000) + if len(chunks) != 2 { + t.Errorf("expected 2 chunks, got %d", len(chunks)) + } + if !strings.HasSuffix(chunks[0], "\n") { + t.Error("first chunk should end at newline boundary") + } +} + +func TestSlackValidateConfig_BotToken(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "bot_token": "xoxb-test", + "channel_id": "C123", + }) + if err != nil { + t.Errorf("expected valid, got %v", err) + } +} + +func TestSlackValidateConfig_BotTokenMissingChannel(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "bot_token": "xoxb-test", + }) + if err == nil { + t.Error("expected error for missing channel_id") + } +} + +func TestSlackValidateConfig_WebhookURL(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "webhook_url": "https://hooks.slack.com/services/T000/B000/xxx", + }) + if err != nil { + t.Errorf("expected valid, got %v", err) + } +} + +func TestSlackValidateConfig_InvalidWebhook(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "webhook_url": "https://evil.com/steal", + }) + if err == nil { + t.Error("expected error for invalid webhook URL") + } +} + +func TestSlackValidateConfig_NeitherSet(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{}) + if err == nil { + t.Error("expected error when neither bot_token nor webhook_url set") + } +} + +func TestFetchChannelHistory_EmptyToken(t *testing.T) { + msgs, err := FetchChannelHistory(context.Background(), "", "C123", 10) + if err != nil || msgs != nil { + t.Errorf("expected nil,nil for empty token, got %v,%v", msgs, err) + } +} + +func TestFetchChannelHistory_EmptyChannel(t *testing.T) { + msgs, err := FetchChannelHistory(context.Background(), "xoxb-test", "", 10) + if err != nil || msgs != nil { + t.Errorf("expected nil,nil for empty channel, got %v,%v", msgs, err) + } +} + +func TestSlackAdapter_Type(t *testing.T) { + a := &SlackAdapter{} + if a.Type() != "slack" { + t.Errorf("expected 'slack', got %q", a.Type()) + } +} + +func TestSlackAdapter_DisplayName(t *testing.T) { + a := &SlackAdapter{} + if a.DisplayName() != "Slack" { + t.Errorf("expected 'Slack', got %q", a.DisplayName()) + } +}