Merge pull request #564 from Molecule-AI/feat/issue-549-x-brand-monitor

feat(brand-monitor): X API pay-per-use brand monitor with surge mode → Slack
This commit is contained in:
Hongming Wang 2026-04-16 19:15:12 -07:00 committed by GitHub
commit 28f720ea22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1460 additions and 0 deletions

4
.gitignore vendored
View File

@ -44,6 +44,10 @@ venv/
*.egg-info/
.pytest_cache/
# Brand monitor runtime state (never commit)
brand-monitor/.surge_state.json
brand-monitor/.monitor_state.json
# Docker
*.log

139
brand-monitor/README.md Normal file
View File

@ -0,0 +1,139 @@
# Molecule AI Brand Monitor
A cron-based X API v2 poller that posts new brand mentions of **Molecule AI** to Slack `#brand-monitoring`.
Features:
- Smart query filter (from issue #549) suppresses drug-discovery SEO noise
- Deduplication via `since_id` — never posts the same tweet twice
- First run automatically backfills the last 24 hours
- **Surge mode** — 15-min polling for launch days / crisis windows (see below)
- `@here` alert when engagement > 10 or a competitor name appears
- Daily digest at 20:00 UTC
---
## Setup
### 1. Install dependencies
```bash
cd brand-monitor
pip install -r requirements.txt
```
### 2. Set environment variables
| Variable | Required | Description |
|---|---|---|
| `X_BEARER_TOKEN` | ✅ | X API Bearer token (from the Developer Portal) |
| `X_API_KEY` | ✅ | X API key (available for future OAuth use) |
| `X_API_SECRET` | ✅ | X API secret |
| `SLACK_WEBHOOK_URL` | ✅ | Slack incoming webhook URL for `#brand-monitoring` |
| `POLL_INTERVAL_SECONDS` | optional | Ambient polling cadence (default: `1800` = 30 min) |
| `SURGE_DURATION_HOURS` | optional | Surge window length in hours (default: `6`) |
For local development, create a `.env` file (never commit it):
```bash
X_BEARER_TOKEN=AAA...
X_API_KEY=BBB...
X_API_SECRET=CCC...
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
```
> **TODO (DevOps):** Provision `X_BEARER_TOKEN`, `X_API_KEY`, `X_API_SECRET`, and `SLACK_WEBHOOK_URL`
> as workspace secrets. The X Developer App credentials are pending approval — blocked on that before
> the monitor can run in production.
### 3. Run
```bash
python monitor.py
```
The monitor logs to stdout and polls until interrupted (Ctrl-C or process signal).
---
## Polling Cadence
| Mode | Interval | How long |
|---|---|---|
| **Ambient** | 30 min (`POLL_INTERVAL_SECONDS`) | Continuous |
| **Surge** | 15 min (fixed) | `SURGE_DURATION_HOURS` (default 6 h) |
---
## Surge Mode
Surge mode temporarily increases the polling frequency to 15 minutes for a configurable window (default 6 hours). State is persisted in `.surge_state.json` — if the process restarts during a surge window, it picks back up automatically.
### Activating manually (Slack slash command)
> **TODO:** Configure the Slack app with a `/surge-monitor` slash command that calls the
> `enable_surge_mode()` Python function (or a thin wrapper HTTP endpoint). The Slack app
> configuration is a separate step; the state machine here is ready.
When the command is wired up:
```
/surge-monitor on # enable for default 6 h
/surge-monitor on 12h # enable for 12 h
/surge-monitor off # deactivate immediately
```
### Auto-trigger on `feat:` PR merge
In your CI/CD pipeline (e.g. GitHub Actions), call `enable_surge_mode()` when a PR with a `feat:` prefix is merged:
```python
# In a post-merge CI step:
import sys
sys.path.insert(0, "brand-monitor")
from monitor import enable_surge_mode
enable_surge_mode() # activates for SURGE_DURATION_HOURS
```
Or from the shell:
```bash
python -c "from monitor import enable_surge_mode; enable_surge_mode()"
```
### Deactivation
Surge mode deactivates automatically when its window expires. To force early deactivation:
```python
from surge import SurgeState
SurgeState().disable()
```
---
## Tests
```bash
cd brand-monitor
pip install -r requirements.txt
pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100
```
All HTTP calls are mocked — no live credentials needed in CI.
---
## Gitignored runtime files
- `.surge_state.json` — surge mode state
- `.monitor_state.json` — polling state (since_id, daily counts)
---
## API Cost Estimate
X API pay-per-use: **$0.005 / tweet read**
| Scenario | Reads/month | Est. cost |
|---|---|---|
| Ambient (30 min), ~5 mentions/day | ~150 | $0.75 |
| Surge (15 min) for 6 h, 10 surge events/month | ~300 extra | $1.50 |
| **Total estimate** | **~450800** | **$24/month** |

225
brand-monitor/monitor.py Normal file
View File

@ -0,0 +1,225 @@
"""Brand monitor — main poller entry point.
Entry point:
python monitor.py
Environment variables (all required at startup):
X_BEARER_TOKEN X API Bearer token
X_API_KEY X API key (available for future OAuth use)
X_API_SECRET X API secret
SLACK_WEBHOOK_URL Slack incoming webhook URL
Optional tuning:
POLL_INTERVAL_SECONDS ambient polling cadence in seconds (default: 1800 = 30 min)
SURGE_DURATION_HOURS surge window length in hours (default: 6)
"""
import json
import logging
import os
import time
from datetime import datetime, timedelta, timezone
from slack_client import SlackClient
from surge import SurgeState
from x_client import XClient
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Constants
# ------------------------------------------------------------------
REQUIRED_ENV_VARS = ["X_BEARER_TOKEN", "X_API_KEY", "X_API_SECRET", "SLACK_WEBHOOK_URL"]
DEFAULT_STATE_FILE = ".monitor_state.json"
# Ambient cadence: 30 min per issue spec (configurable via env)
POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "1800"))
# Surge cadence: fixed at 15 min
SURGE_INTERVAL_SECONDS = 900
# Surge window length (configurable via env)
SURGE_DURATION_HOURS = int(os.environ.get("SURGE_DURATION_HOURS", "6"))
# UTC hour at which the daily digest is sent
DIGEST_HOUR_UTC = 20
# ------------------------------------------------------------------
# Startup validation
# ------------------------------------------------------------------
def validate_env():
"""Raise EnvironmentError if any required env var is absent."""
missing = [v for v in REQUIRED_ENV_VARS if not os.environ.get(v)]
if missing:
raise EnvironmentError(
f"Missing required environment variable(s): {', '.join(missing)}"
)
# ------------------------------------------------------------------
# Surge mode public entry point (callable from CI/CD on feat: PR merge)
# ------------------------------------------------------------------
def enable_surge_mode(duration_hours=None, state_file=None):
"""Enable surge mode. Call this from CI/CD hooks on feat: PR merges.
Args:
duration_hours: Override for surge window length. Defaults to the
SURGE_DURATION_HOURS env var (or 6 h).
state_file: Override path for .surge_state.json (mainly for tests).
"""
hours = duration_hours if duration_hours is not None else SURGE_DURATION_HOURS
kwargs = {}
if state_file is not None:
kwargs["state_file"] = state_file
surge = SurgeState(**kwargs)
surge.enable(hours)
logger.info("enable_surge_mode: activated for %d hour(s)", hours)
# ------------------------------------------------------------------
# Monitor class
# ------------------------------------------------------------------
class Monitor:
"""Cron-style poller: fetches new X mentions and posts them to Slack.
Args:
state_file: Path to the JSON file that persists polling state
(since_id, daily_count, etc.). Defaults to
``.monitor_state.json`` in the current directory.
surge_state_file: Path to the surge state JSON file.
"""
def __init__(self, state_file=DEFAULT_STATE_FILE, surge_state_file=None):
validate_env()
self.x_client = XClient()
self.slack_client = SlackClient()
surge_kwargs = {}
if surge_state_file is not None:
surge_kwargs["state_file"] = surge_state_file
self.surge = SurgeState(**surge_kwargs)
self.state_file = state_file
self.state = self._load_state()
# ------------------------------------------------------------------
# State persistence
# ------------------------------------------------------------------
def _load_state(self):
if os.path.exists(self.state_file):
with open(self.state_file) as fh:
return json.load(fh)
return {}
def _save_state(self):
with open(self.state_file, "w") as fh:
json.dump(self.state, fh, indent=2)
# ------------------------------------------------------------------
# Core poll
# ------------------------------------------------------------------
def run_poll(self):
"""Fetch new tweets and post them to Slack.
On first run (no saved since_id) backfills the last 24 h.
Tracks the newest tweet ID so subsequent runs avoid duplicates.
Returns:
list: tweets posted this cycle (may be empty).
"""
since_id = self.state.get("since_id")
start_time = None
if not since_id:
# First run: backfill last 24 h
start_time = (
datetime.now(timezone.utc) - timedelta(hours=24)
).strftime("%Y-%m-%dT%H:%M:%SZ")
logger.info("First run — backfilling last 24 h (start_time=%s)", start_time)
tweets = self.x_client.search_recent(since_id=since_id, start_time=start_time)
if tweets:
self.slack_client.post_mentions(tweets)
# X API returns tweets newest-first; store the top ID as next since_id
self.state["since_id"] = tweets[0]["id"]
return tweets
# ------------------------------------------------------------------
# Daily digest
# ------------------------------------------------------------------
def _should_send_digest(self):
"""True if it's 20:00 UTC and today's digest hasn't been sent yet."""
now = datetime.now(timezone.utc)
if now.hour != DIGEST_HOUR_UTC:
return False
today = now.strftime("%Y-%m-%d")
return self.state.get("last_digest_date") != today
def run_daily_digest(self):
"""Compile and post the daily summary to Slack, then reset the counter."""
mention_count = self.state.get("daily_count", 0)
self.slack_client.post_digest({"count": mention_count})
self.state["daily_count"] = 0
self.state["last_digest_date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
self._save_state()
logger.info("Daily digest sent (count=%d)", mention_count)
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def _run_once(self):
"""Execute one full polling cycle.
Returns:
int: seconds to sleep before the next cycle.
"""
self.surge.check_expiry()
tweets = self.run_poll()
# Accumulate daily mention count
self.state["daily_count"] = self.state.get("daily_count", 0) + len(tweets)
self._save_state()
if self._should_send_digest():
self.run_daily_digest()
return self.surge.get_interval(POLL_INTERVAL_SECONDS, SURGE_INTERVAL_SECONDS)
def run(self):
"""Blocking main loop. Runs until interrupted."""
logger.info(
"Brand monitor starting — ambient interval %ds, surge interval %ds",
POLL_INTERVAL_SECONDS,
SURGE_INTERVAL_SECONDS,
)
while True:
try:
interval = self._run_once()
except Exception as exc: # noqa: BLE001
logger.error("Poll cycle failed: %s", exc)
interval = POLL_INTERVAL_SECONDS
logger.debug("Sleeping %ds until next poll", interval)
time.sleep(interval)
# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------
if __name__ == "__main__": # pragma: no cover
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
)
monitor = Monitor()
monitor.run()

