Compare commits

...

6 Commits

Author SHA1 Message Date
molecule-operator 45a8c5e309 fix(Canvas): pin molecule-ai-workspace-runtime>=0.1.999 in all external workspace snippets
The molecule-mcp console script (required for heartbeat + registration)

was added in v0.1.999. Without this pin, users with older versions get

silent failures - MCP starts but workspace shows OFFLINE.
2026-05-15 06:18:21 +00:00
core-be 20eb136c00 Merge branch 'local-main' into fix/push-notifications
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 17s
CI / Detect changes (pull_request) Successful in 1m32s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 42s
Check migration collisions / Migration version collision check (pull_request) Successful in 1m56s
E2E API Smoke Test / detect-changes (pull_request) Successful in 52s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 47s
Harness Replays / detect-changes (pull_request) Successful in 19s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 22s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m0s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 53s
gate-check-v3 / gate-check (pull_request) Failing after 17s
qa-review / approved (pull_request) Failing after 14s
security-review / approved (pull_request) Failing after 14s
sop-checklist / all-items-acked (pull_request) Successful in 17s
sop-tier-check / tier-check (pull_request) Successful in 14s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m20s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m38s
CI / Python Lint & Test (pull_request) Successful in 7m47s
CI / Canvas (Next.js) (pull_request) Successful in 19m41s
CI / Platform (Go) (pull_request) Failing after 21m54s
CI / all-required (pull_request) Failing after 20m36s
Harness Replays / Harness Replays (pull_request) Successful in 10s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 10s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m0s
CI / Canvas Deploy Reminder (pull_request) Successful in 4s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5m10s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 9m14s
2026-05-14 17:10:20 -07:00
core-be 338dc4a995 feat(mobile-chat): add file attachment support with upload
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 24s
Check migration collisions / Migration version collision check (pull_request) Successful in 1m10s
CI / Detect changes (pull_request) Successful in 54s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Successful in 1m49s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m8s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 58s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 15s
Harness Replays / detect-changes (pull_request) Successful in 23s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 49s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 21s
gate-check-v3 / gate-check (pull_request) Failing after 20s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 28s
publish-runtime-autobump / pr-validate (pull_request) Successful in 46s
qa-review / approved (pull_request) Failing after 14s
security-review / approved (pull_request) Failing after 13s
sop-checklist / all-items-acked (pull_request) Successful in 11s
sop-tier-check / tier-check (pull_request) Successful in 11s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m12s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m37s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 1m36s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m35s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m22s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m51s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m58s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m38s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Failing after 7m39s
Harness Replays / Harness Replays (pull_request) Successful in 16s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3m2s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 7m4s
CI / Canvas (Next.js) (pull_request) Successful in 19m4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m54s
CI / Platform (Go) (pull_request) Successful in 22m23s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Failing after 12s
2026-05-14 16:39:41 -07:00
core-be 1494f94512 feat(mobile-chat): render message text as markdown with GFM support 2026-05-14 16:21:52 -07:00
core-be cec732ec68 fix(push): populate workspaceSlug from MOLECULE_ORG_SLUG
CI / all-required (pull_request) Blocked by required conditions
sop-checklist / na-declarations (pull_request) awaiting /sop-n/a declaration for: qa-review, security-review
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 30s
Check migration collisions / Migration version collision check (pull_request) Successful in 1m8s
CI / Detect changes (pull_request) Successful in 1m2s
Harness Replays / detect-changes (pull_request) Successful in 41s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m30s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 34s
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Successful in 2m12s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 1m16s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m10s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 17s
qa-review / approved (pull_request) Failing after 15s
security-review / approved (pull_request) Failing after 14s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 29s
publish-runtime-autobump / pr-validate (pull_request) Successful in 45s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m17s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m52s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m38s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m59s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m46s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 9s
Harness Replays / Harness Replays (pull_request) Successful in 12s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m4s
CI / Python Lint & Test (pull_request) Failing after 7m56s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m58s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5m26s
CI / Canvas (Next.js) (pull_request) Failing after 12m21s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Failing after 8m38s
CI / Platform (Go) (pull_request) Failing after 13m38s
sop-checklist / all-items-acked (pull_request) Successful in 34s
sop-tier-check / tier-check (pull_request) Successful in 32s
gate-check-v3 / gate-check (pull_request) Failing after 1m4s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 2m52s
The push payload's workspaceSlug was hardcoded to empty string, breaking
deep-link navigation when users tap a notification. Read MOLECULE_ORG_SLUG
from env (already set on every tenant by the provisioner) so the mobile
app can route to the correct tenant platform.

