feat(gateway/signal): add support for multiple images sending

Adds a new `send_multiple_images` method to the ``BasePlatformAdapter``
that implements the default "One image per message" loop and allows for
platform-specific overriding.

Implements such an override for the Signal adapter, batching images
and trying (best-effort) to work around rate-limits for voluminous
batches using a specific scheduler.

Also implements batching + rate-limit handling in the `send_message`
tool.

New tests added for the Signal adapter, its rate-limit scheduler and the
`send_message` tool
This commit is contained in:
Maxence Groine 2026-04-30 12:11:07 +02:00 committed by Teknium
parent 411f586c67
commit 04ea895ffb
9 changed files with 2010 additions and 84 deletions

View File

@ -1505,7 +1505,64 @@ class BasePlatformAdapter(ABC):
Default is a no-op for platforms with one-shot typing indicators.
"""
pass
async def send_multiple_images(
self,
chat_id: str,
images: List[Tuple[str, str]],
metadata: Optional[Dict[str, Any]] = None,
human_delay: float = 0.0,
) -> None:
"""Send a batch of images.
Accepts ``http(s)://``, ``file://`` URIs in the first tuple
element.
Default implementation sends each item individually,
routing animated GIFs through ``send_animation`` and local
files through ``send_image_file``.
Override in subclasses to bundle into a single native API call
(e.g. Signal's multi-attachment RPC)
"""
from urllib.parse import unquote as _unquote
for image_url, alt_text in images:
if human_delay > 0:
await asyncio.sleep(human_delay)
try:
logger.info(
"[%s] Sending image: %s (alt=%s)",
self.name,
safe_url_for_log(image_url),
alt_text[:30] if alt_text else "",
)
if image_url.startswith("file://"):
img_result = await self.send_image_file(
chat_id=chat_id,
image_path=_unquote(image_url[7:]),
caption=alt_text if alt_text else None,
metadata=metadata,
)
elif self._is_animation_url(image_url):
img_result = await self.send_animation(
chat_id=chat_id,
animation_url=image_url,
caption=alt_text if alt_text else None,
metadata=metadata,
)
else:
img_result = await self.send_image(
chat_id=chat_id,
image_url=image_url,
caption=alt_text if alt_text else None,
metadata=metadata,
)
if not img_result.success:
logger.error("[%s] Failed to send image: %s", self.name, img_result.error)
except Exception as img_err:
logger.error("[%s] Error sending image: %s", self.name, img_err, exc_info=True)
async def send_image(
self,
chat_id: str,
@ -2587,41 +2644,52 @@ class BasePlatformAdapter(ABC):
# Send extracted images as native attachments
if images:
logger.info("[%s] Extracted %d image(s) to send as attachments", self.name, len(images))
for image_url, alt_text in images:
if human_delay > 0:
await asyncio.sleep(human_delay)
try:
logger.info(
"[%s] Sending image: %s (alt=%s)",
self.name,
safe_url_for_log(image_url),
alt_text[:30] if alt_text else "",
await self.send_multiple_images(
chat_id=event.source.chat_id,
images=images,
metadata=_thread_metadata,
human_delay=human_delay,
)
# Route animated GIFs through send_animation for proper playback
if self._is_animation_url(image_url):
img_result = await self.send_animation(
chat_id=event.source.chat_id,
animation_url=image_url,
caption=alt_text if alt_text else None,
metadata=_thread_metadata,
)
else:
img_result = await self.send_image(
chat_id=event.source.chat_id,
image_url=image_url,
caption=alt_text if alt_text else None,
metadata=_thread_metadata,
)
if not img_result.success:
logger.error("[%s] Failed to send image: %s", self.name, img_result.error)
except Exception as img_err:
logger.error("[%s] Error sending image: %s", self.name, img_err, exc_info=True)
except Exception as batch_err:
logger.warning("[%s] Error batching images: %s", self.name, batch_err, exc_info=True)
# Send extracted media files — route by file type
_VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
# Partition images out of media_files + local_files so they
# can be sent as a single batch (Signal RPC)
from urllib.parse import quote as _quote
_image_paths: list = []
_non_image_media: list = []
for media_path, is_voice in media_files:
_ext = Path(media_path).suffix.lower()
if _ext in _IMAGE_EXTS and not is_voice:
_image_paths.append(media_path)
else:
_non_image_media.append((media_path, is_voice))
_non_image_local: list = []
for file_path in local_files:
if Path(file_path).suffix.lower() in _IMAGE_EXTS:
_image_paths.append(file_path)
else:
_non_image_local.append(file_path)
if _image_paths:
try:
_batch = [(f"file://{_quote(p)}", "") for p in _image_paths]
await self.send_multiple_images(
chat_id=event.source.chat_id,
images=_batch,
metadata=_thread_metadata,
human_delay=human_delay,
)
except Exception as batch_err:
logger.warning("[%s] Error batching images: %s", self.name, batch_err, exc_info=True)
for media_path, is_voice in _non_image_media:
if human_delay > 0:
await asyncio.sleep(human_delay)
try:
@ -2638,12 +2706,6 @@ class BasePlatformAdapter(ABC):
video_path=media_path,
metadata=_thread_metadata,
)
elif ext in _IMAGE_EXTS:
media_result = await self.send_image_file(
chat_id=event.source.chat_id,
image_path=media_path,
metadata=_thread_metadata,
)
else:
media_result = await self.send_document(
chat_id=event.source.chat_id,
@ -2656,19 +2718,13 @@ class BasePlatformAdapter(ABC):
except Exception as media_err:
logger.warning("[%s] Error sending media: %s", self.name, media_err)
# Send auto-detected local files as native attachments
for file_path in local_files:
# Send auto-detected local non-image files as native attachments
for file_path in _non_image_local:
if human_delay > 0:
await asyncio.sleep(human_delay)
try:
ext = Path(file_path).suffix.lower()
if ext in _IMAGE_EXTS:
await self.send_image_file(
chat_id=event.source.chat_id,
image_path=file_path,
metadata=_thread_metadata,
)
elif ext in _VIDEO_EXTS:
if ext in _VIDEO_EXTS:
await self.send_video(
chat_id=event.source.chat_id,
video_path=file_path,

View File

@ -21,7 +21,7 @@ import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import quote, unquote
import httpx
@ -39,6 +39,17 @@ from gateway.platforms.base import (
cache_image_from_url,
)
from gateway.platforms.helpers import redact_phone
from gateway.platforms.signal_rate_limit import (
SIGNAL_BATCH_PACING_NOTICE_THRESHOLD,
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
SignalRateLimitError,
_extract_retry_after_seconds,
_format_wait,
_is_signal_rate_limit_error,
_signal_send_timeout,
get_scheduler,
)
logger = logging.getLogger(__name__)
@ -53,6 +64,7 @@ SSE_RETRY_DELAY_MAX = 60.0
HEALTH_CHECK_INTERVAL = 30.0 # seconds between health checks
HEALTH_CHECK_STALE_THRESHOLD = 120.0 # seconds without SSE activity before concern
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -674,6 +686,8 @@ class SignalAdapter(BasePlatformAdapter):
rpc_id: str = None,
*,
log_failures: bool = True,
raise_on_rate_limit: bool = False,
timeout: float = 30.0,
) -> Any:
"""Send a JSON-RPC 2.0 request to signal-cli daemon.
@ -682,6 +696,11 @@ class SignalAdapter(BasePlatformAdapter):
repeated NETWORK_FAILURE spam for unreachable recipients while
still preserving visibility for the first occurrence and for
unrelated RPCs.
When ``raise_on_rate_limit=True``, a Signal ``[429]`` /
``RateLimitException`` response raises ``SignalRateLimitError``
instead of being swallowed lets callers (multi-attachment send)
opt into backoff-retry without changing default behaviour.
"""
if not self.client:
logger.warning("Signal: RPC called but client not connected")
@ -701,20 +720,28 @@ class SignalAdapter(BasePlatformAdapter):
resp = await self.client.post(
f"{self.http_url}/api/v1/rpc",
json=payload,
timeout=30.0,
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
if "error" in data:
err = data["error"]
if raise_on_rate_limit:
if _is_signal_rate_limit_error(err):
err_msg = str(err.get("message", "")) if isinstance(err, dict) else str(err)
retry_after = _extract_retry_after_seconds(err)
raise SignalRateLimitError(err_msg, retry_after=retry_after)
if log_failures:
logger.warning("Signal RPC error (%s): %s", method, data["error"])
logger.warning("Signal RPC error (%s): %s", method, err)
else:
logger.debug("Signal RPC error (%s): %s", method, data["error"])
logger.debug("Signal RPC error (%s): %s", method, err)
return None
return data.get("result")
except SignalRateLimitError:
raise
except Exception as e:
if log_failures:
logger.warning("Signal RPC %s failed: %s", method, e)
@ -978,6 +1005,178 @@ class SignalAdapter(BasePlatformAdapter):
self._typing_failures.pop(chat_id, None)
self._typing_skip_until.pop(chat_id, None)
async def send_multiple_images(
self,
chat_id: str,
images: List[Tuple[str, str]],
metadata: Optional[Dict[str, Any]] = None,
human_delay: float = 0.0,
) -> None:
"""Send a batch of images via chunked Signal RPC calls.
Per-image alt texts are dropped Signal's send RPC only carries
one shared message body. Bad images (download failure, missing
file, oversize) are skipped with a warning so one bad URL
doesn't lose the rest of the batch. ``human_delay`` is ignored:
the rate-limit scheduler handles inter-batch pacing.
"""
if not images:
return
scheduler = get_scheduler()
logger.info(
"Signal send_multiple_images: received %d image(s) for %s"
"scheduler state: %s",
len(images), chat_id[:30], scheduler.state(),
)
await self._stop_typing_indicator(chat_id)
attachments: List[str] = []
skipped_download = 0
skipped_missing = 0
skipped_oversize = 0
for image_url, _alt_text in images:
if image_url.startswith("file://"):
file_path = unquote(image_url[7:])
else:
try:
file_path = await cache_image_from_url(image_url)
except Exception as e:
logger.warning("Signal: failed to download image %s: %s", image_url, e)
skipped_download += 1
continue
if not file_path or not Path(file_path).exists():
logger.warning("Signal: image file not found for %s", image_url)
skipped_missing += 1
continue
file_size = Path(file_path).stat().st_size
if file_size > SIGNAL_MAX_ATTACHMENT_SIZE:
logger.warning(
"Signal: image too large (%d bytes), skipping %s", file_size, image_url
)
skipped_oversize += 1
continue
attachments.append(file_path)
if not attachments:
logger.error(
"Signal: no valid images in batch of %d "
"(download=%d missing=%d oversize=%d)",
len(images), skipped_download, skipped_missing, skipped_oversize,
)
return
logger.info(
"Signal send_multiple_images: %d/%d images valid, sending in chunks",
len(attachments), len(images),
)
base_params: Dict[str, Any] = {
"account": self.account,
"message": "",
}
if chat_id.startswith("group:"):
base_params["groupId"] = chat_id[6:]
else:
base_params["recipient"] = [await self._resolve_recipient(chat_id)]
att_batches = [
attachments[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG]
for i in range(0, len(attachments), SIGNAL_MAX_ATTACHMENTS_PER_MSG)
]
for idx, att_batch in enumerate(att_batches):
n = len(att_batch)
estimated = scheduler.estimate_wait(n)
logger.debug(
"Signal batch %d/%d: %d attachments, estimated wait=%.1fs",
idx + 1, len(att_batches), n, estimated,
)
if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD:
await self._notify_batch_pacing(
chat_id, idx + 1, len(att_batches), estimated
)
params = dict(base_params, attachments=att_batch)
send_timeout = _signal_send_timeout(n)
for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1):
await scheduler.acquire(n)
try:
_rpc_t0 = time.monotonic()
result = await self._rpc(
"send", params, raise_on_rate_limit=True, timeout=send_timeout,
)
_rpc_duration = time.monotonic() - _rpc_t0
if result is not None:
self._track_sent_timestamp(result)
await scheduler.report_rpc_duration(_rpc_duration, n)
logger.info(
"Signal batch %d/%d: %d attachments sent in %.1fs "
"(attempt %d/%d)",
idx + 1, len(att_batches), n, _rpc_duration,
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
)
else:
# Assume the server didn't accept the batch, don't deduce tokens
logger.error(
"Signal: RPC send failed for batch %d/%d (%d attachments, "
"attempt %d/%d, rpc_duration=%.1fs)",
idx + 1, len(att_batches), n,
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
_rpc_duration,
)
# Retry transient (non-rate-limit) failures once
if attempt < SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
backoff = 2.0 ** attempt
logger.info(
"Signal: retrying batch %d/%d after %.1fs backoff",
idx + 1, len(att_batches), backoff,
)
await asyncio.sleep(backoff)
continue
break
except SignalRateLimitError as e:
scheduler.feedback(e.retry_after, n)
if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
logger.error(
"Signal: rate-limit retries exhausted on batch %d/%d "
"(%d attachments lost, server retry_after=%s)",
idx + 1, len(att_batches), n,
f"{e.retry_after:.0f}s" if e.retry_after else "unknown",
)
break
logger.warning(
"Signal: rate-limited on batch %d/%d "
"(attempt %d/%d, server retry_after=%s); "
"scheduler will pace the retry",
idx + 1, len(att_batches),
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
f"{e.retry_after:.0f}s" if e.retry_after else "unknown",
)
async def _notify_batch_pacing(
self,
chat_id: str,
next_batch_idx: int,
total_batches: int,
wait_s: float,
) -> None:
"""Inform the user when an inter-batch pacing wait crosses the
notice threshold. Best-effort; logs and continues on failure."""
try:
await self.send(
chat_id,
f"(More images coming — pausing ~{_format_wait(wait_s)} "
f"for Signal rate limit, batch {next_batch_idx}/{total_batches}.)",
)
except Exception as e:
logger.warning("Signal: failed to send pacing notice: %s", e)
async def send_image(
self,
chat_id: str,

View File

@ -0,0 +1,369 @@
"""
Signal attachment rate-limit scheduler.
Process-wide token-bucket simulator that mirrors the per-account
attachment rate limit signal-cli/Signal-Server enforce. Producers
(``SignalAdapter.send_multiple_images`` and the ``send_message`` tool's
Signal path) call ``acquire(n)`` before an attachment send; on a 429
they call ``feedback(retry_after, n)`` so the model recalibrates from
the server's authoritative hint.
The scheduler serializes concurrent calls through an ``asyncio.Lock``,
giving FIFO fairness across agent sessions sharing one signal-cli
daemon.
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from typing import Any, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SIGNAL_MAX_ATTACHMENTS_PER_MSG = 32 # per-message attachment cap (source: Signal-{Android,Desktop} source code)
SIGNAL_RATE_LIMIT_BUCKET_CAPACITY = 50 # server-side token-bucket capacity for attachments rate limiting
SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER = 4 # fallback token refill interval for signal-cli < v0.14.3
SIGNAL_RATE_LIMIT_MAX_ATTEMPTS = 2 # initial attempt + 1 retry
SIGNAL_BATCH_PACING_NOTICE_THRESHOLD = 10.0 # if estimated waiting time > 10s, notify the user about the delay
SIGNAL_RPC_ERROR_RATELIMIT = -5 # signal-cli (v0.14.3+) JSON-RPC error code for RateLimitException
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class SignalRateLimitError(Exception):
"""
Raised by ``SignalAdapter._rpc`` for rate-limit responses when the
caller has opted in via ``raise_on_rate_limit=True``.
Carries the server-supplied per-token Retry-After (in seconds) on
signal-cli v0.14.3
``retry_after`` is None when the version doesn't expose it.
"""
def __init__(self, message: str, retry_after: Optional[float] = None) -> None:
super().__init__(message)
self.retry_after = retry_after
class SignalSchedulerError(Exception):
pass
# ---------------------------------------------------------------------------
# Detection helpers — used to fish a 429 out of signal-cli's various error
# shapes (typed code, [429] substring, libsignal-net RetryLaterException
# leaked through AttachmentInvalidException).
# ---------------------------------------------------------------------------
# "Retry after 4 seconds" / "retry after 4 second" — libsignal-net's
# RetryLaterException string form, surfaced when 429s hit during
# attachment upload (signal-cli wraps these as AttachmentInvalidException
# rather than RateLimitException, so the typed path doesn't fire).
_RETRY_AFTER_RE = re.compile(r"Retry after (\d+(?:\.\d+)?)\s*second", re.IGNORECASE)
def _extract_retry_after_seconds(err: Any) -> Optional[float]:
"""Pull the per-token Retry-After window from a signal-cli rate-limit error.
Tries two sources, in order:
1. ``error.data.response.results[*].retryAfterSeconds`` the
structured field signal-cli v0.14.3 surfaces for plain
RateLimitException.
2. ``"Retry after N seconds"`` parsed out of the message covers
libsignal-net's RetryLaterException that gets wrapped as
AttachmentInvalidException during attachment upload, where the
structured field stays null.
Returns None when neither yields a value.
"""
msg = ""
if isinstance(err, dict):
data = err.get("data") or {}
response = data.get("response") or {}
results = response.get("results") or []
candidates = [
r.get("retryAfterSeconds") for r in results
if isinstance(r, dict) and r.get("retryAfterSeconds")
]
if candidates:
return float(max(candidates))
msg = str(err.get("message", ""))
else:
msg = str(err)
match = _RETRY_AFTER_RE.search(msg)
return float(match.group(1)) if match else None
def _is_signal_rate_limit_error(err: Any) -> bool:
"""True if a signal-cli RPC error reflects a rate-limit failure.
Matches three layers:
- typed ``RATELIMIT_ERROR`` code (signal-cli v0.14.3, plain
RateLimitException)
- legacy ``[429] / RateLimitException`` substrings
- libsignal-net's ``RetryLaterException`` / ``Retry after N seconds``
surfaced inside ``AttachmentInvalidException`` when the rate
limit is hit during attachment upload signal-cli never re-tags
these as RateLimitException, so substring is the only signal.
"""
if isinstance(err, dict) and err.get("code") == SIGNAL_RPC_ERROR_RATELIMIT:
return True
message = (
str(err.get("message", ""))
if isinstance(err, dict)
else str(err)
)
msg_lower = message.lower()
return (
"[429]" in message
or "ratelimit" in msg_lower
or "retrylaterexception" in msg_lower
or "retry after" in msg_lower
)
# ---------------------------------------------------------------------------
# Misc helpers
# ---------------------------------------------------------------------------
def _format_wait(seconds: float) -> str:
"""Human-friendly wait label for user-facing pacing notices."""
s = max(0.0, seconds)
if s < 90:
return f"{int(round(s))}s"
return f"{max(1, int(round(s / 60)))} min"
def _signal_send_timeout(num_attachments: int) -> float:
"""HTTP timeout for a Signal ``send`` RPC.
signal-cli uploads attachments serially during the call, so the
server-side time scales with batch size. Default 30s is fine for
text-only sends but truncates large attachment batches mid-upload
we then log a phantom failure even though signal-cli completes the
send a few seconds later. Scale at 5s/attachment with a 60s floor.
"""
if num_attachments <= 0:
return 30.0
return max(60.0, 5.0 * num_attachments)
# ---------------------------------------------------------------------------
# Scheduler
# ---------------------------------------------------------------------------
class SignalAttachmentScheduler:
"""Process-wide token-bucket simulator for Signal attachment sends.
The bucket holds up to ``capacity`` tokens (default 50, matching
Signal's server-side rate-limit bucket size). Each attachment consumes one
token. Tokens refill at ``refill_rate`` tokens/second, calibrated
from the per-token Retry-After hint we get from the server when a
429 fires. Until we've observed one, we use the documented default
(1 token / 4 seconds).
Concurrent ``acquire(n)`` calls serialize through an
``asyncio.Lock`` natural FIFO across agent sessions hitting the
same daemon.
"""
def __init__(
self,
capacity: float = float(SIGNAL_RATE_LIMIT_BUCKET_CAPACITY),
default_retry_after: float = float(SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER),
) -> None:
self.capacity = float(capacity)
self.tokens = float(capacity)
self.refill_rate = 1.0 / float(default_retry_after)
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _refill(self) -> None:
now = time.monotonic()
elapsed = now - self.last_refill
if elapsed > 0 and self.tokens < self.capacity:
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def estimate_wait(self, n: int) -> float:
"""Best-effort estimate of the seconds until ``n`` tokens would
be available. Used to decide whether to emit a user-facing
pacing notice *before* committing to an ``acquire`` that may
block silently. Lock-free; small races vs. concurrent acquires
are benign for an informational notice.
"""
now = time.monotonic()
elapsed = now - self.last_refill
projected = self.tokens
if elapsed > 0 and projected < self.capacity:
projected = min(self.capacity, projected + elapsed * self.refill_rate)
deficit = n - projected
if deficit <= 0:
return 0.0
return deficit / self.refill_rate
async def acquire(self, n: int) -> float:
"""Block until at least ``n`` tokens are available, return the
seconds slept.
Does **not** deduct tokens the bucket is a read-only model of
server-side capacity. Call ``report_rpc_duration()`` after the
RPC to synchronise the model with the server timeline.
Not perfect in case lots of coroutines try to acquire for big
uploads (``report_rpc_duration`` will take a long time to get hit)
but this is just a simulation. Signal server is ground truth and
will raise rate-limit exceptions triggering requeues.
The lock is released during ``asyncio.sleep`` so other callers
can interleave. A retry loop re-checks after each sleep in
case the deadline was pessimistic.
"""
if n <= 0:
return 0.0
if n > self.capacity:
raise SignalSchedulerError(
f"Signal scheduler was called requesting {n} tokens "
f"(max is {self.capacity})",
)
total_slept = 0.0
first_pass = True
while True:
async with self._lock:
self._refill()
if self.tokens >= n:
if not first_pass or total_slept > 0:
logger.debug(
"Signal scheduler: tokens sufficient for %d "
"(remaining=%.1f, total_slept=%.1fs)",
n, self.tokens, total_slept,
)
return total_slept
deficit = n - self.tokens
wait = deficit / self.refill_rate
if first_pass:
logger.info(
"Signal scheduler: pausing %.1fs for %d tokens "
"(available=%.1f, deficit=%.1f, refill=%.4f/s ≈ %.1fs/token)",
wait, n, self.tokens, deficit,
self.refill_rate, 1.0 / self.refill_rate,
)
first_pass = False
await asyncio.sleep(wait)
total_slept += wait
async def report_rpc_duration(self, rpc_duration: float, n_attachments: int) -> None:
"""Record an attachment-send RPC that just completed.
Deducts ``n_attachments`` tokens without crediting refill during
the upload window. Signal's server checks the bucket at RPC start
and does *not* refill during request processing refill resumes
after the response. Crediting upload-time refill causes cumulative
drift that eventually triggers 429s.
Advances ``last_refill`` so the next ``acquire`` / ``_refill``
starts counting from this point.
"""
if n_attachments <= 0:
return
async with self._lock:
now = time.monotonic()
token_before = self.tokens
self.tokens = max(0.0, token_before - float(n_attachments))
self.last_refill = now
logger.log(
logging.INFO if rpc_duration > 10 and n_attachments > 5 else logging.DEBUG,
"Signal scheduler: RPC for %d att took %.1fs — "
"tokens %.1f%.1f (deducted=%d, no upload refill credited, refill=%.4fs⁻¹)",
n_attachments, rpc_duration,
token_before, self.tokens,
n_attachments, self.refill_rate,
)
def feedback(self, retry_after: Optional[float], n_attempted: int) -> None:
"""Apply server feedback after a 429.
``retry_after`` is the per-*token* refill window the server
reports (None when signal-cli is older than v0.14.3 and didn't
surface it).
When present we calibrate ``refill_rate`` from it:
the server is authoritative.
"""
if retry_after and retry_after > 0:
new_rate = 1.0 / float(retry_after)
if new_rate != self.refill_rate:
logger.info(
"Signal scheduler: calibrating refill_rate to %.4f tokens/sec "
"(server retry_after=%.1fs per token)",
new_rate, retry_after,
)
self.refill_rate = new_rate
self.tokens = 0.0
self.last_refill = time.monotonic()
def state(self) -> dict:
"""Return current scheduler state for diagnostic logging (read-only).
Does not advance ``last_refill`` safe to call from logging paths
without perturbing the bucket.
"""
now = time.monotonic()
elapsed = now - self.last_refill
projected = self.tokens
if elapsed > 0 and projected < self.capacity:
projected = min(self.capacity, projected + elapsed * self.refill_rate)
return {
"tokens": round(projected, 1),
"capacity": int(self.capacity),
"refill_rate": round(self.refill_rate, 4),
"refill_seconds_per_token": round(1.0 / self.refill_rate, 1) if self.refill_rate > 0 else float("inf"),
}
# ---------------------------------------------------------------------------
# Process-wide singleton
# ---------------------------------------------------------------------------
_scheduler: Optional[SignalAttachmentScheduler] = None
def get_scheduler() -> SignalAttachmentScheduler:
"""Return the process-wide scheduler, creating it on first access."""
global _scheduler
if _scheduler is None:
_scheduler = SignalAttachmentScheduler()
logger.info(
"Signal scheduler: created (capacity=%d tokens, refill=%.4f/s ≈ %.1fs/token)",
int(_scheduler.capacity),
_scheduler.refill_rate,
1.0 / _scheduler.refill_rate,
)
return _scheduler
def _reset_scheduler() -> None:
"""Drop the cached scheduler so the next ``get_scheduler`` call
builds a fresh one. Test-only never call from production paths."""
global _scheduler
_scheduler = None

View File

@ -7186,6 +7186,7 @@ class GatewayRunner:
that the normal _process_message_background path would have caught.
"""
from pathlib import Path
from urllib.parse import quote as _quote
try:
media_files, _ = adapter.extract_media(response)
@ -7199,7 +7200,36 @@ class GatewayRunner:
_VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
# Partition out images so they can be sent as a single batch
# (e.g. Signal's multi-attachment RPC)
image_paths: list = []
non_image_media: list = []
for media_path, is_voice in media_files:
ext = Path(media_path).suffix.lower()
if ext in _IMAGE_EXTS and not is_voice:
image_paths.append(media_path)
else:
non_image_media.append((media_path, is_voice))
non_image_local: list = []
for file_path in local_files:
if Path(file_path).suffix.lower() in _IMAGE_EXTS:
image_paths.append(file_path)
else:
non_image_local.append(file_path)
if image_paths:
try:
images = [(f"file://{_quote(p)}", "") for p in image_paths]
await adapter.send_multiple_images(
chat_id=event.source.chat_id,
images=images,
metadata=_thread_meta,
)
except Exception as e:
logger.warning("[%s] Post-stream image batch delivery failed: %s", adapter.name, e)
for media_path, is_voice in non_image_media:
try:
ext = Path(media_path).suffix.lower()
if should_send_media_as_audio(event.source.platform, ext, is_voice=is_voice):
@ -7214,12 +7244,6 @@ class GatewayRunner:
video_path=media_path,
metadata=_thread_meta,
)
elif ext in _IMAGE_EXTS:
await adapter.send_image_file(
chat_id=event.source.chat_id,
image_path=media_path,
metadata=_thread_meta,
)
else:
await adapter.send_document(
chat_id=event.source.chat_id,
@ -7229,13 +7253,13 @@ class GatewayRunner:
except Exception as e:
logger.warning("[%s] Post-stream media delivery failed: %s", adapter.name, e)
for file_path in local_files:
for file_path in non_image_local:
try:
ext = Path(file_path).suffix.lower()
if ext in _IMAGE_EXTS:
await adapter.send_image_file(
if ext in _VIDEO_EXTS:
await adapter.send_video(
chat_id=event.source.chat_id,
image_path=file_path,
video_path=file_path,
metadata=_thread_meta,
)
else:

View File

@ -1,4 +1,5 @@
"""Tests for Signal messenger platform adapter."""
import asyncio
import base64
import json
import pytest
@ -9,6 +10,16 @@ from urllib.parse import quote
from gateway.config import Platform, PlatformConfig
@pytest.fixture(autouse=True)
def _reset_signal_scheduler():
"""The attachment scheduler is process-wide; drop it between tests
so a fresh token bucket greets each case."""
from gateway.platforms.signal_rate_limit import _reset_scheduler
_reset_scheduler()
yield
_reset_scheduler()
# ---------------------------------------------------------------------------
# Shared Helpers
# ---------------------------------------------------------------------------
@ -1102,3 +1113,539 @@ class TestSignalQuoteExtraction:
event = captured["event"]
assert event.reply_to_message_id == "123"
assert event.reply_to_text is None
# ---------------------------------------------------------------------------
# _rpc rate-limit detection
# ---------------------------------------------------------------------------
class _FakeHttpResponse:
"""Minimal stand-in for httpx.Response — only what _rpc touches."""
def __init__(self, json_data):
self._json = json_data
def raise_for_status(self):
return None
def json(self):
return self._json
def _install_fake_client(adapter, json_data):
"""Replace adapter.client.post with an async fn returning json_data."""
from types import SimpleNamespace
async def _post(url, json=None, timeout=None):
return _FakeHttpResponse(json_data)
adapter.client = SimpleNamespace(post=_post)
class TestSignalRpcRateLimit:
"""_rpc opt-in 429 detection and SignalRateLimitError propagation."""
@pytest.mark.asyncio
async def test_raises_on_429_when_opted_in(self, monkeypatch):
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {"message": "Failed to send: [429] Rate Limited"},
})
with pytest.raises(SignalRateLimitError):
await adapter._rpc("send", {}, raise_on_rate_limit=True)
@pytest.mark.asyncio
async def test_raises_on_rate_limit_exception_substring(self, monkeypatch):
"""Some signal-cli builds emit 'RateLimitException' without a literal [429]."""
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {"message": "RateLimitException occurred"},
})
with pytest.raises(SignalRateLimitError):
await adapter._rpc("send", {}, raise_on_rate_limit=True)
@pytest.mark.asyncio
async def test_default_swallows_rate_limit_returns_none(self, monkeypatch):
"""Without opt-in, 429 stays swallowed — preserves backwards compat."""
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {"message": "[429] Rate Limited"},
})
result = await adapter._rpc("send", {})
assert result is None
@pytest.mark.asyncio
async def test_non_rate_limit_error_does_not_raise_when_opted_in(self, monkeypatch):
"""Opt-in only escalates 429s; other errors still return None."""
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {"message": "Recipient unknown (UntrustedIdentityException)"},
})
result = await adapter._rpc("send", {}, raise_on_rate_limit=True)
assert result is None
@pytest.mark.asyncio
async def test_raises_with_retry_after_from_v0_14_3_payload(self, monkeypatch):
"""signal-cli ≥ v0.14.3 surfaces server Retry-After under
``error.data.response.results[*].retryAfterSeconds`` _rpc
carries that value through SignalRateLimitError.retry_after."""
from gateway.platforms.signal_rate_limit import (
SignalRateLimitError, SIGNAL_RPC_ERROR_RATELIMIT,
)
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {
"code": SIGNAL_RPC_ERROR_RATELIMIT,
"message": "Failed to send message due to rate limiting",
"data": {
"response": {
"timestamp": 0,
"results": [
{"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 90},
],
}
},
},
})
with pytest.raises(SignalRateLimitError) as exc_info:
await adapter._rpc("send", {}, raise_on_rate_limit=True)
assert exc_info.value.retry_after == 90.0
@pytest.mark.asyncio
async def test_raises_with_retry_after_none_for_old_signal_cli(self, monkeypatch):
"""Older signal-cli builds emit only the substring; retry_after=None."""
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {"message": "Failed: [429] Rate Limited"},
})
with pytest.raises(SignalRateLimitError) as exc_info:
await adapter._rpc("send", {}, raise_on_rate_limit=True)
assert exc_info.value.retry_after is None
@pytest.mark.asyncio
async def test_raises_on_retry_later_inside_attachment_invalid(self, monkeypatch):
"""Production case: 429 during attachment upload surfaces as
AttachmentInvalidException UnexpectedErrorException (code
-32603), with the libsignal-net 'Retry after N seconds'
message embedded. _rpc must still detect this as rate-limit
AND parse the seconds out of the message."""
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
_install_fake_client(adapter, {
"error": {
"code": -32603,
"message": (
"Failed to send message: /home/max/sync/Memes/fengshui.jpeg: "
"org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds "
"(AttachmentInvalidException) (UnexpectedErrorException)"
),
"data": None,
},
})
with pytest.raises(SignalRateLimitError) as exc_info:
await adapter._rpc("send", {}, raise_on_rate_limit=True)
assert exc_info.value.retry_after == 4.0
# ---------------------------------------------------------------------------
# send_multiple_images — chunking, pacing, rate-limit retry
# ---------------------------------------------------------------------------
def _make_image_files(tmp_path, count, prefix="img"):
"""Materialize `count` tiny PNG files and return file:// URIs for them."""
uris = []
for i in range(count):
p = tmp_path / f"{prefix}_{i}.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 32)
uris.append((f"file://{p}", ""))
return uris
def _stub_rpc_responses(responses):
"""Build an _rpc replacement that pops a response per call.
Each entry in `responses` is either:
* a return value (dict / None) returned to the caller, or
* an Exception subclass instance raised.
Captures (params, kwargs) per call for inspection.
"""
captured = []
queue = list(responses)
async def mock_rpc(method, params, rpc_id=None, **kwargs):
captured.append({"method": method, "params": dict(params), "kwargs": kwargs})
await asyncio.sleep(0)
if not queue:
raise AssertionError("Unexpected extra _rpc call")
item = queue.pop(0)
if isinstance(item, BaseException):
raise item
return item
return mock_rpc, captured
def _patch_scheduler_sleep(monkeypatch, capture: list):
"""Capture sleeps inside the scheduler so tests don't actually wait.
Zero-second sleeps (e.g. event-loop yields from mock RPCs) are
delegated to the real asyncio.sleep so they don't pollute the
capture list."""
_real_sleep = asyncio.sleep
offset = [0.0]
async def fake_sleep(seconds):
if seconds > 0:
capture.append(seconds)
offset[0] += seconds
else:
await _real_sleep(0)
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.asyncio.sleep", fake_sleep
)
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset[0]
)
class TestSignalSendMultipleImages:
@pytest.mark.asyncio
async def test_empty_list_is_noop(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
await adapter.send_multiple_images(chat_id="+155****4567", images=[])
assert captured == []
adapter._stop_typing_indicator.assert_not_awaited()
@pytest.mark.asyncio
async def test_all_bad_files_no_rpc(self, monkeypatch, tmp_path):
"""If every image is missing/invalid, no RPC fires."""
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
await adapter.send_multiple_images(
chat_id="+155****4567",
images=[(f"file://{tmp_path}/missing_a.png", ""),
(f"file://{tmp_path}/missing_b.png", "")],
)
assert captured == []
@pytest.mark.asyncio
async def test_single_batch_under_limit(self, monkeypatch, tmp_path):
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
images = _make_image_files(tmp_path, 5)
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
assert len(captured) == 1
params = captured[0]["params"]
assert params["recipient"] == ["+155****4567"]
assert params["message"] == ""
assert len(params["attachments"]) == 5
# raise_on_rate_limit must be opted into so the retry loop sees 429s
assert captured[0]["kwargs"].get("raise_on_rate_limit") is True
@pytest.mark.asyncio
async def test_skips_bad_images_in_mixed_batch(self, monkeypatch, tmp_path):
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
good = _make_image_files(tmp_path, 2, prefix="ok")
bad = [(f"file://{tmp_path}/missing.png", "")]
await adapter.send_multiple_images(
chat_id="+155****4567", images=good[:1] + bad + good[1:]
)
assert len(captured) == 1
assert len(captured[0]["params"]["attachments"]) == 2
@pytest.mark.asyncio
async def test_429_calibrates_scheduler_then_retries(self, monkeypatch, tmp_path):
"""Server says retry_after=27 per token. After feedback, the
scheduler's refill_rate becomes 1/27. Re-acquiring n=3 tokens
therefore waits 3 × 27 = 81s pulled from the server's
authoritative rate, not a `× 32` defensive multiplier."""
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([
SignalRateLimitError("Failed: rate limit", retry_after=27.0),
{"timestamp": 99},
])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
images = _make_image_files(tmp_path, 3)
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
assert len(captured) == 2 # initial 429 + retry success
assert sleep_calls == [pytest.approx(3 * 27.0, abs=1.0)]
@pytest.mark.asyncio
async def test_429_without_retry_after_uses_default_rate(
self, monkeypatch, tmp_path
):
"""signal-cli < v0.14.3 doesn't surface Retry-After. The
scheduler keeps its default refill rate (1 token / 4s), so a
retry of n=3 waits 12s."""
from gateway.platforms.signal_rate_limit import (
SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER,
SignalRateLimitError,
)
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([
SignalRateLimitError("[429] Rate Limited", retry_after=None),
{"timestamp": 99},
])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
await adapter.send_multiple_images(
chat_id="+155****4567",
images=_make_image_files(tmp_path, 3),
)
assert len(captured) == 2
assert sleep_calls == [
pytest.approx(3 * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, abs=1.0)
]
@pytest.mark.asyncio
async def test_rate_limit_exhaust_continues_to_next_batch(
self, monkeypatch, tmp_path
):
"""Both attempts on batch 0 fail; batch 1 still gets a chance.
The scheduler's natural pacing on the next acquire stands in for
the old explicit cooldown."""
from gateway.platforms.signal import SignalRateLimitError
adapter = _make_signal_adapter(monkeypatch)
responses = [
SignalRateLimitError("[429]", retry_after=4.0),
SignalRateLimitError("[429]", retry_after=4.0),
{"timestamp": 7},
]
mock_rpc, captured = _stub_rpc_responses(responses)
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
images = _make_image_files(tmp_path, 33) # forces 2 batches
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
# 2 attempts on batch 0 + 1 on batch 1
assert len(captured) == 3
@pytest.mark.asyncio
async def test_full_batch_emits_pacing_notice_for_followup(
self, monkeypatch, tmp_path
):
"""Two full batches of 32. Batch 1 needs 14 more tokens than the
18 remaining after batch 0, so the scheduler sleeps 56s
crossing the 10s user-facing pacing-notice threshold."""
from gateway.platforms.signal import SIGNAL_MAX_ATTACHMENTS_PER_MSG
from gateway.platforms.signal_rate_limit import (
SIGNAL_RATE_LIMIT_BUCKET_CAPACITY,
SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER
)
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([
{"timestamp": 1}, {"timestamp": 2},
])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
adapter._notify_batch_pacing = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
images = _make_image_files(tmp_path, 64)
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
assert len(captured) == 2
assert len(captured[0]["params"]["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG
assert len(captured[1]["params"]["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG
assert len(sleep_calls) == 1
# Batch 1 deficit: 32 - (50 - 32) = 14 tokens × 4s = 56s
expected_wait = (
SIGNAL_MAX_ATTACHMENTS_PER_MSG
- (SIGNAL_RATE_LIMIT_BUCKET_CAPACITY - SIGNAL_MAX_ATTACHMENTS_PER_MSG)
) * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER
assert sleep_calls[0] == pytest.approx(expected_wait, abs=1.0)
adapter._notify_batch_pacing.assert_awaited_once()
@pytest.mark.asyncio
async def test_short_followup_wait_skips_pacing_notice(
self, monkeypatch, tmp_path
):
"""Batch 1 only needs 1 token but 18 remain after batch 0
(50 capacity 32 batch 0). No wait, no pacing notice."""
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([
{"timestamp": 1}, {"timestamp": 2},
])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
adapter._notify_batch_pacing = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
images = _make_image_files(tmp_path, 33)
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
assert len(captured) == 2
assert len(sleep_calls) == 0
adapter._notify_batch_pacing.assert_not_awaited()
@pytest.mark.asyncio
async def test_single_batch_send_does_not_pace(self, monkeypatch, tmp_path):
"""A single-batch send (≤32 attachments) leaves the scheduler
with tokens to spare no follow-up acquire, no sleep."""
adapter = _make_signal_adapter(monkeypatch)
mock_rpc, captured = _stub_rpc_responses([{"timestamp": 1}])
adapter._rpc = mock_rpc
adapter._stop_typing_indicator = AsyncMock()
sleep_calls: list = []
_patch_scheduler_sleep(monkeypatch, sleep_calls)
images = _make_image_files(tmp_path, 10)
await adapter.send_multiple_images(chat_id="+155****4567", images=images)
assert len(captured) == 1
assert sleep_calls == []
class TestSignalRateLimitDetection:
"""Coverage for the typed-code + substring detection helpers."""
def test_detect_typed_code(self):
from gateway.platforms.signal_rate_limit import (
_is_signal_rate_limit_error,
SIGNAL_RPC_ERROR_RATELIMIT,
)
err = {"code": SIGNAL_RPC_ERROR_RATELIMIT, "message": "any text"}
assert _is_signal_rate_limit_error(err) is True
def test_detect_substring_fallback(self):
from gateway.platforms.signal import _is_signal_rate_limit_error
err = {"code": -32603, "message": "Failed: [429] Rate Limited (RateLimitException) (UnexpectedErrorException)"}
assert _is_signal_rate_limit_error(err) is True
def test_detect_non_rate_limit(self):
from gateway.platforms.signal import _is_signal_rate_limit_error
err = {"code": -32603, "message": "UntrustedIdentityException"}
assert _is_signal_rate_limit_error(err) is False
def test_extract_retry_after_from_results(self):
from gateway.platforms.signal import _extract_retry_after_seconds
err = {
"code": -5,
"message": "Failed to send message due to rate limiting",
"data": {
"response": {
"timestamp": 0,
"results": [
{"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 30},
{"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 45},
],
}
},
}
assert _extract_retry_after_seconds(err) == 45.0
def test_extract_retry_after_missing(self):
"""Old signal-cli builds don't expose retryAfterSeconds — return None."""
from gateway.platforms.signal import _extract_retry_after_seconds
err = {"code": -32603, "message": "[429] Rate Limited"}
assert _extract_retry_after_seconds(err) is None
def test_detect_retry_later_exception_substring(self):
"""libsignal-net's RetryLaterException leaks through as
AttachmentInvalidException UnexpectedErrorException when the
rate-limit fires inside attachment upload. Detect it by substring."""
from gateway.platforms.signal import _is_signal_rate_limit_error
err = {
"code": -32603,
"message": (
"Failed to send message: /home/max/sync/Memes/fengshui.jpeg: "
"org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds "
"(AttachmentInvalidException) (UnexpectedErrorException)"
),
}
assert _is_signal_rate_limit_error(err) is True
def test_extract_retry_after_parses_message_string(self):
"""When the structured field is missing, parse the seconds out
of the human 'Retry after N seconds' substring."""
from gateway.platforms.signal import _extract_retry_after_seconds
err = {
"code": -32603,
"message": (
"Failed to send message: /home/max/sync/Memes/fengshui.jpeg: "
"org.signal.libsignal.net.RetryLaterException: Retry after 4 seconds "
"(AttachmentInvalidException) (UnexpectedErrorException)"
),
}
assert _extract_retry_after_seconds(err) == 4.0
class TestSignalSendTimeout:
"""Timeout scaling for batched attachment sends."""
def test_zero_attachments_uses_default(self):
from gateway.platforms.signal import _signal_send_timeout
assert _signal_send_timeout(0) == 30.0
def test_floor_at_60s(self):
from gateway.platforms.signal import _signal_send_timeout
# Few attachments (would be 5×N=5s) should still get 60s floor.
assert _signal_send_timeout(1) == 60.0
assert _signal_send_timeout(5) == 60.0
def test_scales_with_batch_size(self):
from gateway.platforms.signal import _signal_send_timeout
# 32 attachments × 5s = 160s; ought to comfortably outlast a
# serial upload of an attachment-heavy batch.
assert _signal_send_timeout(32) == 160.0

View File

@ -0,0 +1,233 @@
"""Tests for the SignalAttachmentScheduler token-bucket simulator."""
import asyncio
import time
import pytest
from gateway.platforms.signal_rate_limit import (
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
SIGNAL_RATE_LIMIT_BUCKET_CAPACITY,
SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER,
SignalAttachmentScheduler,
get_scheduler,
_reset_scheduler,
)
@pytest.fixture(autouse=True)
def _reset_signal_scheduler():
"""Drop the process-wide scheduler so each test gets a clean bucket."""
_reset_scheduler()
yield
_reset_scheduler()
def _patch_sleep_and_time(monkeypatch, capture: list):
"""Replace asyncio.sleep inside the scheduler module so tests don't
actually wait and advances time.monotonic to simulate time passing.
Captures the requested duration per call."""
offset = 0.0
async def _fake_sleep(seconds):
capture.append(seconds)
nonlocal offset
offset += seconds
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.asyncio.sleep", _fake_sleep
)
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset
)
class TestSchedulerInitialState:
def test_default_capacity_matches_signal_cap(self):
s = SignalAttachmentScheduler()
assert s.capacity == SIGNAL_RATE_LIMIT_BUCKET_CAPACITY
def test_default_refill_rate_from_default_retry_after(self):
s = SignalAttachmentScheduler()
assert s.refill_rate == pytest.approx(1.0 / SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER)
def test_starts_full(self):
s = SignalAttachmentScheduler()
assert s.tokens == s.capacity
class TestEstimateWait:
def test_zero_when_bucket_has_enough(self):
s = SignalAttachmentScheduler()
assert s.estimate_wait(10) == 0.0
assert s.estimate_wait(int(s.capacity)) == 0.0
def test_proportional_to_deficit_when_empty(self, monkeypatch):
"""Freeze monotonic so estimate_wait doesn't see fractional refill."""
s = SignalAttachmentScheduler()
s.tokens = 0.0
frozen = s.last_refill
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.time.monotonic", lambda: frozen
)
# 32 tokens at 0.25 tokens/sec = 128s
assert s.estimate_wait(32) == pytest.approx(32 / s.refill_rate)
assert s.estimate_wait(1) == pytest.approx(1 / s.refill_rate)
class TestAcquire:
@pytest.mark.asyncio
async def test_acquire_zero_is_noop(self, monkeypatch):
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
original = s.tokens
wait = await s.acquire(0)
assert wait == 0.0
assert sleeps == []
assert s.tokens == original
@pytest.mark.asyncio
async def test_acquire_within_capacity_no_sleep(self, monkeypatch):
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
wait = await s.acquire(10)
await s.report_rpc_duration(0.001, 10) # actually deduct tokens
assert wait == 0.0
assert sleeps == []
assert s.tokens == s.capacity - 10
@pytest.mark.asyncio
async def test_acquire_when_empty_sleeps_for_deficit(self, monkeypatch):
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
s.tokens = 0.0
wait = await s.acquire(32)
await s.report_rpc_duration(1e-12, 32)
# 32 tokens at default 0.25 tokens/sec = 128s
expected = 32 / s.refill_rate
assert wait == pytest.approx(expected)
assert sleeps == [pytest.approx(expected)]
# After sleep+acquire+rpc call, the bucket is empty again.
assert s.tokens == pytest.approx(0.0)
@pytest.mark.asyncio
async def test_back_to_back_acquires_drain_then_wait(self, monkeypatch):
"""Two sequential acquires of capacity each: first immediate,
second waits a full refill window."""
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
await s.acquire(int(s.capacity))
await s.report_rpc_duration(1e-12, int(s.capacity))
assert sleeps == [] # first batch had a full bucket
await s.acquire(int(s.capacity))
await s.report_rpc_duration(1e-12, int(s.capacity))
# Second batch: no time elapsed (mocked sleep doesn't advance
# monotonic), tokens still 0 → wait the full capacity / rate.
assert sleeps == [pytest.approx(s.capacity / s.refill_rate)]
@pytest.mark.asyncio
async def test_acquire_more_tokens_than_capacity(self, monkeypatch):
s = SignalAttachmentScheduler()
with pytest.raises(Exception):
await s.acquire(int(s.capacity) + 1)
class TestFeedback:
def test_calibrates_refill_rate_from_retry_after(self):
s = SignalAttachmentScheduler()
original = s.refill_rate
s.feedback(retry_after=42.0, n_attempted=1)
assert s.refill_rate == pytest.approx(1.0 / 42.0)
assert s.refill_rate != original
def test_none_retry_after_leaves_rate(self):
s = SignalAttachmentScheduler()
original = s.refill_rate
s.feedback(retry_after=None, n_attempted=5)
assert s.refill_rate == original
def test_zeros_tokens(self):
s = SignalAttachmentScheduler()
assert s.tokens > 0
s.feedback(retry_after=4.0, n_attempted=1)
assert s.tokens == 0.0
@pytest.mark.asyncio
async def test_acquire_after_feedback_uses_calibrated_rate(self, monkeypatch):
"""signal-cli ≥v0.14.3: server says 'retry_after=42 for one
token' → next acquire(1) waits 42s. Drops the old defensive
``retry_after * 32`` heuristic in favor of the server's
authoritative per-token value."""
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
# Initial acquire empties enough; 429 fires.
await s.acquire(1)
s.feedback(retry_after=42.0, n_attempted=1)
# Re-acquire: bucket empty, calibrated rate = 1/42.
await s.acquire(1)
assert sleeps == [pytest.approx(42.0)]
class TestRefillClamping:
def test_refill_does_not_exceed_capacity(self, monkeypatch):
"""Even after a long elapsed window, refill clamps at capacity."""
s = SignalAttachmentScheduler()
s.tokens = 0.0
# Pretend a year passed.
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.time.monotonic",
lambda: s.last_refill + 365 * 24 * 3600,
)
s._refill()
assert s.tokens == s.capacity
class TestFifoAcquire:
@pytest.mark.asyncio
async def test_concurrent_acquires_serialize(self, monkeypatch):
"""Two coroutines acquiring full capacity each: the second waits
in the lock queue until the first finishes its bucket math + sleep.
Demonstrates the FIFO fairness across sessions."""
sleeps: list = []
_patch_sleep_and_time(monkeypatch, sleeps)
s = SignalAttachmentScheduler()
results: list = []
async def worker(label: str):
wait = await s.acquire(int(s.capacity))
await s.report_rpc_duration(1e-12, int(s.capacity))
results.append((label, wait))
# Launch in order; FIFO means A finishes first, then B.
await asyncio.gather(worker("A"), worker("B"))
assert [r[0] for r in results] == ["A", "B"]
# A had a full bucket (no wait). B waited a full refill.
assert results[0][1] == 0.0
assert results[1][1] == pytest.approx(s.capacity / s.refill_rate)
class TestSingleton:
def test_get_scheduler_returns_same_instance(self):
s1 = get_scheduler()
s2 = get_scheduler()
assert s1 is s2
def test_reset_scheduler_yields_new_instance(self):
s1 = get_scheduler()
_reset_scheduler()
s2 = get_scheduler()
assert s1 is not s2

View File

@ -8,12 +8,25 @@ from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def _reset_signal_scheduler():
"""Drop the process-wide attachment scheduler so each test gets a
fresh token bucket."""
from gateway.platforms.signal_rate_limit import _reset_scheduler
_reset_scheduler()
yield
_reset_scheduler()
from gateway.config import Platform
from tools.send_message_tool import (
_derive_forum_thread_name,
_parse_target_ref,
_send_discord,
_send_matrix_via_adapter,
_send_signal,
_send_telegram,
_send_to_platform,
send_message_tool,
@ -1621,3 +1634,361 @@ class TestForumProbeCache:
assert result2["success"] is True
# Only one session opened (thread creation) — no probe session this time
# (verified by not raising from our side_effect exhaustion)
# ---------------------------------------------------------------------------
# _send_signal — chunking + 429 retry (mirrors gateway adapter behavior)
# ---------------------------------------------------------------------------
class _FakeSignalHttp:
"""Stand-in for httpx.AsyncClient used as an async context manager.
Pops a response from the queue per `post` call. Each entry is either
a dict (returned from .json()) or an exception instance (raised).
Captures (url, payload) per call.
"""
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def __call__(self, *_a, **_kw):
return self
async def __aenter__(self):
return self
async def __aexit__(self, *_a):
return False
async def post(self, url, json=None):
self.calls.append({"url": url, "payload": json})
if not self.responses:
raise AssertionError("Unexpected extra POST")
item = self.responses.pop(0)
if isinstance(item, BaseException):
raise item
resp = SimpleNamespace(
raise_for_status=lambda: None,
json=lambda data=item: data,
)
return resp
def _install_signal_http(monkeypatch, fake):
"""Patch httpx.AsyncClient at the module level so the lazy import in
_send_signal picks it up.
"""
import httpx
monkeypatch.setattr(httpx, "AsyncClient", fake)
def _patch_sendmsg_sleep_and_time(monkeypatch, capture: list):
"""Mock asyncio.sleep + time.monotonic in the signal_rate_limit
module so the scheduler's acquire loop sees synthetic time advancing
during sleep calls, and report_rpc_duration sees the same clock.
Zero-second sleeps (event-loop yields from fake HTTP posts) are
delegated to the real asyncio.sleep so they don't pollute the
capture list.
"""
import asyncio as _aio
_real_sleep = _aio.sleep
offset = [0.0]
async def fake_sleep(seconds):
if seconds > 0:
capture.append(seconds)
offset[0] += seconds
else:
await _real_sleep(0)
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.asyncio.sleep", fake_sleep
)
monkeypatch.setattr(
"gateway.platforms.signal_rate_limit.time.monotonic", lambda: offset[0]
)
class TestSendSignalChunking:
def test_text_only_single_rpc(self, monkeypatch):
fake = _FakeSignalHttp([{"result": {"timestamp": 1}}])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"hello",
)
)
assert result == {"success": True, "platform": "signal", "chat_id": "+15557654321"}
assert len(fake.calls) == 1
params = fake.calls[0]["payload"]["params"]
assert params["message"] == "hello"
assert "attachments" not in params
def test_chunks_attachments_above_max(self, tmp_path, monkeypatch):
"""33 attachments → 2 batches; text only on first batch. Batch 1
only needs 1 token and 18 remain after batch 0, so no sleep."""
from gateway.platforms.signal_rate_limit import (
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
)
paths = []
for i in range(33):
p = tmp_path / f"img_{i}.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
paths.append((str(p), False))
fake = _FakeSignalHttp([
{"result": {"timestamp": 1}}, # batch 0
{"result": {"timestamp": 2}}, # batch 1
])
_install_signal_http(monkeypatch, fake)
sleep_calls = []
_patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"Caption goes here",
media_files=paths,
)
)
assert result["success"] is True
assert len(fake.calls) == 2
assert len(sleep_calls) == 0
first = fake.calls[0]["payload"]["params"]
assert first["message"] == "Caption goes here"
assert len(first["attachments"]) == SIGNAL_MAX_ATTACHMENTS_PER_MSG
second = fake.calls[1]["payload"]["params"]
assert second["message"] == "" # caption only on batch 0
assert len(second["attachments"]) == 33 - SIGNAL_MAX_ATTACHMENTS_PER_MSG
def test_full_followup_batch_emits_pacing_notice(self, tmp_path, monkeypatch):
"""64 attachments → 2 full batches. Batch 1 needs 14 more tokens
than the 18 remaining after batch 0 56s wait crossing the 10s
notice threshold."""
from gateway.platforms.signal_rate_limit import (
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
SIGNAL_RATE_LIMIT_BUCKET_CAPACITY,
SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER,
)
paths = []
for i in range(64):
p = tmp_path / f"img_{i}.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
paths.append((str(p), False))
fake = _FakeSignalHttp([
{"result": {"timestamp": 1}}, # batch 0
{"result": {"timestamp": 99}}, # pacing notice
{"result": {"timestamp": 2}}, # batch 1
])
_install_signal_http(monkeypatch, fake)
sleep_calls = []
_patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"",
media_files=paths,
)
)
assert result["success"] is True
assert len(fake.calls) == 3
notice = fake.calls[1]["payload"]["params"]
assert "More images coming" in notice["message"]
assert "attachments" not in notice
# Batch 1 deficit: 32 - (50 - 32) = 14 tokens × 4s = 56s
expected = (
SIGNAL_MAX_ATTACHMENTS_PER_MSG
- (SIGNAL_RATE_LIMIT_BUCKET_CAPACITY - SIGNAL_MAX_ATTACHMENTS_PER_MSG)
) * SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER
assert sleep_calls == [pytest.approx(expected, abs=1.0)]
def test_429_with_retry_after_drives_exact_backoff(self, tmp_path, monkeypatch):
"""signal-cli ≥ v0.14.3 surfaces Retry-After under
error.data.response.results[*].retryAfterSeconds. The scheduler
calibrates its refill rate from that value; the retry of n=1
sleeps the per-token interval."""
from gateway.platforms.signal_rate_limit import SIGNAL_RPC_ERROR_RATELIMIT
p = tmp_path / "img.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
fake = _FakeSignalHttp([
{
"error": {
"code": SIGNAL_RPC_ERROR_RATELIMIT,
"message": "Failed to send message due to rate limiting",
"data": {
"response": {
"timestamp": 0,
"results": [
{"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 42},
],
}
},
}
},
{"result": {"timestamp": 7}},
])
_install_signal_http(monkeypatch, fake)
sleep_calls = []
_patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"",
media_files=[(str(p), False)],
)
)
assert result["success"] is True
assert len(fake.calls) == 2 # initial + retry
assert sleep_calls == [pytest.approx(42.0, abs=1.0)]
def test_429_without_retry_after_falls_back_to_default(self, tmp_path, monkeypatch):
"""Older signal-cli (< v0.14.3) doesn't surface Retry-After.
The scheduler keeps its default rate (1 token / 4s)."""
from gateway.platforms.signal_rate_limit import SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER
p = tmp_path / "img.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
fake = _FakeSignalHttp([
{"error": {"message": "Failed: [429] Rate Limited"}},
{"result": {"timestamp": 7}},
])
_install_signal_http(monkeypatch, fake)
sleep_calls = []
_patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"",
media_files=[(str(p), False)],
)
)
assert result["success"] is True
assert sleep_calls == [pytest.approx(SIGNAL_RATE_LIMIT_DEFAULT_RETRY_AFTER, abs=1.0)]
def test_429_retry_exhaust_continues_to_next_batch(self, tmp_path, monkeypatch):
"""Both attempts on batch 0 fail; batch 1 still gets a chance.
The scheduler's natural pacing (no more cooldown gate) lets the
second batch through after its acquire wait."""
from gateway.platforms.signal_rate_limit import SIGNAL_RPC_ERROR_RATELIMIT
paths = []
for i in range(33): # forces 2 batches
p = tmp_path / f"img_{i}.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
paths.append((str(p), False))
rate_limit_err = {
"error": {
"code": SIGNAL_RPC_ERROR_RATELIMIT,
"message": "Failed to send message due to rate limiting",
"data": {
"response": {
"timestamp": 0,
"results": [
{"type": "RATE_LIMIT_FAILURE", "retryAfterSeconds": 4},
],
}
},
}
}
fake = _FakeSignalHttp([
rate_limit_err, # batch 0, attempt 1
rate_limit_err, # batch 0, attempt 2 (exhaust)
{"result": {"timestamp": 9}}, # batch 1 succeeds
])
_install_signal_http(monkeypatch, fake)
sleep_calls = []
_patch_sendmsg_sleep_and_time(monkeypatch, sleep_calls)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"many",
media_files=paths,
)
)
# Partial success: batch 0 lost but batch 1 went through.
assert result["success"] is True
assert "warnings" in result
assert any("rate-limited" in w for w in result["warnings"])
# 2 attempts on batch 0 + 1 successful batch 1 = 3 calls
assert len(fake.calls) == 3
def test_non_rate_limit_error_returns_immediately(self, tmp_path, monkeypatch):
"""A non-429 RPC error should not retry — it returns an error result."""
p = tmp_path / "img.png"
p.write_bytes(b"\x89PNG" + b"\x00" * 16)
fake = _FakeSignalHttp([
{"error": {"message": "UntrustedIdentityException"}},
])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"",
media_files=[(str(p), False)],
)
)
assert "error" in result
assert "UntrustedIdentityException" in result["error"]
assert len(fake.calls) == 1 # no retry on non-429
def test_skipped_missing_files_reported_in_warnings(self, tmp_path, monkeypatch):
good = tmp_path / "ok.png"
good.write_bytes(b"\x89PNG" + b"\x00" * 16)
fake = _FakeSignalHttp([{"result": {"timestamp": 1}}])
_install_signal_http(monkeypatch, fake)
result = asyncio.run(
_send_signal(
{"http_url": "http://localhost:8080", "account": "+15551234567"},
"+15557654321",
"msg",
media_files=[(str(good), False), (str(tmp_path / "missing.png"), False)],
)
)
assert result["success"] is True
assert "warnings" in result
# Only the existing file made it into the RPC
params = fake.calls[0]["payload"]["params"]
assert len(params["attachments"]) == 1

