6597e2408f
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Chat / E2E Chat (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 3s
CI / Detect changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 17s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Chat / detect-changes (pull_request) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 3m47s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
Handlers Postgres Integration / detect-changes (pull_request) Successful in 15s
Harness Replays / detect-changes (pull_request) Successful in 7s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 6s
CI / Platform (Go) (pull_request) Successful in 4m47s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 46s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (pull_request) Failing after 1m23s
gate-check-v3 / gate-check (pull_request) Successful in 6s
qa-review / approved (pull_request) Failing after 5s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 4s
security-review / approved (pull_request) Successful in 5s
sop-tier-check / tier-check (pull_request) Successful in 5s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m14s
CI / Python Lint & Test (pull_request) Successful in 6m59s
CI / all-required (pull_request) Successful in 6m44s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m31s
audit-force-merge / audit (pull_request) Successful in 4s
RFC internal#524 Layer 1 deliverable 2: extend the canonical db.DB race-fix primitive (69d9b4e3, already on main via the0e13a801staging-promote) to the ~25 sibling bare-`go` sites that69d9b4e3left untouched. Without this, a SecretsHandler.Set's detached restartFunc, or a2a_proxy's extractAndUpsertTokenUsage, or a delegation goroutine still races a later test's setupTestDB t.Cleanup db.DB swap — exactly the data-race class that69d9b4e3fixed for the WorkspaceHandler path. What changed ============ - workspace.go: add package-level `globalAsync` sync.WaitGroup + `globalGoAsync(fn)` helper + `waitGlobalAsyncForTest()` drain. Same shape as h.goAsync but reachable from sibling handlers that don't carry a *WorkspaceHandler. - handlers_test.go: drainTestAsync now drains globalAsync alongside the per-handler asyncWGs. - Converted bare-`go` → tracked goroutine at 27 call sites: secrets.go (7) — restartFunc fan-out + restartAllAffected templates.go (6) — h.wh.RestartByID after file/template ops template_import.go (3) — h.wh.RestartByID after Import/ReplaceFiles plugins_install.go (2) — restartFunc after uninstall (both paths) plugins_install_pipeline.go (2) — restartFunc after install admin_plugin_drift.go (1) — restartFunc on drift apply registry.go (1) — drainQueue on heartbeat capacity a2a_proxy.go (1) — extractAndUpsertTokenUsage (db.DB INSERT) delegation.go (1) — executeDelegation (DB-touching pipeline) mcp_tools.go (1) — async MCP delegate (db.DB read+write) channels.go (1) — async HandleInbound webhook delivery org_import.go (1) — provisionWorkspaceAuto fan-out - Annotated 6 connection/lifecycle-scoped goroutines with `goAsync-exempt` (RFC Layer 2.2 contract): a2a_proxy.go applyIdleTimeout — SSE idle-timer, no db.DB access socket.go (2) — WebSocket Read/WritePump, conn-lifetime terminal.go (3) — PTY <-> WS bridges, conn-lifetime eic_tunnel_pool.go (group) — pool janitor + cleanup closures - rfc524_layer1_async_drain_test.go: new regression test asserting drainTestAsync waits for BOTH per-handler asyncWG AND the package-level globalAsync — fails fast if either drain side is dropped. Verification ============ - `go vet ./internal/handlers/` : clean - `go test -race -count=1 ./internal/handlers/` : ok 28.6s - `go test -race -count=10 ./internal/handlers/` : ok 4m15s (RFC Layer 5 nightly target) - `go test -race -shuffle=on -count=1 ...` : ok 26.6s The 4 `TestExecuteDelegation_*` tests were already un-Skipped on main (via the staging→main backsync); Layer 1.3 of the RFC is therefore already satisfied. Verified passing under -race in this run. Layer 1 of RFC internal#524 is now complete on main. Layers 2-5 stay as separate PRs per the RFC sequencing. Refs ==== - RFC internal#524 (5-layer roadmap) - molecule-core commit69d9b4e3(canonical fix on staging, promoted to main via0e13a801) - molecule-core#664, #774 (continue-on-error masks) - task #240 (no staging→main auto-promotion — why the gap existed) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
102 lines
3.2 KiB
Go
102 lines
3.2 KiB
Go
package handlers
|
|
|
|
import (
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// In production, validate against CORS_ORIGINS. In dev, allow all.
|
|
origins := os.Getenv("CORS_ORIGINS")
|
|
if origins == "" {
|
|
return true // dev mode — no restriction
|
|
}
|
|
origin := r.Header.Get("Origin")
|
|
for _, allowed := range strings.Split(origins, ",") {
|
|
if strings.EqualFold(strings.TrimSpace(allowed), origin) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
},
|
|
}
|
|
|
|
type SocketHandler struct {
|
|
hub *ws.Hub
|
|
}
|
|
|
|
func NewSocketHandler(hub *ws.Hub) *SocketHandler {
|
|
return &SocketHandler{hub: hub}
|
|
}
|
|
|
|
// HandleConnect handles WebSocket upgrade at GET /ws.
|
|
// Canvas clients connect without X-Workspace-ID — they receive all events.
|
|
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
|
|
//
|
|
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
|
|
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
|
|
// remain unauthenticated. Pre-token workspaces are grandfathered through.
|
|
func (h *SocketHandler) HandleConnect(c *gin.Context) {
|
|
workspaceID := c.GetHeader("X-Workspace-ID")
|
|
|
|
// Authenticate workspace agents (not canvas browser clients).
|
|
if workspaceID != "" {
|
|
ctx := c.Request.Context()
|
|
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
|
|
if err != nil {
|
|
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
|
|
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
|
|
return
|
|
}
|
|
if hasLive {
|
|
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
|
if tok == "" {
|
|
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
|
|
return
|
|
}
|
|
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
|
|
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
log.Printf("WebSocket upgrade error: %v", err)
|
|
return
|
|
}
|
|
|
|
client := &ws.Client{
|
|
Conn: conn,
|
|
WorkspaceID: workspaceID,
|
|
Send: make(chan []byte, 256),
|
|
}
|
|
|
|
h.hub.Register <- client
|
|
metrics.TrackWSConnect()
|
|
|
|
// Wrap WritePump and ReadPump so the gauge is decremented exactly once
|
|
// when the client's write goroutine exits (WritePump owns conn lifetime).
|
|
// goAsync-exempt (RFC internal#524 Layer 2.2): WebSocket pumps live
|
|
// for the duration of the client connection (minutes-hours), not a
|
|
// single request. Wrapping them in globalGoAsync would block every
|
|
// test's t.Cleanup until every connected WS client disconnects. No
|
|
// db.DB access in either pump.
|
|
go func() {
|
|
ws.WritePump(client)
|
|
metrics.TrackWSDisconnect()
|
|
}()
|
|
go ws.ReadPump(client, h.hub)
|
|
}
|