fix(registry,crud,restart): defer rows.Close() to prevent cursor leaks #2037

Closed
core-be wants to merge 2 commits from fix/defer-rows-close-audit into staging
4 changed files with 10 additions and 15 deletions
@@ -407,15 +407,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e
// matching (the wsauth errors are typed for the invalid case).
var errInvalidCallerToken = errors.New("missing caller auth token")
// canvasUserMessage holds the extracted user message extracted from an
// A2A canvas request body for broadcasting to other sessions.
type canvasUserMessage struct {
Message string `json:"message,omitempty"`
Parts []map[string]interface{} `json:"parts,omitempty"`
MessageID string `json:"messageId,omitempty"`
Attachments []map[string]interface{} `json:"attachments,omitempty"`
}
// extractCanvasUserMessage parses an A2A JSON-RPC request body and extracts
// the user-authored text and attachments from a canvas-initiated message/send.
// Returns nil when the body is not a canvas user message (empty, malformed,
@@ -133,24 +133,30 @@ func loadRestartContextData(ctx context.Context, workspaceID string) restartCont
// message bus.
keySet := map[string]struct{}{}
if rows, err := db.DB.QueryContext(ctx, `SELECT key FROM global_secrets`); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("restart-context: global_secrets rows error: %v", err)
}
}
if rows, err := db.DB.QueryContext(ctx,
`SELECT key FROM workspace_secrets WHERE workspace_id = $1`, workspaceID,
); err == nil {
defer rows.Close()
for rows.Next() {
var k string
if rows.Scan(&k) == nil {
keySet[k] = struct{}{}
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Printf("restart-context: workspace_secrets rows error: %v", err)
}
}
for k := range keySet {
d.EnvKeys = append(d.EnvKeys, k)
@@ -435,13 +435,13 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri
if err != nil {
return nil, nil, fmt.Errorf("descendant query: %w", err)
}
defer descRows.Close()
for descRows.Next() {
var descID string
if descRows.Scan(&descID) == nil {
descendantIDs = append(descendantIDs, descID)
}
}
descRows.Close()
allIDs := append([]string{id}, descendantIDs...)
@@ -258,6 +258,7 @@ func sweepLabeledOrphansWithoutRows(ctx context.Context, reaper OrphanReaper) {
log.Printf("Orphan sweeper: wiped-DB reverse-lookup failed: %v — skipping wiped-DB pass", err)
return
}
defer knownRows.Close()
known := make(map[string]struct{}, len(managedLikes))
for knownRows.Next() {
var lk string
@@ -267,9 +268,6 @@ func sweepLabeledOrphansWithoutRows(ctx context.Context, reaper OrphanReaper) {
}
known[lk] = struct{}{}
}
if cerr := knownRows.Close(); cerr != nil {
log.Printf("Orphan sweeper: wiped-DB rows close failed: %v", cerr)
}
if iterErr := knownRows.Err(); iterErr != nil {
log.Printf("Orphan sweeper: wiped-DB rows iteration failed: %v", iterErr)
return