Files
core-be 6597e2408f
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Chat / E2E Chat (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 3s
CI / Detect changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 17s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Chat / detect-changes (pull_request) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 3m47s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
Handlers Postgres Integration / detect-changes (pull_request) Successful in 15s
Harness Replays / detect-changes (pull_request) Successful in 7s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 6s
CI / Platform (Go) (pull_request) Successful in 4m47s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 46s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (pull_request) Failing after 1m23s
gate-check-v3 / gate-check (pull_request) Successful in 6s
qa-review / approved (pull_request) Failing after 5s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 4s
security-review / approved (pull_request) Successful in 5s
sop-tier-check / tier-check (pull_request) Successful in 5s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m14s
CI / Python Lint & Test (pull_request) Successful in 6m59s
CI / all-required (pull_request) Successful in 6m44s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m31s
audit-force-merge / audit (pull_request) Successful in 4s
fix(handlers): forward-port RFC#524 Layer 1 — convert bare-go sites to goAsync/globalGoAsync
RFC internal#524 Layer 1 deliverable 2: extend the canonical db.DB
race-fix primitive (69d9b4e3, already on main via the 0e13a801
staging-promote) to the ~25 sibling bare-`go` sites that 69d9b4e3 left
untouched. Without this, a SecretsHandler.Set's detached restartFunc, or
a2a_proxy's extractAndUpsertTokenUsage, or a delegation goroutine still
races a later test's setupTestDB t.Cleanup db.DB swap — exactly the
data-race class that 69d9b4e3 fixed for the WorkspaceHandler path.

What changed
============

- workspace.go: add package-level `globalAsync` sync.WaitGroup +
  `globalGoAsync(fn)` helper + `waitGlobalAsyncForTest()` drain. Same
  shape as h.goAsync but reachable from sibling handlers that don't
  carry a *WorkspaceHandler.
- handlers_test.go: drainTestAsync now drains globalAsync alongside the
  per-handler asyncWGs.
- Converted bare-`go` → tracked goroutine at 27 call sites:
    secrets.go (7)            — restartFunc fan-out + restartAllAffected
    templates.go (6)          — h.wh.RestartByID after file/template ops
    template_import.go (3)    — h.wh.RestartByID after Import/ReplaceFiles
    plugins_install.go (2)    — restartFunc after uninstall (both paths)
    plugins_install_pipeline.go (2) — restartFunc after install
    admin_plugin_drift.go (1) — restartFunc on drift apply
    registry.go (1)           — drainQueue on heartbeat capacity
    a2a_proxy.go (1)          — extractAndUpsertTokenUsage (db.DB INSERT)
    delegation.go (1)         — executeDelegation (DB-touching pipeline)
    mcp_tools.go (1)          — async MCP delegate (db.DB read+write)
    channels.go (1)           — async HandleInbound webhook delivery
    org_import.go (1)         — provisionWorkspaceAuto fan-out
- Annotated 6 connection/lifecycle-scoped goroutines with
  `goAsync-exempt` (RFC Layer 2.2 contract):
    a2a_proxy.go applyIdleTimeout — SSE idle-timer, no db.DB access
    socket.go (2)              — WebSocket Read/WritePump, conn-lifetime
    terminal.go (3)             — PTY <-> WS bridges, conn-lifetime
    eic_tunnel_pool.go (group)  — pool janitor + cleanup closures
- rfc524_layer1_async_drain_test.go: new regression test asserting
  drainTestAsync waits for BOTH per-handler asyncWG AND the package-level
  globalAsync — fails fast if either drain side is dropped.

Verification
============

- `go vet ./internal/handlers/`           : clean
- `go test -race -count=1  ./internal/handlers/`  : ok 28.6s
- `go test -race -count=10 ./internal/handlers/`  : ok 4m15s (RFC Layer 5
                                                    nightly target)
- `go test -race -shuffle=on -count=1 ...`         : ok 26.6s

The 4 `TestExecuteDelegation_*` tests were already un-Skipped on main
(via the staging→main backsync); Layer 1.3 of the RFC is therefore
already satisfied. Verified passing under -race in this run.

Layer 1 of RFC internal#524 is now complete on main. Layers 2-5 stay
as separate PRs per the RFC sequencing.

Refs
====
- RFC internal#524 (5-layer roadmap)
- molecule-core commit 69d9b4e3 (canonical fix on staging, promoted to main via 0e13a801)
- molecule-core#664, #774 (continue-on-error masks)
- task #240 (no staging→main auto-promotion — why the gap existed)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 03:40:48 +00:00

102 lines
3.2 KiB
Go

package handlers
import (
"log"
"net/http"
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// In production, validate against CORS_ORIGINS. In dev, allow all.
origins := os.Getenv("CORS_ORIGINS")
if origins == "" {
return true // dev mode — no restriction
}
origin := r.Header.Get("Origin")
for _, allowed := range strings.Split(origins, ",") {
if strings.EqualFold(strings.TrimSpace(allowed), origin) {
return true
}
}
return false
},
}
type SocketHandler struct {
hub *ws.Hub
}
func NewSocketHandler(hub *ws.Hub) *SocketHandler {
return &SocketHandler{hub: hub}
}
// HandleConnect handles WebSocket upgrade at GET /ws.
// Canvas clients connect without X-Workspace-ID — they receive all events.
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
//
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
// remain unauthenticated. Pre-token workspaces are grandfathered through.
func (h *SocketHandler) HandleConnect(c *gin.Context) {
workspaceID := c.GetHeader("X-Workspace-ID")
// Authenticate workspace agents (not canvas browser clients).
if workspaceID != "" {
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
if err != nil {
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
client := &ws.Client{
Conn: conn,
WorkspaceID: workspaceID,
Send: make(chan []byte, 256),
}
h.hub.Register <- client
metrics.TrackWSConnect()
// Wrap WritePump and ReadPump so the gauge is decremented exactly once
// when the client's write goroutine exits (WritePump owns conn lifetime).
// goAsync-exempt (RFC internal#524 Layer 2.2): WebSocket pumps live
// for the duration of the client connection (minutes-hours), not a
// single request. Wrapping them in globalGoAsync would block every
// test's t.Cleanup until every connected WS client disconnects. No
// db.DB access in either pump.
go func() {
ws.WritePump(client)
metrics.TrackWSDisconnect()
}()
go ws.ReadPump(client, h.hub)
}