fix(rowserr): add missing rows.Err() checks in memory, schedules, and audit #2092

Closed
core-be wants to merge 3 commits from fix/rowserr-memory-schedules-audit into staging
5 changed files with 12 additions and 17 deletions
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
+3 -5
View File
@@ -206,11 +206,6 @@ func (h *AuditHandler) Query(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "scan failed"})
return
}
if err := rows.Err(); err != nil {
log.Printf("audit: rows error for workspace %s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "scan failed"})
return
}
// Chain verification (inline when AUDIT_LEDGER_SALT is set) ------------
// Paginated views cannot verify chain integrity — earlier events are absent
@@ -252,6 +247,9 @@ func scanAuditRows(rows *sql.Rows) ([]auditEventRow, error) {
}
result = append(result, ev)
}
if err := rows.Err(); err != nil {
return nil, err
}
return result, nil
}
@@ -54,6 +54,9 @@ func (h *MemoryHandler) List(c *gin.Context) {
entry.Value = json.RawMessage(value)
entries = append(entries, entry)
}
if err := rows.Err(); err != nil {
log.Printf("Memory.List: rows error: %v", err)
}
c.JSON(http.StatusOK, entries)
}
@@ -325,6 +325,9 @@ func (h *ScheduleHandler) History(c *gin.Context) {
e.Request = json.RawMessage(reqStr)
entries = append(entries, e)
}
if err := rows.Err(); err != nil {
log.Printf("Schedules.History: rows error: %v", err)
}
c.JSON(http.StatusOK, entries)
}
@@ -87,15 +87,15 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) {
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
SELECT id, parent_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
SELECT w.id, w.parent_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
SELECT id AS root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)