8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
156 lines
3.6 KiB
Go
156 lines
3.6 KiB
Go
package ws
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"sync"
|
|
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// AccessChecker is a function that checks if two workspaces can communicate.
|
|
type AccessChecker func(callerID, targetID string) bool
|
|
|
|
type Client struct {
|
|
Conn *websocket.Conn
|
|
WorkspaceID string // empty for canvas clients
|
|
Send chan []byte
|
|
}
|
|
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
clients map[*Client]bool
|
|
Register chan *Client
|
|
Unregister chan *Client
|
|
canCommunicate AccessChecker
|
|
done chan struct{} // closed once on shutdown
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
func NewHub(canCommunicate AccessChecker) *Hub {
|
|
return &Hub{
|
|
clients: make(map[*Client]bool),
|
|
Register: make(chan *Client),
|
|
Unregister: make(chan *Client),
|
|
canCommunicate: canCommunicate,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Run() {
|
|
for {
|
|
select {
|
|
case client := <-h.Register:
|
|
h.mu.Lock()
|
|
h.clients[client] = true
|
|
h.mu.Unlock()
|
|
log.Printf("WebSocket client connected (workspace=%q)", client.WorkspaceID)
|
|
|
|
case client := <-h.Unregister:
|
|
h.mu.Lock()
|
|
if _, ok := h.clients[client]; ok {
|
|
delete(h.clients, client)
|
|
close(client.Send)
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
case <-h.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// safeSend sends data to a client channel without panicking on a closed channel.
|
|
// Returns false if the channel was closed (i.e. client just disconnected).
|
|
func safeSend(client *Client, data []byte) (sent bool) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// Channel was closed between RLock check and send — client disconnected
|
|
sent = false
|
|
}
|
|
}()
|
|
select {
|
|
case client.Send <- data:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Broadcast sends a WSMessage to all appropriate clients.
|
|
func (h *Hub) Broadcast(msg models.WSMessage) {
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
log.Printf("WS: marshal error: %v", err)
|
|
return
|
|
}
|
|
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
for client := range h.clients {
|
|
// Canvas clients get everything
|
|
if client.WorkspaceID == "" {
|
|
if !safeSend(client, data) {
|
|
log.Printf("WS: dropped message to canvas client (buffer full or closed)")
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Workspace clients: filter by CanCommunicate
|
|
if msg.WorkspaceID != "" && h.canCommunicate != nil && h.canCommunicate(client.WorkspaceID, msg.WorkspaceID) {
|
|
if !safeSend(client, data) {
|
|
log.Printf("WS: dropped message to workspace %s (buffer full or closed)", client.WorkspaceID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WritePump reads from client.Send and writes to the WebSocket.
|
|
func WritePump(client *Client) {
|
|
defer client.Conn.Close()
|
|
for msg := range client.Send {
|
|
if err := client.Conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close disconnects all WebSocket clients gracefully. Safe to call multiple times.
|
|
func (h *Hub) Close() {
|
|
h.closeOnce.Do(func() {
|
|
close(h.done) // signal Run() to exit
|
|
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
count := len(h.clients)
|
|
for client := range h.clients {
|
|
close(client.Send)
|
|
if client.Conn != nil {
|
|
client.Conn.Close()
|
|
}
|
|
delete(h.clients, client)
|
|
}
|
|
log.Printf("WebSocket hub closed (%d clients disconnected)", count)
|
|
})
|
|
}
|
|
|
|
// ReadPump reads from WebSocket (keeps connection alive, discards messages).
|
|
func ReadPump(client *Client, hub *Hub) {
|
|
defer func() {
|
|
// Guard against sending to Unregister after hub is closed
|
|
select {
|
|
case hub.Unregister <- client:
|
|
case <-hub.done:
|
|
}
|
|
client.Conn.Close()
|
|
}()
|
|
for {
|
|
_, _, err := client.Conn.ReadMessage()
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|