View File

@ -1054,25 +1054,33 @@ async def _send_signal(extra, chat_id, message, media_files=None):
"""Send via signal-cli JSON-RPC API.
Supports both text-only and text-with-attachments (images/audio/documents).
Attachments are sent as an 'attachments' array in the JSON-RPC params.
Multi-attachment sends are chunked into batches of
SIGNAL_MAX_ATTACHMENTS_PER_MSG and metered by the process-wide
SignalAttachmentScheduler same bucket the gateway adapter uses, so
sends from this tool and inbound-driven replies share rate-limit state.
"""
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
from gateway.platforms.signal_rate_limit import (
SIGNAL_BATCH_PACING_NOTICE_THRESHOLD,
SIGNAL_MAX_ATTACHMENTS_PER_MSG,
SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
_extract_retry_after_seconds,
_format_wait,
_is_signal_rate_limit_error,
_signal_send_timeout,
get_scheduler,
)
try:
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
account = extra.get("account", "")
if not account:
return {"error": "Signal account not configured"}
params = {"account": account, "message": message}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
# Add attachments if media_files are present
valid_media = media_files or []
attachment_paths = []
for media_path, _is_voice in valid_media:
@ -1081,28 +1089,144 @@ async def _send_signal(extra, chat_id, message, media_files=None):
else:
logger.warning("Signal media file not found, skipping: %s", media_path)
# Chunk attachments. With no attachments we still emit one batch
# (text only). With attachments, the text rides on batch #0 so the
# caption isn't repeated across every chunk.
if attachment_paths:
params["attachments"] = attachment_paths
att_batches = [
attachment_paths[i:i + SIGNAL_MAX_ATTACHMENTS_PER_MSG]
for i in range(0, len(attachment_paths), SIGNAL_MAX_ATTACHMENTS_PER_MSG)
]
else:
att_batches = [[]]
payload = {
"jsonrpc": "2.0",
"method": "send",
"params": params,
"id": f"send_{int(time.time() * 1000)}",
}
async def _post(batch_attachments, batch_message):
params = {"account": account, "message": batch_message}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
if batch_attachments:
params["attachments"] = batch_attachments
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
resp.raise_for_status()
data = resp.json()
if "error" in data:
return _error(f"Signal RPC error: {data['error']}")
payload = {
"jsonrpc": "2.0",
"method": "send",
"params": params,
"id": f"send_{int(time.time() * 1000)}",
}
timeout = _signal_send_timeout(len(batch_attachments) if batch_attachments else 0)
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
resp.raise_for_status()
return resp.json()
# Return warning for any skipped media files
result = {"success": True, "platform": "signal", "chat_id": chat_id}
if len(attachment_paths) < len(valid_media):
result["warnings"] = [f"Some media files were skipped (not found on disk)"]
return result
async def _send_inline_notice(text: str) -> None:
"""Best-effort one-shot RPC for a user-facing pacing notice."""
notice_params = {"account": account, "message": text}
if chat_id.startswith("group:"):
notice_params["groupId"] = chat_id[6:]
else:
notice_params["recipient"] = [chat_id]
try:
async with httpx.AsyncClient(timeout=30.0) as _client:
await _client.post(
f"{http_url}/api/v1/rpc",
json={
"jsonrpc": "2.0",
"method": "send",
"params": notice_params,
"id": f"notice_{int(time.time() * 1000)}",
},
)
except Exception as _e:
logger.warning("Signal: inline notice failed: %s", _e)
scheduler = get_scheduler()
logger.info(
"send_message Signal: scheduler state=%s, %d attachment(s) in %d batch(es)",
scheduler.state(), len(attachment_paths), len(att_batches),
)
failed_batches: list[int] = []
for idx, att_batch in enumerate(att_batches):
n = len(att_batch)
if n > 0:
estimated = scheduler.estimate_wait(n)
if estimated >= SIGNAL_BATCH_PACING_NOTICE_THRESHOLD:
await _send_inline_notice(
f"(More images coming — pausing ~{_format_wait(estimated)} "
f"for Signal rate limit, batch {idx + 1}/{len(att_batches)}.)"
)
batch_message = message if idx == 0 else ""
for attempt in range(1, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS + 1):
try:
await scheduler.acquire(n)
_rpc_t0 = time.monotonic()
data = await _post(att_batch, batch_message)
_rpc_duration = time.monotonic() - _rpc_t0
if "error" not in data:
await scheduler.report_rpc_duration(_rpc_duration, n)
break
err = data["error"]
if not _is_signal_rate_limit_error(err):
return _error(f"Signal RPC error on batch {idx + 1}/{len(att_batches)}: {err}")
server_retry_after = _extract_retry_after_seconds(err)
scheduler.feedback(server_retry_after, n)
if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
failed_batches.append(idx + 1)
logger.error(
"Signal: rate-limit retries exhausted on batch %d/%d "
"(%d attachments lost, server retry_after=%s)",
idx + 1, len(att_batches), n,
f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
)
break
logger.warning(
"Signal: rate-limited on batch %d/%d "
"(attempt %d/%d, server retry_after=%s); "
"scheduler will pace the retry",
idx + 1, len(att_batches),
attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS,
f"{server_retry_after:.0f}s" if server_retry_after else "unknown",
)
except Exception as e:
if attempt >= SIGNAL_RATE_LIMIT_MAX_ATTEMPTS:
failed_batches.append(idx + 1)
logger.error(
"Signal: send error on batch %d/%d after %d attempts: %s",
idx + 1, len(att_batches), attempt, str(e)
)
break
logger.warning(
"Signal: transient error on batch %d/%d (attempt %d/%d): %s; will retry",
idx + 1, len(att_batches), attempt, SIGNAL_RATE_LIMIT_MAX_ATTEMPTS, str(e)
)
warnings = []
if len(attachment_paths) < len(valid_media):
warnings.append("Some media files were skipped (not found on disk)")
if failed_batches:
warnings.append(
f"Signal rate-limited {len(failed_batches)} batch(es) "
f"(#{', #'.join(str(b) for b in failed_batches)})"
)
if failed_batches and len(failed_batches) == len(att_batches):
return _error(
f"Signal: every batch ({len(att_batches)}) hit rate limit; "
f"no attachments delivered"
)
result = {"success": True, "platform": "signal", "chat_id": chat_id}
if warnings:
result["warnings"] = warnings
return result
except Exception as e:
return _error(f"Signal send failed: {e}")

View File

@ -159,7 +159,7 @@ The adapter supports sending and receiving media in both directions.
The agent can send media files via `MEDIA:` tags in responses. The following delivery methods are supported:
- **Images**`send_image_file` sends PNG, JPEG, GIF, WebP as native Signal attachments
- **Images**`send_multiple_images` and `send_image_file` send PNG, JPEG, GIF, WebP as native Signal attachments
- **Voice**`send_voice` sends audio files (OGG, MP3, WAV, M4A, AAC) as attachments
- **Video**`send_video` sends MP4 video files
- **Documents**`send_document` sends any file type (PDF, ZIP, etc.)
@ -167,6 +167,9 @@ The agent can send media files via `MEDIA:` tags in responses. The following del
All outgoing media goes through Signal's standard attachment API. Unlike some platforms, Signal does not distinguish between voice messages and file attachments at the protocol level.
Attachment size limit: **100 MB** (both directions).
:::warning
**Signal servers will rate-limit attachment uploads**, the adapter uses a scheduler for multiple image sending that batches images in groups of 32 and throttles uploads to match the Signal server policy.
:::
### Native Formatting, Reply Quotes, and Reactions