fix(rowserr): add missing rows.Err() checks after iteration loops #2091

Closed
core-be wants to merge 3 commits from fix/rowserr-checks-events-channels-manager into staging
5 changed files with 24 additions and 12 deletions
@@ -156,6 +156,9 @@ func (m *Manager) PausePollersForToken(workspaceID, botToken string) func() {
}
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels: pause-pollers rows error: %v", err)
}
m.mu.Unlock()
if len(pausedIDs) == 0 {
@@ -216,6 +219,9 @@ func (m *Manager) Reload(ctx context.Context) {
}
desired[ch.ID] = ch
}
if err := rows.Err(); err != nil {
log.Printf("Channels: reload rows error: %v", err)
}
m.mu.Lock()
defer m.mu.Unlock()
@@ -473,6 +479,9 @@ func (m *Manager) BroadcastToWorkspaceChannels(ctx context.Context, workspaceID,
}
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels: broadcast rows error: %v", err)
}
}
// FetchWorkspaceChannelContext returns recent Slack channel messages formatted
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -104,6 +104,9 @@ func (h *ChannelHandler) List(c *gin.Context) {
}
result = append(result, entry)
}
if err := rows.Err(); err != nil {
log.Printf("Channels.List: rows error: %v", err)
}
c.JSON(http.StatusOK, result)
}
@@ -514,6 +517,9 @@ func (h *ChannelHandler) Webhook(c *gin.Context) {
candidates = append(candidates, row)
}
}
if err := rows.Err(); err != nil {
log.Printf("Channels.TelegramWebhook: rows error: %v", err)
}
if targetSlug != "" {
// [slug] routing — match against config username (lowercased)
@@ -49,6 +49,9 @@ func (h *EventsHandler) List(c *gin.Context) {
"created_at": createdAt,
})
}
if err := rows.Err(); err != nil {
log.Printf("Events.List: rows error: %v", err)
}
c.JSON(http.StatusOK, events)
}
@@ -87,5 +90,8 @@ func (h *EventsHandler) ListByWorkspace(c *gin.Context) {
"created_at": createdAt,
})
}
if err := rows.Err(); err != nil {
log.Printf("Events.ListByWorkspace: rows error: %v", err)
}
c.JSON(http.StatusOK, events)
}
@@ -87,15 +87,15 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) {
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
SELECT id, parent_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
SELECT w.id, w.parent_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
SELECT id AS root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)