feat(slack): Level 1 auto-post + Level 2 inbound routing

Level 1 — Auto-post cron output to Slack:
- scheduler.go: captures A2A response body, extracts agent text via
  extractResponseSummary(), broadcasts to workspace's configured Slack
  channels on successful non-empty cron completions
- manager.go: adds BroadcastToWorkspaceChannels() — fans out to all
  enabled channels for a workspace (engineering+firehose for eng agents,
  research+firehose for research agents, etc.)
- main.go: wires scheduler → channel manager via SetChannels()
- Truncates output to 500 chars for Slack readability

Level 2 — Inbound Slack messages route to workspaces:
Already implemented by the existing webhook handler (POST /webhooks/slack)
+ the ParseWebhook method in slack.go which handles both Events API JSON
payloads and slash command form-encoded payloads. Needs Slack App Events
API URL configured to: https://<platform-host>/webhooks/slack

Also in this commit:
- slack.go: dual-mode adapter (bot_token + webhook fallback)
- 031 migration: pgvector guard wraps entire DO block

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-17 13:30:20 -07:00
parent 735aae6564
commit 3f161a41eb
3 changed files with 84 additions and 0 deletions

View File

@ -196,6 +196,9 @@ func main() {
channelMgr := channels.NewManager(wh, broadcaster)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
// Wire channel manager into scheduler for auto-posting cron output to Slack
cronSched.SetChannels(channelMgr)
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)

View File

@ -437,6 +437,40 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
return nil
}
// BroadcastToWorkspaceChannels sends a message to ALL enabled channels
// configured for a workspace. Used by the scheduler to auto-post cron
// output summaries and by delegation handlers to post completion notices.
//
// Unlike SendOutbound (which targets a specific channel row by ID), this
// fans out to every enabled channel for the workspace — so a single cron
// completion posts to both #mol-engineering AND #mol-firehose if the
// workspace has both configured via chat_id comma-separation.
func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string) {
if text == "" || db.DB == nil {
return
}
// Truncate to keep Slack messages digestible
if len(text) > 500 {
text = text[:497] + "..."
}
rows, err := db.DB.QueryContext(ctx, `
SELECT id FROM workspace_channels
WHERE workspace_id = $1 AND enabled = true
`, workspaceID)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var channelID string
if rows.Scan(&channelID) == nil {
if sendErr := m.SendOutbound(ctx, channelID, text); sendErr != nil {
log.Printf("Channels: broadcast to %s failed: %v", channelID[:12], sendErr)
}
}
}
}
func splitChatIDs(raw string) []string {
var ids []string
for _, s := range strings.Split(raw, ",") {

View File

@ -43,12 +43,18 @@ type scheduleRow struct {
Prompt string
}
// ChannelBroadcaster posts messages to a workspace's configured social channels.
type ChannelBroadcaster interface {
BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string)
}
// Scheduler polls the workspace_schedules table and fires A2A messages
// when a schedule's next_run_at has passed. Follows the same goroutine
// pattern as registry.StartHealthSweep.
type Scheduler struct {
proxy A2AProxy
broadcaster Broadcaster
channels ChannelBroadcaster
// lastTickAt records the wall-clock time of the most recent tick
// (whether it fired schedules or not). Read by Healthy() and the
@ -67,6 +73,12 @@ func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler {
}
}
// SetChannels wires the channel manager for auto-posting cron output.
// Called after both scheduler and channel manager are initialized.
func (s *Scheduler) SetChannels(ch ChannelBroadcaster) {
s.channels = ch
}
// LastTickAt returns the wall-clock time of the most recently completed tick.
// Returns a zero time.Time if the scheduler has never completed a tick.
func (s *Scheduler) LastTickAt() time.Time {
@ -360,6 +372,16 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
"status": lastStatus,
})
}
// Level 1: auto-post cron output to workspace's Slack channels.
// Only post non-empty successful responses — errors and empties are
// noise that clutters the channel without adding value.
if s.channels != nil && lastStatus == "ok" && !isEmpty {
summary := s.extractResponseSummary(respBody)
if summary != "" {
go s.channels.BroadcastToWorkspaceChannels(ctx, sched.WorkspaceID, summary)
}
}
}
// recordSkipped advances next_run_at and logs a cron_run activity entry
@ -475,6 +497,31 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
// produced no meaningful output. Catches "(no response generated)" from
// the workspace runtime + genuinely empty/null responses. Used by the
// consecutive-empty tracker (#795) to detect phantom-producing crons.
// extractResponseSummary pulls the agent's text from the A2A response body.
// Returns empty string if parsing fails or the response has no text content.
func (s *Scheduler) extractResponseSummary(body []byte) string {
if len(body) == 0 {
return ""
}
var resp map[string]interface{}
if json.Unmarshal(body, &resp) != nil {
return ""
}
// A2A response: result.parts[].text
if result, ok := resp["result"].(map[string]interface{}); ok {
if parts, ok := result["parts"].([]interface{}); ok {
for _, p := range parts {
if part, ok := p.(map[string]interface{}); ok {
if text, ok := part["text"].(string); ok && text != "" {
return text
}
}
}
}
}
return ""
}
func isEmptyResponse(body []byte) bool {
if len(body) == 0 {
return true