add basic twilio signature checking and tests

This commit is contained in:
Mariano Nicolini 2026-04-11 15:11:42 -03:00 committed by Teknium
parent cc4b1f0007
commit c22bffc92e
2 changed files with 267 additions and 5 deletions

View File

@ -10,6 +10,8 @@ Shares credentials with the optional telephony skill — same env vars:
Gateway-specific env vars:
- SMS_WEBHOOK_PORT (default 8080)
- SMS_WEBHOOK_HOST (default 0.0.0.0)
- SMS_WEBHOOK_URL (public URL for Twilio signature validation)
- SMS_ALLOWED_USERS (comma-separated E.164 phone numbers)
- SMS_ALLOW_ALL_USERS (true/false)
- SMS_HOME_CHANNEL (phone number for cron delivery)
@ -17,6 +19,8 @@ Gateway-specific env vars:
import asyncio
import base64
import hashlib
import hmac
import logging
import os
import urllib.parse
@ -28,6 +32,7 @@ from gateway.platforms.base import (
MessageEvent,
MessageType,
SendResult,
is_network_accessible,
)
from gateway.platforms.helpers import redact_phone, strip_markdown
@ -36,6 +41,7 @@ logger = logging.getLogger(__name__)
TWILIO_API_BASE = "https://api.twilio.com/2010-04-01/Accounts"
MAX_SMS_LENGTH = 1600 # ~10 SMS segments
DEFAULT_WEBHOOK_PORT = 8080
DEFAULT_WEBHOOK_HOST = "0.0.0.0"
def check_sms_requirements() -> bool:
@ -65,6 +71,8 @@ class SmsAdapter(BasePlatformAdapter):
self._webhook_port: int = int(
os.getenv("SMS_WEBHOOK_PORT", str(DEFAULT_WEBHOOK_PORT))
)
self._webhook_host: str = os.getenv("SMS_WEBHOOK_HOST", DEFAULT_WEBHOOK_HOST)
self._webhook_url: str = os.getenv("SMS_WEBHOOK_URL", "").strip()
self._runner = None
self._http_session: Optional["aiohttp.ClientSession"] = None
@ -86,13 +94,21 @@ class SmsAdapter(BasePlatformAdapter):
logger.error("[sms] TWILIO_PHONE_NUMBER not set — cannot send replies")
return False
if not self._webhook_url:
logger.warning(
"[sms] SMS_WEBHOOK_URL not set — Twilio signature validation is "
"DISABLED. Any client that can reach port %d can inject messages. "
"Set SMS_WEBHOOK_URL to enable signature validation.",
self._webhook_port,
)
app = web.Application()
app.router.add_post("/webhooks/twilio", self._handle_webhook)
app.router.add_get("/health", lambda _: web.Response(text="ok"))
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, "0.0.0.0", self._webhook_port)
site = web.TCPSite(self._runner, self._webhook_host, self._webhook_port)
await site.start()
self._http_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
@ -100,7 +116,8 @@ class SmsAdapter(BasePlatformAdapter):
self._running = True
logger.info(
"[sms] Twilio webhook server listening on port %d, from: %s",
"[sms] Twilio webhook server listening on %s:%d, from: %s",
self._webhook_host,
self._webhook_port,
redact_phone(self._from_number),
)
@ -182,6 +199,28 @@ class SmsAdapter(BasePlatformAdapter):
"""Strip markdown — SMS renders it as literal characters."""
return strip_markdown(content)
# ------------------------------------------------------------------
# Twilio signature validation
# ------------------------------------------------------------------
def _validate_twilio_signature(
self, url: str, post_params: dict, signature: str,
) -> bool:
"""Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64).
Algorithm: https://www.twilio.com/docs/usage/security#validating-requests
"""
data_to_sign = url
for key in sorted(post_params.keys()):
data_to_sign += key + post_params[key]
mac = hmac.new(
self._auth_token.encode("utf-8"),
data_to_sign.encode("utf-8"),
hashlib.sha1,
)
computed = base64.b64encode(mac.digest()).decode("utf-8")
return hmac.compare_digest(computed, signature)
# ------------------------------------------------------------------
# Twilio webhook handler
# ------------------------------------------------------------------
@ -192,7 +231,7 @@ class SmsAdapter(BasePlatformAdapter):
try:
raw = await request.read()
# Twilio sends form-encoded data, not JSON
form = urllib.parse.parse_qs(raw.decode("utf-8"))
form = urllib.parse.parse_qs(raw.decode("utf-8"), keep_blank_values=True)
except Exception as e:
logger.error("[sms] webhook parse error: %s", e)
return web.Response(
@ -201,6 +240,27 @@ class SmsAdapter(BasePlatformAdapter):
status=400,
)
# Validate Twilio request signature when SMS_WEBHOOK_URL is configured
if self._webhook_url:
twilio_sig = request.headers.get("X-Twilio-Signature", "")
if not twilio_sig:
logger.warning("[sms] Rejected: missing X-Twilio-Signature header")
return web.Response(
text='<?xml version="1.0" encoding="UTF-8"?><Response></Response>',
content_type="application/xml",
status=403,
)
flat_params = {k: v[0] for k, v in form.items() if v}
if not self._validate_twilio_signature(
self._webhook_url, flat_params, twilio_sig
):
logger.warning("[sms] Rejected: invalid Twilio signature")
return web.Response(
text='<?xml version="1.0" encoding="UTF-8"?><Response></Response>',
content_type="application/xml",
status=403,
)
# Extract fields (parse_qs returns lists)
from_number = (form.get("From", [""]))[0].strip()
to_number = (form.get("To", [""]))[0].strip()

