diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index da102453..88ef581d 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -196,6 +196,9 @@ func main() { channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) + // Wire channel manager into scheduler for auto-posting cron output to Slack + cronSched.SetChannels(channelMgr) + // Router r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr) diff --git a/platform/internal/channels/manager.go b/platform/internal/channels/manager.go index 66be0d1a..dc07b207 100644 --- a/platform/internal/channels/manager.go +++ b/platform/internal/channels/manager.go @@ -437,6 +437,41 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin return nil } +// BroadcastToWorkspaceChannels sends a message to ALL enabled channels +// configured for a workspace. Used by the scheduler to auto-post cron +// output summaries and by delegation handlers to post completion notices. +// +// Unlike SendOutbound (which targets a specific channel row by ID), this +// fans out to every enabled channel for the workspace — so a single cron +// completion posts to both #mol-engineering AND #mol-firehose if the +// workspace has both configured via chat_id comma-separation. +func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) { + if text == "" || db.DB == nil { + return + } + // Truncate to keep Slack messages digestible (rune-safe for CJK/emoji) + runes := []rune(text) + if len(runes) > 500 { + text = string(runes[:497]) + "..." + } + rows, err := db.DB.QueryContext(ctx, ` + SELECT id FROM workspace_channels + WHERE workspace_id = $1 AND enabled = true + `, workspaceID) + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + var channelID string + if rows.Scan(&channelID) == nil { + if sendErr := m.SendOutbound(ctx, channelID, text); sendErr != nil { + log.Printf("Channels: broadcast to %s failed: %v", channelID[:12], sendErr) + } + } + } +} + func splitChatIDs(raw string) []string { var ids []string for _, s := range strings.Split(raw, ",") { diff --git a/platform/internal/channels/slack.go b/platform/internal/channels/slack.go index 6eef5fbf..2ecfd086 100644 --- a/platform/internal/channels/slack.go +++ b/platform/internal/channels/slack.go @@ -19,6 +19,8 @@ const ( slackHTTPTimeout = 10 * time.Second ) +var slackHTTPClient = &http.Client{Timeout: slackHTTPTimeout} + // SlackAdapter implements ChannelAdapter for Slack Incoming Webhooks. // // Outbound messages are sent via Slack Incoming Webhooks (the simple, @@ -35,19 +37,90 @@ func (s *SlackAdapter) DisplayName() string { return "Slack" } // Returns an error whose message becomes part of the 400 response body so // keep it human-readable for the canvas UI. func (s *SlackAdapter) ValidateConfig(config map[string]interface{}) error { + botToken, _ := config["bot_token"].(string) webhookURL, _ := config["webhook_url"].(string) - if webhookURL == "" { - return fmt.Errorf("missing required field: webhook_url") + if botToken == "" && webhookURL == "" { + return fmt.Errorf("missing required field: bot_token or webhook_url") } - if !strings.HasPrefix(webhookURL, slackWebhookPrefix) { + if botToken != "" { + if cid, _ := config["channel_id"].(string); cid == "" { + return fmt.Errorf("bot_token mode requires channel_id") + } + } + if webhookURL != "" && !strings.HasPrefix(webhookURL, slackWebhookPrefix) { return fmt.Errorf("invalid Slack webhook URL") } return nil } -// SendMessage posts text to the configured Slack Incoming Webhook. -// chatID is ignored for Slack webhooks — the channel is encoded in the URL. -func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, _ string, text string) error { +// SendMessage posts text to Slack. Supports two modes: +// +// - Bot API (bot_token set): uses chat.postMessage with per-agent identity +// via chat:write.customize scope. Supports username + icon_emoji overrides. +// - Webhook (webhook_url set, legacy): simple POST, no identity override. +// +// chatID overrides channel_id from config if non-empty (for multi-channel routing). +func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interface{}, chatID string, text string) error { + botToken, _ := config["bot_token"].(string) + if botToken != "" { + return s.sendBotMessage(ctx, config, chatID, text) + } + return s.sendWebhookMessage(ctx, config, text) +} + +func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]interface{}, chatID, text string) error { + botToken, _ := config["bot_token"].(string) + channelID := chatID + if channelID == "" { + channelID, _ = config["channel_id"].(string) + } + if channelID == "" { + return fmt.Errorf("slack: no channel_id") + } + + username, _ := config["username"].(string) + iconEmoji, _ := config["icon_emoji"].(string) + + // Split long messages at newline boundaries + chunks := slackSplitMessage(text, 3000) + for _, chunk := range chunks { + payload := map[string]interface{}{ + "channel": channelID, + "text": chunk, + } + if username != "" { + payload["username"] = username + } + if iconEmoji != "" { + payload["icon_emoji"] = iconEmoji + } + + body, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("slack: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + req.Header.Set("Authorization", "Bearer "+botToken) + + resp, err := slackHTTPClient.Do(req) + if err != nil { + return fmt.Errorf("slack: send: %w", err) + } + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + resp.Body.Close() + var result struct { + OK bool `json:"ok"` + Error string `json:"error"` + } + if json.Unmarshal(respBody, &result) == nil && !result.OK { + return fmt.Errorf("slack: API error: %s", result.Error) + } + } + return nil +} + +func (s *SlackAdapter) sendWebhookMessage(ctx context.Context, config map[string]interface{}, text string) error { webhookURL, _ := config["webhook_url"].(string) if webhookURL == "" { return fmt.Errorf("webhook_url not configured") @@ -67,12 +140,10 @@ func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interf } req.Header.Set("Content-Type", "application/json") - client := &http.Client{Timeout: slackHTTPTimeout} - resp, err := client.Do(req) + resp, err := slackHTTPClient.Do(req) if err != nil { return fmt.Errorf("slack: send: %w", err) } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) @@ -81,6 +152,27 @@ func (s *SlackAdapter) SendMessage(ctx context.Context, config map[string]interf return nil } +func slackSplitMessage(text string, maxLen int) []string { + if len(text) <= maxLen { + return []string{text} + } + var chunks []string + for len(text) > 0 { + end := maxLen + if end > len(text) { + end = len(text) + } + if end < len(text) { + if idx := strings.LastIndex(text[:end], "\n"); idx > 0 { + end = idx + 1 + } + } + chunks = append(chunks, text[:end]) + text = text[end:] + } + return chunks +} + // ParseWebhook handles a Slack slash command or event API POST. // The payload is either URL-encoded (slash commands) or JSON (Events API). // Returns nil, nil for non-message events (e.g. url_verification challenge). @@ -112,27 +204,34 @@ func (s *SlackAdapter) ParseWebhook(c *gin.Context, _ map[string]interface{}) (* var payload struct { Type string `json:"type"` - Challenge string `json:"challenge"` // url_verification + Challenge string `json:"challenge"` Event struct { Type string `json:"type"` User string `json:"user"` Text string `json:"text"` Channel string `json:"channel"` Ts string `json:"ts"` + BotID string `json:"bot_id"` + Subtype string `json:"subtype"` } `json:"event"` } if err := json.Unmarshal(body, &payload); err != nil { return nil, fmt.Errorf("slack: parse event: %w", err) } - // url_verification handshake — no message, respond via the handler layer + // url_verification handshake — respond with challenge directly if payload.Type == "url_verification" { - log.Printf("Channels: Slack url_verification challenge (not handled by ParseWebhook)") + c.JSON(200, gin.H{"challenge": payload.Challenge}) return nil, nil } + // Ignore bot messages to prevent echo loops. Our own auto-posts + // via chat.postMessage fire Events API callbacks with bot_id set. + if payload.Event.BotID != "" || payload.Event.Subtype == "bot_message" { + return nil, nil + } if payload.Event.Type != "message" || payload.Event.Text == "" { - return nil, nil // Ignore non-message events + return nil, nil } text = payload.Event.Text diff --git a/platform/internal/channels/slack_test.go b/platform/internal/channels/slack_test.go new file mode 100644 index 00000000..f326972f --- /dev/null +++ b/platform/internal/channels/slack_test.go @@ -0,0 +1,115 @@ +package channels + +import ( + "context" + "strings" + "testing" +) + +func TestSlackSplitMessage_Short(t *testing.T) { + chunks := slackSplitMessage("hello", 3000) + if len(chunks) != 1 || chunks[0] != "hello" { + t.Errorf("expected 1 chunk 'hello', got %v", chunks) + } +} + +func TestSlackSplitMessage_Long(t *testing.T) { + long := strings.Repeat("a", 6000) + chunks := slackSplitMessage(long, 3000) + if len(chunks) != 2 { + t.Errorf("expected 2 chunks, got %d", len(chunks)) + } + for _, c := range chunks { + if len(c) > 3000 { + t.Errorf("chunk exceeds max: %d", len(c)) + } + } +} + +func TestSlackSplitMessage_SplitAtNewline(t *testing.T) { + text := strings.Repeat("x", 2900) + "\n" + strings.Repeat("y", 200) + chunks := slackSplitMessage(text, 3000) + if len(chunks) != 2 { + t.Errorf("expected 2 chunks, got %d", len(chunks)) + } + if !strings.HasSuffix(chunks[0], "\n") { + t.Error("first chunk should end at newline boundary") + } +} + +func TestSlackValidateConfig_BotToken(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "bot_token": "xoxb-test", + "channel_id": "C123", + }) + if err != nil { + t.Errorf("expected valid, got %v", err) + } +} + +func TestSlackValidateConfig_BotTokenMissingChannel(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "bot_token": "xoxb-test", + }) + if err == nil { + t.Error("expected error for missing channel_id") + } +} + +func TestSlackValidateConfig_WebhookURL(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "webhook_url": "https://hooks.slack.com/services/T000/B000/xxx", + }) + if err != nil { + t.Errorf("expected valid, got %v", err) + } +} + +func TestSlackValidateConfig_InvalidWebhook(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{ + "webhook_url": "https://evil.com/steal", + }) + if err == nil { + t.Error("expected error for invalid webhook URL") + } +} + +func TestSlackValidateConfig_NeitherSet(t *testing.T) { + a := &SlackAdapter{} + err := a.ValidateConfig(map[string]interface{}{}) + if err == nil { + t.Error("expected error when neither bot_token nor webhook_url set") + } +} + +func TestFetchChannelHistory_EmptyToken(t *testing.T) { + msgs, err := FetchChannelHistory(context.Background(), "", "C123", 10) + if err != nil || msgs != nil { + t.Errorf("expected nil,nil for empty token, got %v,%v", msgs, err) + } +} + +func TestFetchChannelHistory_EmptyChannel(t *testing.T) { + msgs, err := FetchChannelHistory(context.Background(), "xoxb-test", "", 10) + if err != nil || msgs != nil { + t.Errorf("expected nil,nil for empty channel, got %v,%v", msgs, err) + } +} + +func TestSlackAdapter_Type(t *testing.T) { + a := &SlackAdapter{} + if a.Type() != "slack" { + t.Errorf("expected 'slack', got %q", a.Type()) + } +} + +func TestSlackAdapter_DisplayName(t *testing.T) { + a := &SlackAdapter{} + if a.DisplayName() != "Slack" { + t.Errorf("expected 'Slack', got %q", a.DisplayName()) + } +} diff --git a/platform/internal/handlers/channels.go b/platform/internal/handlers/channels.go index 0c7df94c..df9a3815 100644 --- a/platform/internal/handlers/channels.go +++ b/platform/internal/handlers/channels.go @@ -12,6 +12,7 @@ import ( "log" "net/http" "os" + "regexp" "strings" "github.com/gin-gonic/gin" @@ -443,9 +444,27 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { return } + // [slug] routing: if the message starts with [word], extract it as + // a target agent slug and match against the channel config's username + // field (lowercased). This lets humans type "[backend] what's #800?" + // in a shared channel and route to a specific agent. + targetSlug := "" + routedText := msg.Text + validSlugRe := regexp.MustCompile(`^[a-zA-Z0-9 _-]+$`) + if len(msg.Text) > 2 && msg.Text[0] == '[' { + if idx := strings.Index(msg.Text, "]"); idx > 1 && idx < 40 { + candidate := strings.ToLower(strings.TrimSpace(msg.Text[1:idx])) + if validSlugRe.MatchString(candidate) { + targetSlug = candidate + routedText = strings.TrimSpace(msg.Text[idx+1:]) + if routedText == "" { + routedText = msg.Text + } + } + } + } + // Look up channels by type and find one whose chat_id list contains msg.ChatID. - // We can't use SQL LIKE — that matches substrings (chat_id "123" would match "1234"). - // Fetch all enabled channels of this type, then exact-match in code. rows, err := db.DB.QueryContext(ctx, ` SELECT id, workspace_id, channel_type, channel_config, enabled, allowed_users FROM workspace_channels @@ -458,6 +477,7 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { defer rows.Close() var ch channels.ChannelRow + var candidates []channels.ChannelRow found := false for rows.Next() { var row channels.ChannelRow @@ -467,36 +487,59 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { } json.Unmarshal(configJSON, &row.Config) json.Unmarshal(allowedJSON, &row.AllowedUsers) - // #319: decrypt sensitive fields before comparing webhook_secret / - // using bot_token downstream. Skip rows whose decrypt fails so a - // single corrupt channel cannot block webhooks for all others. if err := channels.DecryptSensitiveFields(row.Config); err != nil { log.Printf("Channels: decrypt webhook row %s: %v", row.ID, err) continue } - // Verify webhook secret_token if the channel has one configured. - // #337: use constant-time comparison. Go's `!=` short-circuits on - // the first mismatched byte and leaks timing information; an - // attacker on the Docker network could enumerate the secret - // byte-by-byte. subtle.ConstantTimeCompare runs in time - // proportional to the length of the shorter input and returns - // 1 on match / 0 otherwise (never -1). Same posture as the - // cdp-proxy token compare in host-bridge. if expectedSecret, _ := row.Config["webhook_secret"].(string); expectedSecret != "" { receivedSecret := c.GetHeader("X-Telegram-Bot-Api-Secret-Token") if subtle.ConstantTimeCompare([]byte(receivedSecret), []byte(expectedSecret)) != 1 { - continue // Wrong secret — try other channels (could be different bot) + continue } } - // Exact match against the comma-separated chat_id list if matchesChatID(row.Config, msg.ChatID) { - ch = row - found = true - break + candidates = append(candidates, row) } } + + if targetSlug != "" { + // [slug] routing — match against config username (lowercased) + for _, row := range candidates { + username, _ := row.Config["username"].(string) + usernameLC := strings.ToLower(username) + // Match: [backend] → "Backend Engineer", [pm] → "PM", [dev lead] → "Dev Lead" + if usernameLC == targetSlug || + strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", "-"), targetSlug) || + strings.HasPrefix(strings.ReplaceAll(usernameLC, " ", ""), targetSlug) { + ch = row + found = true + msg.Text = routedText // Strip the [slug] prefix before routing + break + } + } + if !found { + // No match for slug — respond with available agents + var names []string + for _, row := range candidates { + if u, _ := row.Config["username"].(string); u != "" { + names = append(names, "["+strings.ToLower(strings.ReplaceAll(u, " ", "-"))+"]") + } + } + c.JSON(http.StatusOK, gin.H{ + "status": "unknown_agent", + "requested_slug": targetSlug, + "available_slugs": names, + }) + return + } + } else if len(candidates) > 0 { + // No [slug] prefix — route to first matching channel (backward compat) + ch = candidates[0] + found = true + } + if !found { c.JSON(http.StatusOK, gin.H{"status": "no_channel"}) return @@ -505,7 +548,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) { // Process asynchronously — don't block the webhook response go func() { bgCtx := context.Background() - _ = h.manager.HandleInbound(bgCtx, ch, msg) + if err := h.manager.HandleInbound(bgCtx, ch, msg); err != nil { + log.Printf("Channels: async HandleInbound error for workspace %s: %v", ch.WorkspaceID[:12], err) + } }() c.JSON(http.StatusOK, gin.H{"status": "accepted"}) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 58739d12..9c83e83a 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -43,12 +43,19 @@ type scheduleRow struct { Prompt string } +// ChannelBroadcaster posts messages to and reads context from workspace channels. +type ChannelBroadcaster interface { + BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) + FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string +} + // Scheduler polls the workspace_schedules table and fires A2A messages // when a schedule's next_run_at has passed. Follows the same goroutine // pattern as registry.StartHealthSweep. type Scheduler struct { proxy A2AProxy broadcaster Broadcaster + channels ChannelBroadcaster // lastTickAt records the wall-clock time of the most recent tick // (whether it fired schedules or not). Read by Healthy() and the @@ -67,6 +74,12 @@ func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler { } } +// SetChannels wires the channel manager for auto-posting cron output. +// Called after both scheduler and channel manager are initialized. +func (s *Scheduler) SetChannels(ch ChannelBroadcaster) { + s.channels = ch +} + // LastTickAt returns the wall-clock time of the most recently completed tick. // Returns a zero time.Time if the scheduler has never completed a tick. func (s *Scheduler) LastTickAt() time.Time { @@ -248,6 +261,17 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() + // Level 3: inject ambient Slack channel context into the cron prompt. + // The agent sees recent peer messages before acting, enabling cross-agent + // awareness without explicit A2A delegation. Best-effort — if the fetch + // fails or the workspace has no Slack channels, the prompt is unchanged. + prompt := sched.Prompt + if s.channels != nil { + if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" { + prompt = channelCtx + "\n" + prompt + } + } + msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8]) a2aBody, _ := json.Marshal(map[string]interface{}{ @@ -256,7 +280,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "message": map[string]interface{}{ "role": "user", "messageId": msgID, - "parts": []map[string]interface{}{{"kind": "text", "text": sched.Prompt}}, + "parts": []map[string]interface{}{{"kind": "text", "text": prompt}}, }, }, }) @@ -360,6 +384,20 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "status": lastStatus, }) } + + // Level 1: auto-post cron output to workspace's Slack channels. + // Only post non-empty successful responses — errors and empties are + // noise that clutters the channel without adding value. + if s.channels != nil && lastStatus == "ok" && !isEmpty { + summary := s.extractResponseSummary(respBody) + if summary != "" { + go func(wsID, text string) { + postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer postCancel() + s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text) + }(sched.WorkspaceID, summary) + } + } } // recordSkipped advances next_run_at and logs a cron_run activity entry @@ -475,6 +513,31 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) { // produced no meaningful output. Catches "(no response generated)" from // the workspace runtime + genuinely empty/null responses. Used by the // consecutive-empty tracker (#795) to detect phantom-producing crons. +// extractResponseSummary pulls the agent's text from the A2A response body. +// Returns empty string if parsing fails or the response has no text content. +func (s *Scheduler) extractResponseSummary(body []byte) string { + if len(body) == 0 { + return "" + } + var resp map[string]interface{} + if json.Unmarshal(body, &resp) != nil { + return "" + } + // A2A response: result.parts[].text + if result, ok := resp["result"].(map[string]interface{}); ok { + if parts, ok := result["parts"].([]interface{}); ok { + for _, p := range parts { + if part, ok := p.(map[string]interface{}); ok { + if text, ok := part["text"].(string); ok && text != "" { + return text + } + } + } + } + } + return "" +} + func isEmptyResponse(body []byte) bool { if len(body) == 0 { return true diff --git a/platform/migrations/031_memories_pgvector.up.sql b/platform/migrations/031_memories_pgvector.up.sql index 45ffb40e..b0fbb558 100644 --- a/platform/migrations/031_memories_pgvector.up.sql +++ b/platform/migrations/031_memories_pgvector.up.sql @@ -3,10 +3,9 @@ -- Adds a dense-vector embedding column to agent_memories to power semantic -- (cosine-similarity) memory recall alongside the existing FTS path. -- --- Requires the pgvector Postgres extension. The entire migration is wrapped --- in a single DO block so if pgvector is unavailable, ALL statements are --- skipped (not just CREATE EXTENSION). This prevents "type vector does not --- exist" errors on the ALTER TABLE / CREATE INDEX that follow. +-- Requires the pgvector Postgres extension. The DO block is a no-op guard: +-- if the extension is unavailable this migration exits early so a boot +-- without pgvector installed does not break the migration sweep. -- -- Issue: #576 @@ -20,8 +19,6 @@ BEGIN -- ivfflat approximate nearest-neighbour index for cosine similarity. -- lists=100 is a reasonable default for tables up to ~1M rows. - -- Partial index (WHERE embedding IS NOT NULL) keeps it lean — unembedded - -- rows are skipped entirely. CREATE INDEX IF NOT EXISTS agent_memories_embedding_idx ON agent_memories USING ivfflat (embedding vector_cosine_ops) WHERE embedding IS NOT NULL;