Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 45a8c5e309 | |||
| 20eb136c00 | |||
| 338dc4a995 | |||
| 1494f94512 | |||
| cec732ec68 | |||
| b57de4174e |
@@ -6,9 +6,13 @@
|
||||
// attachments, no A2A topology overlay, no conversation tracing.
|
||||
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import ReactMarkdown from "react-markdown";
|
||||
import remarkGfm from "remark-gfm";
|
||||
|
||||
import { api } from "@/lib/api";
|
||||
import { useCanvasStore } from "@/store/canvas";
|
||||
import { uploadChatFiles } from "@/components/tabs/chat/uploads";
|
||||
import type { ChatAttachment } from "@/components/tabs/chat/types";
|
||||
|
||||
import { toMobileAgent } from "./components";
|
||||
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette";
|
||||
@@ -19,6 +23,7 @@ interface ChatMessage {
|
||||
role: "user" | "agent" | "system";
|
||||
text: string;
|
||||
ts: string;
|
||||
attachments?: ChatAttachment[];
|
||||
}
|
||||
|
||||
const formatStoredTimestamp = (iso: string): string => {
|
||||
@@ -39,6 +44,171 @@ interface A2AResponseShape {
|
||||
const formatTime = (date: Date) =>
|
||||
date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" });
|
||||
|
||||
function MarkdownBubble({
|
||||
children,
|
||||
dark,
|
||||
accent,
|
||||
}: {
|
||||
children: string;
|
||||
dark: boolean;
|
||||
accent: string;
|
||||
}) {
|
||||
const codeBg = dark ? "rgba(255,255,255,0.08)" : "rgba(0,0,0,0.06)";
|
||||
const codeBlockBg = dark ? "#1a1a1a" : "#f5f5f0";
|
||||
const linkColor = accent;
|
||||
const quoteBorder = dark ? "rgba(255,250,240,0.15)" : "rgba(40,30,20,0.15)";
|
||||
|
||||
return (
|
||||
<ReactMarkdown
|
||||
remarkPlugins={[remarkGfm]}
|
||||
components={{
|
||||
p: ({ children }) => (
|
||||
<div style={{ margin: "2px 0", lineHeight: "inherit" }}>{children}</div>
|
||||
),
|
||||
a: ({ href, children }) => (
|
||||
<a
|
||||
href={href}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
style={{ color: linkColor, textDecoration: "underline" }}
|
||||
>
|
||||
{children}
|
||||
</a>
|
||||
),
|
||||
pre: ({ children }) => (
|
||||
<pre
|
||||
style={{
|
||||
background: codeBlockBg,
|
||||
padding: "8px 10px",
|
||||
borderRadius: 8,
|
||||
overflow: "auto",
|
||||
fontSize: 12,
|
||||
lineHeight: 1.5,
|
||||
fontFamily: MOBILE_FONT_MONO,
|
||||
margin: "4px 0",
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</pre>
|
||||
),
|
||||
code: ({ children, className }) => {
|
||||
const isBlock = className != null && String(className).length > 0;
|
||||
if (isBlock) {
|
||||
return (
|
||||
<code style={{ fontFamily: MOBILE_FONT_MONO, fontSize: 12 }}>
|
||||
{children}
|
||||
</code>
|
||||
);
|
||||
}
|
||||
return (
|
||||
<code
|
||||
style={{
|
||||
background: codeBg,
|
||||
padding: "1px 4px",
|
||||
borderRadius: 4,
|
||||
fontSize: 13,
|
||||
fontFamily: MOBILE_FONT_MONO,
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</code>
|
||||
);
|
||||
},
|
||||
ul: ({ children }) => (
|
||||
<ul style={{ margin: "4px 0", paddingLeft: 18, listStyle: "disc" }}>
|
||||
{children}
|
||||
</ul>
|
||||
),
|
||||
ol: ({ children }) => (
|
||||
<ol style={{ margin: "4px 0", paddingLeft: 18, listStyle: "decimal" }}>
|
||||
{children}
|
||||
</ol>
|
||||
),
|
||||
li: ({ children }) => <li style={{ margin: "2px 0" }}>{children}</li>,
|
||||
strong: ({ children }) => (
|
||||
<strong style={{ fontWeight: 600 }}>{children}</strong>
|
||||
),
|
||||
em: ({ children }) => <em style={{ fontStyle: "italic" }}>{children}</em>,
|
||||
h1: ({ children }) => (
|
||||
<div style={{ fontSize: 16, fontWeight: 700, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
h2: ({ children }) => (
|
||||
<div style={{ fontSize: 15, fontWeight: 700, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
h3: ({ children }) => (
|
||||
<div style={{ fontSize: 14, fontWeight: 700, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
h4: ({ children }) => (
|
||||
<div style={{ fontSize: 14, fontWeight: 600, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
h5: ({ children }) => (
|
||||
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
h6: ({ children }) => (
|
||||
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
|
||||
),
|
||||
blockquote: ({ children }) => (
|
||||
<blockquote
|
||||
style={{
|
||||
borderLeft: `2px solid ${quoteBorder}`,
|
||||
margin: "4px 0",
|
||||
paddingLeft: 8,
|
||||
opacity: 0.85,
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</blockquote>
|
||||
),
|
||||
hr: () => (
|
||||
<hr
|
||||
style={{
|
||||
border: "none",
|
||||
borderTop: `0.5px solid ${quoteBorder}`,
|
||||
margin: "6px 0",
|
||||
}}
|
||||
/>
|
||||
),
|
||||
table: ({ children }) => (
|
||||
<table
|
||||
style={{
|
||||
borderCollapse: "collapse",
|
||||
fontSize: 13,
|
||||
margin: "4px 0",
|
||||
width: "100%",
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</table>
|
||||
),
|
||||
thead: ({ children }) => <thead style={{ fontWeight: 600 }}>{children}</thead>,
|
||||
th: ({ children }) => (
|
||||
<th
|
||||
style={{
|
||||
border: `0.5px solid ${quoteBorder}`,
|
||||
padding: "4px 6px",
|
||||
textAlign: "left",
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</th>
|
||||
),
|
||||
td: ({ children }) => (
|
||||
<td
|
||||
style={{
|
||||
border: `0.5px solid ${quoteBorder}`,
|
||||
padding: "4px 6px",
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</td>
|
||||
),
|
||||
}}
|
||||
>
|
||||
{children}
|
||||
</ReactMarkdown>
|
||||
);
|
||||
}
|
||||
|
||||
export function MobileChat({
|
||||
agentId,
|
||||
dark,
|
||||
@@ -54,6 +224,7 @@ export function MobileChat({
|
||||
const [draft, setDraft] = useState("");
|
||||
const [tab, setTab] = useState<SubTab>("my");
|
||||
const [sending, setSending] = useState(false);
|
||||
const [uploading, setUploading] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [historyLoading, setHistoryLoading] = useState(true);
|
||||
const [historyError, setHistoryError] = useState<string | null>(null);
|
||||
@@ -64,6 +235,8 @@ export function MobileChat({
|
||||
// double-send race a stale `sending` lets through.
|
||||
const sendInFlightRef = useRef(false);
|
||||
const composerRef = useRef<HTMLTextAreaElement>(null);
|
||||
const fileInputRef = useRef<HTMLInputElement>(null);
|
||||
const [pendingFiles, setPendingFiles] = useState<File[]>([]);
|
||||
|
||||
// Auto-grow the textarea: reset height to 'auto' so the scrollHeight
|
||||
// shrinks when the user deletes text, then size to scrollHeight up to
|
||||
@@ -171,30 +344,82 @@ export function MobileChat({
|
||||
const a = toMobileAgent(node);
|
||||
const reachable = a.status === "online" || a.status === "degraded";
|
||||
|
||||
const onFilesPicked = (fileList: FileList | null) => {
|
||||
if (!fileList) return;
|
||||
const picked = Array.from(fileList);
|
||||
setPendingFiles((prev) => {
|
||||
const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`));
|
||||
return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))];
|
||||
});
|
||||
if (fileInputRef.current) fileInputRef.current.value = "";
|
||||
};
|
||||
|
||||
const removePendingFile = (index: number) =>
|
||||
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
|
||||
|
||||
const send = async () => {
|
||||
const text = draft.trim();
|
||||
if (!text || sending || !reachable) return;
|
||||
if ((!text && pendingFiles.length === 0) || sending || !reachable) return;
|
||||
if (sendInFlightRef.current) return;
|
||||
sendInFlightRef.current = true;
|
||||
setDraft("");
|
||||
setError(null);
|
||||
setSending(true);
|
||||
|
||||
let uploaded: ChatAttachment[] = [];
|
||||
if (pendingFiles.length > 0) {
|
||||
setUploading(true);
|
||||
try {
|
||||
uploaded = await uploadChatFiles(agentId, pendingFiles);
|
||||
} catch (e) {
|
||||
setError(e instanceof Error ? e.message : "Upload failed");
|
||||
setSending(false);
|
||||
setUploading(false);
|
||||
sendInFlightRef.current = false;
|
||||
return;
|
||||
} finally {
|
||||
setUploading(false);
|
||||
}
|
||||
setPendingFiles([]);
|
||||
}
|
||||
|
||||
const myMsg: ChatMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
role: "user",
|
||||
text,
|
||||
ts: formatTime(new Date()),
|
||||
attachments: uploaded.length > 0 ? uploaded : undefined,
|
||||
};
|
||||
setMessages((m) => [...m, myMsg]);
|
||||
|
||||
try {
|
||||
const parts: Array<
|
||||
| { kind: "text"; text: string }
|
||||
| {
|
||||
kind: "file";
|
||||
file: { name: string; mimeType?: string; uri: string; size?: number };
|
||||
}
|
||||
> = [];
|
||||
if (text) parts.push({ kind: "text", text });
|
||||
for (const att of uploaded) {
|
||||
parts.push({
|
||||
kind: "file",
|
||||
file: {
|
||||
name: att.name,
|
||||
mimeType: att.mimeType,
|
||||
uri: att.uri,
|
||||
size: att.size,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const res = await api.post<A2AResponseShape>(`/workspaces/${agentId}/a2a`, {
|
||||
method: "message/send",
|
||||
params: {
|
||||
message: {
|
||||
role: "user",
|
||||
messageId: crypto.randomUUID(),
|
||||
parts: [{ kind: "text", text }],
|
||||
parts,
|
||||
},
|
||||
},
|
||||
});
|
||||
@@ -402,7 +627,9 @@ export function MobileChat({
|
||||
overflowWrap: "anywhere",
|
||||
}}
|
||||
>
|
||||
{m.text}
|
||||
<MarkdownBubble dark={dark} accent={p.accent}>
|
||||
{m.text}
|
||||
</MarkdownBubble>
|
||||
<div
|
||||
style={{
|
||||
fontSize: 10,
|
||||
@@ -460,6 +687,60 @@ export function MobileChat({
|
||||
backdropFilter: "blur(14px)",
|
||||
}}
|
||||
>
|
||||
{pendingFiles.length > 0 && (
|
||||
<div
|
||||
style={{
|
||||
display: "flex",
|
||||
flexWrap: "wrap",
|
||||
gap: 6,
|
||||
marginBottom: 8,
|
||||
paddingLeft: 2,
|
||||
}}
|
||||
>
|
||||
{pendingFiles.map((f, i) => (
|
||||
<div
|
||||
key={`${f.name}:${f.size}`}
|
||||
style={{
|
||||
display: "flex",
|
||||
alignItems: "center",
|
||||
gap: 4,
|
||||
padding: "3px 8px",
|
||||
borderRadius: 10,
|
||||
background: dark ? "#2a2823" : "#ece9e0",
|
||||
fontSize: 12,
|
||||
color: p.text2,
|
||||
maxWidth: "100%",
|
||||
}}
|
||||
>
|
||||
<span
|
||||
style={{
|
||||
overflow: "hidden",
|
||||
textOverflow: "ellipsis",
|
||||
whiteSpace: "nowrap",
|
||||
}}
|
||||
>
|
||||
{f.name}
|
||||
</span>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => removePendingFile(i)}
|
||||
aria-label={`Remove ${f.name}`}
|
||||
style={{
|
||||
border: "none",
|
||||
background: "transparent",
|
||||
color: p.text3,
|
||||
cursor: "pointer",
|
||||
fontSize: 12,
|
||||
padding: 0,
|
||||
lineHeight: 1,
|
||||
}}
|
||||
>
|
||||
✕
|
||||
</button>
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
<div
|
||||
style={{
|
||||
display: "flex",
|
||||
@@ -471,21 +752,32 @@ export function MobileChat({
|
||||
padding: "6px 6px 6px 12px",
|
||||
}}
|
||||
>
|
||||
<input
|
||||
ref={fileInputRef}
|
||||
type="file"
|
||||
multiple
|
||||
style={{ display: "none" }}
|
||||
onChange={(e) => onFilesPicked(e.target.files)}
|
||||
aria-hidden="true"
|
||||
/>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => fileInputRef.current?.click()}
|
||||
disabled={!reachable || sending || uploading}
|
||||
aria-label="Attach"
|
||||
style={{
|
||||
width: 32,
|
||||
height: 32,
|
||||
borderRadius: 999,
|
||||
border: "none",
|
||||
cursor: "pointer",
|
||||
cursor: reachable && !sending && !uploading ? "pointer" : "not-allowed",
|
||||
background: "transparent",
|
||||
color: p.text3,
|
||||
flexShrink: 0,
|
||||
display: "flex",
|
||||
alignItems: "center",
|
||||
justifyContent: "center",
|
||||
opacity: !reachable || sending || uploading ? 0.4 : 1,
|
||||
}}
|
||||
>
|
||||
{Icons.attach({ size: 16 })}
|
||||
@@ -531,28 +823,32 @@ export function MobileChat({
|
||||
<button
|
||||
type="button"
|
||||
onClick={send}
|
||||
disabled={!draft.trim() || !reachable || sending}
|
||||
disabled={(!draft.trim() && pendingFiles.length === 0) || !reachable || sending || uploading}
|
||||
aria-label="Send"
|
||||
style={{
|
||||
width: 36,
|
||||
height: 36,
|
||||
borderRadius: 999,
|
||||
border: "none",
|
||||
cursor: draft.trim() && !sending ? "pointer" : "not-allowed",
|
||||
cursor: (draft.trim() || pendingFiles.length > 0) && !sending && !uploading ? "pointer" : "not-allowed",
|
||||
flexShrink: 0,
|
||||
background:
|
||||
draft.trim() && reachable && !sending
|
||||
(draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading
|
||||
? p.accent
|
||||
: dark
|
||||
? "#2a2823"
|
||||
: "#ece9e0",
|
||||
color: draft.trim() && reachable && !sending ? "#fff" : p.text3,
|
||||
color: (draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading ? "#fff" : p.text3,
|
||||
display: "flex",
|
||||
alignItems: "center",
|
||||
justifyContent: "center",
|
||||
}}
|
||||
>
|
||||
{Icons.send({ size: 16 })}
|
||||
{uploading ? (
|
||||
<span style={{ fontSize: 10, fontWeight: 600 }}>↑</span>
|
||||
) : (
|
||||
Icons.send({ size: 16 })
|
||||
)}
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -14,16 +14,18 @@ import (
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ActivityHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
notifier *push.Notifier
|
||||
}
|
||||
|
||||
func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
|
||||
return &ActivityHandler{broadcaster: b}
|
||||
func NewActivityHandler(b *events.Broadcaster, notifier *push.Notifier) *ActivityHandler {
|
||||
return &ActivityHandler{broadcaster: b, notifier: notifier}
|
||||
}
|
||||
|
||||
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
|
||||
@@ -476,7 +478,7 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
|
||||
for _, a := range body.Attachments {
|
||||
attachments = append(attachments, AgentMessageAttachment(a))
|
||||
}
|
||||
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
|
||||
writer := NewAgentMessageWriter(db.DB, h.broadcaster, h.notifier)
|
||||
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
|
||||
if errors.Is(err, ErrWorkspaceNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -69,7 +69,7 @@ func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -101,7 +101,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -137,7 +137,7 @@ func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestActivityHandler_SinceSecs_Accepted(t *testing.T) {
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -70,7 +70,7 @@ func TestActivityHandler_SinceSecs_ClampedAt30Days(t *testing.T) {
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -106,7 +106,7 @@ func TestActivityHandler_SinceSecs_InvalidRejected(t *testing.T) {
|
||||
// No DB call expected; bad input must be caught before the query.
|
||||
setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -142,7 +142,7 @@ func TestActivityHandler_SinceSecs_Omitted(t *testing.T) {
|
||||
WillReturnRows(newActivityRows())
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
@@ -22,7 +22,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
rows := sqlmock.NewRows([]string{
|
||||
"kind", "id", "workspace_id", "label", "content", "method", "status", "request_body", "response_body", "created_at",
|
||||
@@ -68,7 +68,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
|
||||
func TestActivityList_SourceCanvas(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
// Expect query with "source_id IS NULL"
|
||||
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NULL`).
|
||||
@@ -97,7 +97,7 @@ func TestActivityList_SourceCanvas(t *testing.T) {
|
||||
func TestActivityList_SourceAgent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
// Expect query with "source_id IS NOT NULL"
|
||||
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NOT NULL`).
|
||||
@@ -126,7 +126,7 @@ func TestActivityList_SourceAgent(t *testing.T) {
|
||||
func TestActivityList_SourceInvalid(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -142,7 +142,7 @@ func TestActivityList_SourceInvalid(t *testing.T) {
|
||||
func TestActivityList_SourceWithType(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
// Both type and source filters
|
||||
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NULL`).
|
||||
@@ -181,7 +181,7 @@ const testPeerUUID = "11111111-2222-3333-4444-555555555555"
|
||||
func TestActivityList_PeerIDFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
// peer_id binds twice in the query (source_id OR target_id) but is
|
||||
// added to args once — sqlmock matches positional args, so the
|
||||
@@ -220,7 +220,7 @@ func TestActivityList_PeerIDComposesWithType(t *testing.T) {
|
||||
// of the builder can't silently rearrange placeholders.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
mock.ExpectQuery(
|
||||
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
|
||||
@@ -258,7 +258,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
|
||||
// otherwise interpolate the value into the URL or another query.
|
||||
gin.SetMode(gin.TestMode)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
for _, bad := range []string{
|
||||
"not-a-uuid",
|
||||
@@ -292,7 +292,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
|
||||
func TestActivityList_BeforeTSFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
|
||||
mock.ExpectQuery(
|
||||
@@ -328,7 +328,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
|
||||
// can't silently drop one filter or reorder placeholders.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
|
||||
mock.ExpectQuery(
|
||||
@@ -363,7 +363,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
|
||||
func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
for _, bad := range []string{
|
||||
"yesterday",
|
||||
@@ -400,7 +400,7 @@ func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
@@ -426,7 +426,7 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) {
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
@@ -478,7 +478,7 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
@@ -527,7 +527,7 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
@@ -593,7 +593,7 @@ func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) {
|
||||
// only if the handler unexpectedly queries.
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -647,7 +647,7 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
|
||||
WillReturnError(fmt.Errorf("simulated db hiccup"))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
@@ -44,6 +44,7 @@ import (
|
||||
"log"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil"
|
||||
)
|
||||
|
||||
@@ -76,12 +77,14 @@ type AgentMessageAttachment struct {
|
||||
type AgentMessageWriter struct {
|
||||
db *sql.DB
|
||||
broadcaster events.EventEmitter
|
||||
notifier *push.Notifier
|
||||
}
|
||||
|
||||
// NewAgentMessageWriter binds the writer to the platform's DB pool +
|
||||
// WebSocket broadcaster.
|
||||
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
|
||||
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
|
||||
// WebSocket broadcaster. notifier may be nil if push notifications are
|
||||
// not configured.
|
||||
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter, notifier *push.Notifier) *AgentMessageWriter {
|
||||
return &AgentMessageWriter{db: db, broadcaster: broadcaster, notifier: notifier}
|
||||
}
|
||||
|
||||
// Send delivers a single agent → user message. Look up + broadcast +
|
||||
@@ -132,7 +135,12 @@ func (w *AgentMessageWriter) Send(
|
||||
}
|
||||
w.broadcaster.BroadcastOnly(workspaceID, string(events.EventAgentMessage), broadcastPayload)
|
||||
|
||||
// 3. Persist for chat-history hydration. response_body shape MUST stay
|
||||
// 3. Send push notifications to mobile devices.
|
||||
if w.notifier != nil {
|
||||
w.notifier.NotifyAgentMessage(ctx, workspaceID, wsName, message)
|
||||
}
|
||||
|
||||
// 4. Persist for chat-history hydration. response_body shape MUST stay
|
||||
// in sync with extractResponseText + extractFilesFromTask in
|
||||
// canvas/src/components/tabs/chat/historyHydration.ts:
|
||||
// - extractResponseText reads body.result (string) → renders text
|
||||
|
||||
@@ -86,7 +86,7 @@ func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType strin
|
||||
// path: workspace lookup, broadcast, INSERT, return nil.
|
||||
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-1").
|
||||
@@ -114,7 +114,7 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
|
||||
// Drift here = chips disappear on chat reload.
|
||||
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-att").
|
||||
@@ -171,7 +171,7 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
|
||||
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
w := NewAgentMessageWriter(db.DB, emitter, nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-missing").
|
||||
@@ -200,7 +200,7 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
|
||||
// broadcast.
|
||||
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-dbfail").
|
||||
@@ -221,7 +221,7 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
|
||||
// table doesn't carry multi-KB summaries that bloat list queries.
|
||||
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-trunc").
|
||||
@@ -261,7 +261,7 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
|
||||
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
w := NewAgentMessageWriter(db.DB, emitter, nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-bc").
|
||||
@@ -312,7 +312,7 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
|
||||
// real incidents in alerting.
|
||||
func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
transientErr := errors.New("connection refused")
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
@@ -344,7 +344,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
|
||||
// coverage. Now it does.
|
||||
func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
|
||||
|
||||
// 200-rune CJK message — exceeds the 80-rune cap, would have hit
|
||||
// the byte-slice bug.
|
||||
@@ -393,7 +393,7 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
|
||||
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
w := NewAgentMessageWriter(db.DB, emitter, nil)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-noatt").
|
||||
|
||||
@@ -225,7 +225,7 @@ const externalUniversalMcpTemplate = `# Universal MCP — standalone register +
|
||||
# python3.12 / etc.) or use a 3.11+ venv.
|
||||
|
||||
# 1. Install the workspace runtime wheel:
|
||||
pip install molecule-ai-workspace-runtime
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
|
||||
# 2. Wire molecule-mcp into your agent's MCP config. Claude Code:
|
||||
claude mcp add molecule -s user -- env \
|
||||
@@ -336,7 +336,7 @@ const externalHermesChannelTemplate = `# Hermes channel — bridges this workspa
|
||||
# also supported via the plugin's dual-mode fallback.
|
||||
#
|
||||
# 1. Install the runtime + plugin:
|
||||
pip install molecule-ai-workspace-runtime
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
pip install 'git+https://git.moleculesai.app/molecule-ai/hermes-channel-molecule.git'
|
||||
|
||||
# 2. Export the workspace credentials:
|
||||
@@ -407,7 +407,7 @@ const externalCodexTemplate = `# Codex external setup — outbound tools (MCP) +
|
||||
|
||||
# 1. Install codex CLI, the workspace runtime, and the bridge daemon:
|
||||
npm install -g @openai/codex@latest
|
||||
pip install molecule-ai-workspace-runtime
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
pip install codex-channel-molecule
|
||||
|
||||
# 2. Wire the molecule MCP server into codex's config.toml — this is
|
||||
@@ -499,7 +499,7 @@ const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat
|
||||
# No public URL needed; runs behind NAT in poll mode.
|
||||
|
||||
# 1. Install the workspace runtime wheel (provides HTTP client):
|
||||
pip install molecule-ai-workspace-runtime
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
|
||||
# 2. Save credentials and the bridge script:
|
||||
mkdir -p ~/.molecule-ai/kimi-workspace
|
||||
@@ -647,7 +647,7 @@ const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path.
|
||||
|
||||
# 1. Install openclaw CLI + the workspace runtime wheel:
|
||||
npm install -g openclaw@latest
|
||||
pip install molecule-ai-workspace-runtime
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
|
||||
# 2. Onboard openclaw against your model provider (one-time setup).
|
||||
# --non-interactive needs an explicit --provider + --model so it
|
||||
|
||||
@@ -646,7 +646,7 @@ func TestActivityHandler_List(t *testing.T) {
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -695,7 +695,7 @@ func TestActivityHandler_ListByType(t *testing.T) {
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -723,7 +723,7 @@ func TestActivityHandler_Report(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
// Expect the INSERT into activity_logs
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
@@ -752,7 +752,7 @@ func TestActivityHandler_Report_InvalidType(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -980,7 +980,7 @@ func TestActivityHandler_ListEmpty(t *testing.T) {
|
||||
WillReturnRows(sqlmock.NewRows(columns))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1014,7 +1014,7 @@ func TestActivityHandler_ListCustomLimit(t *testing.T) {
|
||||
WillReturnRows(sqlmock.NewRows(columns))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1047,7 +1047,7 @@ func TestActivityHandler_ListMaxLimit(t *testing.T) {
|
||||
WillReturnRows(sqlmock.NewRows(columns))
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1075,7 +1075,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@@ -1106,7 +1106,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
|
||||
func TestActivityHandler_ReportMissingBody(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1179,7 +1179,7 @@ func TestActivityHandler_Report_SourceIDSpoofRejected(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -1202,7 +1202,7 @@ func TestActivityHandler_Report_MatchingSourceIDAccepted(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@@ -1232,7 +1232,7 @@ func TestActivityHandler_Report_SourceIDLogInjection(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
handler := NewActivityHandler(broadcaster, nil)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
@@ -84,6 +85,7 @@ type mcpTool struct {
|
||||
type MCPHandler struct {
|
||||
database *sql.DB
|
||||
broadcaster *events.Broadcaster
|
||||
notifier *push.Notifier
|
||||
|
||||
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
|
||||
// every v2 tool calls memoryV2Available() first and returns a
|
||||
@@ -94,8 +96,9 @@ type MCPHandler struct {
|
||||
|
||||
// NewMCPHandler wires the handler to db and broadcaster.
|
||||
// Pass db.DB and the platform broadcaster at router-setup time.
|
||||
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandler {
|
||||
return &MCPHandler{database: database, broadcaster: broadcaster}
|
||||
// notifier may be nil if push notifications are not configured.
|
||||
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster, notifier *push.Notifier) *MCPHandler {
|
||||
return &MCPHandler{database: database, broadcaster: broadcaster, notifier: notifier}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
|
||||
t.Helper()
|
||||
mock := setupTestDB(t)
|
||||
h := NewMCPHandler(db.DB, newTestBroadcaster())
|
||||
h := NewMCPHandler(db.DB, newTestBroadcaster(), nil)
|
||||
return h, mock
|
||||
}
|
||||
|
||||
|
||||
@@ -392,7 +392,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
// (the tool args don't accept them); pass nil. If a future tool
|
||||
// schema adds an attachments arg, build []AgentMessageAttachment
|
||||
// and pass through.
|
||||
writer := NewAgentMessageWriter(h.database, h.broadcaster)
|
||||
writer := NewAgentMessageWriter(h.database, h.broadcaster, h.notifier)
|
||||
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
|
||||
if errors.Is(err, ErrWorkspaceNotFound) {
|
||||
return "", fmt.Errorf("workspace not found")
|
||||
|
||||
@@ -207,7 +207,7 @@ func setupSwapEnv(t *testing.T) (*handlers.MCPHandler, *flatPlugin, sqlmock.Sqlm
|
||||
resolver := namespace.New(db)
|
||||
|
||||
// MCPHandler needs a real *sql.DB; pass the sqlmock-backed one.
|
||||
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
|
||||
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
|
||||
return h, plugin, mock
|
||||
}
|
||||
|
||||
@@ -430,7 +430,7 @@ func TestE2E_PluginUnreachable_AgentSeesClearError(t *testing.T) {
|
||||
db, _, _ := sqlmock.New()
|
||||
defer db.Close()
|
||||
resolver := namespace.New(db)
|
||||
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
|
||||
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
|
||||
|
||||
_, err := h.Dispatch(context.Background(), "root-1", "commit_memory_v2", map[string]interface{}{
|
||||
"content": "x",
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Handler exposes HTTP endpoints for push-token management.
|
||||
type Handler struct {
|
||||
repo *Repo
|
||||
}
|
||||
|
||||
// NewHandler creates a push-token HTTP handler.
|
||||
func NewHandler(repo *Repo) *Handler {
|
||||
return &Handler{repo: repo}
|
||||
}
|
||||
|
||||
// RegisterRoutes mounts push-token routes on the given router group.
|
||||
func (h *Handler) RegisterRoutes(rg *gin.RouterGroup) {
|
||||
rg.POST("/push-tokens", h.Create)
|
||||
rg.DELETE("/push-tokens", h.Delete)
|
||||
}
|
||||
|
||||
// Create handles POST /push-tokens.
|
||||
// Body: { "token": "ExponentPushToken[xxx]", "platform": "ios" | "android" }
|
||||
func (h *Handler) Create(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
if _, err := uuid.Parse(workspaceID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Token string `json:"token" binding:"required"`
|
||||
Platform string `json:"platform" binding:"required,oneof=ios android"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.repo.SaveToken(c.Request.Context(), workspaceID, body.Token, body.Platform); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save token"})
|
||||
return
|
||||
}
|
||||
|
||||
c.Status(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Delete handles DELETE /push-tokens.
|
||||
// Body: { "token": "ExponentPushToken[xxx]" }
|
||||
func (h *Handler) Delete(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
if _, err := uuid.Parse(workspaceID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Token string `json:"token" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.repo.DeleteToken(c.Request.Context(), workspaceID, body.Token); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete token"})
|
||||
return
|
||||
}
|
||||
|
||||
c.Status(http.StatusNoContent)
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Notifier sends push notifications for agent messages.
|
||||
type Notifier struct {
|
||||
repo *Repo
|
||||
sender *Sender
|
||||
}
|
||||
|
||||
// NewNotifier creates a Notifier.
|
||||
func NewNotifier(db *sql.DB, sender *Sender) *Notifier {
|
||||
return &Notifier{
|
||||
repo: NewRepo(db),
|
||||
sender: sender,
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyAgentMessage sends a push notification to all registered devices for a
|
||||
// workspace when an agent sends a message. It runs asynchronously (fire-and-
|
||||
// forget) so the caller's WebSocket broadcast is never blocked.
|
||||
func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspaceName, message string) {
|
||||
if n == nil || n.sender == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Capture values for the goroutine.
|
||||
wsID := workspaceID
|
||||
wsName := workspaceName
|
||||
msg := message
|
||||
|
||||
go func() {
|
||||
// Use a fresh context with timeout so a slow Expo API doesn't
|
||||
// leak the caller's context deadline.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tokens, err := n.repo.GetTokens(ctx, wsID)
|
||||
if err != nil {
|
||||
log.Printf("push: failed to get tokens for workspace %s: %v", wsID, err)
|
||||
return
|
||||
}
|
||||
if len(tokens) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Expo accepts batches of up to ~100 messages; we cap lower to stay
|
||||
// well under the limit.
|
||||
const batchSize = 50
|
||||
for i := 0; i < len(tokens); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(tokens) {
|
||||
end = len(tokens)
|
||||
}
|
||||
|
||||
batch := tokens[i:end]
|
||||
messages := make([]Message, 0, len(batch))
|
||||
for _, t := range batch {
|
||||
messages = append(messages, Message{
|
||||
To: t.Token,
|
||||
Title: wsName,
|
||||
Body: truncate(msg, 100),
|
||||
Data: map[string]string{
|
||||
"type": "agent_message",
|
||||
"workspaceId": wsID,
|
||||
"workspaceSlug": os.Getenv("MOLECULE_ORG_SLUG"),
|
||||
},
|
||||
Sound: "default",
|
||||
Priority: "high",
|
||||
})
|
||||
}
|
||||
|
||||
results, err := n.sender.Send(ctx, messages)
|
||||
if err != nil {
|
||||
log.Printf("push: send failed for workspace %s: %v", wsID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove invalid tokens.
|
||||
for j, r := range results {
|
||||
if ShouldRemoveToken(r) {
|
||||
if delErr := n.repo.DeleteToken(ctx, wsID, batch[j].Token); delErr != nil {
|
||||
log.Printf("push: failed to delete invalid token for workspace %s: %v", wsID, delErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func truncate(s string, max int) string {
|
||||
if len(s) <= max {
|
||||
return s
|
||||
}
|
||||
return s[:max] + "…"
|
||||
}
|
||||
@@ -0,0 +1,159 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSenderSend(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
expoResponse := map[string]interface{}{
|
||||
"data": []map[string]interface{}{
|
||||
{"status": "ok", "id": "abc123"},
|
||||
{"status": "error", "message": "Invalid token", "details": map[string]string{"error": "DeviceNotRegistered"}},
|
||||
},
|
||||
}
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "POST", r.Method)
|
||||
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
||||
|
||||
var msgs []Message
|
||||
require.NoError(t, json.NewDecoder(r.Body).Decode(&msgs))
|
||||
assert.Len(t, msgs, 2)
|
||||
assert.Equal(t, "ExponentPushToken[test1]", msgs[0].To)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(expoResponse)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
sender := NewSender("")
|
||||
sender.apiURL = server.URL
|
||||
|
||||
results, err := sender.Send(context.Background(), []Message{
|
||||
{To: "ExponentPushToken[test1]", Title: "Test", Body: "Hello"},
|
||||
{To: "ExponentPushToken[test2]", Title: "Test", Body: "World"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, results, 2)
|
||||
assert.Equal(t, "ok", results[0].Status)
|
||||
assert.Equal(t, "error", results[1].Status)
|
||||
assert.True(t, ShouldRemoveToken(results[1]))
|
||||
}
|
||||
|
||||
func TestSenderSendEmpty(t *testing.T) {
|
||||
sender := NewSender("")
|
||||
results, err := sender.Send(context.Background(), nil)
|
||||
require.NoError(t, err)
|
||||
assert.Nil(t, results)
|
||||
}
|
||||
|
||||
func TestHandlerCreate(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
db, mock, err := sqlmock.New()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
mock.ExpectExec("INSERT INTO push_tokens").
|
||||
WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
repo := NewRepo(db)
|
||||
handler := NewHandler(repo)
|
||||
|
||||
router := gin.New()
|
||||
group := router.Group("/workspaces/:id")
|
||||
handler.RegisterRoutes(group)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
|
||||
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusNoContent, w.Code)
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestHandlerCreateInvalidPlatform(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
db, _, err := sqlmock.New()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
handler := NewHandler(NewRepo(db))
|
||||
|
||||
router := gin.New()
|
||||
group := router.Group("/workspaces/:id")
|
||||
handler.RegisterRoutes(group)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
body := `{"token":"ExponentPushToken[abc]","platform":"windows"}`
|
||||
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusBadRequest, w.Code)
|
||||
}
|
||||
|
||||
func TestHandlerDelete(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
db, mock, err := sqlmock.New()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
mock.ExpectExec("DELETE FROM push_tokens").
|
||||
WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
repo := NewRepo(db)
|
||||
handler := NewHandler(repo)
|
||||
|
||||
router := gin.New()
|
||||
group := router.Group("/workspaces/:id")
|
||||
handler.RegisterRoutes(group)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
body := `{"token":"ExponentPushToken[del]"}`
|
||||
req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
assert.Equal(t, http.StatusNoContent, w.Code)
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
|
||||
func TestRepoGetTokens(t *testing.T) {
|
||||
db, mock, err := sqlmock.New()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}).
|
||||
AddRow("1", "ws-1", "ExponentPushToken[a]", "ios", "2026-01-01T00:00:00Z").
|
||||
AddRow("2", "ws-1", "ExponentPushToken[b]", "android", "2026-01-01T00:00:00Z"))
|
||||
|
||||
repo := NewRepo(db)
|
||||
tokens, err := repo.GetTokens(context.Background(), "ws-1")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, tokens, 2)
|
||||
assert.Equal(t, "ExponentPushToken[a]", tokens[0].Token)
|
||||
assert.Equal(t, "ios", tokens[0].Platform)
|
||||
assert.Equal(t, "ExponentPushToken[b]", tokens[1].Token)
|
||||
require.NoError(t, mock.ExpectationsWereMet())
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Token is one registered push token for a workspace.
|
||||
type Token struct {
|
||||
ID string
|
||||
WorkspaceID string
|
||||
Token string
|
||||
Platform string
|
||||
CreatedAt string
|
||||
}
|
||||
|
||||
// Repo reads and writes push tokens in Postgres.
|
||||
type Repo struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewRepo creates a token repository backed by db.
|
||||
func NewRepo(db *sql.DB) *Repo {
|
||||
return &Repo{db: db}
|
||||
}
|
||||
|
||||
// SaveToken registers a push token for a workspace. If the same token already
|
||||
// exists for the workspace, it updates the timestamp.
|
||||
func (r *Repo) SaveToken(ctx context.Context, workspaceID, token, platform string) error {
|
||||
_, err := r.db.ExecContext(ctx, `
|
||||
INSERT INTO push_tokens (workspace_id, token, platform)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (workspace_id, token) DO UPDATE
|
||||
SET updated_at = now()
|
||||
`, workspaceID, token, platform)
|
||||
if err != nil {
|
||||
return fmt.Errorf("push_tokens: save: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteToken removes a push token. Returns nil even if the token did not exist.
|
||||
func (r *Repo) DeleteToken(ctx context.Context, workspaceID, token string) error {
|
||||
_, err := r.db.ExecContext(ctx, `
|
||||
DELETE FROM push_tokens
|
||||
WHERE workspace_id = $1 AND token = $2
|
||||
`, workspaceID, token)
|
||||
if err != nil {
|
||||
return fmt.Errorf("push_tokens: delete: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTokens returns all active push tokens for a workspace.
|
||||
func (r *Repo) GetTokens(ctx context.Context, workspaceID string) ([]Token, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT id, workspace_id, token, platform, created_at
|
||||
FROM push_tokens
|
||||
WHERE workspace_id = $1
|
||||
`, workspaceID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("push_tokens: list: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tokens []Token
|
||||
for rows.Next() {
|
||||
var t Token
|
||||
if err := rows.Scan(&t.ID, &t.WorkspaceID, &t.Token, &t.Platform, &t.CreatedAt); err != nil {
|
||||
return nil, fmt.Errorf("push_tokens: scan: %w", err)
|
||||
}
|
||||
tokens = append(tokens, t)
|
||||
}
|
||||
return tokens, rows.Err()
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
package push
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const expoPushAPI = "https://exp.host/--/api/v2/push/send"
|
||||
|
||||
// Message is one Expo push notification.
|
||||
type Message struct {
|
||||
To string `json:"to"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Body string `json:"body,omitempty"`
|
||||
Data map[string]string `json:"data,omitempty"`
|
||||
Sound string `json:"sound,omitempty"`
|
||||
Priority string `json:"priority,omitempty"`
|
||||
}
|
||||
|
||||
// Sender delivers push notifications via the Expo Push Service.
|
||||
type Sender struct {
|
||||
apiURL string
|
||||
httpClient *http.Client
|
||||
expoToken string // optional Expo access token for authenticated requests
|
||||
}
|
||||
|
||||
// NewSender creates a Sender. expoToken may be empty for unauthenticated
|
||||
// requests (sufficient for most use cases).
|
||||
func NewSender(expoToken string) *Sender {
|
||||
return &Sender{
|
||||
apiURL: expoPushAPI,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
},
|
||||
expoToken: expoToken,
|
||||
}
|
||||
}
|
||||
|
||||
// SendResult is the per-recipient status from Expo.
|
||||
type SendResult struct {
|
||||
Status string `json:"status"`
|
||||
ID string `json:"id"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Details struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
} `json:"details,omitempty"`
|
||||
}
|
||||
|
||||
// expoResponse is the wrapper shape returned by the Expo API.
|
||||
type expoResponse struct {
|
||||
Data []SendResult `json:"data"`
|
||||
}
|
||||
|
||||
// Send fires a batch of push messages. It returns a slice of results in the
|
||||
// same order as the input, plus an error only when the HTTP call itself fails.
|
||||
// Callers should inspect each result's Status field for per-message errors
|
||||
// (e.g. "DeviceNotRegistered" → token should be deleted).
|
||||
func (s *Sender) Send(ctx context.Context, messages []Message) ([]SendResult, error) {
|
||||
if len(messages) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
body, err := json.Marshal(messages)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("push: marshal: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.apiURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("push: new request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
req.Header.Set("Accept-Encoding", "gzip, deflate")
|
||||
if s.expoToken != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+s.expoToken)
|
||||
}
|
||||
|
||||
res, err := s.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("push: post: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("push: expo returned %d", res.StatusCode)
|
||||
}
|
||||
|
||||
var resp expoResponse
|
||||
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
|
||||
return nil, fmt.Errorf("push: decode: %w", err)
|
||||
}
|
||||
return resp.Data, nil
|
||||
}
|
||||
|
||||
// ShouldRemoveToken reports whether a SendResult indicates the token is no
|
||||
// longer valid and should be deleted from the database.
|
||||
func ShouldRemoveToken(r SendResult) bool {
|
||||
return r.Status == "error" && r.Details.Error == "DeviceNotRegistered"
|
||||
}
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
"github.com/docker/docker/client"
|
||||
@@ -318,13 +319,25 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
|
||||
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
|
||||
{
|
||||
// Push notifications (mobile)
|
||||
var pushNotifier *push.Notifier
|
||||
if expoToken := os.Getenv("EXPO_ACCESS_TOKEN"); expoToken != "" {
|
||||
pushNotifier = push.NewNotifier(db.DB, push.NewSender(expoToken))
|
||||
}
|
||||
|
||||
// Activity Logs
|
||||
acth := handlers.NewActivityHandler(broadcaster)
|
||||
acth := handlers.NewActivityHandler(broadcaster, pushNotifier)
|
||||
wsAuth.GET("/activity", acth.List)
|
||||
wsAuth.GET("/session-search", acth.SessionSearch)
|
||||
wsAuth.POST("/activity", acth.Report)
|
||||
wsAuth.POST("/notify", acth.Notify)
|
||||
|
||||
// Push token registration (mobile)
|
||||
if pushNotifier != nil {
|
||||
pushH := push.NewHandler(push.NewRepo(db.DB))
|
||||
pushH.RegisterRoutes(wsAuth)
|
||||
}
|
||||
|
||||
// Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue
|
||||
// #3026). Server-side rendering of activity_logs rows into
|
||||
// the canonical ChatMessage shape; storage is plugin-shaped
|
||||
@@ -428,7 +441,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// opencode session cannot saturate the platform.
|
||||
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
|
||||
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
|
||||
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
|
||||
mcpH := handlers.NewMCPHandler(db.DB, broadcaster, pushNotifier)
|
||||
if memBundle != nil {
|
||||
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
|
||||
}
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS push_tokens;
|
||||
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE push_tokens (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
||||
token TEXT NOT NULL,
|
||||
platform TEXT NOT NULL CHECK (platform IN ('ios', 'android')),
|
||||
created_at TIMESTAMPTZ DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ DEFAULT now(),
|
||||
UNIQUE(workspace_id, token)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_push_tokens_workspace ON push_tokens(workspace_id);
|
||||
Reference in New Issue
Block a user