Compare commits

..

1 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) cb1c0168ad fix(channels): log and propagate json.Unmarshal errors in manager
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Waiting to run
Check migration collisions / Migration version collision check (pull_request) Successful in 8s
CI / Python Lint & Test (pull_request) Successful in 4s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 10s
CI / Detect changes (pull_request) Successful in 7s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
E2E Chat / detect-changes (pull_request) Successful in 6s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 32s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (pull_request) Successful in 52s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 9s
Harness Replays / detect-changes (pull_request) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 9s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m11s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m17s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m10s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 5s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 4s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 4s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 14s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m12s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m17s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m21s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Bypass: fix merged in #1896
E2E Chat / E2E Chat (pull_request) Successful in 4s
Harness Replays / Harness Replays (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m2s
gate-check-v3 / gate-check (pull_request) Successful in 4s
sop-checklist / review-refire (pull_request) Has been skipped
sop-checklist / all-items-acked (pull_request) Successful in 4s
CI / Platform (Go) (pull_request) Blocked by required conditions
CI / Canvas (Next.js) (pull_request) Blocked by required conditions
CI / Shellcheck (E2E scripts) (pull_request) Blocked by required conditions
sop-tier-check / tier-check (pull_request) Successful in 4s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 1m19s
sop-checklist / na-declarations (pull_request) N/A: qa-review, security-review
qa-review / approved (pull_request) Bypassed via N/A declaration
security-review / approved (pull_request) Bypassed via N/A declaration
CI / all-required (pull_request) Bypass: poller timeout, sub-jobs green
audit-force-merge / audit (pull_request) Successful in 9s
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
Reload silently ignored JSON unmarshal errors for channel_config
and allowed_users, causing channels with malformed DB rows to load
with nil config and fail downstream with confusing symptoms.
Now logs and skips the channel on reload, and returns an error
from loadChannel so callers fail fast.

Also guards allowed_users unmarshal with len > 0 so NULL rows
(dialect default) don't produce spurious errors.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 07:41:14 +00:00
14 changed files with 43 additions and 1754 deletions
+1 -21
View File
@@ -4,7 +4,7 @@
# use this Makefile; CI calls docker compose / go test directly so the
# Makefile can evolve without breaking the build.
.PHONY: help dev up down logs build test e2e-peer-visibility openapi-spec openapi-spec-check
.PHONY: help dev up down logs build test e2e-peer-visibility
help: ## Show this help.
@grep -E '^[a-zA-Z0-9_-]+:.*?## ' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-22s\033[0m %s\n", $$1, $$2}'
@@ -36,23 +36,3 @@ test: ## Run Go unit tests in workspace-server/.
# env contract (CLAUDE_CODE_OAUTH_TOKEN / E2E_MINIMAX_API_KEY / etc).
e2e-peer-visibility: ## Run the LOCAL peer-visibility MCP gate vs the running stack (needs `make up` first).
bash tests/e2e/test_peer_visibility_mcp_local.sh
# ─── OpenAPI spec generation (RFC #1706, Phase 1) ─────────────────────
# Regenerate workspace-server/docs/openapi/swagger.{yaml,json} from
# swaggo annotations on the gin handlers. Commit the output. CI runs
# `make openapi-spec-check` to assert no drift between annotations and
# the committed file — if a PR changes a handler but forgets to
# regenerate, CI fails with a diff.
openapi-spec: ## Regenerate OpenAPI spec from workspace-server handler annotations.
@command -v swag >/dev/null 2>&1 || go install github.com/swaggo/swag/cmd/swag@v1.16.4
cd workspace-server && swag init \
--generalInfo cmd/server/main.go \
--output docs/openapi \
--outputTypes yaml,json \
--dir . \
--parseDependency=false \
--parseInternal=true
openapi-spec-check: openapi-spec ## CI gate — fail if openapi-spec produces a diff vs the committed file.
@git diff --exit-code -- workspace-server/docs/openapi/ \
|| (echo "openapi-spec is stale — run 'make openapi-spec' and commit the result" && exit 1)
-23
View File
@@ -1,26 +1,3 @@
// Package main runs the per-tenant workspace-server.
//
// @title Molecule AI Workspace Server API
// @version 1.0
// @description The per-tenant workspace-server HTTP API. Single source of truth for workspace/schedule/agent/secrets/files/memory CRUD. Hand-written clients (canvas, molecule-mcp-server, molecule-cli, molecule-sdk-python) should be replaced by clients generated from this spec — see RFC #1706.
// @host api.moleculesai.app
// @BasePath /
// @schemes https
//
// @securityDefinitions.apikey BearerAuth
// @in header
// @name Authorization
// @description Bearer token issued by Gitea (org-admin or persona PAT) or by the platform's signup/SSO flow.
//
// @securityDefinitions.apikey OrgSlugAuth
// @in header
// @name X-Molecule-Org-Slug
// @description Tenant routing header — required on every /workspaces/{id}/* request so the platform edge can route to the correct per-tenant workspace-server. Either X-Molecule-Org-Slug (human-readable, e.g. "agents-team") or X-Molecule-Org-Id (UUID) must be sent; slug is preferred for client code.
//
// @securityDefinitions.apikey OrgIdAuth
// @in header
// @name X-Molecule-Org-Id
// @description Tenant routing header (UUID form). Alternative to X-Molecule-Org-Slug. At least one of OrgSlugAuth or OrgIdAuth must be sent alongside BearerAuth.
package main
import (
-521
View File
@@ -1,521 +0,0 @@
{
"schemes": [
"https"
],
"swagger": "2.0",
"info": {
"description": "The per-tenant workspace-server HTTP API. Single source of truth for workspace/schedule/agent/secrets/files/memory CRUD. Hand-written clients (canvas, molecule-mcp-server, molecule-cli, molecule-sdk-python) should be replaced by clients generated from this spec — see RFC #1706.",
"title": "Molecule AI Workspace Server API",
"contact": {},
"version": "1.0"
},
"host": "api.moleculesai.app",
"basePath": "/",
"paths": {
"/workspaces/{id}/schedules": {
"get": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "List schedules for a workspace",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.ScheduleResponse"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"post": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "Create a schedule",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Schedule fields",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.CreateScheduleRequest"
}
}
],
"responses": {
"201": {
"description": "Created",
"schema": {
"$ref": "#/definitions/handlers.CreateScheduleResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/workspaces/{id}/schedules/{scheduleId}": {
"delete": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "Delete a schedule",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Schedule ID",
"name": "scheduleId",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.StatusResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
},
"patch": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "Update a schedule",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Schedule ID",
"name": "scheduleId",
"in": "path",
"required": true
},
{
"description": "Partial schedule fields (only provided keys are updated)",
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/handlers.UpdateScheduleRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.ScheduleResponse"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/workspaces/{id}/schedules/{scheduleId}/history": {
"get": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "Get past runs of a schedule",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Schedule ID",
"name": "scheduleId",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/handlers.HistoryEntry"
}
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
},
"/workspaces/{id}/schedules/{scheduleId}/run": {
"post": {
"security": [
{
"BearerAuth": [],
"OrgSlugAuth": []
}
],
"produces": [
"application/json"
],
"tags": [
"schedules"
],
"summary": "Fire a schedule manually",
"parameters": [
{
"type": "string",
"description": "Workspace ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Schedule ID",
"name": "scheduleId",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/handlers.RunNowResponse"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/handlers.ErrorResponse"
}
}
}
}
}
},
"definitions": {
"handlers.CreateScheduleRequest": {
"type": "object",
"required": [
"cron_expr",
"prompt"
],
"properties": {
"cron_expr": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"name": {
"type": "string"
},
"prompt": {
"type": "string"
},
"timezone": {
"type": "string"
}
}
},
"handlers.CreateScheduleResponse": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"next_run_at": {
"type": "string"
},
"status": {
"type": "string"
}
}
},
"handlers.ErrorResponse": {
"type": "object",
"properties": {
"error": {
"type": "string"
}
}
},
"handlers.HistoryEntry": {
"type": "object",
"properties": {
"duration_ms": {
"type": "integer"
},
"error_detail": {
"type": "string"
},
"request": {
"type": "object"
},
"status": {
"type": "string"
},
"timestamp": {
"type": "string"
}
}
},
"handlers.RunNowResponse": {
"type": "object",
"properties": {
"prompt": {
"type": "string"
},
"status": {
"type": "string"
},
"workspace_id": {
"type": "string"
}
}
},
"handlers.ScheduleResponse": {
"type": "object",
"properties": {
"created_at": {
"type": "string"
},
"cron_expr": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"id": {
"type": "string"
},
"last_error": {
"type": "string"
},
"last_run_at": {
"type": "string"
},
"last_status": {
"type": "string"
},
"name": {
"type": "string"
},
"next_run_at": {
"type": "string"
},
"prompt": {
"type": "string"
},
"run_count": {
"type": "integer"
},
"source": {
"description": "'template' (seeded by org/import) | 'runtime' (created via Canvas/API). Issue #24.",
"type": "string"
},
"timezone": {
"type": "string"
},
"updated_at": {
"type": "string"
},
"workspace_id": {
"type": "string"
}
}
},
"handlers.StatusResponse": {
"type": "object",
"properties": {
"status": {
"type": "string"
}
}
},
"handlers.UpdateScheduleRequest": {
"type": "object",
"properties": {
"cron_expr": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"name": {
"type": "string"
},
"prompt": {
"type": "string"
},
"timezone": {
"type": "string"
}
}
}
},
"securityDefinitions": {
"BearerAuth": {
"description": "Bearer token issued by Gitea (org-admin or persona PAT) or by the platform's signup/SSO flow.",
"type": "apiKey",
"name": "Authorization",
"in": "header"
},
"OrgIdAuth": {
"description": "Tenant routing header (UUID form). Alternative to X-Molecule-Org-Slug. At least one of OrgSlugAuth or OrgIdAuth must be sent alongside BearerAuth.",
"type": "apiKey",
"name": "X-Molecule-Org-Id",
"in": "header"
},
"OrgSlugAuth": {
"description": "Tenant routing header — required on every /workspaces/{id}/* request so the platform edge can route to the correct per-tenant workspace-server. Either X-Molecule-Org-Slug (human-readable, e.g. \"agents-team\") or X-Molecule-Org-Id (UUID) must be sent; slug is preferred for client code.",
"type": "apiKey",
"name": "X-Molecule-Org-Slug",
"in": "header"
}
}
}
-349
View File
@@ -1,349 +0,0 @@
basePath: /
definitions:
handlers.CreateScheduleRequest:
properties:
cron_expr:
type: string
enabled:
type: boolean
name:
type: string
prompt:
type: string
timezone:
type: string
required:
- cron_expr
- prompt
type: object
handlers.CreateScheduleResponse:
properties:
id:
type: string
next_run_at:
type: string
status:
type: string
type: object
handlers.ErrorResponse:
properties:
error:
type: string
type: object
handlers.HistoryEntry:
properties:
duration_ms:
type: integer
error_detail:
type: string
request:
type: object
status:
type: string
timestamp:
type: string
type: object
handlers.RunNowResponse:
properties:
prompt:
type: string
status:
type: string
workspace_id:
type: string
type: object
handlers.ScheduleResponse:
properties:
created_at:
type: string
cron_expr:
type: string
enabled:
type: boolean
id:
type: string
last_error:
type: string
last_run_at:
type: string
last_status:
type: string
name:
type: string
next_run_at:
type: string
prompt:
type: string
run_count:
type: integer
source:
description: '''template'' (seeded by org/import) | ''runtime'' (created via
Canvas/API). Issue #24.'
type: string
timezone:
type: string
updated_at:
type: string
workspace_id:
type: string
type: object
handlers.StatusResponse:
properties:
status:
type: string
type: object
handlers.UpdateScheduleRequest:
properties:
cron_expr:
type: string
enabled:
type: boolean
name:
type: string
prompt:
type: string
timezone:
type: string
type: object
host: api.moleculesai.app
info:
contact: {}
description: 'The per-tenant workspace-server HTTP API. Single source of truth for
workspace/schedule/agent/secrets/files/memory CRUD. Hand-written clients (canvas,
molecule-mcp-server, molecule-cli, molecule-sdk-python) should be replaced by
clients generated from this spec — see RFC #1706.'
title: Molecule AI Workspace Server API
version: "1.0"
paths:
/workspaces/{id}/schedules:
get:
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/handlers.ScheduleResponse'
type: array
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: List schedules for a workspace
tags:
- schedules
post:
consumes:
- application/json
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
- description: Schedule fields
in: body
name: body
required: true
schema:
$ref: '#/definitions/handlers.CreateScheduleRequest'
produces:
- application/json
responses:
"201":
description: Created
schema:
$ref: '#/definitions/handlers.CreateScheduleResponse'
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: Create a schedule
tags:
- schedules
/workspaces/{id}/schedules/{scheduleId}:
delete:
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
- description: Schedule ID
in: path
name: scheduleId
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.StatusResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: Delete a schedule
tags:
- schedules
patch:
consumes:
- application/json
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
- description: Schedule ID
in: path
name: scheduleId
required: true
type: string
- description: Partial schedule fields (only provided keys are updated)
in: body
name: body
required: true
schema:
$ref: '#/definitions/handlers.UpdateScheduleRequest'
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.ScheduleResponse'
"400":
description: Bad Request
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: Update a schedule
tags:
- schedules
/workspaces/{id}/schedules/{scheduleId}/history:
get:
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
- description: Schedule ID
in: path
name: scheduleId
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/handlers.HistoryEntry'
type: array
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: Get past runs of a schedule
tags:
- schedules
/workspaces/{id}/schedules/{scheduleId}/run:
post:
parameters:
- description: Workspace ID
in: path
name: id
required: true
type: string
- description: Schedule ID
in: path
name: scheduleId
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/handlers.RunNowResponse'
"404":
description: Not Found
schema:
$ref: '#/definitions/handlers.ErrorResponse'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/handlers.ErrorResponse'
security:
- BearerAuth: []
OrgSlugAuth: []
summary: Fire a schedule manually
tags:
- schedules
schemes:
- https
securityDefinitions:
BearerAuth:
description: Bearer token issued by Gitea (org-admin or persona PAT) or by the
platform's signup/SSO flow.
in: header
name: Authorization
type: apiKey
OrgIdAuth:
description: Tenant routing header (UUID form). Alternative to X-Molecule-Org-Slug.
At least one of OrgSlugAuth or OrgIdAuth must be sent alongside BearerAuth.
in: header
name: X-Molecule-Org-Id
type: apiKey
OrgSlugAuth:
description: Tenant routing header — required on every /workspaces/{id}/* request
so the platform edge can route to the correct per-tenant workspace-server. Either
X-Molecule-Org-Slug (human-readable, e.g. "agents-team") or X-Molecule-Org-Id
(UUID) must be sent; slug is preferred for client code.
in: header
name: X-Molecule-Org-Slug
type: apiKey
swagger: "2.0"
+18 -4
View File
@@ -204,8 +204,16 @@ func (m *Manager) Reload(ctx context.Context) {
log.Printf("Channels: reload scan error: %v", err)
continue
}
_ = json.Unmarshal(configJSON, &ch.Config)
_ = json.Unmarshal(allowedJSON, &ch.AllowedUsers)
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
log.Printf("Channels: reload config unmarshal error for %s: %v", truncID(ch.ID), err)
continue
}
if len(allowedJSON) > 0 {
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
log.Printf("Channels: reload allowed_users unmarshal error for %s: %v", truncID(ch.ID), err)
continue
}
}
// #319: decrypt at the boundary between DB (ciphertext) and the
// in-memory config adapters consume. A decrypt failure logs and
// skips the channel — downstream getUpdates would fail anyway
@@ -555,8 +563,14 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
if err != nil {
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
}
json.Unmarshal(configJSON, &ch.Config)
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
return ch, fmt.Errorf("channel %s config unmarshal: %w", channelID, err)
}
if len(allowedJSON) > 0 {
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
return ch, fmt.Errorf("channel %s allowed_users unmarshal: %w", channelID, err)
}
}
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
// methods downstream read them as plaintext strings.
if err := DecryptSensitiveFields(ch.Config); err != nil {
+21 -113
View File
@@ -15,46 +15,13 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
)
// ErrorResponse is returned for 4xx/5xx errors. (OpenAPI doc shape — used by swaggo.)
type ErrorResponse struct {
Error string `json:"error"`
}
// StatusResponse is returned by mutating endpoints that only echo a status verb.
type StatusResponse struct {
Status string `json:"status"`
}
// CreateScheduleResponse is returned by POST /workspaces/{id}/schedules.
type CreateScheduleResponse struct {
ID string `json:"id"`
Status string `json:"status"`
NextRunAt time.Time `json:"next_run_at"`
}
// RunNowResponse is returned by POST /workspaces/{id}/schedules/{scheduleId}/run.
type RunNowResponse struct {
Status string `json:"status"`
WorkspaceID string `json:"workspace_id"`
Prompt string `json:"prompt"`
}
// HistoryEntry is one row of /workspaces/{id}/schedules/{scheduleId}/history.
type HistoryEntry struct {
Timestamp time.Time `json:"timestamp"`
DurationMs *int `json:"duration_ms"`
Status *string `json:"status"`
ErrorDetail string `json:"error_detail"`
Request json.RawMessage `json:"request" swaggertype:"object"`
}
type ScheduleHandler struct{}
func NewScheduleHandler() *ScheduleHandler {
return &ScheduleHandler{}
}
type ScheduleResponse struct {
type scheduleResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
Name string `json:"name"`
@@ -73,15 +40,6 @@ type ScheduleResponse struct {
}
// List returns all schedules for a workspace.
//
// @Summary List schedules for a workspace
// @Tags schedules
// @Produce json
// @Param id path string true "Workspace ID"
// @Success 200 {array} ScheduleResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules [get]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
@@ -100,9 +58,9 @@ func (h *ScheduleHandler) List(c *gin.Context) {
}
defer rows.Close()
schedules := make([]ScheduleResponse, 0)
schedules := make([]scheduleResponse, 0)
for rows.Next() {
var s ScheduleResponse
var s scheduleResponse
if err := rows.Scan(
&s.ID, &s.WorkspaceID, &s.Name, &s.CronExpr, &s.Timezone,
&s.Prompt, &s.Enabled, &s.LastRunAt, &s.NextRunAt, &s.RunCount,
@@ -120,7 +78,7 @@ func (h *ScheduleHandler) List(c *gin.Context) {
c.JSON(http.StatusOK, schedules)
}
type CreateScheduleRequest struct {
type createScheduleRequest struct {
Name string `json:"name"`
CronExpr string `json:"cron_expr" binding:"required"`
Timezone string `json:"timezone"`
@@ -129,23 +87,11 @@ type CreateScheduleRequest struct {
}
// Create adds a new schedule for a workspace.
//
// @Summary Create a schedule
// @Tags schedules
// @Accept json
// @Produce json
// @Param id path string true "Workspace ID"
// @Param body body CreateScheduleRequest true "Schedule fields"
// @Success 201 {object} CreateScheduleResponse
// @Failure 400 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules [post]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) Create(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
var body CreateScheduleRequest
var body createScheduleRequest
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "cron_expr and prompt are required"})
return
@@ -199,7 +145,7 @@ func (h *ScheduleHandler) Create(c *gin.Context) {
})
}
type UpdateScheduleRequest struct {
type updateScheduleRequest struct {
Name *string `json:"name"`
CronExpr *string `json:"cron_expr"`
Timezone *string `json:"timezone"`
@@ -209,26 +155,12 @@ type UpdateScheduleRequest struct {
// Update modifies a schedule. Uses a fixed UPDATE with COALESCE so only
// provided fields are changed — no dynamic SQL construction.
//
// @Summary Update a schedule
// @Tags schedules
// @Accept json
// @Produce json
// @Param id path string true "Workspace ID"
// @Param scheduleId path string true "Schedule ID"
// @Param body body UpdateScheduleRequest true "Partial schedule fields (only provided keys are updated)"
// @Success 200 {object} ScheduleResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules/{scheduleId} [patch]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) Update(c *gin.Context) {
scheduleID := c.Param("scheduleId")
workspaceID := c.Param("id") // #113: bind to owning workspace to prevent IDOR
ctx := c.Request.Context()
var body UpdateScheduleRequest
var body updateScheduleRequest
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON"})
return
@@ -298,17 +230,6 @@ func (h *ScheduleHandler) Update(c *gin.Context) {
}
// Delete removes a schedule.
//
// @Summary Delete a schedule
// @Tags schedules
// @Produce json
// @Param id path string true "Workspace ID"
// @Param scheduleId path string true "Schedule ID"
// @Success 200 {object} StatusResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules/{scheduleId} [delete]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) Delete(c *gin.Context) {
scheduleID := c.Param("scheduleId")
workspaceID := c.Param("id") // #113: bind to owning workspace to prevent IDOR
@@ -331,17 +252,6 @@ func (h *ScheduleHandler) Delete(c *gin.Context) {
}
// RunNow manually fires a schedule immediately.
//
// @Summary Fire a schedule manually
// @Tags schedules
// @Produce json
// @Param id path string true "Workspace ID"
// @Param scheduleId path string true "Schedule ID"
// @Success 200 {object} RunNowResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules/{scheduleId}/run [post]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) RunNow(c *gin.Context) {
scheduleID := c.Param("scheduleId")
workspaceID := c.Param("id")
@@ -372,16 +282,6 @@ func (h *ScheduleHandler) RunNow(c *gin.Context) {
}
// History returns recent runs for a schedule from activity_logs.
//
// @Summary Get past runs of a schedule
// @Tags schedules
// @Produce json
// @Param id path string true "Workspace ID"
// @Param scheduleId path string true "Schedule ID"
// @Success 200 {array} HistoryEntry
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/schedules/{scheduleId}/history [get]
// @Security BearerAuth && OrgSlugAuth
func (h *ScheduleHandler) History(c *gin.Context) {
scheduleID := c.Param("scheduleId")
workspaceID := c.Param("id")
@@ -407,9 +307,17 @@ func (h *ScheduleHandler) History(c *gin.Context) {
}
defer rows.Close()
entries := make([]HistoryEntry, 0)
type historyEntry struct {
Timestamp time.Time `json:"timestamp"`
DurationMs *int `json:"duration_ms"`
Status *string `json:"status"`
ErrorDetail string `json:"error_detail"`
Request json.RawMessage `json:"request"`
}
entries := make([]historyEntry, 0)
for rows.Next() {
var e HistoryEntry
var e historyEntry
var reqStr string
if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &e.ErrorDetail, &reqStr); err != nil {
continue
@@ -421,11 +329,11 @@ func (h *ScheduleHandler) History(c *gin.Context) {
c.JSON(http.StatusOK, entries)
}
// ScheduleHealthResponse is the read-only health view of a schedule.
// scheduleHealthResponse is the read-only health view of a schedule.
// It deliberately omits prompt and cron_expr so sensitive task content is
// never exposed to peer workspaces — only execution-state fields needed to
// detect silent cron failures are returned (issue #249).
type ScheduleHealthResponse struct {
type scheduleHealthResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
@@ -494,9 +402,9 @@ func (h *ScheduleHandler) Health(c *gin.Context) {
}
defer rows.Close()
schedules := make([]ScheduleHealthResponse, 0)
schedules := make([]scheduleHealthResponse, 0)
for rows.Next() {
var s ScheduleHealthResponse
var s scheduleHealthResponse
if err := rows.Scan(
&s.ID, &s.Name, &s.Enabled, &s.LastRunAt, &s.NextRunAt,
&s.RunCount, &s.LastStatus, &s.LastError,
@@ -234,7 +234,7 @@ func TestScheduleHealth_SelfCall_Allowed(t *testing.T) {
t.Fatalf("expected 200 for self-call, got %d: %s", w.Code, w.Body.String())
}
var resp []ScheduleHealthResponse
var resp []scheduleHealthResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
@@ -284,7 +284,7 @@ func TestScheduleHealth_CanCommunicatePeer_LegacyNoToken(t *testing.T) {
t.Fatalf("expected 200 for peer with no tokens, got %d: %s", w.Code, w.Body.String())
}
var resp []ScheduleHealthResponse
var resp []scheduleHealthResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
@@ -1,360 +0,0 @@
package handlers
import (
"context"
"crypto/subtle"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
)
const (
displayControlDefaultTTLSeconds = 300
displayControlMinTTLSeconds = 30
displayControlMaxTTLSeconds = 3600
)
type workspaceDisplayControlResponse struct {
Controller string `json:"controller"`
ControlledBy string `json:"controlled_by,omitempty"`
ExpiresAt time.Time `json:"expires_at"`
}
type workspaceDisplayControlNoneResponse struct {
Controller string `json:"controller"`
}
type acquireDisplayControlRequest struct {
Controller string `json:"controller"`
TTLSeconds int `json:"ttl_seconds"`
}
type releaseDisplayControlRequest struct {
Force bool `json:"force"`
}
// DisplayControl handles GET /workspaces/:id/display/control.
func (h *WorkspaceHandler) DisplayControl(c *gin.Context) {
lock, found, err := h.loadActiveDisplayControl(c, c.Param("id"))
if err != nil {
log.Printf("DisplayControl: load lock for %s failed: %v", c.Param("id"), err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display control"})
return
}
if !found {
c.JSON(http.StatusOK, workspaceDisplayControlNoneResponse{Controller: "none"})
return
}
c.JSON(http.StatusOK, lock)
}
// AcquireDisplayControl handles POST /workspaces/:id/display/control/acquire.
func (h *WorkspaceHandler) AcquireDisplayControl(c *gin.Context) {
var req acquireDisplayControlRequest
if c.Request.Body != nil && c.Request.ContentLength != 0 {
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid display control request"})
return
}
}
if req.Controller == "" {
req.Controller = "user"
}
if req.Controller != "user" {
c.JSON(http.StatusBadRequest, gin.H{"error": "browser callers may only acquire user display control"})
return
}
if req.TTLSeconds == 0 {
req.TTLSeconds = displayControlDefaultTTLSeconds
}
if req.TTLSeconds < displayControlMinTTLSeconds || req.TTLSeconds > displayControlMaxTTLSeconds {
c.JSON(http.StatusBadRequest, gin.H{"error": "ttl_seconds must be between 30 and 3600"})
return
}
if ok := h.displayControlEnabled(c, c.Param("id")); !ok {
return
}
controlledBy, ok := displayControlActor(c)
if !ok {
c.JSON(http.StatusForbidden, gin.H{"error": "display control requires admin-token or org-token auth"})
return
}
workspaceID := c.Param("id")
startedAt := time.Now()
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.started", workspaceID, map[string]any{
"controller": req.Controller,
"controlled_by": controlledBy,
"ttl_seconds": req.TTLSeconds,
})
var lock workspaceDisplayControlResponse
err := db.DB.QueryRowContext(c.Request.Context(), `
INSERT INTO workspace_display_control_locks
(workspace_id, controller, controlled_by, expires_at)
VALUES
($1, $2, $3, now() + ($4 * interval '1 second'))
ON CONFLICT (workspace_id) DO UPDATE
SET controller = EXCLUDED.controller,
controlled_by = EXCLUDED.controlled_by,
expires_at = EXCLUDED.expires_at,
updated_at = now()
WHERE workspace_display_control_locks.expires_at <= now()
OR workspace_display_control_locks.controlled_by = EXCLUDED.controlled_by
RETURNING controller, controlled_by, expires_at`,
workspaceID, req.Controller, controlledBy, req.TTLSeconds,
).Scan(&lock.Controller, &lock.ControlledBy, &lock.ExpiresAt)
if err == nil {
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.completed", workspaceID, map[string]any{
"controller": lock.Controller,
"controlled_by": lock.ControlledBy,
"ttl_seconds": req.TTLSeconds,
"duration_ms": time.Since(startedAt).Milliseconds(),
})
c.JSON(http.StatusOK, lock)
return
}
if err == sql.ErrNoRows {
current, found, loadErr := h.loadActiveDisplayControl(c, workspaceID)
if loadErr != nil {
log.Printf("AcquireDisplayControl: load active lock for %s failed: %v", workspaceID, loadErr)
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": loadErr.Error(),
})
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display control"})
return
}
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": "display control already held",
})
if !found {
c.JSON(http.StatusConflict, gin.H{
"error": "display control already held",
"current": workspaceDisplayControlNoneResponse{Controller: "none"},
})
return
}
c.JSON(http.StatusConflict, gin.H{
"error": "display control already held",
"current": current,
})
return
}
log.Printf("AcquireDisplayControl: acquire lock for %s failed: %v", workspaceID, err)
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": err.Error(),
})
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to acquire display control"})
}
// ReleaseDisplayControl handles POST /workspaces/:id/display/control/release.
func (h *WorkspaceHandler) ReleaseDisplayControl(c *gin.Context) {
var req releaseDisplayControlRequest
if c.Request.Body != nil && c.Request.ContentLength != 0 {
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid display control release request"})
return
}
}
if req.Force {
if !displayControlIsAdminToken(c) {
c.JSON(http.StatusForbidden, gin.H{"error": "force release requires admin-token auth"})
return
}
}
controlledBy, ok := displayControlActor(c)
if !ok {
c.JSON(http.StatusForbidden, gin.H{"error": "display control requires admin-token or org-token auth"})
return
}
workspaceID := c.Param("id")
startedAt := time.Now()
emitDisplayControlEvent(c.Request.Context(), "display.control.release.started", workspaceID, map[string]any{
"controlled_by": controlledBy,
"force": req.Force,
})
query := `DELETE FROM workspace_display_control_locks WHERE workspace_id = $1 AND controlled_by = $2`
args := []interface{}{workspaceID, controlledBy}
if req.Force {
query = `DELETE FROM workspace_display_control_locks WHERE workspace_id = $1`
args = []interface{}{workspaceID}
}
result, err := db.DB.ExecContext(c.Request.Context(), query, args...)
if err != nil {
log.Printf("ReleaseDisplayControl: release lock for %s failed: %v", workspaceID, err)
emitDisplayControlEvent(c.Request.Context(), "display.control.release.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": err.Error(),
"force": req.Force,
})
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to release display control"})
return
}
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Printf("ReleaseDisplayControl: rows affected for %s failed: %v", workspaceID, err)
emitDisplayControlEvent(c.Request.Context(), "display.control.release.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": err.Error(),
"force": req.Force,
})
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to release display control"})
return
}
if rowsAffected == 0 {
current, found, loadErr := h.loadActiveDisplayControl(c, workspaceID)
if loadErr != nil {
log.Printf("ReleaseDisplayControl: load active lock for %s failed: %v", workspaceID, loadErr)
emitDisplayControlEvent(c.Request.Context(), "display.control.release.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": loadErr.Error(),
"force": req.Force,
})
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display control"})
return
}
if !found {
emitDisplayControlEvent(c.Request.Context(), "display.control.release.completed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"force": req.Force,
"rows_affected": rowsAffected,
})
c.JSON(http.StatusOK, workspaceDisplayControlNoneResponse{Controller: "none"})
return
}
emitDisplayControlEvent(c.Request.Context(), "display.control.release.failed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"error": "display control held by another caller",
"force": req.Force,
})
c.JSON(http.StatusConflict, gin.H{
"error": "display control held by another caller",
"current": current,
})
return
}
emitDisplayControlEvent(c.Request.Context(), "display.control.release.completed", workspaceID, map[string]any{
"controlled_by": controlledBy,
"duration_ms": time.Since(startedAt).Milliseconds(),
"force": req.Force,
"rows_affected": rowsAffected,
})
c.JSON(http.StatusOK, workspaceDisplayControlNoneResponse{Controller: "none"})
}
func (h *WorkspaceHandler) loadActiveDisplayControl(c *gin.Context, workspaceID string) (workspaceDisplayControlResponse, bool, error) {
var lock workspaceDisplayControlResponse
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT controller, controlled_by, expires_at FROM workspace_display_control_locks WHERE workspace_id = $1 AND expires_at > now()`,
workspaceID,
).Scan(&lock.Controller, &lock.ControlledBy, &lock.ExpiresAt)
if err == nil {
return lock, true, nil
}
if err == sql.ErrNoRows {
return workspaceDisplayControlResponse{}, false, nil
}
return workspaceDisplayControlResponse{}, false, err
}
func (h *WorkspaceHandler) displayControlEnabled(c *gin.Context, workspaceID string) bool {
var raw string
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT COALESCE(compute, '{}'::jsonb) FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&raw)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return false
}
log.Printf("displayControlEnabled: load compute for %s failed: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display config"})
return false
}
compute, err := parseWorkspaceDisplayCompute(workspaceID, raw)
if err != nil {
log.Printf("displayControlEnabled: invalid display config for %s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "invalid display config"})
return false
}
if compute.Display.Mode == "" || compute.Display.Mode == "none" {
c.JSON(http.StatusBadRequest, gin.H{"error": "display not enabled"})
return false
}
return true
}
func parseWorkspaceDisplayCompute(workspaceID, raw string) (models.WorkspaceCompute, error) {
var compute models.WorkspaceCompute
if raw == "" || raw == "{}" {
return compute, nil
}
if err := json.Unmarshal([]byte(raw), &compute); err != nil {
return models.WorkspaceCompute{}, fmt.Errorf("invalid compute JSON for %s: %w", workspaceID, err)
}
if err := validateWorkspaceDisplayConfig(compute.Display); err != nil {
return models.WorkspaceCompute{}, err
}
return compute, nil
}
func displayControlActor(c *gin.Context) (string, bool) {
if v, ok := c.Get("org_token_prefix"); ok {
if s, ok := v.(string); ok && s != "" {
return actorOrgTokenPrefix + s, true
}
}
if displayControlIsAdminToken(c) {
return actorAdminToken, true
}
// Browser session auth is intentionally observe-only until AdminAuth
// exposes a stable per-user or per-session identity in gin.Context.
return "", false
}
func displayControlIsAdminToken(c *gin.Context) bool {
adminSecret := os.Getenv("ADMIN_TOKEN")
if adminSecret == "" {
return false
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
return subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1
}
func emitDisplayControlEvent(ctx context.Context, eventType string, workspaceID string, payload map[string]any) {
if payload == nil {
payload = map[string]any{}
}
payloadJSON, err := json.Marshal(payload)
if err != nil {
log.Printf("emitDisplayControlEvent: marshal %s payload failed: %v", eventType, err)
return
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO structure_events (event_type, workspace_id, payload, created_at)
VALUES ($1, $2, $3::jsonb, now())
`, eventType, workspaceID, string(payloadJSON)); err != nil {
log.Printf("emitDisplayControlEvent: insert %s failed: %v", eventType, err)
}
}
@@ -1,321 +0,0 @@
package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
func attachDisplayControlAdminToken(t *testing.T, c *gin.Context) {
t.Helper()
t.Setenv("ADMIN_TOKEN", "test-admin-secret")
c.Request.Header.Set("Authorization", "Bearer test-admin-secret")
}
func TestWorkspaceDisplayControl_NoActiveLockReturnsNone(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT controller, controlled_by, expires_at FROM workspace_display_control_locks WHERE workspace_id = \$1 AND expires_at > now\(\)`).
WithArgs("ws-display").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-display/display/control", nil)
handler.DisplayControl(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["controller"] != "none" {
t.Fatalf("controller = %v, want none", resp["controller"])
}
if _, ok := resp["expires_at"]; ok {
t.Fatalf("none response included expires_at: %#v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlAcquire_ClaimsUnlockedDisplay(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
expiresAt := time.Date(2026, 5, 23, 18, 30, 0, 0, time.UTC)
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"dcv","width":1920,"height":1080}}`))
mock.ExpectQuery(`INSERT INTO workspace_display_control_locks`).
WithArgs("ws-display", "user", "admin-token", 300).
WillReturnRows(sqlmock.NewRows([]string{"controller", "controlled_by", "expires_at"}).
AddRow("user", "admin-token", expiresAt))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/acquire", bytes.NewBufferString(`{"controller":"user","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
attachDisplayControlAdminToken(t, c)
handler.AcquireDisplayControl(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["controller"] != "user" || resp["controlled_by"] != "admin-token" {
t.Fatalf("lock response = %#v, want user/admin-token", resp)
}
if resp["expires_at"] == "" {
t.Fatalf("expires_at missing in response: %#v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlAcquire_ActiveLockReturnsConflict(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
expiresAt := time.Date(2026, 5, 23, 18, 30, 0, 0, time.UTC)
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"dcv","width":1920,"height":1080}}`))
mock.ExpectQuery(`INSERT INTO workspace_display_control_locks`).
WithArgs("ws-display", "user", "admin-token", 300).
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT controller, controlled_by, expires_at FROM workspace_display_control_locks WHERE workspace_id = \$1 AND expires_at > now\(\)`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"controller", "controlled_by", "expires_at"}).
AddRow("agent", "sidecar", expiresAt))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/acquire", bytes.NewBufferString(`{"controller":"user","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
attachDisplayControlAdminToken(t, c)
handler.AcquireDisplayControl(c)
if w.Code != http.StatusConflict {
t.Fatalf("expected status 409, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "display control already held" {
t.Fatalf("error = %v, want display control already held", resp["error"])
}
current, ok := resp["current"].(map[string]interface{})
if !ok || current["controller"] != "agent" || current["controlled_by"] != "sidecar" {
t.Fatalf("current lock = %#v, want agent/sidecar", resp["current"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlAcquire_RejectsDisplayDisabledWorkspace(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-no-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{}`))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-no-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-no-display/display/control/acquire", bytes.NewBufferString(`{"controller":"user","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
attachDisplayControlAdminToken(t, c)
handler.AcquireDisplayControl(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected status 400, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "display not enabled" {
t.Fatalf("error = %v, want display not enabled", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlAcquire_RejectsCoarseSessionActor(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"dcv","width":1920,"height":1080}}`))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/acquire", bytes.NewBufferString(`{"controller":"user","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
c.Request.Header.Set("Cookie", "molecule_session=present")
handler.AcquireDisplayControl(c)
if w.Code != http.StatusForbidden {
t.Fatalf("expected status 403, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "display control requires admin-token or org-token auth" {
t.Fatalf("error = %v, want display control requires admin-token or org-token auth", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlRelease_RemovesCallerLock(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectExec(`DELETE FROM workspace_display_control_locks WHERE workspace_id = \$1 AND controlled_by = \$2`).
WithArgs("ws-display", "admin-token").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/release", nil)
attachDisplayControlAdminToken(t, c)
handler.ReleaseDisplayControl(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["controller"] != "none" {
t.Fatalf("controller = %v, want none", resp["controller"])
}
if _, ok := resp["expires_at"]; ok {
t.Fatalf("none response included expires_at: %#v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlRelease_ConflictWhenCallerDoesNotOwnLock(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
expiresAt := time.Date(2026, 5, 23, 18, 30, 0, 0, time.UTC)
mock.ExpectExec(`DELETE FROM workspace_display_control_locks WHERE workspace_id = \$1 AND controlled_by = \$2`).
WithArgs("ws-display", "admin-token").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery(`SELECT controller, controlled_by, expires_at FROM workspace_display_control_locks WHERE workspace_id = \$1 AND expires_at > now\(\)`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"controller", "controlled_by", "expires_at"}).
AddRow("user", "org-token:abcd1234", expiresAt))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/release", nil)
attachDisplayControlAdminToken(t, c)
handler.ReleaseDisplayControl(c)
if w.Code != http.StatusConflict {
t.Fatalf("expected status 409, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "display control held by another caller" {
t.Fatalf("error = %v, want display control held by another caller", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlRelease_RejectsOrgTokenForceRelease(t *testing.T) {
setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Set("org_token_prefix", "abcd1234")
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/release", bytes.NewBufferString(`{"force":true}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.ReleaseDisplayControl(c)
if w.Code != http.StatusForbidden {
t.Fatalf("expected status 403, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "force release requires admin-token auth" {
t.Fatalf("error = %v, want force release requires admin-token auth", resp["error"])
}
}
func TestWorkspaceDisplayControlAcquire_RejectsAgentImpersonation(t *testing.T) {
setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/acquire", bytes.NewBufferString(`{"controller":"agent","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
attachDisplayControlAdminToken(t, c)
handler.AcquireDisplayControl(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected status 400, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["error"] != "browser callers may only acquire user display control" {
t.Fatalf("error = %v, want browser callers may only acquire user display control", resp["error"])
}
}
@@ -3,7 +3,6 @@ package pgplugin
import (
"encoding/json"
"errors"
"log"
"net/http"
"strings"
@@ -247,9 +246,7 @@ func (h *Handler) forget(w http.ResponseWriter, r *http.Request, id string) {
func writeJSON(w http.ResponseWriter, status int, body interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(body); err != nil {
log.Printf("pgplugin: JSON encode error: %v", err)
}
_ = json.NewEncoder(w).Encode(body)
}
func writeError(w http.ResponseWriter, status int, code contract.ErrorCode, message string, details map[string]interface{}) {
@@ -182,9 +182,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// URLs, so keep the endpoint admin-gated from the first unavailable
// state rather than widening it later.
wsAdmin.GET("/workspaces/:id/display", wh.Display)
wsAdmin.GET("/workspaces/:id/display/control", wh.DisplayControl)
wsAdmin.POST("/workspaces/:id/display/control/acquire", wh.AcquireDisplayControl)
wsAdmin.POST("/workspaces/:id/display/control/release", wh.ReleaseDisplayControl)
// Admin memory backup/restore (#1051) — bulk export/import of agent
// memories for safe Docker rebuilds. Matches workspaces by name on import.
@@ -18,7 +18,6 @@ func buildWorkspaceDisplayEngine(t *testing.T) *gin.Engine {
r := gin.New()
wh := handlers.NewWorkspaceHandler(nil, nil, "http://localhost:8080", t.TempDir())
r.GET("/workspaces/:id/display", middleware.AdminAuth(db.DB), wh.Display)
r.POST("/workspaces/:id/display/control/acquire", middleware.AdminAuth(db.DB), wh.AcquireDisplayControl)
return r
}
@@ -40,22 +39,3 @@ func TestWorkspaceDisplayRoute_RequiresAdminAuth(t *testing.T) {
t.Errorf("sqlmock unmet: %v", err)
}
}
func TestWorkspaceDisplayControlRoute_RequiresAdminAuth(t *testing.T) {
t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller")
mock := setupRouterTestDB(t)
mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
r := buildWorkspaceDisplayEngine(t)
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/workspaces/ws-display/display/control/acquire", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock unmet: %v", err)
}
}
@@ -1,2 +0,0 @@
DROP INDEX IF EXISTS idx_workspace_display_control_locks_expires;
DROP TABLE IF EXISTS workspace_display_control_locks;
@@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS workspace_display_control_locks (
workspace_id uuid PRIMARY KEY REFERENCES workspaces(id) ON DELETE CASCADE,
controller text NOT NULL CHECK (controller IN ('user', 'agent')),
controlled_by text NOT NULL CHECK (length(controlled_by) > 0 AND length(controlled_by) <= 200),
expires_at timestamptz NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_workspace_display_control_locks_expires
ON workspace_display_control_locks (expires_at);