fix: add panic recovery in goroutines across channels, handlers, scheduler (re-created from staging #2044) #2117

Closed
core-be wants to merge 1 commits from fix/panic-recovery-goroutines-channels-handlers-scheduler-main into main
3 changed files with 31 additions and 0 deletions
@@ -271,6 +271,11 @@ func (m *Manager) Reload(ctx context.Context) {
ch.Config["_channel_id"] = ch.ID
go func(a ChannelAdapter, c ChannelRow, pCtx context.Context) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in channel polling goroutine: %v", r)
}
}()
if err := a.StartPolling(pCtx, c.Config, m.onInboundMessage); err != nil {
log.Printf("Channels: polling error for %s/%s: %v", c.ChannelType, truncID(c.ID), err)
}
@@ -354,6 +359,11 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
typingCtx, typingCancel := context.WithCancel(fireCtx)
defer typingCancel()
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in typing indicator goroutine: %v", r)
}
}()
typer.SendTyping(ch.Config, msg.ChatID)
ticker := time.NewTicker(4 * time.Second)
defer ticker.Stop()
@@ -14,6 +14,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"time"
@@ -113,6 +114,11 @@ func (h *WorkspaceHandler) goAsync(fn func()) {
h.asyncWG.Add(1)
go func() {
defer h.asyncWG.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in goAsync goroutine: %v\n%s", r, debug.Stack())
}
}()
fn()
}()
}
@@ -151,6 +157,11 @@ func globalGoAsync(fn func()) {
globalAsync.Add(1)
go func() {
defer globalAsync.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in globalGoAsync goroutine: %v", r)
}
}()
fn()
}()
}
@@ -199,6 +199,11 @@ func (s *Scheduler) Start(ctx context.Context) {
// entry/exit — those are kept as redundant signals but this pulse is the
// one that guarantees liveness freshness regardless of tick state.
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in scheduler heartbeat goroutine: %v", r)
}
}()
pulseTicker := time.NewTicker(10 * time.Second)
defer pulseTicker.Stop()
for {
@@ -638,6 +643,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
summary := s.extractResponseSummary(respBody)
if summary != "" {
go func(wsID, text string) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in broadcast summary goroutine: %v", r)
}
}()
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer postCancel()
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)