From 3f161a41ebeb841492fc9ffa9ae350152776932f Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Fri, 17 Apr 2026 13:30:20 -0700 Subject: [PATCH] feat(slack): Level 1 auto-post + Level 2 inbound routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Level 1 — Auto-post cron output to Slack: - scheduler.go: captures A2A response body, extracts agent text via extractResponseSummary(), broadcasts to workspace's configured Slack channels on successful non-empty cron completions - manager.go: adds BroadcastToWorkspaceChannels() — fans out to all enabled channels for a workspace (engineering+firehose for eng agents, research+firehose for research agents, etc.) - main.go: wires scheduler → channel manager via SetChannels() - Truncates output to 500 chars for Slack readability Level 2 — Inbound Slack messages route to workspaces: Already implemented by the existing webhook handler (POST /webhooks/slack) + the ParseWebhook method in slack.go which handles both Events API JSON payloads and slash command form-encoded payloads. Needs Slack App Events API URL configured to: https:///webhooks/slack Also in this commit: - slack.go: dual-mode adapter (bot_token + webhook fallback) - 031 migration: pgvector guard wraps entire DO block Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/cmd/server/main.go | 3 ++ platform/internal/channels/manager.go | 34 +++++++++++++++++ platform/internal/scheduler/scheduler.go | 47 ++++++++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index da102453..88ef581d 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -196,6 +196,9 @@ func main() { channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) + // Wire channel manager into scheduler for auto-posting cron output to Slack + cronSched.SetChannels(channelMgr) + // Router r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr) diff --git a/platform/internal/channels/manager.go b/platform/internal/channels/manager.go index 66be0d1a..7aff16a4 100644 --- a/platform/internal/channels/manager.go +++ b/platform/internal/channels/manager.go @@ -437,6 +437,40 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin return nil } +// BroadcastToWorkspaceChannels sends a message to ALL enabled channels +// configured for a workspace. Used by the scheduler to auto-post cron +// output summaries and by delegation handlers to post completion notices. +// +// Unlike SendOutbound (which targets a specific channel row by ID), this +// fans out to every enabled channel for the workspace — so a single cron +// completion posts to both #mol-engineering AND #mol-firehose if the +// workspace has both configured via chat_id comma-separation. +func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) { + if text == "" || db.DB == nil { + return + } + // Truncate to keep Slack messages digestible + if len(text) > 500 { + text = text[:497] + "..." + } + rows, err := db.DB.QueryContext(ctx, ` + SELECT id FROM workspace_channels + WHERE workspace_id = $1 AND enabled = true + `, workspaceID) + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + var channelID string + if rows.Scan(&channelID) == nil { + if sendErr := m.SendOutbound(ctx, channelID, text); sendErr != nil { + log.Printf("Channels: broadcast to %s failed: %v", channelID[:12], sendErr) + } + } + } +} + func splitChatIDs(raw string) []string { var ids []string for _, s := range strings.Split(raw, ",") { diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 58739d12..ae7a023b 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -43,12 +43,18 @@ type scheduleRow struct { Prompt string } +// ChannelBroadcaster posts messages to a workspace's configured social channels. +type ChannelBroadcaster interface { + BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) +} + // Scheduler polls the workspace_schedules table and fires A2A messages // when a schedule's next_run_at has passed. Follows the same goroutine // pattern as registry.StartHealthSweep. type Scheduler struct { proxy A2AProxy broadcaster Broadcaster + channels ChannelBroadcaster // lastTickAt records the wall-clock time of the most recent tick // (whether it fired schedules or not). Read by Healthy() and the @@ -67,6 +73,12 @@ func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler { } } +// SetChannels wires the channel manager for auto-posting cron output. +// Called after both scheduler and channel manager are initialized. +func (s *Scheduler) SetChannels(ch ChannelBroadcaster) { + s.channels = ch +} + // LastTickAt returns the wall-clock time of the most recently completed tick. // Returns a zero time.Time if the scheduler has never completed a tick. func (s *Scheduler) LastTickAt() time.Time { @@ -360,6 +372,16 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "status": lastStatus, }) } + + // Level 1: auto-post cron output to workspace's Slack channels. + // Only post non-empty successful responses — errors and empties are + // noise that clutters the channel without adding value. + if s.channels != nil && lastStatus == "ok" && !isEmpty { + summary := s.extractResponseSummary(respBody) + if summary != "" { + go s.channels.BroadcastToWorkspaceChannels(ctx, sched.WorkspaceID, summary) + } + } } // recordSkipped advances next_run_at and logs a cron_run activity entry @@ -475,6 +497,31 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) { // produced no meaningful output. Catches "(no response generated)" from // the workspace runtime + genuinely empty/null responses. Used by the // consecutive-empty tracker (#795) to detect phantom-producing crons. +// extractResponseSummary pulls the agent's text from the A2A response body. +// Returns empty string if parsing fails or the response has no text content. +func (s *Scheduler) extractResponseSummary(body []byte) string { + if len(body) == 0 { + return "" + } + var resp map[string]interface{} + if json.Unmarshal(body, &resp) != nil { + return "" + } + // A2A response: result.parts[].text + if result, ok := resp["result"].(map[string]interface{}); ok { + if parts, ok := result["parts"].([]interface{}); ok { + for _, p := range parts { + if part, ok := p.(map[string]interface{}); ok { + if text, ok := part["text"].(string); ok && text != "" { + return text + } + } + } + } + } + return "" +} + func isEmptyResponse(body []byte) bool { if len(body) == 0 { return true