molecule-core/workspace-server/internal/handlers/admin_queue.go
Molecule AI Infra-Runtime-BE a1b803ca7a fix(admin/a2a_queue): add drop-stale endpoint for post-incident queue cleanup
Issue #1947: after incidents, PM agents inherit hour-old TASK-priority
queue items from ICs that were correctly reporting "X is broken" while
X was actually broken. Once X is fixed those items are stale noise —
PMs spend ~5 min each writing "thanks, the issue is resolved".

Adds:
- DropStaleQueueItems() in a2a_queue.go: UPDATE ... SET status='dropped'
  for queued items older than maxAgeMinutes. Uses FOR UPDATE SKIP LOCKED
  to stay concurrency-safe with concurrent drain calls.
- AdminQueueHandler in admin_queue.go: POST /admin/a2a-queue/drop-stale
  (AdminAuth, ?max_age_minutes=N, &workspace_id=<id>). Returns {dropped: N}.
- admin_queue_test.go: HTTP-level tests for param validation and response shape.
- Router registration for the new endpoint.

Usage during incident recovery:
  curl -X POST /admin/a2a-queue/drop-stale?max_age_minutes=120
  # scoped to one workspace:
  curl -X POST /admin/a2a-queue/drop-stale?max_age_minutes=120&workspace_id=<uuid>

Closes #1947.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 02:08:35 +00:00

48 lines
1.5 KiB
Go

package handlers
import (
"log"
"net/http"
"strconv"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// AdminQueueHandler serves POST /admin/a2a-queue/drop-stale — an ops tool for
// post-incident queue cleanup. Marks queued items older than the given TTL as
// 'dropped', preventing PM agents from spending cycles on stale post-incident
// TASK-priority messages.
//
// POST /admin/a2a-queue/drop-stale
// ?max_age_minutes=N (default 60)
// &workspace_id=<id> (optional; empty = all workspaces)
//
// Returns JSON { "dropped": <count> } on success, 500 on error.
type AdminQueueHandler struct{}
func NewAdminQueueHandler() *AdminQueueHandler {
return &AdminQueueHandler{}
}
func (h *AdminQueueHandler) DropStale(c *gin.Context) {
maxAgeStr := c.DefaultQuery("max_age_minutes", "60")
maxAge, err := strconv.Atoi(maxAgeStr)
if err != nil || maxAge < 1 {
c.JSON(http.StatusBadRequest, gin.H{"error": "max_age_minutes must be a positive integer"})
return
}
workspaceID := c.Query("workspace_id")
count, err := DropStaleQueueItems(c.Request.Context(), workspaceID, maxAge)
if err != nil {
log.Printf("AdminQueueHandler.DropStale: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to drop stale items"})
return
}
log.Printf("AdminQueueHandler.DropStale: dropped %d items (workspace_id=%s, max_age=%dm)",
count, workspaceID, maxAge)
c.JSON(http.StatusOK, gin.H{"dropped": count})
}