fix(mcp-tools): log scanPeers errors instead of silently dropping them #1713

Merged
hongming merged 5 commits from fix/mcp-tools-scanpeers-err into main 2026-05-23 10:15:14 +00:00
8 changed files with 53 additions and 6 deletions
@@ -156,6 +156,9 @@ func (m *Manager) PausePollersForToken(workspaceID, botToken string) func() {
}
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels: pause-pollers rows.Err: %v", err)
}
m.mu.Unlock()
if len(pausedIDs) == 0 {
@@ -216,6 +219,9 @@ func (m *Manager) Reload(ctx context.Context) {
}
desired[ch.ID] = ch
}
if err := rows.Err(); err != nil {
log.Printf("Channels: reload rows.Err: %v", err)
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -473,6 +479,9 @@ func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID,
}
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels: broadcast rows.Err: %v", err)
}
}
// FetchWorkspaceChannelContext returns recent Slack channel messages formatted
@@ -104,6 +104,9 @@ func (h *ChannelHandler) List(c *gin.Context) {
}
result = append(result, entry)
}
if err := rows.Err(); err != nil {
log.Printf("Channels: list rows.Err: %v", err)
}
c.JSON(http.StatusOK, result)
}
@@ -514,6 +517,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
candidates = append(candidates, row)
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels: telegram webhook rows.Err: %v", err)
}
if targetSlug != "" {
// [slug] routing — match against config username (lowercased)
@@ -393,6 +393,9 @@ func queryPeerMaps(query string, args ...interface{}) ([]map[string]interface{},
result = append(result, peer)
}
if err := rows.Err(); err != nil {
log.Printf("queryPeerMaps rows.Err: %v", err)
}
return result, nil
}
@@ -95,14 +95,18 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
cols+` FROM workspaces w WHERE w.parent_id = $1 AND w.id != $2 AND w.status != 'removed'`,
parentID.String, workspaceID)
if err == nil {
_ = scanPeers(rows)
if scanErr := scanPeers(rows); scanErr != nil {
log.Printf("MCP toolListPeers: sibling scan error: %v", scanErr)
}
}
} else {
rows, err := h.database.QueryContext(ctx,
cols+` FROM workspaces w WHERE w.parent_id IS NULL AND w.id != $1 AND w.status != 'removed'`,
workspaceID)
if err == nil {
_ = scanPeers(rows)
if scanErr := scanPeers(rows); scanErr != nil {
log.Printf("MCP toolListPeers: sibling scan error: %v", scanErr)
}
}
}
@@ -112,7 +116,9 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
cols+` FROM workspaces w WHERE w.parent_id = $1 AND w.status != 'removed'`,
workspaceID)
if err == nil {
_ = scanPeers(rows)
if scanErr := scanPeers(rows); scanErr != nil {
log.Printf("MCP toolListPeers: children scan error: %v", scanErr)
}
}
}
@@ -122,7 +128,9 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
cols+` FROM workspaces w WHERE w.id = $1 AND w.status != 'removed'`,
parentID.String)
if err == nil {
_ = scanPeers(rows)
if scanErr := scanPeers(rows); scanErr != nil {
log.Printf("MCP toolListPeers: parent scan error: %v", scanErr)
}
}
}
@@ -133,24 +133,30 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// message bus.
keySet := map[string]struct{}{}
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("loadRestartContextData: global_secrets rows.Err: %v", err)
}
}
if rows, err := db.DB.QueryContext(ctx,
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("loadRestartContextData: workspace_secrets rows.Err: %v", err)
}
}
for k := range keySet {
d.EnvKeys = append(d.EnvKeys, k)
@@ -858,6 +858,9 @@ func (h *WorkspaceHandler) Pause(c *gin.Context) {
toPause = append(toPause, struct{ id, name string }{cid, cname})
}
}
if err := rows.Err(); err != nil {
log.Printf("Pause: descendant query rows.Err: %v", err)
}
}
// Stop containers and mark all as paused. StopWorkspaceAuto routes
@@ -939,6 +942,9 @@ func (h *WorkspaceHandler) Resume(c *gin.Context) {
toResume = append(toResume, ws)
}
}
if err := rows.Err(); err != nil {
log.Printf("Resume: descendant query rows.Err: %v", err)
}
}
// Re-provision all
@@ -93,6 +93,9 @@ func sweepOnlineWorkspaces(ctx context.Context, checker ContainerChecker, onOffl
ids = append(ids, id)
}
}
if err := rows.Err(); err != nil {
log.Printf("Health sweep: rows error: %v", err)
}
for _, id := range ids {
running, err := checker.IsRunning(ctx, id)
@@ -159,6 +162,9 @@ func sweepStaleRemoteWorkspaces(ctx context.Context, onOffline OfflineHandler) {
ids = append(ids, id)
}
}
if err := rows.Err(); err != nil {
log.Printf("Health sweep: rows error: %v", err)
}
for _, id := range ids {
// External workspaces flip to 'awaiting_agent' (re-registrable
@@ -166,6 +166,9 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
ids = append(ids, c)
}
}
if err := rows.Err(); err != nil {
log.Printf("Provision-timeout sweep: rows error: %v", err)
}
for _, c := range ids {
timeout := provisioningTimeoutFor(c.runtime, lookup)