Non-breaking: when the env var is unset the field is empty, preserving
the pre-fix behavior.
2026-05-14 14:16:36 -07:00
core-be b57de4174e feat(workspace-server): push notifications for agent messages
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 24s
E2E API Smoke Test / detect-changes (pull_request) Successful in 31s
CI / Detect changes (pull_request) Successful in 35s
Harness Replays / detect-changes (pull_request) Successful in 12s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 9s
Check migration collisions / Migration version collision check (pull_request) Successful in 44s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 24s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 22s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 17s
qa-review / approved (pull_request) Failing after 19s
security-review / approved (pull_request) Failing after 19s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 28s
sop-checklist / na-declarations (pull_request) awaiting /sop-n/a declaration for: qa-review, security-review
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m20s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m33s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m24s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m28s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m46s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m22s
CI / Canvas (Next.js) (pull_request) Successful in 9s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Successful in 9s
Harness Replays / Harness Replays (pull_request) Successful in 11s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 16s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 14s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m0s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 6m5s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 15s
sop-checklist / all-items-acked (pull_request) Successful in 21s
gate-check-v3 / gate-check (pull_request) Successful in 33s
CI / Platform (Go) (pull_request) Successful in 17m52s
CI / all-required (pull_request) Successful in 6s
Adds Expo Push Service integration so mobile devices receive background
notifications when an agent sends a message to the user.

- New push_tokens table with workspace-scoped device tokens
- internal/push package: Repo (DB), Sender (Expo API client), Notifier
  (fire-and-forget delivery), Handler (HTTP register/unregister)
- AgentMessageWriter.Send() now triggers push delivery after WS broadcast
- New endpoints: POST /workspaces/:id/push-tokens, DELETE /push-tokens
- Token invalidation: auto-removes tokens when Expo returns DeviceNotRegistered
- Configured via EXPO_ACCESS_TOKEN env var (optional; push disabled when absent)

