feat(gateway/signal): native formatting, reply quotes, and reactions

Three Signal adapter improvements that depend on the no-edit-mode
plumbing from the previous commit.

1. Native formatting (markdown -> Signal bodyRanges)
   Signal renders markdown as literal characters (**bold**, `code`, #
   heading), which looks broken. Added _markdown_to_signal(text) that
   strips markdown syntax and emits Signal-native bodyRanges as
   start:length:STYLE entries. Offsets are computed in UTF-16 code
   units so non-BMP emoji stay aligned. Supports BOLD, ITALIC, STRIKE,
   MONO, and headings mapped to BOLD. Fenced code and inline code are
   handled; link syntax is unwrapped to visible text + URL.

   Includes edge-case fixes reported previously:
   - Bullet lists ("* item") no longer misidentified as italics
   - URLs containing underscores no longer italicized around the dot

2. Reply-quote context
   Parses dataMessage.quote on inbound messages and populates
   MessageEvent.raw_message with sender + timestamp_ms. This lets the
   gateway's existing [Replying to: "..."] injector (gateway/run.py)
   work on Signal, matching Telegram/Matrix behavior.

3. Processing reactions
   Overrides on_processing_start -> hourglass and on_processing_complete
   -> checkmark via the sendReaction JSON-RPC using targetAuthor and
   targetTimestamp pulled from raw_message. Uses the ProcessingOutcome
   enum introduced in the previous commit.

Also sets SUPPORTS_MESSAGE_EDITING = False on SignalAdapter so the
no-edit streaming path activates.

Tests: 40+ new tests in tests/gateway/test_signal_format.py covering
markdown conversion, UTF-16 offset correctness with non-BMP emoji,
bullet-list and URL false-positive regressions, reply-quote extraction,
and reaction payload shape. Regression extensions to test_signal.py.
This commit is contained in:
exiao 2026-04-17 00:20:22 -04:00 committed by Teknium
parent ed170f4333
commit 23f5fc6765
3 changed files with 847 additions and 13 deletions

View File