View File

@ -0,0 +1,6 @@
requests==2.33.1
python-dotenv==1.0.1
# Test / dev
pytest==8.3.5
pytest-cov==6.1.0

View File

@ -0,0 +1,149 @@
"""Slack webhook client for posting brand mentions and daily digest."""
import os
import logging
import requests
logger = logging.getLogger(__name__)
# Competitor names that auto-trigger @here alert
COMPETITOR_NAMES = [
"openai", "langchain", "langgraph", "autogen", "crewai", "crew ai",
"llamaindex", "dify", "flowise", "n8n", "zapier", "make.com",
]
# Engagement threshold above which @here is triggered
AT_HERE_ENGAGEMENT_THRESHOLD = 10
class SlackClient:
"""Posts brand mention alerts and daily digests to a Slack webhook.
Webhook URL from SLACK_WEBHOOK_URL env var.
"""
def __init__(self):
self.webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not self.webhook_url:
raise EnvironmentError("Missing required environment variable: SLACK_WEBHOOK_URL")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _engagement_score(self, tweet):
"""Sum of likes + retweets + replies."""
metrics = tweet.get("public_metrics", {})
return (
metrics.get("like_count", 0)
+ metrics.get("retweet_count", 0)
+ metrics.get("reply_count", 0)
)
def _escape_mrkdwn(self, text: str) -> str:
"""Escape Slack mrkdwn special characters in untrusted content."""
return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
def _should_at_here(self, tweet):
"""Return True if the tweet warrants an @here ping."""
if self._engagement_score(tweet) > AT_HERE_ENGAGEMENT_THRESHOLD:
return True
text = tweet.get("text", "").lower()
return any(comp in text for comp in COMPETITOR_NAMES)
def _format_tweet_block(self, tweet):
"""Format a single tweet as a Slack mrkdwn string."""
tweet_id = tweet.get("id", "")
author_id = tweet.get("author_id", "unknown")
text = tweet.get("text", "").replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
created_at = tweet.get("created_at", "")
metrics = tweet.get("public_metrics", {})
url = f"https://twitter.com/i/web/status/{tweet_id}"
return (
f"*New mention* — <{url}|view>\n"
f">{text}\n"
f"Author: `{author_id}` | "
f"❤️ {metrics.get('like_count', 0)} "
f"🔁 {metrics.get('retweet_count', 0)} "
f"💬 {metrics.get('reply_count', 0)}\n"
f"_Posted: {created_at}_"
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def post_mentions(self, tweets):
"""Bundle and post new brand mentions to Slack.
Multiple tweets are sent in a single webhook payload, not one per tweet.
Args:
tweets: List of tweet dicts from XClient.search_recent().
Returns:
None. No-ops on empty list.
Raises:
requests.HTTPError: On non-2xx Slack response.
"""
if not tweets:
return
has_at_here = any(self._should_at_here(t) for t in tweets)
blocks = []
if has_at_here:
blocks.append(
{"type": "section", "text": {"type": "mrkdwn", "text": "<!here>"}}
)
count = len(tweets)
header = f"*{count} new Molecule AI mention{'s' if count > 1 else ''}* in #brand-monitoring"
blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": header}})
blocks.append({"type": "divider"})
for tweet in tweets:
blocks.append(
{"type": "section", "text": {"type": "mrkdwn", "text": self._format_tweet_block(tweet)}}
)
blocks.append({"type": "divider"})
payload = {"blocks": blocks}
logger.info("Posting %d mention(s) to Slack (at_here=%s)", count, has_at_here)
response = requests.post(self.webhook_url, json=payload, timeout=15)
response.raise_for_status()
def post_digest(self, summary):
"""Post the daily 20:00 UTC mention digest to Slack.
Args:
summary: Dict with keys:
count (int): total mentions today
top_tweets (list, optional): list of high-engagement tweet dicts
Raises:
requests.HTTPError: On non-2xx Slack response.
"""
count = summary.get("count", 0)
top_tweets = summary.get("top_tweets", [])
lines = [
"*📊 Daily Digest — Molecule AI Brand Mentions*",
f"Total mentions today: *{count}*",
]
if top_tweets:
lines.append("\n*Top engagements:*")
for tweet in top_tweets[:3]:
snippet = self._escape_mrkdwn(tweet.get("text", "")[:120])
score = self._engagement_score(tweet)
tweet_id = tweet.get("id", "")
url = f"https://twitter.com/i/web/status/{tweet_id}"
lines.append(f"• <{url}|{snippet}…> _(score: {score})_")
payload = {"text": "\n".join(lines)}
logger.info("Posting daily digest to Slack (count=%d)", count)
response = requests.post(self.webhook_url, json=payload, timeout=15)
response.raise_for_status()

114
brand-monitor/surge.py Normal file
View File

@ -0,0 +1,114 @@
"""Surge mode state machine.
Surge mode increases polling frequency from 30 min to 15 min for a
configurable window (default 6 h). State is persisted in a JSON file so
restarts during an active surge window continue in surge mode.
Activation paths:
1. Manual: call enable_surge_mode() (or the Slack slash command /surge-monitor on)
2. Auto: any PR merged with a 'feat:' prefix calls enable_surge_mode()
"""
import json
import logging
import os
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
DEFAULT_SURGE_FILE = ".surge_state.json"
DEFAULT_SURGE_DURATION_HOURS = 6
class SurgeState:
"""Persist and query surge mode activation.
Args:
state_file: Path to the JSON state file. Defaults to
``.surge_state.json`` in the current directory.
"""
def __init__(self, state_file=DEFAULT_SURGE_FILE):
self.state_file = state_file
# ------------------------------------------------------------------
# State I/O
# ------------------------------------------------------------------
def _load(self):
"""Return parsed state dict, or None if the file doesn't exist."""
if not os.path.exists(self.state_file):
return None
with open(self.state_file) as fh:
return json.load(fh)
def _write(self, state):
with open(self.state_file, "w") as fh:
json.dump(state, fh, indent=2)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def enable(self, duration_hours=DEFAULT_SURGE_DURATION_HOURS):
"""Activate surge mode for *duration_hours* hours.
Writes ``.surge_state.json`` so that restarts re-enter surge mode.
Args:
duration_hours: How long surge mode stays active (default 6 h).
"""
expires_at = (
datetime.now(timezone.utc) + timedelta(hours=duration_hours)
).isoformat()
state = {
"active": True,
"enabled_at": datetime.now(timezone.utc).isoformat(),
"expires_at": expires_at,
"duration_hours": duration_hours,
}
self._write(state)
logger.info("Surge mode enabled for %dh — expires at %s", duration_hours, expires_at)
def disable(self):
"""Deactivate surge mode and remove the state file."""
if os.path.exists(self.state_file):
os.remove(self.state_file)
logger.info("Surge mode disabled")
def is_active(self):
"""Return True if surge mode is currently active (and not expired).
Side effect: auto-disables if the expiry timestamp has passed.
"""
state = self._load()
if not state:
return False
expires_at = datetime.fromisoformat(state["expires_at"])
if datetime.now(timezone.utc) >= expires_at:
logger.info("Surge mode expired — auto-disabling")
self.disable()
return False
return True
def check_expiry(self):
"""Auto-disable surge if its window has elapsed.
Returns:
bool: whether surge mode is still active after the check.
"""
return self.is_active()
def get_interval(self, normal_interval, surge_interval):
"""Return the appropriate polling interval in seconds.
Args:
normal_interval: Seconds to sleep in ambient mode.
surge_interval: Seconds to sleep while surge is active.
Returns:
int: surge_interval if surge is active, else normal_interval.
"""
if self.is_active():
return surge_interval
return normal_interval

View File

@ -0,0 +1,758 @@
"""Full test suite for brand-monitor modules.
Run:
pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100
All HTTP calls are mocked no live API calls, no credentials needed.
"""
import json
import os
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, call, patch
import pytest
import requests
# ---------------------------------------------------------------------------
# Shared fixtures / constants
# ---------------------------------------------------------------------------
BASE_ENV = {
"X_BEARER_TOKEN": "test-bearer-token",
"X_API_KEY": "test-api-key",
"X_API_SECRET": "test-api-secret",
"SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/TEST",
}
SAMPLE_TWEET = {
"id": "1111111111",
"text": "Really excited about Molecule AI's agent platform — great SDK!",
"author_id": "9876543210",
"created_at": "2024-01-01T12:00:00Z",
"public_metrics": {
"like_count": 3,
"retweet_count": 1,
"reply_count": 2,
},
}
SAMPLE_TWEET_HIGH_ENGAGEMENT = {
"id": "2222222222",
"text": "Molecule AI multi-agent workflow is incredible",
"author_id": "1111111111",
"created_at": "2024-01-01T13:00:00Z",
"public_metrics": {
"like_count": 50,
"retweet_count": 20,
"reply_count": 15,
},
}
SAMPLE_TWEET_COMPETITOR = {
"id": "3333333333",
"text": "Comparing Molecule AI with langchain for our orchestration workflow",
"author_id": "2222222222",
"created_at": "2024-01-01T14:00:00Z",
"public_metrics": {
"like_count": 0,
"retweet_count": 0,
"reply_count": 0,
},
}
# ===========================================================================
# x_client tests
# ===========================================================================
class TestXClient:
def test_init_missing_token_raises(self):
from x_client import XClient
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"):
XClient()
def test_init_success(self):
from x_client import XClient
with patch.dict(os.environ, {"X_BEARER_TOKEN": "my-token"}):
client = XClient()
assert client.bearer_token == "my-token"
def _make_client(self):
from x_client import XClient
with patch.dict(os.environ, {"X_BEARER_TOKEN": "tok"}):
return XClient()
def test_search_recent_returns_tweets(self):
from x_client import SEARCH_QUERY, SEARCH_URL
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
mock_resp.json.return_value = {"data": [SAMPLE_TWEET]}
with patch("x_client.requests.get", return_value=mock_resp) as mock_get:
result = client.search_recent()
assert result == [SAMPLE_TWEET]
# Verify URL, auth header and query string
args, kwargs = mock_get.call_args
assert args[0] == SEARCH_URL
assert kwargs["headers"]["Authorization"] == "Bearer tok"
assert kwargs["params"]["query"] == SEARCH_QUERY
def test_search_recent_no_data_key_returns_empty_list(self):
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
mock_resp.json.return_value = {"meta": {"result_count": 0}}
with patch("x_client.requests.get", return_value=mock_resp):
result = client.search_recent()
assert result == []
def test_search_recent_with_since_id_adds_param(self):
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
mock_resp.json.return_value = {"data": [SAMPLE_TWEET]}
with patch("x_client.requests.get", return_value=mock_resp) as mock_get:
client.search_recent(since_id="9999")
params = mock_get.call_args.kwargs["params"]
assert params["since_id"] == "9999"
def test_search_recent_with_start_time_adds_param(self):
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
mock_resp.json.return_value = {"data": []}
with patch("x_client.requests.get", return_value=mock_resp) as mock_get:
client.search_recent(start_time="2024-01-01T00:00:00Z")
params = mock_get.call_args.kwargs["params"]
assert params["start_time"] == "2024-01-01T00:00:00Z"
def test_search_recent_no_since_id_no_start_time_omits_params(self):
"""Neither since_id nor start_time in params when not provided."""
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
mock_resp.json.return_value = {"data": []}
with patch("x_client.requests.get", return_value=mock_resp) as mock_get:
client.search_recent()
params = mock_get.call_args.kwargs["params"]
assert "since_id" not in params
assert "start_time" not in params
def test_search_recent_http_error_propagates(self):
client = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.side_effect = requests.HTTPError("403 Forbidden")
with patch("x_client.requests.get", return_value=mock_resp):
with pytest.raises(requests.HTTPError):
client.search_recent()
# ===========================================================================
# slack_client tests
# ===========================================================================
class TestSlackClient:
def _make_client(self):
from slack_client import SlackClient
with patch.dict(os.environ, {"SLACK_WEBHOOK_URL": "https://hooks.slack.com/test"}):
return SlackClient()
def test_init_missing_webhook_raises(self):
from slack_client import SlackClient
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(EnvironmentError, match="SLACK_WEBHOOK_URL"):
SlackClient()
def test_init_success(self):
c = self._make_client()
assert c.webhook_url == "https://hooks.slack.com/test"
def test_engagement_score_sums_correctly(self):
c = self._make_client()
tweet = {"public_metrics": {"like_count": 5, "retweet_count": 3, "reply_count": 2}}
assert c._engagement_score(tweet) == 10
def test_engagement_score_missing_metrics_returns_zero(self):
c = self._make_client()
assert c._engagement_score({}) == 0
def test_should_at_here_high_engagement_returns_true(self):
c = self._make_client()
assert c._should_at_here(SAMPLE_TWEET_HIGH_ENGAGEMENT) is True
def test_should_at_here_competitor_name_returns_true(self):
c = self._make_client()
# SAMPLE_TWEET_COMPETITOR contains "langchain" — engagement is 0
assert c._should_at_here(SAMPLE_TWEET_COMPETITOR) is True
def test_should_at_here_normal_tweet_returns_false(self):
c = self._make_client()
# SAMPLE_TWEET: engagement=6 (<=10), no competitor
assert c._should_at_here(SAMPLE_TWEET) is False
def test_post_mentions_empty_list_is_noop(self):
c = self._make_client()
with patch("slack_client.requests.post") as mock_post:
c.post_mentions([])
mock_post.assert_not_called()
def test_post_mentions_single_tweet_no_at_here(self):
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_mentions([SAMPLE_TWEET])
mock_post.assert_called_once()
payload = mock_post.call_args.kwargs["json"]
section_texts = [
b["text"]["text"]
for b in payload["blocks"]
if b.get("type") == "section"
]
# No @here for normal engagement tweet
assert not any("<!here>" in t for t in section_texts)
# Header mentions "1 new … mention"
assert any("1 new" in t for t in section_texts)
def test_post_mentions_multiple_tweets_with_at_here(self):
"""High-engagement tweet triggers @here; both tweets appear in payload."""
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_mentions([SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET])
payload = mock_post.call_args.kwargs["json"]
section_texts = [
b["text"]["text"]
for b in payload["blocks"]
if b.get("type") == "section"
]
assert any("<!here>" in t for t in section_texts)
assert any("2 new" in t for t in section_texts)
def test_post_mentions_html_escaping_in_tweet_text(self):
"""< > & in tweet text are escaped to prevent Slack mrkdwn injection."""
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
tweet = {**SAMPLE_TWEET, "text": "X < Y & Z > W"}
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_mentions([tweet])
raw = str(mock_post.call_args.kwargs["json"])
assert "&lt;" in raw
assert "&gt;" in raw
assert "&amp;" in raw
def test_post_mentions_http_error_propagates(self):
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.side_effect = requests.HTTPError("500")
with patch("slack_client.requests.post", return_value=mock_resp):
with pytest.raises(requests.HTTPError):
c.post_mentions([SAMPLE_TWEET])
def test_post_digest_count_only_no_top_tweets(self):
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_digest({"count": 42})
text = mock_post.call_args.kwargs["json"]["text"]
assert "42" in text
assert "Top engagements" not in text
def test_post_digest_with_top_tweets_included(self):
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_digest({"count": 10, "top_tweets": [SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET]})
text = mock_post.call_args.kwargs["json"]["text"]
assert "Top engagements" in text
def test_post_digest_mrkdwn_escaping_in_snippet(self):
"""< > & in top-tweet snippets are escaped to prevent mrkdwn injection."""
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.return_value = None
malicious_tweet = {**SAMPLE_TWEET, "text": "X < Y & Z > W <!channel>"}
with patch("slack_client.requests.post", return_value=mock_resp) as mock_post:
c.post_digest({"count": 1, "top_tweets": [malicious_tweet]})
text = mock_post.call_args.kwargs["json"]["text"]
assert "&lt;" in text
assert "&gt;" in text
assert "&amp;" in text
assert "<!channel>" not in text
assert "<" not in text.split("twitter.com")[1] # no raw < after the URL
def test_post_digest_http_error_propagates(self):
c = self._make_client()
mock_resp = MagicMock()
mock_resp.raise_for_status.side_effect = requests.HTTPError("500")
with patch("slack_client.requests.post", return_value=mock_resp):
with pytest.raises(requests.HTTPError):
c.post_digest({"count": 1})
# ===========================================================================
# surge tests
# ===========================================================================
class TestSurgeState:
def _make_surge(self, tmp_path):
from surge import SurgeState
return SurgeState(state_file=str(tmp_path / ".surge_state.json"))
def test_init_default_state_file(self):
from surge import DEFAULT_SURGE_FILE, SurgeState
s = SurgeState()
assert s.state_file == DEFAULT_SURGE_FILE
def test_init_custom_state_file(self, tmp_path):
s = self._make_surge(tmp_path)
assert ".surge_state.json" in s.state_file
def test_enable_writes_state_file_with_correct_fields(self, tmp_path):
s = self._make_surge(tmp_path)
s.enable(duration_hours=3)
state = json.loads(open(s.state_file).read())
assert state["active"] is True
assert state["duration_hours"] == 3
assert "expires_at" in state
assert "enabled_at" in state
def test_enable_default_duration(self, tmp_path):
from surge import DEFAULT_SURGE_DURATION_HOURS
s = self._make_surge(tmp_path)
s.enable()
state = json.loads(open(s.state_file).read())
assert state["duration_hours"] == DEFAULT_SURGE_DURATION_HOURS
def test_disable_removes_file(self, tmp_path):
s = self._make_surge(tmp_path)
s.enable()
assert os.path.exists(s.state_file)
s.disable()
assert not os.path.exists(s.state_file)
def test_disable_no_file_does_not_raise(self, tmp_path):
s = self._make_surge(tmp_path)
# File doesn't exist — should be silent
s.disable()
def test_is_active_no_file_returns_false(self, tmp_path):
s = self._make_surge(tmp_path)
assert s.is_active() is False
def test_is_active_not_expired_returns_true(self, tmp_path):
s = self._make_surge(tmp_path)
s.enable(duration_hours=6)
assert s.is_active() is True
def test_is_active_expired_auto_disables_returns_false(self, tmp_path):
s = self._make_surge(tmp_path)
# Write an already-expired state
past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w"))
assert s.is_active() is False
assert not os.path.exists(s.state_file)
def test_check_expiry_returns_true_when_active(self, tmp_path):
s = self._make_surge(tmp_path)
s.enable(duration_hours=6)
assert s.check_expiry() is True
def test_check_expiry_returns_false_when_expired(self, tmp_path):
s = self._make_surge(tmp_path)
past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w"))
assert s.check_expiry() is False
def test_get_interval_surge_active_returns_surge_interval(self, tmp_path):
s = self._make_surge(tmp_path)
s.enable(duration_hours=6)
assert s.get_interval(1800, 900) == 900
def test_get_interval_surge_inactive_returns_normal_interval(self, tmp_path):
s = self._make_surge(tmp_path)
assert s.get_interval(1800, 900) == 1800
# ===========================================================================
# monitor — validate_env tests
# ===========================================================================
class TestValidateEnv:
def test_all_vars_present_passes(self):
from monitor import validate_env
with patch.dict(os.environ, BASE_ENV, clear=False):
validate_env() # must not raise
def test_single_missing_var_raises_with_name(self):
from monitor import validate_env
env = {k: v for k, v in BASE_ENV.items() if k != "X_BEARER_TOKEN"}
with patch.dict(os.environ, env, clear=True):
with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"):
validate_env()
def test_multiple_missing_vars_raises_with_all_names(self):
from monitor import validate_env
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(EnvironmentError) as exc_info:
validate_env()
msg = str(exc_info.value)
assert "X_BEARER_TOKEN" in msg
assert "SLACK_WEBHOOK_URL" in msg
# ===========================================================================
# monitor — enable_surge_mode tests
# ===========================================================================
class TestEnableSurgeMode:
def test_default_duration_uses_env_default(self, tmp_path):
from monitor import SURGE_DURATION_HOURS, enable_surge_mode
sf = str(tmp_path / ".surge.json")
enable_surge_mode(state_file=sf)
state = json.loads(open(sf).read())
assert state["duration_hours"] == SURGE_DURATION_HOURS
def test_custom_duration_overrides_default(self, tmp_path):
from monitor import enable_surge_mode
sf = str(tmp_path / ".surge.json")
enable_surge_mode(duration_hours=12, state_file=sf)
state = json.loads(open(sf).read())
assert state["duration_hours"] == 12
def test_no_state_file_override_uses_default_path(self):
"""When state_file=None, SurgeState() is constructed with no kwargs."""
from monitor import enable_surge_mode
with patch("monitor.SurgeState") as MockSurge:
mock_instance = MagicMock()
MockSurge.return_value = mock_instance
enable_surge_mode(duration_hours=3)
MockSurge.assert_called_once_with()
mock_instance.enable.assert_called_once_with(3)
# ===========================================================================
# monitor — Monitor class tests
# ===========================================================================
class TestMonitor:
"""Tests for the Monitor class."""
# ------------------------------------------------------------------
# Constructor helpers
# ------------------------------------------------------------------
def _make_monitor(self, tmp_path, state_data=None):
"""Build a Monitor with temp files and mocked HTTP clients."""
from monitor import Monitor
state_file = str(tmp_path / "monitor_state.json")
surge_file = str(tmp_path / "surge_state.json")
if state_data is not None:
json.dump(state_data, open(state_file, "w"))
with patch.dict(os.environ, BASE_ENV, clear=False):
with patch("monitor.XClient"), patch("monitor.SlackClient"):
m = Monitor(state_file=state_file, surge_state_file=surge_file)
return m
# ------------------------------------------------------------------
# __init__
# ------------------------------------------------------------------
def test_init_success_with_empty_state(self, tmp_path):
m = self._make_monitor(tmp_path)
assert m.state == {}
def test_init_loads_existing_state_file(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"since_id": "abc"})
assert m.state["since_id"] == "abc"
def test_init_missing_env_raises(self, tmp_path):
from monitor import Monitor
sf = str(tmp_path / "st.json")
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(EnvironmentError):
Monitor(state_file=sf)
def test_init_surge_state_file_none_uses_default(self, tmp_path):
"""surge_state_file=None → SurgeState constructed with no kwargs."""
from monitor import Monitor
sf = str(tmp_path / "st.json")
with patch.dict(os.environ, BASE_ENV, clear=False):
with patch("monitor.XClient"), patch("monitor.SlackClient"):
with patch("monitor.SurgeState") as MockSurge:
Monitor(state_file=sf) # surge_state_file defaults to None
MockSurge.assert_called_once_with()
def test_init_surge_state_file_provided_passes_kwarg(self, tmp_path):
"""surge_state_file provided → SurgeState(state_file=...) is called."""
from monitor import Monitor
sf = str(tmp_path / "st.json")
surge_sf = str(tmp_path / "surge.json")
with patch.dict(os.environ, BASE_ENV, clear=False):
with patch("monitor.XClient"), patch("monitor.SlackClient"):
with patch("monitor.SurgeState") as MockSurge:
Monitor(state_file=sf, surge_state_file=surge_sf)
MockSurge.assert_called_once_with(state_file=surge_sf)
# ------------------------------------------------------------------
# _load_state / _save_state
# ------------------------------------------------------------------
def test_load_state_no_file_returns_empty_dict(self, tmp_path):
m = self._make_monitor(tmp_path)
assert m._load_state() == {}
def test_load_state_existing_file_returns_contents(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"since_id": "XYZ"})
assert m._load_state()["since_id"] == "XYZ"
def test_save_state_persists_to_disk(self, tmp_path):
m = self._make_monitor(tmp_path)
m.state["since_id"] = "saved"
m._save_state()
on_disk = json.loads(open(m.state_file).read())
assert on_disk["since_id"] == "saved"
# ------------------------------------------------------------------
# run_poll
# ------------------------------------------------------------------
def test_run_poll_first_run_uses_start_time_backfill(self, tmp_path):
"""No since_id → search_recent called with start_time set, since_id=None."""
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = [SAMPLE_TWEET]
tweets = m.run_poll()
kw = m.x_client.search_recent.call_args.kwargs
assert kw["since_id"] is None
assert kw["start_time"] is not None # 24h backfill
assert tweets == [SAMPLE_TWEET]
assert m.state["since_id"] == SAMPLE_TWEET["id"]
def test_run_poll_subsequent_run_passes_since_id(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"since_id": "prev_tweet_id"})
m.x_client.search_recent.return_value = [SAMPLE_TWEET]
m.run_poll()
kw = m.x_client.search_recent.call_args.kwargs
assert kw["since_id"] == "prev_tweet_id"
def test_run_poll_no_tweets_does_not_post_to_slack(self, tmp_path):
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = []
tweets = m.run_poll()
m.slack_client.post_mentions.assert_not_called()
assert "since_id" not in m.state
assert tweets == []
def test_run_poll_no_tweets_preserves_existing_since_id(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"since_id": "old_id"})
m.x_client.search_recent.return_value = []
m.run_poll()
assert m.state["since_id"] == "old_id"
def test_run_poll_new_tweets_posts_to_slack_and_updates_since_id(self, tmp_path):
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = [SAMPLE_TWEET]
m.run_poll()
m.slack_client.post_mentions.assert_called_once_with([SAMPLE_TWEET])
assert m.state["since_id"] == SAMPLE_TWEET["id"]
# ------------------------------------------------------------------
# _should_send_digest
# ------------------------------------------------------------------
def test_should_send_digest_wrong_hour_returns_false(self, tmp_path):
m = self._make_monitor(tmp_path)
fake_now = datetime(2024, 1, 1, 15, 0, 0, tzinfo=timezone.utc) # 15:00 UTC
with patch("monitor.datetime") as mock_dt:
mock_dt.now.return_value = fake_now
assert m._should_send_digest() is False
def test_should_send_digest_correct_hour_not_yet_sent_returns_true(self, tmp_path):
m = self._make_monitor(tmp_path)
fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc) # 20:00 UTC
with patch("monitor.datetime") as mock_dt:
mock_dt.now.return_value = fake_now
assert m._should_send_digest() is True
def test_should_send_digest_already_sent_today_returns_false(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"last_digest_date": "2024-01-01"})
fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc)
with patch("monitor.datetime") as mock_dt:
mock_dt.now.return_value = fake_now
assert m._should_send_digest() is False
# ------------------------------------------------------------------
# run_daily_digest
# ------------------------------------------------------------------
def test_run_daily_digest_posts_count_and_resets(self, tmp_path):
m = self._make_monitor(tmp_path, state_data={"daily_count": 7})
m.run_daily_digest()
m.slack_client.post_digest.assert_called_once_with({"count": 7})
assert m.state["daily_count"] == 0
assert "last_digest_date" in m.state
# ------------------------------------------------------------------
# _run_once
# ------------------------------------------------------------------
def test_run_once_no_digest_returns_normal_interval(self, tmp_path):
from monitor import POLL_INTERVAL_SECONDS
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = [SAMPLE_TWEET]
with patch.object(m, "_should_send_digest", return_value=False):
interval = m._run_once()
assert m.state["daily_count"] == 1
assert interval == POLL_INTERVAL_SECONDS
def test_run_once_triggers_digest_when_due(self, tmp_path):
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = []
with patch.object(m, "_should_send_digest", return_value=True):
with patch.object(m, "run_daily_digest") as mock_digest:
m._run_once()
mock_digest.assert_called_once()
def test_run_once_returns_surge_interval_when_surge_active(self, tmp_path):
from monitor import SURGE_INTERVAL_SECONDS
m = self._make_monitor(tmp_path)
m.x_client.search_recent.return_value = []
m.surge.enable(duration_hours=6)
with patch.object(m, "_should_send_digest", return_value=False):
interval = m._run_once()
assert interval == SURGE_INTERVAL_SECONDS
# ------------------------------------------------------------------
# run (infinite loop)
# ------------------------------------------------------------------
def test_run_normal_path_sleeps_with_returned_interval(self, tmp_path):
from monitor import Monitor, POLL_INTERVAL_SECONDS
sf = str(tmp_path / "st.json")
surge_sf = str(tmp_path / "surge.json")
with patch.dict(os.environ, BASE_ENV, clear=False):
with patch("monitor.XClient"), patch("monitor.SlackClient"):
m = Monitor(state_file=sf, surge_state_file=surge_sf)
sleep_calls = []
def fake_sleep(n):
sleep_calls.append(n)
raise SystemExit("terminate test loop")
with patch.object(m, "_run_once", return_value=POLL_INTERVAL_SECONDS):
with patch("monitor.time.sleep", side_effect=fake_sleep):
with pytest.raises(SystemExit):
m.run()
assert sleep_calls == [POLL_INTERVAL_SECONDS]
def test_run_exception_in_run_once_falls_back_to_poll_interval(self, tmp_path):
from monitor import Monitor, POLL_INTERVAL_SECONDS
sf = str(tmp_path / "st.json")
surge_sf = str(tmp_path / "surge.json")
with patch.dict(os.environ, BASE_ENV, clear=False):
with patch("monitor.XClient"), patch("monitor.SlackClient"):
m = Monitor(state_file=sf, surge_state_file=surge_sf)
sleep_calls = []
def fake_sleep(n):
sleep_calls.append(n)
raise SystemExit("terminate test loop")
with patch.object(m, "_run_once", side_effect=RuntimeError("api exploded")):
with patch("monitor.time.sleep", side_effect=fake_sleep):
with pytest.raises(SystemExit):
m.run()
# On exception, sleep is called with the ambient interval
assert sleep_calls == [POLL_INTERVAL_SECONDS]

