fix(handlers,channels,scheduler): add panic recovery to 10 goroutines #2044
@@ -271,6 +271,11 @@ func (m *Manager) Reload(ctx context.Context) {
|
||||
ch.Config["_channel_id"] = ch.ID
|
||||
|
||||
go func(a ChannelAdapter, c ChannelRow, pCtx context.Context) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in channel polling goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
if err := a.StartPolling(pCtx, c.Config, m.onInboundMessage); err != nil {
|
||||
log.Printf("Channels: polling error for %s/%s: %v", c.ChannelType, truncID(c.ID), err)
|
||||
}
|
||||
@@ -354,6 +359,11 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
typingCtx, typingCancel := context.WithCancel(fireCtx)
|
||||
defer typingCancel()
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in typing indicator goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
typer.SendTyping(ch.Config, msg.ChatID)
|
||||
ticker := time.NewTicker(4 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -113,6 +114,11 @@ func (h *WorkspaceHandler) goAsync(fn func()) {
|
||||
h.asyncWG.Add(1)
|
||||
go func() {
|
||||
defer h.asyncWG.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in goAsync goroutine: %v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
@@ -151,6 +157,11 @@ func globalGoAsync(fn func()) {
|
||||
globalAsync.Add(1)
|
||||
go func() {
|
||||
defer globalAsync.Done()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in globalGoAsync goroutine: %v\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -199,6 +199,11 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
// entry/exit — those are kept as redundant signals but this pulse is the
|
||||
// one that guarantees liveness freshness regardless of tick state.
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in scheduler heartbeat goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
pulseTicker := time.NewTicker(10 * time.Second)
|
||||
defer pulseTicker.Stop()
|
||||
for {
|
||||
@@ -638,6 +643,11 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
summary := s.extractResponseSummary(respBody)
|
||||
if summary != "" {
|
||||
go func(wsID, text string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC recovered in broadcast summary goroutine: %v", r)
|
||||
}
|
||||
}()
|
||||
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer postCancel()
|
||||
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)
|
||||
|
||||
Reference in New Issue
Block a user