@ -31,6 +31,7 @@ from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
ProcessingOutcome,
SendResult,
cache_image_from_bytes,
cache_audio_from_bytes,
@ -162,6 +163,10 @@ class SignalAdapter(BasePlatformAdapter):
"""Signal messenger adapter using signal-cli HTTP daemon."""
platform = Platform.SIGNAL
# Signal has no real edit API for already-sent messages. Mark it explicitly
# so streaming suppresses the visible cursor instead of leaving a stale tofu
# square behind in chat clients when edit attempts fail.
SUPPORTS_MESSAGE_EDITING = False
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.SIGNAL)
@ -488,6 +493,11 @@ class SignalAdapter(BasePlatformAdapter):
if text and mentions:
text = _render_mentions(text, mentions)
# Extract quote (reply-to) context from Signal dataMessage
quote_data = data_message.get("quote") or {}
reply_to_id = str(quote_data.get("id")) if quote_data.get("id") else None
reply_to_text = quote_data.get("text")
# Process attachments
attachments_data = data_message.get("attachments", [])
media_urls = []
@ -541,7 +551,9 @@ class SignalAdapter(BasePlatformAdapter):
else:
timestamp = datetime.now(tz=timezone.utc)
# Build and dispatch event
# Build and dispatch event.
# Store raw envelope data in raw_message so on_processing_start/complete
# can extract targetAuthor + targetTimestamp for sendReaction.
event = MessageEvent(
source=source,
text=text or "",
@ -549,6 +561,9 @@ class SignalAdapter(BasePlatformAdapter):
media_urls=media_urls,
media_types=media_types,
timestamp=timestamp,
raw_message={"sender": sender, "timestamp_ms": ts_ms},
reply_to_message_id=reply_to_id,
reply_to_text=reply_to_text,
)
logger.debug("Signal: message from %s in %s: %s",
@ -707,6 +722,157 @@ class SignalAdapter(BasePlatformAdapter):
logger.debug("Signal RPC %s failed: %s", method, e)
return None
# ------------------------------------------------------------------
# Formatting — markdown → Signal body ranges
# ------------------------------------------------------------------
@staticmethod
def _markdown_to_signal(text: str) -> tuple:
"""Convert markdown to plain text + Signal textStyles list.
Signal doesn't render markdown. Instead it uses ``bodyRanges``
(exposed by signal-cli as ``textStyle`` / ``textStyles`` params)
with the format ``start:length:STYLE``.
Positions are measured in **UTF-16 code units** (not Python code
points) because that's what the Signal protocol uses.
Supported styles: BOLD, ITALIC, STRIKETHROUGH, MONOSPACE, SPOILER.
Returns ``(plain_text, styles_list)`` where *styles_list* may be
empty if there's nothing to format.
"""
import re
def _utf16_len(s: str) -> int:
"""Length of *s* in UTF-16 code units."""
return len(s.encode("utf-16-le")) // 2
# Pre-process: normalize whitespace before any position tracking
# so later operations don't invalidate recorded offsets.
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
styles: list = []
# --- Phase 1: fenced code blocks ```...``` → MONOSPACE ---
_CB = re.compile(r"```[a-zA-Z0-9_+-]*\n?(.*?)```", re.DOTALL)
while m := _CB.search(text):
inner = m.group(1).rstrip("\n")
start = m.start()
text = text[: m.start()] + inner + text[m.end() :]
styles.append((start, len(inner), "MONOSPACE"))
# --- Phase 2: heading markers # Foo → Foo (BOLD) ---
_HEADING = re.compile(r"^#{1,6}\s+", re.MULTILINE)
new_text = ""
last_end = 0
for m in _HEADING.finditer(text):
new_text += text[last_end : m.start()]
last_end = m.end()
eol = text.find("\n", m.end())
if eol == -1:
eol = len(text)
heading_text = text[m.end() : eol]
start = len(new_text)
new_text += heading_text
styles.append((start, len(heading_text), "BOLD"))
last_end = eol
new_text += text[last_end:]
text = new_text
# --- Phase 3: inline patterns (single-pass to avoid offset drift) ---
# The old code processed each pattern sequentially, stripping markers
# and recording positions per-pass. Later passes shifted text without
# adjusting earlier positions → bold/italic landed mid-word.
#
# Fix: collect ALL non-overlapping matches first, then strip every
# marker in one pass so positions are computed against the final text.
_PATTERNS = [
(re.compile(r"\*\*(.+?)\*\*", re.DOTALL), "BOLD"),
(re.compile(r"__(.+?)__", re.DOTALL), "BOLD"),
(re.compile(r"~~(.+?)~~", re.DOTALL), "STRIKETHROUGH"),
(re.compile(r"`(.+?)`"), "MONOSPACE"),
(re.compile(r"(?<!\*)\*(?!\*| )(.+?)(?<!\*)\*(?!\*)"), "ITALIC"),
(re.compile(r"(?<!\w)_(?!_)(.+?)(?<!_)_(?!\w)"), "ITALIC"),
]
# Collect all non-overlapping matches (earlier patterns win ties).
all_matches: list = [] # (start, end, g1_start, g1_end, style)
occupied: list = [] # (start, end) intervals already claimed
for pat, style in _PATTERNS:
for m in pat.finditer(text):
ms, me = m.start(), m.end()
if not any(ms < oe and me > os for os, oe in occupied):
all_matches.append((ms, me, m.start(1), m.end(1), style))
occupied.append((ms, me))
all_matches.sort()
# Build removal list so we can adjust Phase 1/2 styles.
# Each match removes its prefix markers (start..g1_start) and
# suffix markers (g1_end..end).
removals: list = [] # (position, length) sorted
for ms, me, g1s, g1e, _ in all_matches:
if g1s > ms:
removals.append((ms, g1s - ms))
if me > g1e:
removals.append((g1e, me - g1e))
removals.sort()
# Adjust Phase 1/2 styles for characters about to be removed.
def _adj(pos: int) -> int:
shift = 0
for rp, rl in removals:
if rp < pos:
shift += min(rl, pos - rp)
else:
break
return pos - shift
adjusted_prior: list = []
for s, l, st in styles:
ns = _adj(s)
ne = _adj(s + l)
if ne > ns:
adjusted_prior.append((ns, ne - ns, st))
# Strip all inline markers in one pass → positions are correct.
result = ""
last_end = 0
inline_styles: list = []
for ms, me, g1s, g1e, sty in all_matches:
result += text[last_end:ms]
pos = len(result)
inner = text[g1s:g1e]
result += inner
inline_styles.append((pos, len(inner), sty))
last_end = me
result += text[last_end:]
text = result
styles = adjusted_prior + inline_styles
# Convert code-point offsets → UTF-16 code-unit offsets
style_strings = []
for cp_start, cp_len, stype in sorted(styles):
# Safety: skip any out-of-bounds styles
if cp_start < 0 or cp_start + cp_len > len(text):
continue
u16_start = _utf16_len(text[:cp_start])
u16_len = _utf16_len(text[cp_start : cp_start + cp_len])
style_strings.append(f"{u16_start}:{u16_len}:{stype}")
return text, style_strings
def format_message(self, content: str) -> str:
"""Strip markdown for plain-text fallback (used by base class).
The actual rich formatting happens in send() via _markdown_to_signal().
"""
# This is only called if someone uses the base-class send path.
# Our send() override bypasses this entirely.
return content
# ------------------------------------------------------------------
# Sending
# ------------------------------------------------------------------
@ -718,14 +884,22 @@ class SignalAdapter(BasePlatformAdapter):
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a text message."""
"""Send a text message with native Signal formatting."""
await self._stop_typing_indicator(chat_id)
plain_text, text_styles = self._markdown_to_signal(content)
params: Dict[str, Any] = {
"account": self.account,
"message": content,
"message": plain_text,
}
if text_styles:
if len(text_styles) == 1:
params["textStyle"] = text_styles[0]
else:
params["textStyles"] = text_styles
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
@ -735,11 +909,10 @@ class SignalAdapter(BasePlatformAdapter):
if result is not None:
self._track_sent_timestamp(result)
# Use the timestamp from the RPC result as a pseudo message_id.
# Signal doesn't have real message IDs, but the stream consumer
# needs a truthy value to follow its edit→fallback path correctly.
_msg_id = str(result.get("timestamp", "")) if isinstance(result, dict) else None
return SendResult(success=True, message_id=_msg_id or None)
# Signal has no editable message identifier. Returning None keeps the
# stream consumer on the non-edit fallback path instead of pretending
# future edits can remove an in-progress cursor from the chat thread.
return SendResult(success=True, message_id=None)
return SendResult(success=False, error="RPC send failed")
def _track_sent_timestamp(self, rpc_result) -> None:
@ -963,6 +1136,110 @@ class SignalAdapter(BasePlatformAdapter):
_keep_typing finally block to clean up platform-level typing tasks."""
await self._stop_typing_indicator(chat_id)
# ------------------------------------------------------------------
# Reactions
# ------------------------------------------------------------------
async def send_reaction(
self,
chat_id: str,
emoji: str,
target_author: str,
target_timestamp: int,
) -> bool:
"""Send a reaction emoji to a specific message via signal-cli RPC.
Args:
chat_id: The chat (phone number or "group:<id>")
emoji: Reaction emoji string (e.g. "👀", "")
target_author: Phone number / UUID of the message author
target_timestamp: Signal timestamp (ms) of the message to react to
"""
params: Dict[str, Any] = {
"account": self.account,
"emoji": emoji,
"targetAuthor": target_author,
"targetTimestamp": target_timestamp,
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
result = await self._rpc("sendReaction", params)
if result is not None:
return True
logger.debug("Signal: sendReaction failed (chat=%s, emoji=%s)", chat_id[:20], emoji)
return False
async def remove_reaction(
self,
chat_id: str,
target_author: str,
target_timestamp: int,
) -> bool:
"""Remove a reaction by sending an empty-string emoji."""
params: Dict[str, Any] = {
"account": self.account,
"emoji": "",
"targetAuthor": target_author,
"targetTimestamp": target_timestamp,
"remove": True,
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
result = await self._rpc("sendReaction", params)
return result is not None
# ------------------------------------------------------------------
# Processing Lifecycle Hooks (reactions as progress indicators)
# ------------------------------------------------------------------
def _extract_reaction_target(self, event: MessageEvent) -> Optional[tuple]:
"""Extract (target_author, target_timestamp) from a MessageEvent.
Returns None if the event doesn't carry the raw Signal envelope data
needed for sendReaction.
"""
raw = event.raw_message
if not isinstance(raw, dict):
return None
author = raw.get("sender")
ts = raw.get("timestamp_ms")
if not author or not ts:
return None
return (author, ts)
async def on_processing_start(self, event: MessageEvent) -> None:
"""React with 👀 when processing begins."""
target = self._extract_reaction_target(event)
if target:
await self.send_reaction(event.source.chat_id, "👀", *target)
async def on_processing_complete(self, event: MessageEvent, outcome: "ProcessingOutcome") -> None:
"""Swap the 👀 reaction for ✅ (success) or ❌ (failure).
On CANCELLED we leave the 👀 in place no terminal outcome means
the reaction should keep reflecting "in progress" (matches Telegram).
"""
if outcome == ProcessingOutcome.CANCELLED:
return
target = self._extract_reaction_target(event)
if not target:
return
chat_id = event.source.chat_id
# Remove the in-progress reaction, then add the final one
await self.remove_reaction(chat_id, *target)
if outcome == ProcessingOutcome.SUCCESS:
await self.send_reaction(chat_id, "", *target)
elif outcome == ProcessingOutcome.FAILURE:
await self.send_reaction(chat_id, "", *target)
# ------------------------------------------------------------------
# Chat Info
# ------------------------------------------------------------------

View File

@ -800,15 +800,23 @@ class TestSignalSendDocumentViaHelper:
# ---------------------------------------------------------------------------
# send() returns message_id from timestamp (#4647)
# Signal streaming edit capability / message_id behavior
# ---------------------------------------------------------------------------
class TestSignalStreamingCapabilities:
"""Signal must opt out of edit-based streaming behavior."""
def test_signal_declares_no_message_editing(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
assert adapter.SUPPORTS_MESSAGE_EDITING is False
class TestSignalSendReturnsMessageId:
"""Signal send() must return a timestamp-based message_id so the stream
consumer can follow its editfallback path correctly."""
"""Signal send() should not pretend sent messages are editable."""
@pytest.mark.asyncio
async def test_send_returns_timestamp_as_message_id(self, monkeypatch):
async def test_send_returns_none_message_id_even_with_timestamp(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, _ = _stub_rpc({"timestamp": 1712345678000})
adapter._rpc = mock_rpc
@ -817,7 +825,7 @@ class TestSignalSendReturnsMessageId:
result = await adapter.send(chat_id="+155****4567", content="hello")
assert result.success is True
assert result.message_id == "1712345678000"
assert result.message_id is None
@pytest.mark.asyncio
async def test_send_returns_none_message_id_when_no_timestamp(self, monkeypatch):
@ -997,3 +1005,100 @@ class TestSignalTypingBackoff:
assert "+155****4567" not in adapter._typing_failures
assert "+155****4567" not in adapter._typing_skip_until
# ---------------------------------------------------------------------------
# Reply quote extraction
# ---------------------------------------------------------------------------
class TestSignalQuoteExtraction:
"""Verify Signal reply quote fields are propagated to MessageEvent."""
@pytest.mark.asyncio
async def test_handle_envelope_sets_reply_context_from_quote(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
captured = {}
async def fake_handle(event):
captured["event"] = event
adapter.handle_message = fake_handle
await adapter._handle_envelope({
"envelope": {
"sourceNumber": "+15550001111",
"sourceUuid": "uuid-sender",
"sourceName": "Tester",
"timestamp": 1000000000,
"dataMessage": {
"message": "yes I agree",
"quote": {
"id": 99,
"text": "want to grab lunch?",
"author": "+15550002222",
},
},
}
})
event = captured["event"]
assert event.text == "yes I agree"
assert event.reply_to_message_id == "99"
assert event.reply_to_text == "want to grab lunch?"
@pytest.mark.asyncio
async def test_handle_envelope_without_quote_leaves_reply_fields_none(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
captured = {}
async def fake_handle(event):
captured["event"] = event
adapter.handle_message = fake_handle
await adapter._handle_envelope({
"envelope": {
"sourceNumber": "+15550001111",
"sourceUuid": "uuid-sender",
"sourceName": "Tester",
"timestamp": 1000000000,
"dataMessage": {
"message": "plain message",
},
}
})
event = captured["event"]
assert event.text == "plain message"
assert event.reply_to_message_id is None
assert event.reply_to_text is None
@pytest.mark.asyncio
async def test_handle_envelope_quote_without_text_sets_only_reply_id(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
captured = {}
async def fake_handle(event):
captured["event"] = event
adapter.handle_message = fake_handle
await adapter._handle_envelope({
"envelope": {
"sourceNumber": "+15550001111",
"sourceUuid": "uuid-sender",
"sourceName": "Tester",
"timestamp": 1000000000,
"dataMessage": {
"message": "reply without quote text",
"quote": {
"id": 123,
"author": "+15550002222",
},
},
}
})
event = captured["event"]
assert event.reply_to_message_id == "123"
assert event.reply_to_text is None

View File

@ -0,0 +1,452 @@
"""Tests for Signal _markdown_to_signal() formatting.
Covers the markdown-to-bodyRanges conversion pipeline: bold, italic,
strikethrough, monospace, code blocks, headings, and critically the
false-positive regressions that caused spurious italics in production.
"""
import pytest
from gateway.config import PlatformConfig
from gateway.platforms.signal import SignalAdapter
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _m2s(text: str):
"""Shorthand: call the static method and return (plain_text, styles)."""
return SignalAdapter._markdown_to_signal(text)
def _style_types(styles: list[str]) -> list[str]:
"""Extract just the STYLE part from '0:4:BOLD' strings."""
return [s.rsplit(":", 1)[1] for s in styles]
def _find_style(styles: list[str], style_type: str) -> list[str]:
"""Return only styles matching a given type."""
return [s for s in styles if s.endswith(f":{style_type}")]
# ===========================================================================
# Basic formatting
# ===========================================================================
class TestMarkdownToSignalBasic:
"""Core formatting: bold, italic, strikethrough, monospace."""
def test_bold_double_asterisk(self):
text, styles = _m2s("hello **world**")
assert text == "hello world"
assert len(styles) == 1
assert styles[0].endswith(":BOLD")
def test_bold_double_underscore(self):
text, styles = _m2s("hello __world__")
assert text == "hello world"
assert len(styles) == 1
assert styles[0].endswith(":BOLD")
def test_italic_single_asterisk(self):
text, styles = _m2s("hello *world*")
assert text == "hello world"
assert len(styles) == 1
assert styles[0].endswith(":ITALIC")
def test_italic_single_underscore(self):
text, styles = _m2s("hello _world_")
assert text == "hello world"
assert len(styles) == 1
assert styles[0].endswith(":ITALIC")
def test_strikethrough(self):
text, styles = _m2s("hello ~~world~~")
assert text == "hello world"
assert len(styles) == 1
assert styles[0].endswith(":STRIKETHROUGH")
def test_inline_monospace(self):
text, styles = _m2s("run `ls -la` now")
assert text == "run ls -la now"
assert len(styles) == 1
assert styles[0].endswith(":MONOSPACE")
def test_fenced_code_block(self):
text, styles = _m2s("before\n```\ncode here\n```\nafter")
assert "code here" in text
assert "```" not in text
assert any(s.endswith(":MONOSPACE") for s in styles)
def test_heading_becomes_bold(self):
text, styles = _m2s("## Section Title")
assert text == "Section Title"
assert len(styles) == 1
assert styles[0].endswith(":BOLD")
def test_multiple_styles(self):
text, styles = _m2s("**bold** and *italic*")
assert text == "bold and italic"
types = _style_types(styles)
assert "BOLD" in types
assert "ITALIC" in types
def test_plain_text_no_styles(self):
text, styles = _m2s("just plain text")
assert text == "just plain text"
assert styles == []
def test_empty_string(self):
text, styles = _m2s("")
assert text == ""
assert styles == []
# ===========================================================================
# Italic false-positive regressions
# ===========================================================================
class TestItalicFalsePositives:
"""Regressions from signal-italic-false-positive-fix.md and
signal-italic-bullet-list-fix.md."""
# --- snake_case (original fix) ---
def test_snake_case_not_italic(self):
"""snake_case identifiers must NOT be italicized."""
text, styles = _m2s("the config_file is ready")
assert text == "the config_file is ready"
assert _find_style(styles, "ITALIC") == []
def test_multiple_snake_case(self):
text, styles = _m2s("set OPENAI_API_KEY and ANTHROPIC_API_KEY")
assert _find_style(styles, "ITALIC") == []
def test_snake_case_path(self):
text, styles = _m2s("/tools/delegate_tool.py")
assert _find_style(styles, "ITALIC") == []
def test_snake_case_between_words(self):
"""file_path and error_code — underscores between words."""
text, styles = _m2s("file_path and error_code")
assert _find_style(styles, "ITALIC") == []
# --- Bullet lists (second fix) ---
def test_bullet_list_not_italic(self):
"""* item lines must NOT be treated as italic delimiters."""
md = "* item one\n* item two\n* item three"
text, styles = _m2s(md)
assert _find_style(styles, "ITALIC") == []
def test_bullet_list_with_content_before(self):
md = "Here are things:\n\n* first thing\n* second thing"
text, styles = _m2s(md)
assert _find_style(styles, "ITALIC") == []
def test_bullet_list_file_paths(self):
"""Real-world case that triggered the bug."""
md = (
"* tools/delegate_tool.py — delegation\n"
"* tools/file_tools.py — file operations\n"
"* tools/web_tools.py — web operations"
)
text, styles = _m2s(md)
assert _find_style(styles, "ITALIC") == []
def test_bullet_with_italic_inside(self):
"""Italic *inside* a bullet item should still work."""
md = "* this has *emphasis* inside\n* plain item"
text, styles = _m2s(md)
italic_styles = _find_style(styles, "ITALIC")
assert len(italic_styles) == 1
# The italic should cover "emphasis", not the whole bullet
assert "emphasis" in text
# --- Cross-line spans (DOTALL removal) ---
def test_star_italic_no_cross_line(self):
"""*foo\\nbar* must NOT match as italic (no DOTALL)."""
text, styles = _m2s("*foo\nbar*")
assert _find_style(styles, "ITALIC") == []
def test_underscore_italic_no_cross_line(self):
"""_foo\\nbar_ must NOT match as italic (no DOTALL)."""
text, styles = _m2s("_foo\nbar_")
assert _find_style(styles, "ITALIC") == []
def test_star_italic_multiline_response(self):
"""Multi-paragraph response with * should not false-positive."""
md = (
"I checked the following files:\n\n"
"* tools/delegate_tool.py — sub-agent delegation\n"
"* tools/file_tools.py — file read/write/search\n"
"* tools/web_tools.py — web search/extract\n\n"
"Everything looks good."
)
text, styles = _m2s(md)
assert _find_style(styles, "ITALIC") == []
# --- Legitimate italic still works ---
def test_star_italic_still_works(self):
text, styles = _m2s("this is *italic* text")
assert text == "this is italic text"
assert len(_find_style(styles, "ITALIC")) == 1
def test_underscore_italic_still_works(self):
text, styles = _m2s("this is _italic_ text")
assert text == "this is italic text"
assert len(_find_style(styles, "ITALIC")) == 1
def test_multiple_italic_same_line(self):
text, styles = _m2s("*foo* and *bar* ok")
assert text == "foo and bar ok"
assert len(_find_style(styles, "ITALIC")) == 2
def test_italic_single_word(self):
text, styles = _m2s("*word*")
assert text == "word"
assert len(_find_style(styles, "ITALIC")) == 1
def test_italic_multi_word(self):
text, styles = _m2s("*several words here*")
assert text == "several words here"
assert len(_find_style(styles, "ITALIC")) == 1
# ===========================================================================
# Style position accuracy
# ===========================================================================
class TestStylePositions:
"""Verify that start:length positions map to the correct text."""
def _extract(self, text: str, style_str: str) -> str:
"""Given 'start:length:STYLE', extract the substring from text."""
# Positions are UTF-16 code units; for ASCII they match code points
parts = style_str.split(":")
start, length = int(parts[0]), int(parts[1])
# Encode to UTF-16-LE, slice, decode back
encoded = text.encode("utf-16-le")
extracted = encoded[start * 2 : (start + length) * 2]
return extracted.decode("utf-16-le")
def test_bold_position(self):
text, styles = _m2s("hello **world** end")
assert len(styles) == 1
assert self._extract(text, styles[0]) == "world"
def test_italic_position(self):
text, styles = _m2s("hello *world* end")
assert len(styles) == 1
assert self._extract(text, styles[0]) == "world"
def test_multiple_styles_positions(self):
text, styles = _m2s("**bold** then *italic*")
assert len(styles) == 2
extracted = {self._extract(text, s) for s in styles}
assert extracted == {"bold", "italic"}
def test_emoji_utf16_offset(self):
"""Emoji (multi-byte UTF-16) before a styled span."""
text, styles = _m2s("👋 **hello**")
assert text == "👋 hello"
assert len(styles) == 1
assert self._extract(text, styles[0]) == "hello"
# ===========================================================================
# Edge cases
# ===========================================================================
class TestEdgeCases:
"""Tricky inputs that have caused issues or could regress."""
def test_bold_inside_bullet(self):
"""Bold inside a bullet list item."""
md = "* **important** item\n* normal item"
text, styles = _m2s(md)
assert len(_find_style(styles, "BOLD")) == 1
assert _find_style(styles, "ITALIC") == []
def test_code_span_with_underscores(self):
"""`snake_case_var` — backtick takes priority over underscore."""
text, styles = _m2s("use `my_var_name` here")
assert text == "use my_var_name here"
types = _style_types(styles)
assert "MONOSPACE" in types
assert "ITALIC" not in types
def test_bold_and_italic_nested(self):
"""***bold+italic*** — bold captured, not italic (bold pattern first)."""
text, styles = _m2s("***word***")
# ** matches bold around *word*, or *** is ambiguous;
# either way there should be no false italic of the whole string
assert "word" in text
def test_lone_asterisk(self):
"""A single * with no pair should not cause issues."""
text, styles = _m2s("5 * 3 = 15")
# Should not crash; any italic match would be a false positive
assert "5" in text and "15" in text
def test_lone_underscore(self):
"""A single _ with no pair."""
text, styles = _m2s("this _ that")
assert text == "this _ that"
def test_consecutive_underscored_words(self):
"""_foo and _bar (leading underscores, no closers)."""
text, styles = _m2s("call _init and _setup")
assert _find_style(styles, "ITALIC") == []
def test_mixed_formatting_no_bleed(self):
"""Multiple format types don't bleed into each other."""
md = "**bold** and `code` and *italic* and ~~strike~~"
text, styles = _m2s(md)
assert text == "bold and code and italic and strike"
types = _style_types(styles)
assert sorted(types) == ["BOLD", "ITALIC", "MONOSPACE", "STRIKETHROUGH"]
# ===========================================================================
# signal-markdown-strip-patch: core conversion pipeline
# ===========================================================================
class TestMarkdownStripPatch:
"""Tests for the original signal-markdown-strip-patch.
Covers: fenced code blocks with language tags, links preserved,
headings converted to bold, multiple headings, UTF-16 correctness
for multi-byte characters, and marker stripping completeness.
"""
def test_fenced_code_block_with_language_tag(self):
"""```python\\ncode\\n``` — language tag is stripped, content is MONOSPACE."""
text, styles = _m2s("```python\nprint('hello')\n```")
assert "```" not in text
assert "python" not in text # language tag stripped
assert "print('hello')" in text
assert any(s.endswith(":MONOSPACE") for s in styles)
def test_fenced_code_block_multiline(self):
"""Multi-line code blocks preserve all lines."""
md = "```\nline1\nline2\nline3\n```"
text, styles = _m2s(md)
assert "line1" in text
assert "line2" in text
assert "line3" in text
assert "```" not in text
def test_links_preserved(self):
"""[text](url) links are kept as-is — Signal auto-linkifies."""
md = "Check [this link](https://example.com) for details"
text, styles = _m2s(md)
# Links should pass through — either as markdown or just preserved
assert "https://example.com" in text
def test_heading_h1(self):
"""# H1 becomes bold text."""
text, styles = _m2s("# Main Title")
assert text == "Main Title"
assert len(styles) == 1
assert styles[0].endswith(":BOLD")
def test_heading_h3(self):
"""### H3 becomes bold text."""
text, styles = _m2s("### Sub Section")
assert text == "Sub Section"
assert len(styles) == 1
assert styles[0].endswith(":BOLD")
def test_multiple_headings(self):
"""Multiple headings each become separate bold spans."""
md = "## First\n\nSome text\n\n## Second"
text, styles = _m2s(md)
assert "First" in text
assert "Second" in text
assert "##" not in text
bold_styles = _find_style(styles, "BOLD")
assert len(bold_styles) == 2
def test_no_raw_markdown_markers_in_output(self):
"""All markdown syntax is stripped from plain text output."""
md = "**bold** and *italic* and ~~struck~~ and `code` and ## heading"
text, styles = _m2s(md)
assert "**" not in text
assert "~~" not in text
assert "`" not in text
# ## at end might remain if not at line start — that's ok
# The important thing is styled markers are stripped
def test_utf16_surrogate_pair_emoji(self):
"""Emoji requiring UTF-16 surrogate pairs don't corrupt offsets."""
# 🎉 is U+1F389 — requires surrogate pair (2 UTF-16 code units)
text, styles = _m2s("🎉🎉 **test**")
assert "test" in text
assert len(styles) == 1
# Verify the style position is correct
parts = styles[0].split(":")
start, length = int(parts[0]), int(parts[1])
# 🎉🎉 = 4 UTF-16 code units + space = 5, then "test" = 4
assert start == 5
assert length == 4
def test_consecutive_newlines_collapsed(self):
"""3+ consecutive newlines are collapsed to 2."""
text, styles = _m2s("first\n\n\n\n\nsecond")
assert "\n\n\n" not in text
assert "first" in text
assert "second" in text
def test_empty_bold_not_crash(self):
"""**** (empty bold) should not crash."""
text, styles = _m2s("before **** after")
# Should not raise — exact output doesn't matter much
assert "before" in text
# ===========================================================================
# signal-streaming-patch: SUPPORTS_MESSAGE_EDITING and send() behavior
# ===========================================================================
class TestSignalStreamingPatch:
"""Tests for signal-streaming-patch: cursor suppression and edit support.
These verify the adapter-level properties that prevent the streaming
cursor from leaking into Signal messages.
"""
def test_signal_does_not_support_editing(self, monkeypatch):
"""SignalAdapter.SUPPORTS_MESSAGE_EDITING must be False."""
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
from gateway.platforms.signal import SignalAdapter
assert SignalAdapter.SUPPORTS_MESSAGE_EDITING is False
@pytest.mark.asyncio
async def test_send_returns_no_message_id(self, monkeypatch):
"""send() returns message_id=None so stream consumer uses no-edit path."""
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
from gateway.platforms.signal import SignalAdapter
from gateway.config import PlatformConfig
config = PlatformConfig(enabled=True)
config.extra = {
"http_url": "http://localhost:8080",
"account": "+15551234567",
}
adapter = SignalAdapter(config)
# Mock the RPC call
async def mock_rpc(method, params, rpc_id=None):
return {"timestamp": 1234567890}
adapter._rpc = mock_rpc
result = await adapter.send(
chat_id="+15559876543",
content="Hello",
)
assert result.message_id is None