65
brand-monitor/x_client.py Normal file
View File

@ -0,0 +1,65 @@
"""X API v2 thin client for brand mention search."""
import os
import logging
import requests
logger = logging.getLogger(__name__)
SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
# Verbatim from issue #549 — drug-discovery SEO noise suppressed at query level
SEARCH_QUERY = (
'("Molecule AI" OR "@moleculeai") '
'(agent OR workflow OR orchestrat OR "multi-agent" OR developer OR SDK OR API OR "agent platform") '
'-moleculeai.com -molecule.ai -"drug discovery" -pharmaceutical -CRISPR -oncology '
'-is:retweet lang:en'
)
TWEET_FIELDS = "author_id,created_at,public_metrics,entities"
class XClient:
"""Thin wrapper around X API v2 recent-search endpoint.
Auth: Bearer token from X_BEARER_TOKEN env var.
"""
def __init__(self):
self.bearer_token = os.environ.get("X_BEARER_TOKEN")
if not self.bearer_token:
raise EnvironmentError("Missing required environment variable: X_BEARER_TOKEN")
def search_recent(self, since_id=None, start_time=None, max_results=100):
"""Search recent tweets matching SEARCH_QUERY.
Args:
since_id: Only return tweets newer than this tweet ID.
start_time: ISO 8601 datetime string; only return tweets after this time.
max_results: Max tweets per request (10100).
Returns:
List of tweet dicts (newest first), empty list if none found.
Raises:
requests.HTTPError: On non-2xx API response.
"""
headers = {"Authorization": f"Bearer {self.bearer_token}"}
params = {
"query": SEARCH_QUERY,
"tweet.fields": TWEET_FIELDS,
"max_results": max_results,
}
if since_id:
params["since_id"] = since_id
if start_time:
params["start_time"] = start_time
logger.debug("Searching X API: since_id=%s start_time=%s", since_id, start_time)
response = requests.get(SEARCH_URL, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
tweets = data.get("data", [])
logger.info("X API returned %d tweet(s)", len(tweets))
return tweets