View File

@ -1,11 +1,14 @@
"""Tests for SMS (Twilio) platform integration.
Covers config loading, format/truncate, echo prevention,
requirements check, and toolset verification.
requirements check, toolset verification, and Twilio signature validation.
"""
import base64
import hashlib
import hmac
import os
from unittest.mock import patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -213,3 +216,202 @@ class TestSmsToolset:
from tools.cronjob_tools import CRONJOB_SCHEMA
deliver_desc = CRONJOB_SCHEMA["parameters"]["properties"]["deliver"]["description"]
assert "sms" in deliver_desc.lower()
# ── Webhook host configuration ─────────────────────────────────────
class TestWebhookHostConfig:
"""Verify SMS_WEBHOOK_HOST env var and default."""
def test_default_host_is_all_interfaces(self):
from gateway.platforms.sms import DEFAULT_WEBHOOK_HOST
assert DEFAULT_WEBHOOK_HOST == "0.0.0.0"
def test_host_from_env(self):
from gateway.platforms.sms import SmsAdapter
env = {
"TWILIO_ACCOUNT_SID": "ACtest",
"TWILIO_AUTH_TOKEN": "tok",
"TWILIO_PHONE_NUMBER": "+15550001111",
"SMS_WEBHOOK_HOST": "127.0.0.1",
}
with patch.dict(os.environ, env):
pc = PlatformConfig(enabled=True, api_key="tok")
adapter = SmsAdapter(pc)
assert adapter._webhook_host == "127.0.0.1"
def test_webhook_url_from_env(self):
from gateway.platforms.sms import SmsAdapter
env = {
"TWILIO_ACCOUNT_SID": "ACtest",
"TWILIO_AUTH_TOKEN": "tok",
"TWILIO_PHONE_NUMBER": "+15550001111",
"SMS_WEBHOOK_URL": "https://example.com/webhooks/twilio",
}
with patch.dict(os.environ, env):
pc = PlatformConfig(enabled=True, api_key="tok")
adapter = SmsAdapter(pc)
assert adapter._webhook_url == "https://example.com/webhooks/twilio"
def test_webhook_url_stripped(self):
from gateway.platforms.sms import SmsAdapter
env = {
"TWILIO_ACCOUNT_SID": "ACtest",
"TWILIO_AUTH_TOKEN": "tok",
"TWILIO_PHONE_NUMBER": "+15550001111",
"SMS_WEBHOOK_URL": " https://example.com/webhooks/twilio ",
}
with patch.dict(os.environ, env):
pc = PlatformConfig(enabled=True, api_key="tok")
adapter = SmsAdapter(pc)
assert adapter._webhook_url == "https://example.com/webhooks/twilio"
# ── Twilio signature validation ────────────────────────────────────
def _compute_twilio_signature(auth_token, url, params):
"""Reference implementation of Twilio's signature algorithm."""
data_to_sign = url
for key in sorted(params.keys()):
data_to_sign += key + params[key]
mac = hmac.new(
auth_token.encode("utf-8"),
data_to_sign.encode("utf-8"),
hashlib.sha1,
)
return base64.b64encode(mac.digest()).decode("utf-8")
class TestTwilioSignatureValidation:
"""Unit tests for SmsAdapter._validate_twilio_signature."""
def _make_adapter(self, auth_token="test_token_secret"):
from gateway.platforms.sms import SmsAdapter
env = {
"TWILIO_ACCOUNT_SID": "ACtest",
"TWILIO_AUTH_TOKEN": auth_token,
"TWILIO_PHONE_NUMBER": "+15550001111",
}
with patch.dict(os.environ, env):
pc = PlatformConfig(enabled=True, api_key=auth_token)
adapter = SmsAdapter(pc)
return adapter
def test_valid_signature_accepted(self):
adapter = self._make_adapter()
url = "https://example.com/webhooks/twilio"
params = {"From": "+15551234567", "Body": "hello", "To": "+15550001111"}
sig = _compute_twilio_signature("test_token_secret", url, params)
assert adapter._validate_twilio_signature(url, params, sig) is True
def test_invalid_signature_rejected(self):
adapter = self._make_adapter()
url = "https://example.com/webhooks/twilio"
params = {"From": "+15551234567", "Body": "hello"}
assert adapter._validate_twilio_signature(url, params, "badsig") is False
def test_wrong_token_rejected(self):
adapter = self._make_adapter(auth_token="correct_token")
url = "https://example.com/webhooks/twilio"
params = {"From": "+15551234567", "Body": "hello"}
sig = _compute_twilio_signature("wrong_token", url, params)
assert adapter._validate_twilio_signature(url, params, sig) is False
def test_params_sorted_by_key(self):
"""Signature must be computed with params sorted alphabetically."""
adapter = self._make_adapter()
url = "https://example.com/webhooks/twilio"
params = {"Zebra": "last", "Alpha": "first", "Middle": "mid"}
sig = _compute_twilio_signature("test_token_secret", url, params)
assert adapter._validate_twilio_signature(url, params, sig) is True
def test_empty_param_values_included(self):
"""Blank values must be included in signature computation."""
adapter = self._make_adapter()
url = "https://example.com/webhooks/twilio"
params = {"From": "+15551234567", "Body": "", "SmsStatus": "received"}
sig = _compute_twilio_signature("test_token_secret", url, params)
assert adapter._validate_twilio_signature(url, params, sig) is True
def test_url_matters(self):
"""Different URLs produce different signatures."""
adapter = self._make_adapter()
params = {"Body": "hello"}
sig = _compute_twilio_signature(
"test_token_secret", "https://a.com/webhooks/twilio", params
)
assert adapter._validate_twilio_signature(
"https://b.com/webhooks/twilio", params, sig
) is False
# ── Webhook signature enforcement (handler-level) ──────────────────
class TestWebhookSignatureEnforcement:
"""Integration tests for signature validation in _handle_webhook."""
def _make_adapter(self, webhook_url=""):
from gateway.platforms.sms import SmsAdapter
env = {
"TWILIO_ACCOUNT_SID": "ACtest",
"TWILIO_AUTH_TOKEN": "test_token_secret",
"TWILIO_PHONE_NUMBER": "+15550001111",
"SMS_WEBHOOK_URL": webhook_url,
}
with patch.dict(os.environ, env):
pc = PlatformConfig(enabled=True, api_key="test_token_secret")
adapter = SmsAdapter(pc)
adapter._message_handler = AsyncMock()
return adapter
def _mock_request(self, body, headers=None):
request = MagicMock()
request.read = AsyncMock(return_value=body)
request.headers = headers or {}
return request
@pytest.mark.asyncio
async def test_no_webhook_url_skips_validation(self):
"""Without SMS_WEBHOOK_URL, all requests are accepted."""
adapter = self._make_adapter(webhook_url="")
body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123"
request = self._mock_request(body)
resp = await adapter._handle_webhook(request)
assert resp.status == 200
@pytest.mark.asyncio
async def test_missing_signature_returns_403(self):
adapter = self._make_adapter(webhook_url="https://example.com/webhooks/twilio")
body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123"
request = self._mock_request(body, headers={})
resp = await adapter._handle_webhook(request)
assert resp.status == 403
@pytest.mark.asyncio
async def test_invalid_signature_returns_403(self):
adapter = self._make_adapter(webhook_url="https://example.com/webhooks/twilio")
body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123"
request = self._mock_request(body, headers={"X-Twilio-Signature": "invalid"})
resp = await adapter._handle_webhook(request)
assert resp.status == 403
@pytest.mark.asyncio
async def test_valid_signature_returns_200(self):
webhook_url = "https://example.com/webhooks/twilio"
adapter = self._make_adapter(webhook_url=webhook_url)
params = {
"From": "+15551234567",
"To": "+15550001111",
"Body": "hello",
"MessageSid": "SM123",
}
sig = _compute_twilio_signature("test_token_secret", webhook_url, params)
body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123"
request = self._mock_request(body, headers={"X-Twilio-Signature": sig})
resp = await adapter._handle_webhook(request)
assert resp.status == 200