Mechanical migration of bare event-name strings in BroadcastOnly / RecordAndBroadcast call sites to the typed constants from internal/events/types.go (RFC #2945 PR-B). Wire format unchanged (both shapes serialize to identical WSMessage.Event literals); pinned by TestAllEventTypes_IsSnapshot in #2965. Migrated (18 files, scope: handlers/, scheduler/, registry/, bundle/, channels/): - handlers/{approvals,a2a_proxy_helpers,a2a_queue,activity,agent, delegation,external_rotate,org_import,registry,workspace, workspace_bootstrap,workspace_crud,workspace_provision_shared, workspace_restart}.go - channels/manager.go (caught by hostile-reviewer pass — initial scope missed channels/, found via grep on the post-migration tree) - scheduler/scheduler.go - registry/provisiontimeout.go - bundle/importer.go Hostile self-review (3 weakest spots, addressed) ------------------------------------------------ 1. Missed call sites — initial scope omitted channels/. Post-migration `grep -rEn 'BroadcastOnly\([^,]+,[^,]*"[A-Z_]+"|RecordAndBroadcast\([^,]+,[^,]*"[A-Z_]+"' internal/` found 2 stragglers in channels/manager.go. Migrated. Final grep on the same pattern returns only the docstring example in types.go (intentional). 2. gofmt drift — auto-import injection produced non-canonical import ordering. `gofmt -w` applied ONLY to the 18 modified files (NOT the whole tree, to avoid sweeping unrelated pre-existing drift into this PR's diff). Three pre-existing un-gofmt'd files in handlers/ (a2a_proxy.go, a2a_proxy_test.go, a2a_queue_test.go) left as-is — they're unchanged by this PR and their drift predates it. 3. Wire format — paranoia check: do the constants serialize to the exact strings consumers (canvas TS, hermes plugin, anything parsing WSMessage.Event) expect? Yes. Pinned by the snapshot test. The migration is name-only; not a single character of wire output changes. Verified - go build ./... clean - go vet ./internal/... clean - gofmt -l on the 5 migrated package dirs: only pre-existing files - Full tests: handlers/, channels/, scheduler/, registry/, events/, bundle/ all green (5 ok, 0 fail) PR-B-2 (canvas TS mirror + cross-language parity gate) remains as the final piece of RFC #2945 PR-B. Tracked separately so this PR stays mechanical + reviewable. Refs RFC #2945, PR #2965 (PR-B types). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
236 lines
7.2 KiB
Go
236 lines
7.2 KiB
Go
package handlers
|
|
|
|
import (
|
|
"database/sql"
|
|
"log"
|
|
"net/http"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
type AgentHandler struct {
|
|
broadcaster *events.Broadcaster
|
|
}
|
|
|
|
func NewAgentHandler(b *events.Broadcaster) *AgentHandler {
|
|
return &AgentHandler{broadcaster: b}
|
|
}
|
|
|
|
// Assign handles POST /workspaces/:id/agent
|
|
func (h *AgentHandler) Assign(c *gin.Context) {
|
|
workspaceID := c.Param("id")
|
|
ctx := c.Request.Context()
|
|
|
|
var body struct {
|
|
Model string `json:"model" binding:"required"`
|
|
}
|
|
if err := c.ShouldBindJSON(&body); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
|
return
|
|
}
|
|
|
|
// Check workspace exists
|
|
var status string
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`SELECT status FROM workspaces WHERE id = $1`, workspaceID).Scan(&status)
|
|
if err == sql.ErrNoRows {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
|
return
|
|
}
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
|
return
|
|
}
|
|
|
|
// Check no active agent already assigned
|
|
var existingCount int
|
|
if err := db.DB.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM agents WHERE workspace_id = $1 AND status = 'active'`, workspaceID,
|
|
).Scan(&existingCount); err != nil {
|
|
log.Printf("Agent assign check error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
|
return
|
|
}
|
|
if existingCount > 0 {
|
|
c.JSON(http.StatusConflict, gin.H{"error": "workspace already has an active agent, use PATCH to replace"})
|
|
return
|
|
}
|
|
|
|
// Insert agent
|
|
var agentID string
|
|
err = db.DB.QueryRowContext(ctx,
|
|
`INSERT INTO agents (workspace_id, model) VALUES ($1, $2) RETURNING id`, workspaceID, body.Model,
|
|
).Scan(&agentID)
|
|
if err != nil {
|
|
log.Printf("Assign agent error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to assign agent"})
|
|
return
|
|
}
|
|
|
|
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentAssigned), workspaceID, map[string]interface{}{
|
|
"agent_id": agentID,
|
|
"model": body.Model,
|
|
})
|
|
|
|
c.JSON(http.StatusCreated, gin.H{"agent_id": agentID, "model": body.Model})
|
|
}
|
|
|
|
// Replace handles PATCH /workspaces/:id/agent
|
|
func (h *AgentHandler) Replace(c *gin.Context) {
|
|
workspaceID := c.Param("id")
|
|
ctx := c.Request.Context()
|
|
|
|
var body struct {
|
|
Model string `json:"model" binding:"required"`
|
|
}
|
|
if err := c.ShouldBindJSON(&body); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
|
return
|
|
}
|
|
|
|
// Deactivate current agent
|
|
var oldModel string
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`UPDATE agents SET status = 'replaced', removed_at = now(), removal_reason = 'model_replaced'
|
|
WHERE workspace_id = $1 AND status = 'active' RETURNING model`,
|
|
workspaceID,
|
|
).Scan(&oldModel)
|
|
if err == sql.ErrNoRows {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "no active agent to replace"})
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Printf("Replace agent error (deactivate): %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to replace agent"})
|
|
return
|
|
}
|
|
|
|
// Insert new agent
|
|
var agentID string
|
|
err = db.DB.QueryRowContext(ctx,
|
|
`INSERT INTO agents (workspace_id, model) VALUES ($1, $2) RETURNING id`, workspaceID, body.Model,
|
|
).Scan(&agentID)
|
|
if err != nil {
|
|
log.Printf("Replace agent error (insert): %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to assign replacement"})
|
|
return
|
|
}
|
|
|
|
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentReplaced), workspaceID, map[string]interface{}{
|
|
"agent_id": agentID,
|
|
"model": body.Model,
|
|
"old_model": oldModel,
|
|
})
|
|
|
|
c.JSON(http.StatusOK, gin.H{"agent_id": agentID, "model": body.Model, "old_model": oldModel})
|
|
}
|
|
|
|
// Remove handles DELETE /workspaces/:id/agent
|
|
func (h *AgentHandler) Remove(c *gin.Context) {
|
|
workspaceID := c.Param("id")
|
|
ctx := c.Request.Context()
|
|
|
|
var agentID, model string
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`UPDATE agents SET status = 'removed', removed_at = now(), removal_reason = 'manual_removal'
|
|
WHERE workspace_id = $1 AND status = 'active' RETURNING id, model`,
|
|
workspaceID,
|
|
).Scan(&agentID, &model)
|
|
if err == sql.ErrNoRows {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "no active agent to remove"})
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Printf("Remove agent error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to remove agent"})
|
|
return
|
|
}
|
|
|
|
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentRemoved), workspaceID, map[string]interface{}{
|
|
"agent_id": agentID,
|
|
"model": model,
|
|
})
|
|
|
|
c.JSON(http.StatusOK, gin.H{"status": "removed", "agent_id": agentID})
|
|
}
|
|
|
|
// Move handles POST /workspaces/:id/agent/move
|
|
func (h *AgentHandler) Move(c *gin.Context) {
|
|
sourceID := c.Param("id")
|
|
ctx := c.Request.Context()
|
|
|
|
var body struct {
|
|
TargetWorkspaceID string `json:"target_workspace_id" binding:"required"`
|
|
}
|
|
if err := c.ShouldBindJSON(&body); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
|
return
|
|
}
|
|
|
|
// Check target workspace exists
|
|
var targetStatus string
|
|
err := db.DB.QueryRowContext(ctx,
|
|
`SELECT status FROM workspaces WHERE id = $1`, body.TargetWorkspaceID).Scan(&targetStatus)
|
|
if err == sql.ErrNoRows {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "target workspace not found"})
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Printf("Move agent target lookup error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "target lookup failed"})
|
|
return
|
|
}
|
|
|
|
// Check target doesn't already have an agent
|
|
var targetAgentCount int
|
|
if err := db.DB.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM agents WHERE workspace_id = $1 AND status = 'active'`, body.TargetWorkspaceID,
|
|
).Scan(&targetAgentCount); err != nil {
|
|
log.Printf("Move agent target check error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "target lookup failed"})
|
|
return
|
|
}
|
|
if targetAgentCount > 0 {
|
|
c.JSON(http.StatusConflict, gin.H{"error": "target workspace already has an active agent"})
|
|
return
|
|
}
|
|
|
|
// Move the agent: update workspace_id
|
|
var agentID, model string
|
|
err = db.DB.QueryRowContext(ctx,
|
|
`UPDATE agents SET workspace_id = $2
|
|
WHERE workspace_id = $1 AND status = 'active' RETURNING id, model`,
|
|
sourceID, body.TargetWorkspaceID,
|
|
).Scan(&agentID, &model)
|
|
if err == sql.ErrNoRows {
|
|
c.JSON(http.StatusNotFound, gin.H{"error": "no active agent in source workspace"})
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Printf("Move agent error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to move agent"})
|
|
return
|
|
}
|
|
|
|
// Broadcast on both workspaces
|
|
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentMoved), sourceID, map[string]interface{}{
|
|
"agent_id": agentID,
|
|
"model": model,
|
|
"target_workspace_id": body.TargetWorkspaceID,
|
|
})
|
|
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventAgentMoved), body.TargetWorkspaceID, map[string]interface{}{
|
|
"agent_id": agentID,
|
|
"model": model,
|
|
"source_workspace_id": sourceID,
|
|
})
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"agent_id": agentID,
|
|
"model": model,
|
|
"from_workspace": sourceID,
|
|
"to_workspace": body.TargetWorkspaceID,
|
|
})
|
|
}
|