Files
hongming 8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
chore(go-module): #1760 rename Go module to git.moleculesai.app/molecule-ai/molecule-core/workspace-server (#1816)
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
2026-05-24 23:37:18 +00:00

102 lines
3.2 KiB
Go

package handlers
import (
"log"
"net/http"
"os"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/metrics"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/ws"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// In production, validate against CORS_ORIGINS. In dev, allow all.
origins := os.Getenv("CORS_ORIGINS")
if origins == "" {
return true // dev mode — no restriction
}
origin := r.Header.Get("Origin")
for _, allowed := range strings.Split(origins, ",") {
if strings.EqualFold(strings.TrimSpace(allowed), origin) {
return true
}
}
return false
},
}
type SocketHandler struct {
hub *ws.Hub
}
func NewSocketHandler(hub *ws.Hub) *SocketHandler {
return &SocketHandler{hub: hub}
}
// HandleConnect handles WebSocket upgrade at GET /ws.
// Canvas clients connect without X-Workspace-ID — they receive all events.
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
//
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
// remain unauthenticated. Pre-token workspaces are grandfathered through.
func (h *SocketHandler) HandleConnect(c *gin.Context) {
workspaceID := c.GetHeader("X-Workspace-ID")
// Authenticate workspace agents (not canvas browser clients).
if workspaceID != "" {
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
if err != nil {
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
client := &ws.Client{
Conn: conn,
WorkspaceID: workspaceID,
Send: make(chan []byte, 256),
}
h.hub.Register <- client
metrics.TrackWSConnect()
// Wrap WritePump and ReadPump so the gauge is decremented exactly once
// when the client's write goroutine exits (WritePump owns conn lifetime).
// goAsync-exempt (RFC internal#524 Layer 2.2): WebSocket pumps live
// for the duration of the client connection (minutes-hours), not a
// single request. Wrapping them in globalGoAsync would block every
// test's t.Cleanup until every connected WS client disconnects. No
// db.DB access in either pump.
go func() {
ws.WritePump(client)
metrics.TrackWSDisconnect()
}()
go ws.ReadPump(client, h.hub)
}