All existing tests updated to pass nil notifier where required.
2026-05-14 13:40:22 -07:00
21 changed files with 925 additions and 75 deletions
+305 -9
View File
@@ -6,9 +6,13 @@
// attachments, no A2A topology overlay, no conversation tracing.
import { useCallback, useEffect, useRef, useState } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore } from "@/store/canvas";
import { uploadChatFiles } from "@/components/tabs/chat/uploads";
import type { ChatAttachment } from "@/components/tabs/chat/types";
import { toMobileAgent } from "./components";
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette";
@@ -19,6 +23,7 @@ interface ChatMessage {
role: "user" | "agent" | "system";
text: string;
ts: string;
attachments?: ChatAttachment[];
}
const formatStoredTimestamp = (iso: string): string => {
@@ -39,6 +44,171 @@ interface A2AResponseShape {
const formatTime = (date: Date) =>
date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" });
function MarkdownBubble({
children,
dark,
accent,
}: {
children: string;
dark: boolean;
accent: string;
}) {
const codeBg = dark ? "rgba(255,255,255,0.08)" : "rgba(0,0,0,0.06)";
const codeBlockBg = dark ? "#1a1a1a" : "#f5f5f0";
const linkColor = accent;
const quoteBorder = dark ? "rgba(255,250,240,0.15)" : "rgba(40,30,20,0.15)";
return (
<ReactMarkdown
remarkPlugins={[remarkGfm]}
components={{
p: ({ children }) => (
<div style={{ margin: "2px 0", lineHeight: "inherit" }}>{children}</div>
),
a: ({ href, children }) => (
<a
href={href}
target="_blank"
rel="noopener noreferrer"
style={{ color: linkColor, textDecoration: "underline" }}
>
{children}
</a>
),
pre: ({ children }) => (
<pre
style={{
background: codeBlockBg,
padding: "8px 10px",
borderRadius: 8,
overflow: "auto",
fontSize: 12,
lineHeight: 1.5,
fontFamily: MOBILE_FONT_MONO,
margin: "4px 0",
}}
>
{children}
</pre>
),
code: ({ children, className }) => {
const isBlock = className != null && String(className).length > 0;
if (isBlock) {
return (
<code style={{ fontFamily: MOBILE_FONT_MONO, fontSize: 12 }}>
{children}
</code>
);
}
return (
<code
style={{
background: codeBg,
padding: "1px 4px",
borderRadius: 4,
fontSize: 13,
fontFamily: MOBILE_FONT_MONO,
}}
>
{children}
</code>
);
},
ul: ({ children }) => (
<ul style={{ margin: "4px 0", paddingLeft: 18, listStyle: "disc" }}>
{children}
</ul>
),
ol: ({ children }) => (
<ol style={{ margin: "4px 0", paddingLeft: 18, listStyle: "decimal" }}>
{children}
</ol>
),
li: ({ children }) => <li style={{ margin: "2px 0" }}>{children}</li>,
strong: ({ children }) => (
<strong style={{ fontWeight: 600 }}>{children}</strong>
),
em: ({ children }) => <em style={{ fontStyle: "italic" }}>{children}</em>,
h1: ({ children }) => (
<div style={{ fontSize: 16, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h2: ({ children }) => (
<div style={{ fontSize: 15, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h3: ({ children }) => (
<div style={{ fontSize: 14, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h4: ({ children }) => (
<div style={{ fontSize: 14, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
h5: ({ children }) => (
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
h6: ({ children }) => (
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
blockquote: ({ children }) => (
<blockquote
style={{
borderLeft: `2px solid ${quoteBorder}`,
margin: "4px 0",
paddingLeft: 8,
opacity: 0.85,
}}
>
{children}
</blockquote>
),
hr: () => (
<hr
style={{
border: "none",
borderTop: `0.5px solid ${quoteBorder}`,
margin: "6px 0",
}}
/>
),
table: ({ children }) => (
<table
style={{
borderCollapse: "collapse",
fontSize: 13,
margin: "4px 0",
width: "100%",
}}
>
{children}
</table>
),
thead: ({ children }) => <thead style={{ fontWeight: 600 }}>{children}</thead>,
th: ({ children }) => (
<th
style={{
border: `0.5px solid ${quoteBorder}`,
padding: "4px 6px",
textAlign: "left",
}}
>
{children}
</th>
),
td: ({ children }) => (
<td
style={{
border: `0.5px solid ${quoteBorder}`,
padding: "4px 6px",
}}
>
{children}
</td>
),
}}
>
{children}
</ReactMarkdown>
);
}
export function MobileChat({
agentId,
dark,
@@ -54,6 +224,7 @@ export function MobileChat({
const [draft, setDraft] = useState("");
const [tab, setTab] = useState<SubTab>("my");
const [sending, setSending] = useState(false);
const [uploading, setUploading] = useState(false);
const [error, setError] = useState<string | null>(null);
const [historyLoading, setHistoryLoading] = useState(true);
const [historyError, setHistoryError] = useState<string | null>(null);
@@ -64,6 +235,8 @@ export function MobileChat({
// double-send race a stale `sending` lets through.
const sendInFlightRef = useRef(false);
const composerRef = useRef<HTMLTextAreaElement>(null);
const fileInputRef = useRef<HTMLInputElement>(null);
const [pendingFiles, setPendingFiles] = useState<File[]>([]);
// Auto-grow the textarea: reset height to 'auto' so the scrollHeight
// shrinks when the user deletes text, then size to scrollHeight up to
@@ -171,30 +344,82 @@ export function MobileChat({
const a = toMobileAgent(node);
const reachable = a.status === "online" || a.status === "degraded";
const onFilesPicked = (fileList: FileList | null) => {
if (!fileList) return;
const picked = Array.from(fileList);
setPendingFiles((prev) => {
const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`));
return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))];
});
if (fileInputRef.current) fileInputRef.current.value = "";
};
const removePendingFile = (index: number) =>
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
const send = async () => {
const text = draft.trim();
if (!text || sending || !reachable) return;
if ((!text && pendingFiles.length === 0) || sending || !reachable) return;
if (sendInFlightRef.current) return;
sendInFlightRef.current = true;
setDraft("");
setError(null);
setSending(true);
let uploaded: ChatAttachment[] = [];
if (pendingFiles.length > 0) {
setUploading(true);
try {
uploaded = await uploadChatFiles(agentId, pendingFiles);
} catch (e) {
setError(e instanceof Error ? e.message : "Upload failed");
setSending(false);
setUploading(false);
sendInFlightRef.current = false;
return;
} finally {
setUploading(false);
}
setPendingFiles([]);
}
const myMsg: ChatMessage = {
id: crypto.randomUUID(),
role: "user",
text,
ts: formatTime(new Date()),
attachments: uploaded.length > 0 ? uploaded : undefined,
};
setMessages((m) => [...m, myMsg]);
try {
const parts: Array<
| { kind: "text"; text: string }
| {
kind: "file";
file: { name: string; mimeType?: string; uri: string; size?: number };
}
> = [];
if (text) parts.push({ kind: "text", text });
for (const att of uploaded) {
parts.push({
kind: "file",
file: {
name: att.name,
mimeType: att.mimeType,
uri: att.uri,
size: att.size,
},
});
}
const res = await api.post<A2AResponseShape>(`/workspaces/${agentId}/a2a`, {
method: "message/send",
params: {
message: {
role: "user",
messageId: crypto.randomUUID(),
parts: [{ kind: "text", text }],
parts,
},
},
});
@@ -402,7 +627,9 @@ export function MobileChat({
overflowWrap: "anywhere",
}}
>
{m.text}
<MarkdownBubble dark={dark} accent={p.accent}>
{m.text}
</MarkdownBubble>
<div
style={{
fontSize: 10,
@@ -460,6 +687,60 @@ export function MobileChat({
backdropFilter: "blur(14px)",
}}
>
{pendingFiles.length > 0 && (
<div
style={{
display: "flex",
flexWrap: "wrap",
gap: 6,
marginBottom: 8,
paddingLeft: 2,
}}
>
{pendingFiles.map((f, i) => (
<div
key={`${f.name}:${f.size}`}
style={{
display: "flex",
alignItems: "center",
gap: 4,
padding: "3px 8px",
borderRadius: 10,
background: dark ? "#2a2823" : "#ece9e0",
fontSize: 12,
color: p.text2,
maxWidth: "100%",
}}
>
<span
style={{
overflow: "hidden",
textOverflow: "ellipsis",
whiteSpace: "nowrap",
}}
>
{f.name}
</span>
<button
type="button"
onClick={() => removePendingFile(i)}
aria-label={`Remove ${f.name}`}
style={{
border: "none",
background: "transparent",
color: p.text3,
cursor: "pointer",
fontSize: 12,
padding: 0,
lineHeight: 1,
}}
>
</button>
</div>
))}
</div>
)}
<div
style={{
display: "flex",
@@ -471,21 +752,32 @@ export function MobileChat({
padding: "6px 6px 6px 12px",
}}
>
<input
ref={fileInputRef}
type="file"
multiple
style={{ display: "none" }}
onChange={(e) => onFilesPicked(e.target.files)}
aria-hidden="true"
/>
<button
type="button"
onClick={() => fileInputRef.current?.click()}
disabled={!reachable || sending || uploading}
aria-label="Attach"
style={{
width: 32,
height: 32,
borderRadius: 999,
border: "none",
cursor: "pointer",
cursor: reachable && !sending && !uploading ? "pointer" : "not-allowed",
background: "transparent",
color: p.text3,
flexShrink: 0,
display: "flex",
alignItems: "center",
justifyContent: "center",
opacity: !reachable || sending || uploading ? 0.4 : 1,
}}
>
{Icons.attach({ size: 16 })}
@@ -531,28 +823,32 @@ export function MobileChat({
<button
type="button"
onClick={send}
disabled={!draft.trim() || !reachable || sending}
disabled={(!draft.trim() && pendingFiles.length === 0) || !reachable || sending || uploading}
aria-label="Send"
style={{
width: 36,
height: 36,
borderRadius: 999,
border: "none",
cursor: draft.trim() && !sending ? "pointer" : "not-allowed",
cursor: (draft.trim() || pendingFiles.length > 0) && !sending && !uploading ? "pointer" : "not-allowed",
flexShrink: 0,
background:
draft.trim() && reachable && !sending
(draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading
? p.accent
: dark
? "#2a2823"
: "#ece9e0",
color: draft.trim() && reachable && !sending ? "#fff" : p.text3,
color: (draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading ? "#fff" : p.text3,
display: "flex",
alignItems: "center",
justifyContent: "center",
}}
>
{Icons.send({ size: 16 })}
{uploading ? (
<span style={{ fontSize: 10, fontWeight: 600 }}></span>
) : (
Icons.send({ size: 16 })
)}
</button>
</div>
</div>
@@ -14,16 +14,18 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
type ActivityHandler struct {
broadcaster *events.Broadcaster
notifier *push.Notifier
}
func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
func NewActivityHandler(b *events.Broadcaster, notifier *push.Notifier) *ActivityHandler {
return &ActivityHandler{broadcaster: b, notifier: notifier}
}
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
@@ -476,7 +478,7 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
for _, a := range body.Attachments {
attachments = append(attachments, AgentMessageAttachment(a))
}
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
writer := NewAgentMessageWriter(db.DB, h.broadcaster, h.notifier)
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
@@ -40,7 +40,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -69,7 +69,7 @@ func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -101,7 +101,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -137,7 +137,7 @@ func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -41,7 +41,7 @@ func TestActivityHandler_SinceSecs_Accepted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -70,7 +70,7 @@ func TestActivityHandler_SinceSecs_ClampedAt30Days(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -106,7 +106,7 @@ func TestActivityHandler_SinceSecs_InvalidRejected(t *testing.T) {
// No DB call expected; bad input must be caught before the query.
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityHandler_SinceSecs_Omitted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -22,7 +22,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
rows := sqlmock.NewRows([]string{
"kind", "id", "workspace_id", "label", "content", "method", "status", "request_body", "response_body", "created_at",
@@ -68,7 +68,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
func TestActivityList_SourceCanvas(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
// Expect query with "source_id IS NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NULL`).
@@ -97,7 +97,7 @@ func TestActivityList_SourceCanvas(t *testing.T) {
func TestActivityList_SourceAgent(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
// Expect query with "source_id IS NOT NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NOT NULL`).
@@ -126,7 +126,7 @@ func TestActivityList_SourceAgent(t *testing.T) {
func TestActivityList_SourceInvalid(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityList_SourceInvalid(t *testing.T) {
func TestActivityList_SourceWithType(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
// Both type and source filters
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NULL`).
@@ -181,7 +181,7 @@ const testPeerUUID = "11111111-2222-3333-4444-555555555555"
func TestActivityList_PeerIDFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
// peer_id binds twice in the query (source_id OR target_id) but is
// added to args once — sqlmock matches positional args, so the
@@ -220,7 +220,7 @@ func TestActivityList_PeerIDComposesWithType(t *testing.T) {
// of the builder can't silently rearrange placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
mock.ExpectQuery(
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
@@ -258,7 +258,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
// otherwise interpolate the value into the URL or another query.
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
for _, bad := range []string{
"not-a-uuid",
@@ -292,7 +292,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
func TestActivityList_BeforeTSFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -328,7 +328,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
// can't silently drop one filter or reorder placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -363,7 +363,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
for _, bad := range []string{
"yesterday",
@@ -400,7 +400,7 @@ func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -426,7 +426,7 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) {
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -478,7 +478,7 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -527,7 +527,7 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -593,7 +593,7 @@ func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) {
// only if the handler unexpectedly queries.
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -647,7 +647,7 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
WillReturnError(fmt.Errorf("simulated db hiccup"))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -44,6 +44,7 @@ import (
"log"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil"
)
@@ -76,12 +77,14 @@ type AgentMessageAttachment struct {
type AgentMessageWriter struct {
db *sql.DB
broadcaster events.EventEmitter
notifier *push.Notifier
}
// NewAgentMessageWriter binds the writer to the platform's DB pool +
// WebSocket broadcaster.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
// WebSocket broadcaster. notifier may be nil if push notifications are
// not configured.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter, notifier *push.Notifier) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster, notifier: notifier}
}
// Send delivers a single agent → user message. Look up + broadcast +
@@ -132,7 +135,12 @@ func (w *AgentMessageWriter) Send(
}
w.broadcaster.BroadcastOnly(workspaceID, string(events.EventAgentMessage), broadcastPayload)
// 3. Persist for chat-history hydration. response_body shape MUST stay
// 3. Send push notifications to mobile devices.
if w.notifier != nil {
w.notifier.NotifyAgentMessage(ctx, workspaceID, wsName, message)
}
// 4. Persist for chat-history hydration. response_body shape MUST stay
// in sync with extractResponseText + extractFilesFromTask in
// canvas/src/components/tabs/chat/historyHydration.ts:
// - extractResponseText reads body.result (string) → renders text
@@ -86,7 +86,7 @@ func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType strin
// path: workspace lookup, broadcast, INSERT, return nil.
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-1").
@@ -114,7 +114,7 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
// Drift here = chips disappear on chat reload.
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-att").
@@ -171,7 +171,7 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
w := NewAgentMessageWriter(db.DB, emitter, nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-missing").
@@ -200,7 +200,7 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
// broadcast.
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-dbfail").
@@ -221,7 +221,7 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
// table doesn't carry multi-KB summaries that bloat list queries.
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-trunc").
@@ -261,7 +261,7 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
w := NewAgentMessageWriter(db.DB, emitter, nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-bc").
@@ -312,7 +312,7 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
// real incidents in alerting.
func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
transientErr := errors.New("connection refused")
mock.ExpectQuery("SELECT name FROM workspaces").
@@ -344,7 +344,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
// coverage. Now it does.
func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
// 200-rune CJK message — exceeds the 80-rune cap, would have hit
// the byte-slice bug.
@@ -393,7 +393,7 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
w := NewAgentMessageWriter(db.DB, emitter, nil)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-noatt").
@@ -225,7 +225,7 @@ const externalUniversalMcpTemplate = `# Universal MCP — standalone register +
# python3.12 / etc.) or use a 3.11+ venv.
# 1. Install the workspace runtime wheel:
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
# 2. Wire molecule-mcp into your agent's MCP config. Claude Code:
claude mcp add molecule -s user -- env \
@@ -336,7 +336,7 @@ const externalHermesChannelTemplate = `# Hermes channel — bridges this workspa
# also supported via the plugin's dual-mode fallback.
#
# 1. Install the runtime + plugin:
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
pip install 'git+https://git.moleculesai.app/molecule-ai/hermes-channel-molecule.git'
# 2. Export the workspace credentials:
@@ -407,7 +407,7 @@ const externalCodexTemplate = `# Codex external setup — outbound tools (MCP) +
# 1. Install codex CLI, the workspace runtime, and the bridge daemon:
npm install -g @openai/codex@latest
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
pip install codex-channel-molecule
# 2. Wire the molecule MCP server into codex's config.toml — this is
@@ -499,7 +499,7 @@ const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat
# No public URL needed; runs behind NAT in poll mode.
# 1. Install the workspace runtime wheel (provides HTTP client):
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
# 2. Save credentials and the bridge script:
mkdir -p ~/.molecule-ai/kimi-workspace
@@ -647,7 +647,7 @@ const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path.
# 1. Install openclaw CLI + the workspace runtime wheel:
npm install -g openclaw@latest
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
# 2. Onboard openclaw against your model provider (one-time setup).
# --non-interactive needs an explicit --provider + --model so it
@@ -646,7 +646,7 @@ func TestActivityHandler_List(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -695,7 +695,7 @@ func TestActivityHandler_ListByType(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -723,7 +723,7 @@ func TestActivityHandler_Report(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
// Expect the INSERT into activity_logs
mock.ExpectExec("INSERT INTO activity_logs").
@@ -752,7 +752,7 @@ func TestActivityHandler_Report_InvalidType(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -980,7 +980,7 @@ func TestActivityHandler_ListEmpty(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1014,7 +1014,7 @@ func TestActivityHandler_ListCustomLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1047,7 +1047,7 @@ func TestActivityHandler_ListMaxLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1075,7 +1075,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1106,7 +1106,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
func TestActivityHandler_ReportMissingBody(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1179,7 +1179,7 @@ func TestActivityHandler_Report_SourceIDSpoofRejected(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1202,7 +1202,7 @@ func TestActivityHandler_Report_MatchingSourceIDAccepted(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1232,7 +1232,7 @@ func TestActivityHandler_Report_SourceIDLogInjection(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
handler := NewActivityHandler(broadcaster, nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
+5 -2
View File
@@ -34,6 +34,7 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
)
@@ -84,6 +85,7 @@ type mcpTool struct {
type MCPHandler struct {
database *sql.DB
broadcaster *events.Broadcaster
notifier *push.Notifier
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
// every v2 tool calls memoryV2Available() first and returns a
@@ -94,8 +96,9 @@ type MCPHandler struct {
// NewMCPHandler wires the handler to db and broadcaster.
// Pass db.DB and the platform broadcaster at router-setup time.
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster}
// notifier may be nil if push notifications are not configured.
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster, notifier *push.Notifier) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster, notifier: notifier}
}
// ─────────────────────────────────────────────────────────────────────────────
@@ -26,7 +26,7 @@ import (
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
h := NewMCPHandler(db.DB, newTestBroadcaster())
h := NewMCPHandler(db.DB, newTestBroadcaster(), nil)
return h, mock
}
@@ -392,7 +392,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
// (the tool args don't accept them); pass nil. If a future tool
// schema adds an attachments arg, build []AgentMessageAttachment
// and pass through.
writer := NewAgentMessageWriter(h.database, h.broadcaster)
writer := NewAgentMessageWriter(h.database, h.broadcaster, h.notifier)
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
return "", fmt.Errorf("workspace not found")
@@ -207,7 +207,7 @@ func setupSwapEnv(t *testing.T) (*handlers.MCPHandler, *flatPlugin, sqlmock.Sqlm
resolver := namespace.New(db)
// MCPHandler needs a real *sql.DB; pass the sqlmock-backed one.
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
return h, plugin, mock
}
@@ -430,7 +430,7 @@ func TestE2E_PluginUnreachable_AgentSeesClearError(t *testing.T) {
db, _, _ := sqlmock.New()
defer db.Close()
resolver := namespace.New(db)
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
_, err := h.Dispatch(context.Background(), "root-1", "commit_memory_v2", map[string]interface{}{
"content": "x",
+75
View File
@@ -0,0 +1,75 @@
package push
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// Handler exposes HTTP endpoints for push-token management.
type Handler struct {
repo *Repo
}
// NewHandler creates a push-token HTTP handler.
func NewHandler(repo *Repo) *Handler {
return &Handler{repo: repo}
}
// RegisterRoutes mounts push-token routes on the given router group.
func (h *Handler) RegisterRoutes(rg *gin.RouterGroup) {
rg.POST("/push-tokens", h.Create)
rg.DELETE("/push-tokens", h.Delete)
}
// Create handles POST /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]", "platform": "ios" | "android" }
func (h *Handler) Create(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
Platform string `json:"platform" binding:"required,oneof=ios android"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.SaveToken(c.Request.Context(), workspaceID, body.Token, body.Platform); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save token"})
return
}
c.Status(http.StatusNoContent)
}
// Delete handles DELETE /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]" }
func (h *Handler) Delete(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.DeleteToken(c.Request.Context(), workspaceID, body.Token); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete token"})
return
}
c.Status(http.StatusNoContent)
}
+102
View File
@@ -0,0 +1,102 @@
package push
import (
"context"
"database/sql"
"log"
"os"
"time"
)
// Notifier sends push notifications for agent messages.
type Notifier struct {
repo *Repo
sender *Sender
}
// NewNotifier creates a Notifier.
func NewNotifier(db *sql.DB, sender *Sender) *Notifier {
return &Notifier{
repo: NewRepo(db),
sender: sender,
}
}
// NotifyAgentMessage sends a push notification to all registered devices for a
// workspace when an agent sends a message. It runs asynchronously (fire-and-
// forget) so the caller's WebSocket broadcast is never blocked.
func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspaceName, message string) {
if n == nil || n.sender == nil {
return
}
// Capture values for the goroutine.
wsID := workspaceID
wsName := workspaceName
msg := message
go func() {
// Use a fresh context with timeout so a slow Expo API doesn't
// leak the caller's context deadline.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
tokens, err := n.repo.GetTokens(ctx, wsID)
if err != nil {
log.Printf("push: failed to get tokens for workspace %s: %v", wsID, err)
return
}
if len(tokens) == 0 {
return
}
// Expo accepts batches of up to ~100 messages; we cap lower to stay
// well under the limit.
const batchSize = 50
for i := 0; i < len(tokens); i += batchSize {
end := i + batchSize
if end > len(tokens) {
end = len(tokens)
}
batch := tokens[i:end]
messages := make([]Message, 0, len(batch))
for _, t := range batch {
messages = append(messages, Message{
To: t.Token,
Title: wsName,
Body: truncate(msg, 100),
Data: map[string]string{
"type": "agent_message",
"workspaceId": wsID,
"workspaceSlug": os.Getenv("MOLECULE_ORG_SLUG"),
},
Sound: "default",
Priority: "high",
})
}
results, err := n.sender.Send(ctx, messages)
if err != nil {
log.Printf("push: send failed for workspace %s: %v", wsID, err)
continue
}
// Remove invalid tokens.
for j, r := range results {
if ShouldRemoveToken(r) {
if delErr := n.repo.DeleteToken(ctx, wsID, batch[j].Token); delErr != nil {
log.Printf("push: failed to delete invalid token for workspace %s: %v", wsID, delErr)
}
}
}
}
}()
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "…"
}
+159
View File
@@ -0,0 +1,159 @@
package push
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSenderSend(t *testing.T) {
gin.SetMode(gin.TestMode)
expoResponse := map[string]interface{}{
"data": []map[string]interface{}{
{"status": "ok", "id": "abc123"},
{"status": "error", "message": "Invalid token", "details": map[string]string{"error": "DeviceNotRegistered"}},
},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
var msgs []Message
require.NoError(t, json.NewDecoder(r.Body).Decode(&msgs))
assert.Len(t, msgs, 2)
assert.Equal(t, "ExponentPushToken[test1]", msgs[0].To)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(expoResponse)
}))
defer server.Close()
sender := NewSender("")
sender.apiURL = server.URL
results, err := sender.Send(context.Background(), []Message{
{To: "ExponentPushToken[test1]", Title: "Test", Body: "Hello"},
{To: "ExponentPushToken[test2]", Title: "Test", Body: "World"},
})
require.NoError(t, err)
require.Len(t, results, 2)
assert.Equal(t, "ok", results[0].Status)
assert.Equal(t, "error", results[1].Status)
assert.True(t, ShouldRemoveToken(results[1]))
}
func TestSenderSendEmpty(t *testing.T) {
sender := NewSender("")
results, err := sender.Send(context.Background(), nil)
require.NoError(t, err)
assert.Nil(t, results)
}
func TestHandlerCreate(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("INSERT INTO push_tokens").
WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios").
WillReturnResult(sqlmock.NewResult(1, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestHandlerCreateInvalidPlatform(t *testing.T) {
gin.SetMode(gin.TestMode)
db, _, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
handler := NewHandler(NewRepo(db))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"windows"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerDelete(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("DELETE FROM push_tokens").
WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]").
WillReturnResult(sqlmock.NewResult(0, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[del]"}`
req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestRepoGetTokens(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}).
AddRow("1", "ws-1", "ExponentPushToken[a]", "ios", "2026-01-01T00:00:00Z").
AddRow("2", "ws-1", "ExponentPushToken[b]", "android", "2026-01-01T00:00:00Z"))
repo := NewRepo(db)
tokens, err := repo.GetTokens(context.Background(), "ws-1")
require.NoError(t, err)
require.Len(t, tokens, 2)
assert.Equal(t, "ExponentPushToken[a]", tokens[0].Token)
assert.Equal(t, "ios", tokens[0].Platform)
assert.Equal(t, "ExponentPushToken[b]", tokens[1].Token)
require.NoError(t, mock.ExpectationsWereMet())
}
+76
View File
@@ -0,0 +1,76 @@
package push
import (
"context"
"database/sql"
"fmt"
)
// Token is one registered push token for a workspace.
type Token struct {
ID string
WorkspaceID string
Token string
Platform string
CreatedAt string
}
// Repo reads and writes push tokens in Postgres.
type Repo struct {
db *sql.DB
}
// NewRepo creates a token repository backed by db.
func NewRepo(db *sql.DB) *Repo {
return &Repo{db: db}
}
// SaveToken registers a push token for a workspace. If the same token already
// exists for the workspace, it updates the timestamp.
func (r *Repo) SaveToken(ctx context.Context, workspaceID, token, platform string) error {
_, err := r.db.ExecContext(ctx, `
INSERT INTO push_tokens (workspace_id, token, platform)
VALUES ($1, $2, $3)
ON CONFLICT (workspace_id, token) DO UPDATE
SET updated_at = now()
`, workspaceID, token, platform)
if err != nil {
return fmt.Errorf("push_tokens: save: %w", err)
}
return nil
}
// DeleteToken removes a push token. Returns nil even if the token did not exist.
func (r *Repo) DeleteToken(ctx context.Context, workspaceID, token string) error {
_, err := r.db.ExecContext(ctx, `
DELETE FROM push_tokens
WHERE workspace_id = $1 AND token = $2
`, workspaceID, token)
if err != nil {
return fmt.Errorf("push_tokens: delete: %w", err)
}
return nil
}
// GetTokens returns all active push tokens for a workspace.
func (r *Repo) GetTokens(ctx context.Context, workspaceID string) ([]Token, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, workspace_id, token, platform, created_at
FROM push_tokens
WHERE workspace_id = $1
`, workspaceID)
if err != nil {
return nil, fmt.Errorf("push_tokens: list: %w", err)
}
defer rows.Close()
var tokens []Token
for rows.Next() {
var t Token
if err := rows.Scan(&t.ID, &t.WorkspaceID, &t.Token, &t.Platform, &t.CreatedAt); err != nil {
return nil, fmt.Errorf("push_tokens: scan: %w", err)
}
tokens = append(tokens, t)
}
return tokens, rows.Err()
}
+104
View File
@@ -0,0 +1,104 @@
package push
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
const expoPushAPI = "https://exp.host/--/api/v2/push/send"
// Message is one Expo push notification.
type Message struct {
To string `json:"to"`
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
Data map[string]string `json:"data,omitempty"`
Sound string `json:"sound,omitempty"`
Priority string `json:"priority,omitempty"`
}
// Sender delivers push notifications via the Expo Push Service.
type Sender struct {
apiURL string
httpClient *http.Client
expoToken string // optional Expo access token for authenticated requests
}
// NewSender creates a Sender. expoToken may be empty for unauthenticated
// requests (sufficient for most use cases).
func NewSender(expoToken string) *Sender {
return &Sender{
apiURL: expoPushAPI,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
expoToken: expoToken,
}
}
// SendResult is the per-recipient status from Expo.
type SendResult struct {
Status string `json:"status"`
ID string `json:"id"`
Message string `json:"message,omitempty"`
Details struct {
Error string `json:"error,omitempty"`
} `json:"details,omitempty"`
}
// expoResponse is the wrapper shape returned by the Expo API.
type expoResponse struct {
Data []SendResult `json:"data"`
}
// Send fires a batch of push messages. It returns a slice of results in the
// same order as the input, plus an error only when the HTTP call itself fails.
// Callers should inspect each result's Status field for per-message errors
// (e.g. "DeviceNotRegistered" → token should be deleted).
func (s *Sender) Send(ctx context.Context, messages []Message) ([]SendResult, error) {
if len(messages) == 0 {
return nil, nil
}
body, err := json.Marshal(messages)
if err != nil {
return nil, fmt.Errorf("push: marshal: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.apiURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("push: new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept-Encoding", "gzip, deflate")
if s.expoToken != "" {
req.Header.Set("Authorization", "Bearer "+s.expoToken)
}
res, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("push: post: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("push: expo returned %d", res.StatusCode)
}
var resp expoResponse
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return nil, fmt.Errorf("push: decode: %w", err)
}
return resp.Data, nil
}
// ShouldRemoveToken reports whether a SendResult indicates the token is no
// longer valid and should be deleted from the database.
func ShouldRemoveToken(r SendResult) bool {
return r.Status == "error" && r.Details.Error == "DeviceNotRegistered"
}
+15 -2
View File
@@ -20,6 +20,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/docker/docker/client"
@@ -318,13 +319,25 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
{
// Push notifications (mobile)
var pushNotifier *push.Notifier
if expoToken := os.Getenv("EXPO_ACCESS_TOKEN"); expoToken != "" {
pushNotifier = push.NewNotifier(db.DB, push.NewSender(expoToken))
}
// Activity Logs
acth := handlers.NewActivityHandler(broadcaster)
acth := handlers.NewActivityHandler(broadcaster, pushNotifier)
wsAuth.GET("/activity", acth.List)
wsAuth.GET("/session-search", acth.SessionSearch)
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Push token registration (mobile)
if pushNotifier != nil {
pushH := push.NewHandler(push.NewRepo(db.DB))
pushH.RegisterRoutes(wsAuth)
}
// Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue
// #3026). Server-side rendering of activity_logs rows into
// the canonical ChatMessage shape; storage is plugin-shaped
@@ -428,7 +441,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// opencode session cannot saturate the platform.
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
mcpH := handlers.NewMCPHandler(db.DB, broadcaster, pushNotifier)
if memBundle != nil {
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
@@ -0,0 +1 @@
DROP TABLE IF EXISTS push_tokens;
@@ -0,0 +1,11 @@
CREATE TABLE push_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
token TEXT NOT NULL,
platform TEXT NOT NULL CHECK (platform IN ('ios', 'android')),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(workspace_id, token)
);
CREATE INDEX idx_push_tokens_workspace ON push_tokens(workspace_id);