Renames: - platform/ → workspace-server/ (Go module path stays as "platform" for external dep compat — will update after plugin module republish) - workspace-template/ → workspace/ Removed (moved to separate repos or deleted): - PLAN.md — internal roadmap (move to private project board) - HANDOFF.md, AGENTS.md — one-time internal session docs - .claude/ — gitignored entirely (local agent config) - infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy - org-templates/molecule-dev/ → standalone template repo - .mcp-eval/ → molecule-mcp-server repo - test-results/ — ephemeral, gitignored Security scrubbing: - Cloudflare account/zone/KV IDs → placeholders - Real EC2 IPs → <EC2_IP> in all docs - CF token prefix, Neon project ID, Fly app names → redacted - Langfuse dev credentials → parameterized - Personal runner username/machine name → generic Community files: - CONTRIBUTING.md — build, test, branch conventions - CODE_OF_CONDUCT.md — Contributor Covenant 2.1 All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml, README, CLAUDE.md updated for new directory names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
97 lines
3.3 KiB
Python
97 lines
3.3 KiB
Python
"""WebSocket subscriber for platform events.
|
|
|
|
Subscribes to the platform WebSocket with X-Workspace-ID header
|
|
so the workspace only receives events about reachable peers.
|
|
Triggers system prompt rebuild on relevant peer changes.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Events that should trigger a system prompt rebuild
|
|
REBUILD_EVENTS = {
|
|
"WORKSPACE_ONLINE",
|
|
"WORKSPACE_OFFLINE",
|
|
"WORKSPACE_EXPANDED",
|
|
"WORKSPACE_COLLAPSED",
|
|
"WORKSPACE_REMOVED",
|
|
"AGENT_CARD_UPDATED",
|
|
}
|
|
|
|
|
|
class PlatformEventSubscriber:
|
|
"""Subscribes to platform WebSocket for peer events."""
|
|
|
|
def __init__(
|
|
self,
|
|
platform_url: str,
|
|
workspace_id: str,
|
|
on_peer_change=None,
|
|
):
|
|
self.ws_url = platform_url.replace("http://", "ws://").replace("https://", "wss://") + "/ws"
|
|
self.workspace_id = workspace_id
|
|
self.on_peer_change = on_peer_change
|
|
self._running = False
|
|
self._reconnect_delay = 1.0
|
|
|
|
async def start(self):
|
|
"""Connect to platform WebSocket with exponential backoff reconnect."""
|
|
self._running = True
|
|
|
|
while self._running:
|
|
try:
|
|
await self._connect()
|
|
except Exception as e:
|
|
if not self._running:
|
|
break
|
|
logger.warning("WebSocket disconnected: %s. Reconnecting in %.0fs...", e, self._reconnect_delay)
|
|
await asyncio.sleep(self._reconnect_delay)
|
|
self._reconnect_delay = min(self._reconnect_delay * 2, 30.0)
|
|
|
|
async def _connect(self):
|
|
"""Establish WebSocket connection and process events."""
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
logger.warning("websockets package not installed, skipping event subscription")
|
|
self._running = False
|
|
return
|
|
|
|
# Fix D (Cycle 5): include bearer token in WebSocket upgrade so the
|
|
# server's new auth check can validate this agent connection.
|
|
# Graceful fallback for workspaces that have no token yet.
|
|
headers = {"X-Workspace-ID": self.workspace_id}
|
|
try:
|
|
from platform_auth import auth_headers as _auth_headers
|
|
headers.update(_auth_headers())
|
|
except Exception:
|
|
pass # No token available — connect unauthenticated (grandfathered)
|
|
logger.info("Connecting to platform WebSocket: %s", self.ws_url)
|
|
|
|
async with websockets.connect(self.ws_url, additional_headers=headers) as ws:
|
|
self._reconnect_delay = 1.0 # Reset on successful connect
|
|
logger.info("Platform WebSocket connected")
|
|
|
|
async for message in ws:
|
|
try:
|
|
event = json.loads(message)
|
|
event_type = event.get("event", "")
|
|
|
|
if event_type in REBUILD_EVENTS:
|
|
logger.info("Peer event: %s for workspace %s",
|
|
event_type, event.get("workspace_id", ""))
|
|
if self.on_peer_change:
|
|
await self.on_peer_change(event)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except Exception as e:
|
|
logger.warning("Error processing event: %s", e)
|
|
|
|
def stop(self):
|
|
self._running = False
|