fix(workspace-server): prevent time.After goroutine leaks in long-running loops #1939
@@ -482,12 +482,14 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
if apiErr.Code == 429 {
|
||||
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
|
||||
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
|
||||
timer := time.NewTimer(retryAfter)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-time.After(retryAfter):
|
||||
continue
|
||||
case <-timer.C:
|
||||
}
|
||||
continue
|
||||
}
|
||||
if apiErr.Code == 401 {
|
||||
invalidateBot(token)
|
||||
@@ -495,12 +497,14 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
}
|
||||
}
|
||||
log.Printf("Channels: Telegram poll error: %v", err)
|
||||
timer := time.NewTimer(telegramPollInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return nil
|
||||
case <-time.After(telegramPollInterval):
|
||||
continue
|
||||
case <-timer.C:
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, update := range updates {
|
||||
|
||||
@@ -177,10 +177,12 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
|
||||
).Scan(&status); err == nil && status == "online" {
|
||||
return true
|
||||
}
|
||||
timer := time.NewTimer(restartContextOnlinePollInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return false
|
||||
case <-time.After(restartContextOnlinePollInterval):
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
return false
|
||||
@@ -213,10 +215,12 @@ func waitForFreshHeartbeat(ctx context.Context, workspaceID string, restartStart
|
||||
lastHB.Valid && lastHB.Time.After(restartStartTs) {
|
||||
return true
|
||||
}
|
||||
timer := time.NewTimer(restartContextOnlinePollInterval)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return false
|
||||
case <-time.After(restartContextOnlinePollInterval):
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -763,12 +763,14 @@ func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID,
|
||||
}
|
||||
// Sleep with ctx awareness so a cancelled ctx exits early instead
|
||||
// of stalling the goroutine through the remaining backoff.
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
log.Printf("%s: cpProv.Stop(%s) abandoned mid-retry: ctx cancelled (last_err=%v)",
|
||||
source, workspaceID, lastErr)
|
||||
return ctx.Err()
|
||||
case <-time.After(delay):
|
||||
case <-timer.C:
|
||||
}
|
||||
delay *= 2
|
||||
}
|
||||
|
||||
@@ -60,10 +60,12 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
|
||||
}
|
||||
|
||||
// Panic → back off and restart.
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
case <-timer.C:
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
|
||||
Reference in New Issue
Block a user