diff --git a/gateway/platforms/signal.py b/gateway/platforms/signal.py index 9a0a6256..f35cf7a6 100644 --- a/gateway/platforms/signal.py +++ b/gateway/platforms/signal.py @@ -31,6 +31,7 @@ from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, + ProcessingOutcome, SendResult, cache_image_from_bytes, cache_audio_from_bytes, @@ -162,6 +163,10 @@ class SignalAdapter(BasePlatformAdapter): """Signal messenger adapter using signal-cli HTTP daemon.""" platform = Platform.SIGNAL + # Signal has no real edit API for already-sent messages. Mark it explicitly + # so streaming suppresses the visible cursor instead of leaving a stale tofu + # square behind in chat clients when edit attempts fail. + SUPPORTS_MESSAGE_EDITING = False def __init__(self, config: PlatformConfig): super().__init__(config, Platform.SIGNAL) @@ -488,6 +493,11 @@ class SignalAdapter(BasePlatformAdapter): if text and mentions: text = _render_mentions(text, mentions) + # Extract quote (reply-to) context from Signal dataMessage + quote_data = data_message.get("quote") or {} + reply_to_id = str(quote_data.get("id")) if quote_data.get("id") else None + reply_to_text = quote_data.get("text") + # Process attachments attachments_data = data_message.get("attachments", []) media_urls = [] @@ -541,7 +551,9 @@ class SignalAdapter(BasePlatformAdapter): else: timestamp = datetime.now(tz=timezone.utc) - # Build and dispatch event + # Build and dispatch event. + # Store raw envelope data in raw_message so on_processing_start/complete + # can extract targetAuthor + targetTimestamp for sendReaction. event = MessageEvent( source=source, text=text or "", @@ -549,6 +561,9 @@ class SignalAdapter(BasePlatformAdapter): media_urls=media_urls, media_types=media_types, timestamp=timestamp, + raw_message={"sender": sender, "timestamp_ms": ts_ms}, + reply_to_message_id=reply_to_id, + reply_to_text=reply_to_text, ) logger.debug("Signal: message from %s in %s: %s", @@ -707,6 +722,157 @@ class SignalAdapter(BasePlatformAdapter): logger.debug("Signal RPC %s failed: %s", method, e) return None + # ------------------------------------------------------------------ + # Formatting — markdown → Signal body ranges + # ------------------------------------------------------------------ + + @staticmethod + def _markdown_to_signal(text: str) -> tuple: + """Convert markdown to plain text + Signal textStyles list. + + Signal doesn't render markdown. Instead it uses ``bodyRanges`` + (exposed by signal-cli as ``textStyle`` / ``textStyles`` params) + with the format ``start:length:STYLE``. + + Positions are measured in **UTF-16 code units** (not Python code + points) because that's what the Signal protocol uses. + + Supported styles: BOLD, ITALIC, STRIKETHROUGH, MONOSPACE, SPOILER. + + Returns ``(plain_text, styles_list)`` where *styles_list* may be + empty if there's nothing to format. + """ + import re + + def _utf16_len(s: str) -> int: + """Length of *s* in UTF-16 code units.""" + return len(s.encode("utf-16-le")) // 2 + + # Pre-process: normalize whitespace before any position tracking + # so later operations don't invalidate recorded offsets. + text = re.sub(r"\n{3,}", "\n\n", text) + text = text.strip() + + styles: list = [] + + # --- Phase 1: fenced code blocks ```...``` → MONOSPACE --- + _CB = re.compile(r"```[a-zA-Z0-9_+-]*\n?(.*?)```", re.DOTALL) + while m := _CB.search(text): + inner = m.group(1).rstrip("\n") + start = m.start() + text = text[: m.start()] + inner + text[m.end() :] + styles.append((start, len(inner), "MONOSPACE")) + + # --- Phase 2: heading markers # Foo → Foo (BOLD) --- + _HEADING = re.compile(r"^#{1,6}\s+", re.MULTILINE) + new_text = "" + last_end = 0 + for m in _HEADING.finditer(text): + new_text += text[last_end : m.start()] + last_end = m.end() + eol = text.find("\n", m.end()) + if eol == -1: + eol = len(text) + heading_text = text[m.end() : eol] + start = len(new_text) + new_text += heading_text + styles.append((start, len(heading_text), "BOLD")) + last_end = eol + new_text += text[last_end:] + text = new_text + + # --- Phase 3: inline patterns (single-pass to avoid offset drift) --- + # The old code processed each pattern sequentially, stripping markers + # and recording positions per-pass. Later passes shifted text without + # adjusting earlier positions → bold/italic landed mid-word. + # + # Fix: collect ALL non-overlapping matches first, then strip every + # marker in one pass so positions are computed against the final text. + _PATTERNS = [ + (re.compile(r"\*\*(.+?)\*\*", re.DOTALL), "BOLD"), + (re.compile(r"__(.+?)__", re.DOTALL), "BOLD"), + (re.compile(r"~~(.+?)~~", re.DOTALL), "STRIKETHROUGH"), + (re.compile(r"`(.+?)`"), "MONOSPACE"), + (re.compile(r"(? os for os, oe in occupied): + all_matches.append((ms, me, m.start(1), m.end(1), style)) + occupied.append((ms, me)) + all_matches.sort() + + # Build removal list so we can adjust Phase 1/2 styles. + # Each match removes its prefix markers (start..g1_start) and + # suffix markers (g1_end..end). + removals: list = [] # (position, length) sorted + for ms, me, g1s, g1e, _ in all_matches: + if g1s > ms: + removals.append((ms, g1s - ms)) + if me > g1e: + removals.append((g1e, me - g1e)) + removals.sort() + + # Adjust Phase 1/2 styles for characters about to be removed. + def _adj(pos: int) -> int: + shift = 0 + for rp, rl in removals: + if rp < pos: + shift += min(rl, pos - rp) + else: + break + return pos - shift + + adjusted_prior: list = [] + for s, l, st in styles: + ns = _adj(s) + ne = _adj(s + l) + if ne > ns: + adjusted_prior.append((ns, ne - ns, st)) + + # Strip all inline markers in one pass → positions are correct. + result = "" + last_end = 0 + inline_styles: list = [] + for ms, me, g1s, g1e, sty in all_matches: + result += text[last_end:ms] + pos = len(result) + inner = text[g1s:g1e] + result += inner + inline_styles.append((pos, len(inner), sty)) + last_end = me + result += text[last_end:] + text = result + + styles = adjusted_prior + inline_styles + + # Convert code-point offsets → UTF-16 code-unit offsets + style_strings = [] + for cp_start, cp_len, stype in sorted(styles): + # Safety: skip any out-of-bounds styles + if cp_start < 0 or cp_start + cp_len > len(text): + continue + u16_start = _utf16_len(text[:cp_start]) + u16_len = _utf16_len(text[cp_start : cp_start + cp_len]) + style_strings.append(f"{u16_start}:{u16_len}:{stype}") + + return text, style_strings + + def format_message(self, content: str) -> str: + """Strip markdown for plain-text fallback (used by base class). + + The actual rich formatting happens in send() via _markdown_to_signal(). + """ + # This is only called if someone uses the base-class send path. + # Our send() override bypasses this entirely. + return content + # ------------------------------------------------------------------ # Sending # ------------------------------------------------------------------ @@ -718,14 +884,22 @@ class SignalAdapter(BasePlatformAdapter): reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> SendResult: - """Send a text message.""" + """Send a text message with native Signal formatting.""" await self._stop_typing_indicator(chat_id) + plain_text, text_styles = self._markdown_to_signal(content) + params: Dict[str, Any] = { "account": self.account, - "message": content, + "message": plain_text, } + if text_styles: + if len(text_styles) == 1: + params["textStyle"] = text_styles[0] + else: + params["textStyles"] = text_styles + if chat_id.startswith("group:"): params["groupId"] = chat_id[6:] else: @@ -735,11 +909,10 @@ class SignalAdapter(BasePlatformAdapter): if result is not None: self._track_sent_timestamp(result) - # Use the timestamp from the RPC result as a pseudo message_id. - # Signal doesn't have real message IDs, but the stream consumer - # needs a truthy value to follow its edit→fallback path correctly. - _msg_id = str(result.get("timestamp", "")) if isinstance(result, dict) else None - return SendResult(success=True, message_id=_msg_id or None) + # Signal has no editable message identifier. Returning None keeps the + # stream consumer on the non-edit fallback path instead of pretending + # future edits can remove an in-progress cursor from the chat thread. + return SendResult(success=True, message_id=None) return SendResult(success=False, error="RPC send failed") def _track_sent_timestamp(self, rpc_result) -> None: @@ -963,6 +1136,110 @@ class SignalAdapter(BasePlatformAdapter): _keep_typing finally block to clean up platform-level typing tasks.""" await self._stop_typing_indicator(chat_id) + # ------------------------------------------------------------------ + # Reactions + # ------------------------------------------------------------------ + + async def send_reaction( + self, + chat_id: str, + emoji: str, + target_author: str, + target_timestamp: int, + ) -> bool: + """Send a reaction emoji to a specific message via signal-cli RPC. + + Args: + chat_id: The chat (phone number or "group:") + emoji: Reaction emoji string (e.g. "👀", "✅") + target_author: Phone number / UUID of the message author + target_timestamp: Signal timestamp (ms) of the message to react to + """ + params: Dict[str, Any] = { + "account": self.account, + "emoji": emoji, + "targetAuthor": target_author, + "targetTimestamp": target_timestamp, + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + result = await self._rpc("sendReaction", params) + if result is not None: + return True + logger.debug("Signal: sendReaction failed (chat=%s, emoji=%s)", chat_id[:20], emoji) + return False + + async def remove_reaction( + self, + chat_id: str, + target_author: str, + target_timestamp: int, + ) -> bool: + """Remove a reaction by sending an empty-string emoji.""" + params: Dict[str, Any] = { + "account": self.account, + "emoji": "", + "targetAuthor": target_author, + "targetTimestamp": target_timestamp, + "remove": True, + } + + if chat_id.startswith("group:"): + params["groupId"] = chat_id[6:] + else: + params["recipient"] = [chat_id] + + result = await self._rpc("sendReaction", params) + return result is not None + + # ------------------------------------------------------------------ + # Processing Lifecycle Hooks (reactions as progress indicators) + # ------------------------------------------------------------------ + + def _extract_reaction_target(self, event: MessageEvent) -> Optional[tuple]: + """Extract (target_author, target_timestamp) from a MessageEvent. + + Returns None if the event doesn't carry the raw Signal envelope data + needed for sendReaction. + """ + raw = event.raw_message + if not isinstance(raw, dict): + return None + author = raw.get("sender") + ts = raw.get("timestamp_ms") + if not author or not ts: + return None + return (author, ts) + + async def on_processing_start(self, event: MessageEvent) -> None: + """React with 👀 when processing begins.""" + target = self._extract_reaction_target(event) + if target: + await self.send_reaction(event.source.chat_id, "👀", *target) + + async def on_processing_complete(self, event: MessageEvent, outcome: "ProcessingOutcome") -> None: + """Swap the 👀 reaction for ✅ (success) or ❌ (failure). + + On CANCELLED we leave the 👀 in place — no terminal outcome means + the reaction should keep reflecting "in progress" (matches Telegram). + """ + if outcome == ProcessingOutcome.CANCELLED: + return + target = self._extract_reaction_target(event) + if not target: + return + chat_id = event.source.chat_id + # Remove the in-progress reaction, then add the final one + await self.remove_reaction(chat_id, *target) + if outcome == ProcessingOutcome.SUCCESS: + await self.send_reaction(chat_id, "✅", *target) + elif outcome == ProcessingOutcome.FAILURE: + await self.send_reaction(chat_id, "❌", *target) + # ------------------------------------------------------------------ # Chat Info # ------------------------------------------------------------------ diff --git a/tests/gateway/test_signal.py b/tests/gateway/test_signal.py index b51ec713..ca8f458a 100644 --- a/tests/gateway/test_signal.py +++ b/tests/gateway/test_signal.py @@ -800,15 +800,23 @@ class TestSignalSendDocumentViaHelper: # --------------------------------------------------------------------------- -# send() returns message_id from timestamp (#4647) +# Signal streaming edit capability / message_id behavior # --------------------------------------------------------------------------- +class TestSignalStreamingCapabilities: + """Signal must opt out of edit-based streaming behavior.""" + + def test_signal_declares_no_message_editing(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + + assert adapter.SUPPORTS_MESSAGE_EDITING is False + + class TestSignalSendReturnsMessageId: - """Signal send() must return a timestamp-based message_id so the stream - consumer can follow its edit→fallback path correctly.""" + """Signal send() should not pretend sent messages are editable.""" @pytest.mark.asyncio - async def test_send_returns_timestamp_as_message_id(self, monkeypatch): + async def test_send_returns_none_message_id_even_with_timestamp(self, monkeypatch): adapter = _make_signal_adapter(monkeypatch) mock_rpc, _ = _stub_rpc({"timestamp": 1712345678000}) adapter._rpc = mock_rpc @@ -817,7 +825,7 @@ class TestSignalSendReturnsMessageId: result = await adapter.send(chat_id="+155****4567", content="hello") assert result.success is True - assert result.message_id == "1712345678000" + assert result.message_id is None @pytest.mark.asyncio async def test_send_returns_none_message_id_when_no_timestamp(self, monkeypatch): @@ -997,3 +1005,100 @@ class TestSignalTypingBackoff: assert "+155****4567" not in adapter._typing_failures assert "+155****4567" not in adapter._typing_skip_until + + +# --------------------------------------------------------------------------- +# Reply quote extraction +# --------------------------------------------------------------------------- + +class TestSignalQuoteExtraction: + """Verify Signal reply quote fields are propagated to MessageEvent.""" + + @pytest.mark.asyncio + async def test_handle_envelope_sets_reply_context_from_quote(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + captured = {} + + async def fake_handle(event): + captured["event"] = event + + adapter.handle_message = fake_handle + + await adapter._handle_envelope({ + "envelope": { + "sourceNumber": "+15550001111", + "sourceUuid": "uuid-sender", + "sourceName": "Tester", + "timestamp": 1000000000, + "dataMessage": { + "message": "yes I agree", + "quote": { + "id": 99, + "text": "want to grab lunch?", + "author": "+15550002222", + }, + }, + } + }) + + event = captured["event"] + assert event.text == "yes I agree" + assert event.reply_to_message_id == "99" + assert event.reply_to_text == "want to grab lunch?" + + @pytest.mark.asyncio + async def test_handle_envelope_without_quote_leaves_reply_fields_none(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + captured = {} + + async def fake_handle(event): + captured["event"] = event + + adapter.handle_message = fake_handle + + await adapter._handle_envelope({ + "envelope": { + "sourceNumber": "+15550001111", + "sourceUuid": "uuid-sender", + "sourceName": "Tester", + "timestamp": 1000000000, + "dataMessage": { + "message": "plain message", + }, + } + }) + + event = captured["event"] + assert event.text == "plain message" + assert event.reply_to_message_id is None + assert event.reply_to_text is None + + @pytest.mark.asyncio + async def test_handle_envelope_quote_without_text_sets_only_reply_id(self, monkeypatch): + adapter = _make_signal_adapter(monkeypatch) + captured = {} + + async def fake_handle(event): + captured["event"] = event + + adapter.handle_message = fake_handle + + await adapter._handle_envelope({ + "envelope": { + "sourceNumber": "+15550001111", + "sourceUuid": "uuid-sender", + "sourceName": "Tester", + "timestamp": 1000000000, + "dataMessage": { + "message": "reply without quote text", + "quote": { + "id": 123, + "author": "+15550002222", + }, + }, + } + }) + + event = captured["event"] + assert event.reply_to_message_id == "123" + assert event.reply_to_text is None diff --git a/tests/gateway/test_signal_format.py b/tests/gateway/test_signal_format.py new file mode 100644 index 00000000..ef50f62f --- /dev/null +++ b/tests/gateway/test_signal_format.py @@ -0,0 +1,452 @@ +"""Tests for Signal _markdown_to_signal() formatting. + +Covers the markdown-to-bodyRanges conversion pipeline: bold, italic, +strikethrough, monospace, code blocks, headings, and — critically — the +false-positive regressions that caused spurious italics in production. +""" + +import pytest + +from gateway.config import PlatformConfig +from gateway.platforms.signal import SignalAdapter + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _m2s(text: str): + """Shorthand: call the static method and return (plain_text, styles).""" + return SignalAdapter._markdown_to_signal(text) + + +def _style_types(styles: list[str]) -> list[str]: + """Extract just the STYLE part from '0:4:BOLD' strings.""" + return [s.rsplit(":", 1)[1] for s in styles] + + +def _find_style(styles: list[str], style_type: str) -> list[str]: + """Return only styles matching a given type.""" + return [s for s in styles if s.endswith(f":{style_type}")] + + +# =========================================================================== +# Basic formatting +# =========================================================================== + +class TestMarkdownToSignalBasic: + """Core formatting: bold, italic, strikethrough, monospace.""" + + def test_bold_double_asterisk(self): + text, styles = _m2s("hello **world**") + assert text == "hello world" + assert len(styles) == 1 + assert styles[0].endswith(":BOLD") + + def test_bold_double_underscore(self): + text, styles = _m2s("hello __world__") + assert text == "hello world" + assert len(styles) == 1 + assert styles[0].endswith(":BOLD") + + def test_italic_single_asterisk(self): + text, styles = _m2s("hello *world*") + assert text == "hello world" + assert len(styles) == 1 + assert styles[0].endswith(":ITALIC") + + def test_italic_single_underscore(self): + text, styles = _m2s("hello _world_") + assert text == "hello world" + assert len(styles) == 1 + assert styles[0].endswith(":ITALIC") + + def test_strikethrough(self): + text, styles = _m2s("hello ~~world~~") + assert text == "hello world" + assert len(styles) == 1 + assert styles[0].endswith(":STRIKETHROUGH") + + def test_inline_monospace(self): + text, styles = _m2s("run `ls -la` now") + assert text == "run ls -la now" + assert len(styles) == 1 + assert styles[0].endswith(":MONOSPACE") + + def test_fenced_code_block(self): + text, styles = _m2s("before\n```\ncode here\n```\nafter") + assert "code here" in text + assert "```" not in text + assert any(s.endswith(":MONOSPACE") for s in styles) + + def test_heading_becomes_bold(self): + text, styles = _m2s("## Section Title") + assert text == "Section Title" + assert len(styles) == 1 + assert styles[0].endswith(":BOLD") + + def test_multiple_styles(self): + text, styles = _m2s("**bold** and *italic*") + assert text == "bold and italic" + types = _style_types(styles) + assert "BOLD" in types + assert "ITALIC" in types + + def test_plain_text_no_styles(self): + text, styles = _m2s("just plain text") + assert text == "just plain text" + assert styles == [] + + def test_empty_string(self): + text, styles = _m2s("") + assert text == "" + assert styles == [] + + +# =========================================================================== +# Italic false-positive regressions +# =========================================================================== + +class TestItalicFalsePositives: + """Regressions from signal-italic-false-positive-fix.md and + signal-italic-bullet-list-fix.md.""" + + # --- snake_case (original fix) --- + + def test_snake_case_not_italic(self): + """snake_case identifiers must NOT be italicized.""" + text, styles = _m2s("the config_file is ready") + assert text == "the config_file is ready" + assert _find_style(styles, "ITALIC") == [] + + def test_multiple_snake_case(self): + text, styles = _m2s("set OPENAI_API_KEY and ANTHROPIC_API_KEY") + assert _find_style(styles, "ITALIC") == [] + + def test_snake_case_path(self): + text, styles = _m2s("/tools/delegate_tool.py") + assert _find_style(styles, "ITALIC") == [] + + def test_snake_case_between_words(self): + """file_path and error_code — underscores between words.""" + text, styles = _m2s("file_path and error_code") + assert _find_style(styles, "ITALIC") == [] + + # --- Bullet lists (second fix) --- + + def test_bullet_list_not_italic(self): + """* item lines must NOT be treated as italic delimiters.""" + md = "* item one\n* item two\n* item three" + text, styles = _m2s(md) + assert _find_style(styles, "ITALIC") == [] + + def test_bullet_list_with_content_before(self): + md = "Here are things:\n\n* first thing\n* second thing" + text, styles = _m2s(md) + assert _find_style(styles, "ITALIC") == [] + + def test_bullet_list_file_paths(self): + """Real-world case that triggered the bug.""" + md = ( + "* tools/delegate_tool.py — delegation\n" + "* tools/file_tools.py — file operations\n" + "* tools/web_tools.py — web operations" + ) + text, styles = _m2s(md) + assert _find_style(styles, "ITALIC") == [] + + def test_bullet_with_italic_inside(self): + """Italic *inside* a bullet item should still work.""" + md = "* this has *emphasis* inside\n* plain item" + text, styles = _m2s(md) + italic_styles = _find_style(styles, "ITALIC") + assert len(italic_styles) == 1 + # The italic should cover "emphasis", not the whole bullet + assert "emphasis" in text + + # --- Cross-line spans (DOTALL removal) --- + + def test_star_italic_no_cross_line(self): + """*foo\\nbar* must NOT match as italic (no DOTALL).""" + text, styles = _m2s("*foo\nbar*") + assert _find_style(styles, "ITALIC") == [] + + def test_underscore_italic_no_cross_line(self): + """_foo\\nbar_ must NOT match as italic (no DOTALL).""" + text, styles = _m2s("_foo\nbar_") + assert _find_style(styles, "ITALIC") == [] + + def test_star_italic_multiline_response(self): + """Multi-paragraph response with * should not false-positive.""" + md = ( + "I checked the following files:\n\n" + "* tools/delegate_tool.py — sub-agent delegation\n" + "* tools/file_tools.py — file read/write/search\n" + "* tools/web_tools.py — web search/extract\n\n" + "Everything looks good." + ) + text, styles = _m2s(md) + assert _find_style(styles, "ITALIC") == [] + + # --- Legitimate italic still works --- + + def test_star_italic_still_works(self): + text, styles = _m2s("this is *italic* text") + assert text == "this is italic text" + assert len(_find_style(styles, "ITALIC")) == 1 + + def test_underscore_italic_still_works(self): + text, styles = _m2s("this is _italic_ text") + assert text == "this is italic text" + assert len(_find_style(styles, "ITALIC")) == 1 + + def test_multiple_italic_same_line(self): + text, styles = _m2s("*foo* and *bar* ok") + assert text == "foo and bar ok" + assert len(_find_style(styles, "ITALIC")) == 2 + + def test_italic_single_word(self): + text, styles = _m2s("*word*") + assert text == "word" + assert len(_find_style(styles, "ITALIC")) == 1 + + def test_italic_multi_word(self): + text, styles = _m2s("*several words here*") + assert text == "several words here" + assert len(_find_style(styles, "ITALIC")) == 1 + + +# =========================================================================== +# Style position accuracy +# =========================================================================== + +class TestStylePositions: + """Verify that start:length positions map to the correct text.""" + + def _extract(self, text: str, style_str: str) -> str: + """Given 'start:length:STYLE', extract the substring from text.""" + # Positions are UTF-16 code units; for ASCII they match code points + parts = style_str.split(":") + start, length = int(parts[0]), int(parts[1]) + # Encode to UTF-16-LE, slice, decode back + encoded = text.encode("utf-16-le") + extracted = encoded[start * 2 : (start + length) * 2] + return extracted.decode("utf-16-le") + + def test_bold_position(self): + text, styles = _m2s("hello **world** end") + assert len(styles) == 1 + assert self._extract(text, styles[0]) == "world" + + def test_italic_position(self): + text, styles = _m2s("hello *world* end") + assert len(styles) == 1 + assert self._extract(text, styles[0]) == "world" + + def test_multiple_styles_positions(self): + text, styles = _m2s("**bold** then *italic*") + assert len(styles) == 2 + extracted = {self._extract(text, s) for s in styles} + assert extracted == {"bold", "italic"} + + def test_emoji_utf16_offset(self): + """Emoji (multi-byte UTF-16) before a styled span.""" + text, styles = _m2s("👋 **hello**") + assert text == "👋 hello" + assert len(styles) == 1 + assert self._extract(text, styles[0]) == "hello" + + +# =========================================================================== +# Edge cases +# =========================================================================== + +class TestEdgeCases: + """Tricky inputs that have caused issues or could regress.""" + + def test_bold_inside_bullet(self): + """Bold inside a bullet list item.""" + md = "* **important** item\n* normal item" + text, styles = _m2s(md) + assert len(_find_style(styles, "BOLD")) == 1 + assert _find_style(styles, "ITALIC") == [] + + def test_code_span_with_underscores(self): + """`snake_case_var` — backtick takes priority over underscore.""" + text, styles = _m2s("use `my_var_name` here") + assert text == "use my_var_name here" + types = _style_types(styles) + assert "MONOSPACE" in types + assert "ITALIC" not in types + + def test_bold_and_italic_nested(self): + """***bold+italic*** — bold captured, not italic (bold pattern first).""" + text, styles = _m2s("***word***") + # ** matches bold around *word*, or *** is ambiguous; + # either way there should be no false italic of the whole string + assert "word" in text + + def test_lone_asterisk(self): + """A single * with no pair should not cause issues.""" + text, styles = _m2s("5 * 3 = 15") + # Should not crash; any italic match would be a false positive + assert "5" in text and "15" in text + + def test_lone_underscore(self): + """A single _ with no pair.""" + text, styles = _m2s("this _ that") + assert text == "this _ that" + + def test_consecutive_underscored_words(self): + """_foo and _bar (leading underscores, no closers).""" + text, styles = _m2s("call _init and _setup") + assert _find_style(styles, "ITALIC") == [] + + def test_mixed_formatting_no_bleed(self): + """Multiple format types don't bleed into each other.""" + md = "**bold** and `code` and *italic* and ~~strike~~" + text, styles = _m2s(md) + assert text == "bold and code and italic and strike" + types = _style_types(styles) + assert sorted(types) == ["BOLD", "ITALIC", "MONOSPACE", "STRIKETHROUGH"] + + +# =========================================================================== +# signal-markdown-strip-patch: core conversion pipeline +# =========================================================================== + +class TestMarkdownStripPatch: + """Tests for the original signal-markdown-strip-patch. + + Covers: fenced code blocks with language tags, links preserved, + headings converted to bold, multiple headings, UTF-16 correctness + for multi-byte characters, and marker stripping completeness. + """ + + def test_fenced_code_block_with_language_tag(self): + """```python\\ncode\\n``` — language tag is stripped, content is MONOSPACE.""" + text, styles = _m2s("```python\nprint('hello')\n```") + assert "```" not in text + assert "python" not in text # language tag stripped + assert "print('hello')" in text + assert any(s.endswith(":MONOSPACE") for s in styles) + + def test_fenced_code_block_multiline(self): + """Multi-line code blocks preserve all lines.""" + md = "```\nline1\nline2\nline3\n```" + text, styles = _m2s(md) + assert "line1" in text + assert "line2" in text + assert "line3" in text + assert "```" not in text + + def test_links_preserved(self): + """[text](url) links are kept as-is — Signal auto-linkifies.""" + md = "Check [this link](https://example.com) for details" + text, styles = _m2s(md) + # Links should pass through — either as markdown or just preserved + assert "https://example.com" in text + + def test_heading_h1(self): + """# H1 becomes bold text.""" + text, styles = _m2s("# Main Title") + assert text == "Main Title" + assert len(styles) == 1 + assert styles[0].endswith(":BOLD") + + def test_heading_h3(self): + """### H3 becomes bold text.""" + text, styles = _m2s("### Sub Section") + assert text == "Sub Section" + assert len(styles) == 1 + assert styles[0].endswith(":BOLD") + + def test_multiple_headings(self): + """Multiple headings each become separate bold spans.""" + md = "## First\n\nSome text\n\n## Second" + text, styles = _m2s(md) + assert "First" in text + assert "Second" in text + assert "##" not in text + bold_styles = _find_style(styles, "BOLD") + assert len(bold_styles) == 2 + + def test_no_raw_markdown_markers_in_output(self): + """All markdown syntax is stripped from plain text output.""" + md = "**bold** and *italic* and ~~struck~~ and `code` and ## heading" + text, styles = _m2s(md) + assert "**" not in text + assert "~~" not in text + assert "`" not in text + # ## at end might remain if not at line start — that's ok + # The important thing is styled markers are stripped + + def test_utf16_surrogate_pair_emoji(self): + """Emoji requiring UTF-16 surrogate pairs don't corrupt offsets.""" + # 🎉 is U+1F389 — requires surrogate pair (2 UTF-16 code units) + text, styles = _m2s("🎉🎉 **test**") + assert "test" in text + assert len(styles) == 1 + # Verify the style position is correct + parts = styles[0].split(":") + start, length = int(parts[0]), int(parts[1]) + # 🎉🎉 = 4 UTF-16 code units + space = 5, then "test" = 4 + assert start == 5 + assert length == 4 + + def test_consecutive_newlines_collapsed(self): + """3+ consecutive newlines are collapsed to 2.""" + text, styles = _m2s("first\n\n\n\n\nsecond") + assert "\n\n\n" not in text + assert "first" in text + assert "second" in text + + def test_empty_bold_not_crash(self): + """**** (empty bold) should not crash.""" + text, styles = _m2s("before **** after") + # Should not raise — exact output doesn't matter much + assert "before" in text + + +# =========================================================================== +# signal-streaming-patch: SUPPORTS_MESSAGE_EDITING and send() behavior +# =========================================================================== + +class TestSignalStreamingPatch: + """Tests for signal-streaming-patch: cursor suppression and edit support. + + These verify the adapter-level properties that prevent the streaming + cursor from leaking into Signal messages. + """ + + def test_signal_does_not_support_editing(self, monkeypatch): + """SignalAdapter.SUPPORTS_MESSAGE_EDITING must be False.""" + monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "") + from gateway.platforms.signal import SignalAdapter + assert SignalAdapter.SUPPORTS_MESSAGE_EDITING is False + + @pytest.mark.asyncio + async def test_send_returns_no_message_id(self, monkeypatch): + """send() returns message_id=None so stream consumer uses no-edit path.""" + monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "") + from gateway.platforms.signal import SignalAdapter + from gateway.config import PlatformConfig + + config = PlatformConfig(enabled=True) + config.extra = { + "http_url": "http://localhost:8080", + "account": "+15551234567", + } + adapter = SignalAdapter(config) + + # Mock the RPC call + async def mock_rpc(method, params, rpc_id=None): + return {"timestamp": 1234567890} + + adapter._rpc = mock_rpc + + result = await adapter.send( + chat_id="+15559876543", + content="Hello", + ) + assert result.message_id is None