diff --git a/.gitignore b/.gitignore index ddfa7a84..a3a4a2a1 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,10 @@ venv/ *.egg-info/ .pytest_cache/ +# Brand monitor runtime state (never commit) +brand-monitor/.surge_state.json +brand-monitor/.monitor_state.json + # Docker *.log diff --git a/brand-monitor/README.md b/brand-monitor/README.md new file mode 100644 index 00000000..adc914b7 --- /dev/null +++ b/brand-monitor/README.md @@ -0,0 +1,139 @@ +# Molecule AI Brand Monitor + +A cron-based X API v2 poller that posts new brand mentions of **Molecule AI** to Slack `#brand-monitoring`. + +Features: +- Smart query filter (from issue #549) suppresses drug-discovery SEO noise +- Deduplication via `since_id` — never posts the same tweet twice +- First run automatically backfills the last 24 hours +- **Surge mode** — 15-min polling for launch days / crisis windows (see below) +- `@here` alert when engagement > 10 or a competitor name appears +- Daily digest at 20:00 UTC + +--- + +## Setup + +### 1. Install dependencies + +```bash +cd brand-monitor +pip install -r requirements.txt +``` + +### 2. Set environment variables + +| Variable | Required | Description | +|---|---|---| +| `X_BEARER_TOKEN` | ✅ | X API Bearer token (from the Developer Portal) | +| `X_API_KEY` | ✅ | X API key (available for future OAuth use) | +| `X_API_SECRET` | ✅ | X API secret | +| `SLACK_WEBHOOK_URL` | ✅ | Slack incoming webhook URL for `#brand-monitoring` | +| `POLL_INTERVAL_SECONDS` | optional | Ambient polling cadence (default: `1800` = 30 min) | +| `SURGE_DURATION_HOURS` | optional | Surge window length in hours (default: `6`) | + +For local development, create a `.env` file (never commit it): + +```bash +X_BEARER_TOKEN=AAA... +X_API_KEY=BBB... +X_API_SECRET=CCC... +SLACK_WEBHOOK_URL=https://hooks.slack.com/services/... +``` + +> **TODO (DevOps):** Provision `X_BEARER_TOKEN`, `X_API_KEY`, `X_API_SECRET`, and `SLACK_WEBHOOK_URL` +> as workspace secrets. The X Developer App credentials are pending approval — blocked on that before +> the monitor can run in production. + +### 3. Run + +```bash +python monitor.py +``` + +The monitor logs to stdout and polls until interrupted (Ctrl-C or process signal). + +--- + +## Polling Cadence + +| Mode | Interval | How long | +|---|---|---| +| **Ambient** | 30 min (`POLL_INTERVAL_SECONDS`) | Continuous | +| **Surge** | 15 min (fixed) | `SURGE_DURATION_HOURS` (default 6 h) | + +--- + +## Surge Mode + +Surge mode temporarily increases the polling frequency to 15 minutes for a configurable window (default 6 hours). State is persisted in `.surge_state.json` — if the process restarts during a surge window, it picks back up automatically. + +### Activating manually (Slack slash command) + +> **TODO:** Configure the Slack app with a `/surge-monitor` slash command that calls the +> `enable_surge_mode()` Python function (or a thin wrapper HTTP endpoint). The Slack app +> configuration is a separate step; the state machine here is ready. + +When the command is wired up: +``` +/surge-monitor on # enable for default 6 h +/surge-monitor on 12h # enable for 12 h +/surge-monitor off # deactivate immediately +``` + +### Auto-trigger on `feat:` PR merge + +In your CI/CD pipeline (e.g. GitHub Actions), call `enable_surge_mode()` when a PR with a `feat:` prefix is merged: + +```python +# In a post-merge CI step: +import sys +sys.path.insert(0, "brand-monitor") +from monitor import enable_surge_mode +enable_surge_mode() # activates for SURGE_DURATION_HOURS +``` + +Or from the shell: +```bash +python -c "from monitor import enable_surge_mode; enable_surge_mode()" +``` + +### Deactivation + +Surge mode deactivates automatically when its window expires. To force early deactivation: + +```python +from surge import SurgeState +SurgeState().disable() +``` + +--- + +## Tests + +```bash +cd brand-monitor +pip install -r requirements.txt +pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100 +``` + +All HTTP calls are mocked — no live credentials needed in CI. + +--- + +## Gitignored runtime files + +- `.surge_state.json` — surge mode state +- `.monitor_state.json` — polling state (since_id, daily counts) + +--- + +## API Cost Estimate + +X API pay-per-use: **$0.005 / tweet read** + +| Scenario | Reads/month | Est. cost | +|---|---|---| +| Ambient (30 min), ~5 mentions/day | ~150 | $0.75 | +| Surge (15 min) for 6 h, 10 surge events/month | ~300 extra | $1.50 | +| **Total estimate** | **~450–800** | **$2–4/month** | diff --git a/brand-monitor/monitor.py b/brand-monitor/monitor.py new file mode 100644 index 00000000..2ac5092f --- /dev/null +++ b/brand-monitor/monitor.py @@ -0,0 +1,225 @@ +"""Brand monitor — main poller entry point. + +Entry point: + python monitor.py + +Environment variables (all required at startup): + X_BEARER_TOKEN — X API Bearer token + X_API_KEY — X API key (available for future OAuth use) + X_API_SECRET — X API secret + SLACK_WEBHOOK_URL — Slack incoming webhook URL + +Optional tuning: + POLL_INTERVAL_SECONDS — ambient polling cadence in seconds (default: 1800 = 30 min) + SURGE_DURATION_HOURS — surge window length in hours (default: 6) +""" + +import json +import logging +import os +import time +from datetime import datetime, timedelta, timezone + +from slack_client import SlackClient +from surge import SurgeState +from x_client import XClient + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------ +# Constants +# ------------------------------------------------------------------ + +REQUIRED_ENV_VARS = ["X_BEARER_TOKEN", "X_API_KEY", "X_API_SECRET", "SLACK_WEBHOOK_URL"] + +DEFAULT_STATE_FILE = ".monitor_state.json" + +# Ambient cadence: 30 min per issue spec (configurable via env) +POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "1800")) + +# Surge cadence: fixed at 15 min +SURGE_INTERVAL_SECONDS = 900 + +# Surge window length (configurable via env) +SURGE_DURATION_HOURS = int(os.environ.get("SURGE_DURATION_HOURS", "6")) + +# UTC hour at which the daily digest is sent +DIGEST_HOUR_UTC = 20 + + +# ------------------------------------------------------------------ +# Startup validation +# ------------------------------------------------------------------ + +def validate_env(): + """Raise EnvironmentError if any required env var is absent.""" + missing = [v for v in REQUIRED_ENV_VARS if not os.environ.get(v)] + if missing: + raise EnvironmentError( + f"Missing required environment variable(s): {', '.join(missing)}" + ) + + +# ------------------------------------------------------------------ +# Surge mode public entry point (callable from CI/CD on feat: PR merge) +# ------------------------------------------------------------------ + +def enable_surge_mode(duration_hours=None, state_file=None): + """Enable surge mode. Call this from CI/CD hooks on feat: PR merges. + + Args: + duration_hours: Override for surge window length. Defaults to the + SURGE_DURATION_HOURS env var (or 6 h). + state_file: Override path for .surge_state.json (mainly for tests). + """ + hours = duration_hours if duration_hours is not None else SURGE_DURATION_HOURS + kwargs = {} + if state_file is not None: + kwargs["state_file"] = state_file + surge = SurgeState(**kwargs) + surge.enable(hours) + logger.info("enable_surge_mode: activated for %d hour(s)", hours) + + +# ------------------------------------------------------------------ +# Monitor class +# ------------------------------------------------------------------ + +class Monitor: + """Cron-style poller: fetches new X mentions and posts them to Slack. + + Args: + state_file: Path to the JSON file that persists polling state + (since_id, daily_count, etc.). Defaults to + ``.monitor_state.json`` in the current directory. + surge_state_file: Path to the surge state JSON file. + """ + + def __init__(self, state_file=DEFAULT_STATE_FILE, surge_state_file=None): + validate_env() + self.x_client = XClient() + self.slack_client = SlackClient() + surge_kwargs = {} + if surge_state_file is not None: + surge_kwargs["state_file"] = surge_state_file + self.surge = SurgeState(**surge_kwargs) + self.state_file = state_file + self.state = self._load_state() + + # ------------------------------------------------------------------ + # State persistence + # ------------------------------------------------------------------ + + def _load_state(self): + if os.path.exists(self.state_file): + with open(self.state_file) as fh: + return json.load(fh) + return {} + + def _save_state(self): + with open(self.state_file, "w") as fh: + json.dump(self.state, fh, indent=2) + + # ------------------------------------------------------------------ + # Core poll + # ------------------------------------------------------------------ + + def run_poll(self): + """Fetch new tweets and post them to Slack. + + On first run (no saved since_id) backfills the last 24 h. + Tracks the newest tweet ID so subsequent runs avoid duplicates. + + Returns: + list: tweets posted this cycle (may be empty). + """ + since_id = self.state.get("since_id") + start_time = None + + if not since_id: + # First run: backfill last 24 h + start_time = ( + datetime.now(timezone.utc) - timedelta(hours=24) + ).strftime("%Y-%m-%dT%H:%M:%SZ") + logger.info("First run — backfilling last 24 h (start_time=%s)", start_time) + + tweets = self.x_client.search_recent(since_id=since_id, start_time=start_time) + + if tweets: + self.slack_client.post_mentions(tweets) + # X API returns tweets newest-first; store the top ID as next since_id + self.state["since_id"] = tweets[0]["id"] + + return tweets + + # ------------------------------------------------------------------ + # Daily digest + # ------------------------------------------------------------------ + + def _should_send_digest(self): + """True if it's 20:00 UTC and today's digest hasn't been sent yet.""" + now = datetime.now(timezone.utc) + if now.hour != DIGEST_HOUR_UTC: + return False + today = now.strftime("%Y-%m-%d") + return self.state.get("last_digest_date") != today + + def run_daily_digest(self): + """Compile and post the daily summary to Slack, then reset the counter.""" + mention_count = self.state.get("daily_count", 0) + self.slack_client.post_digest({"count": mention_count}) + self.state["daily_count"] = 0 + self.state["last_digest_date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d") + self._save_state() + logger.info("Daily digest sent (count=%d)", mention_count) + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ + + def _run_once(self): + """Execute one full polling cycle. + + Returns: + int: seconds to sleep before the next cycle. + """ + self.surge.check_expiry() + tweets = self.run_poll() + + # Accumulate daily mention count + self.state["daily_count"] = self.state.get("daily_count", 0) + len(tweets) + self._save_state() + + if self._should_send_digest(): + self.run_daily_digest() + + return self.surge.get_interval(POLL_INTERVAL_SECONDS, SURGE_INTERVAL_SECONDS) + + def run(self): + """Blocking main loop. Runs until interrupted.""" + logger.info( + "Brand monitor starting — ambient interval %ds, surge interval %ds", + POLL_INTERVAL_SECONDS, + SURGE_INTERVAL_SECONDS, + ) + while True: + try: + interval = self._run_once() + except Exception as exc: # noqa: BLE001 + logger.error("Poll cycle failed: %s", exc) + interval = POLL_INTERVAL_SECONDS + logger.debug("Sleeping %ds until next poll", interval) + time.sleep(interval) + + +# ------------------------------------------------------------------ +# Entry point +# ------------------------------------------------------------------ + +if __name__ == "__main__": # pragma: no cover + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s — %(message)s", + ) + monitor = Monitor() + monitor.run() diff --git a/brand-monitor/requirements.txt b/brand-monitor/requirements.txt new file mode 100644 index 00000000..97db594a --- /dev/null +++ b/brand-monitor/requirements.txt @@ -0,0 +1,6 @@ +requests==2.32.3 +python-dotenv==1.0.1 + +# Test / dev +pytest==8.3.5 +pytest-cov==6.1.0 diff --git a/brand-monitor/slack_client.py b/brand-monitor/slack_client.py new file mode 100644 index 00000000..6a5f5fe5 --- /dev/null +++ b/brand-monitor/slack_client.py @@ -0,0 +1,145 @@ +"""Slack webhook client for posting brand mentions and daily digest.""" + +import os +import logging +import requests + +logger = logging.getLogger(__name__) + +# Competitor names that auto-trigger @here alert +COMPETITOR_NAMES = [ + "openai", "langchain", "langgraph", "autogen", "crewai", "crew ai", + "llamaindex", "dify", "flowise", "n8n", "zapier", "make.com", +] + +# Engagement threshold above which @here is triggered +AT_HERE_ENGAGEMENT_THRESHOLD = 10 + + +class SlackClient: + """Posts brand mention alerts and daily digests to a Slack webhook. + + Webhook URL from SLACK_WEBHOOK_URL env var. + """ + + def __init__(self): + self.webhook_url = os.environ.get("SLACK_WEBHOOK_URL") + if not self.webhook_url: + raise EnvironmentError("Missing required environment variable: SLACK_WEBHOOK_URL") + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _engagement_score(self, tweet): + """Sum of likes + retweets + replies.""" + metrics = tweet.get("public_metrics", {}) + return ( + metrics.get("like_count", 0) + + metrics.get("retweet_count", 0) + + metrics.get("reply_count", 0) + ) + + def _should_at_here(self, tweet): + """Return True if the tweet warrants an @here ping.""" + if self._engagement_score(tweet) > AT_HERE_ENGAGEMENT_THRESHOLD: + return True + text = tweet.get("text", "").lower() + return any(comp in text for comp in COMPETITOR_NAMES) + + def _format_tweet_block(self, tweet): + """Format a single tweet as a Slack mrkdwn string.""" + tweet_id = tweet.get("id", "") + author_id = tweet.get("author_id", "unknown") + text = tweet.get("text", "").replace("&", "&").replace("<", "<").replace(">", ">") + created_at = tweet.get("created_at", "") + metrics = tweet.get("public_metrics", {}) + url = f"https://twitter.com/i/web/status/{tweet_id}" + + return ( + f"*New mention* — <{url}|view>\n" + f">{text}\n" + f"Author: `{author_id}` | " + f"❤️ {metrics.get('like_count', 0)} " + f"🔁 {metrics.get('retweet_count', 0)} " + f"💬 {metrics.get('reply_count', 0)}\n" + f"_Posted: {created_at}_" + ) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def post_mentions(self, tweets): + """Bundle and post new brand mentions to Slack. + + Multiple tweets are sent in a single webhook payload, not one per tweet. + + Args: + tweets: List of tweet dicts from XClient.search_recent(). + + Returns: + None. No-ops on empty list. + + Raises: + requests.HTTPError: On non-2xx Slack response. + """ + if not tweets: + return + + has_at_here = any(self._should_at_here(t) for t in tweets) + + blocks = [] + if has_at_here: + blocks.append( + {"type": "section", "text": {"type": "mrkdwn", "text": ""}} + ) + + count = len(tweets) + header = f"*{count} new Molecule AI mention{'s' if count > 1 else ''}* in #brand-monitoring" + blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": header}}) + blocks.append({"type": "divider"}) + + for tweet in tweets: + blocks.append( + {"type": "section", "text": {"type": "mrkdwn", "text": self._format_tweet_block(tweet)}} + ) + blocks.append({"type": "divider"}) + + payload = {"blocks": blocks} + logger.info("Posting %d mention(s) to Slack (at_here=%s)", count, has_at_here) + response = requests.post(self.webhook_url, json=payload, timeout=15) + response.raise_for_status() + + def post_digest(self, summary): + """Post the daily 20:00 UTC mention digest to Slack. + + Args: + summary: Dict with keys: + count (int): total mentions today + top_tweets (list, optional): list of high-engagement tweet dicts + + Raises: + requests.HTTPError: On non-2xx Slack response. + """ + count = summary.get("count", 0) + top_tweets = summary.get("top_tweets", []) + + lines = [ + "*📊 Daily Digest — Molecule AI Brand Mentions*", + f"Total mentions today: *{count}*", + ] + + if top_tweets: + lines.append("\n*Top engagements:*") + for tweet in top_tweets[:3]: + snippet = tweet.get("text", "")[:120] + score = self._engagement_score(tweet) + tweet_id = tweet.get("id", "") + url = f"https://twitter.com/i/web/status/{tweet_id}" + lines.append(f"• <{url}|{snippet}…> _(score: {score})_") + + payload = {"text": "\n".join(lines)} + logger.info("Posting daily digest to Slack (count=%d)", count) + response = requests.post(self.webhook_url, json=payload, timeout=15) + response.raise_for_status() diff --git a/brand-monitor/surge.py b/brand-monitor/surge.py new file mode 100644 index 00000000..9a11800c --- /dev/null +++ b/brand-monitor/surge.py @@ -0,0 +1,114 @@ +"""Surge mode state machine. + +Surge mode increases polling frequency from 30 min to 15 min for a +configurable window (default 6 h). State is persisted in a JSON file so +restarts during an active surge window continue in surge mode. + +Activation paths: + 1. Manual: call enable_surge_mode() (or the Slack slash command /surge-monitor on) + 2. Auto: any PR merged with a 'feat:' prefix calls enable_surge_mode() +""" + +import json +import logging +import os +from datetime import datetime, timedelta, timezone + +logger = logging.getLogger(__name__) + +DEFAULT_SURGE_FILE = ".surge_state.json" +DEFAULT_SURGE_DURATION_HOURS = 6 + + +class SurgeState: + """Persist and query surge mode activation. + + Args: + state_file: Path to the JSON state file. Defaults to + ``.surge_state.json`` in the current directory. + """ + + def __init__(self, state_file=DEFAULT_SURGE_FILE): + self.state_file = state_file + + # ------------------------------------------------------------------ + # State I/O + # ------------------------------------------------------------------ + + def _load(self): + """Return parsed state dict, or None if the file doesn't exist.""" + if not os.path.exists(self.state_file): + return None + with open(self.state_file) as fh: + return json.load(fh) + + def _write(self, state): + with open(self.state_file, "w") as fh: + json.dump(state, fh, indent=2) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def enable(self, duration_hours=DEFAULT_SURGE_DURATION_HOURS): + """Activate surge mode for *duration_hours* hours. + + Writes ``.surge_state.json`` so that restarts re-enter surge mode. + + Args: + duration_hours: How long surge mode stays active (default 6 h). + """ + expires_at = ( + datetime.now(timezone.utc) + timedelta(hours=duration_hours) + ).isoformat() + state = { + "active": True, + "enabled_at": datetime.now(timezone.utc).isoformat(), + "expires_at": expires_at, + "duration_hours": duration_hours, + } + self._write(state) + logger.info("Surge mode enabled for %dh — expires at %s", duration_hours, expires_at) + + def disable(self): + """Deactivate surge mode and remove the state file.""" + if os.path.exists(self.state_file): + os.remove(self.state_file) + logger.info("Surge mode disabled") + + def is_active(self): + """Return True if surge mode is currently active (and not expired). + + Side effect: auto-disables if the expiry timestamp has passed. + """ + state = self._load() + if not state: + return False + expires_at = datetime.fromisoformat(state["expires_at"]) + if datetime.now(timezone.utc) >= expires_at: + logger.info("Surge mode expired — auto-disabling") + self.disable() + return False + return True + + def check_expiry(self): + """Auto-disable surge if its window has elapsed. + + Returns: + bool: whether surge mode is still active after the check. + """ + return self.is_active() + + def get_interval(self, normal_interval, surge_interval): + """Return the appropriate polling interval in seconds. + + Args: + normal_interval: Seconds to sleep in ambient mode. + surge_interval: Seconds to sleep while surge is active. + + Returns: + int: surge_interval if surge is active, else normal_interval. + """ + if self.is_active(): + return surge_interval + return normal_interval diff --git a/brand-monitor/test_monitor.py b/brand-monitor/test_monitor.py new file mode 100644 index 00000000..ec8bb8ad --- /dev/null +++ b/brand-monitor/test_monitor.py @@ -0,0 +1,741 @@ +"""Full test suite for brand-monitor modules. + +Run: + pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100 + +All HTTP calls are mocked — no live API calls, no credentials needed. +""" + +import json +import os +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, call, patch + +import pytest +import requests + +# --------------------------------------------------------------------------- +# Shared fixtures / constants +# --------------------------------------------------------------------------- + +BASE_ENV = { + "X_BEARER_TOKEN": "test-bearer-token", + "X_API_KEY": "test-api-key", + "X_API_SECRET": "test-api-secret", + "SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/TEST", +} + +SAMPLE_TWEET = { + "id": "1111111111", + "text": "Really excited about Molecule AI's agent platform — great SDK!", + "author_id": "9876543210", + "created_at": "2024-01-01T12:00:00Z", + "public_metrics": { + "like_count": 3, + "retweet_count": 1, + "reply_count": 2, + }, +} + +SAMPLE_TWEET_HIGH_ENGAGEMENT = { + "id": "2222222222", + "text": "Molecule AI multi-agent workflow is incredible", + "author_id": "1111111111", + "created_at": "2024-01-01T13:00:00Z", + "public_metrics": { + "like_count": 50, + "retweet_count": 20, + "reply_count": 15, + }, +} + +SAMPLE_TWEET_COMPETITOR = { + "id": "3333333333", + "text": "Comparing Molecule AI with langchain for our orchestration workflow", + "author_id": "2222222222", + "created_at": "2024-01-01T14:00:00Z", + "public_metrics": { + "like_count": 0, + "retweet_count": 0, + "reply_count": 0, + }, +} + + +# =========================================================================== +# x_client tests +# =========================================================================== + + +class TestXClient: + + def test_init_missing_token_raises(self): + from x_client import XClient + + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"): + XClient() + + def test_init_success(self): + from x_client import XClient + + with patch.dict(os.environ, {"X_BEARER_TOKEN": "my-token"}): + client = XClient() + assert client.bearer_token == "my-token" + + def _make_client(self): + from x_client import XClient + + with patch.dict(os.environ, {"X_BEARER_TOKEN": "tok"}): + return XClient() + + def test_search_recent_returns_tweets(self): + from x_client import SEARCH_QUERY, SEARCH_URL + + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_resp.json.return_value = {"data": [SAMPLE_TWEET]} + + with patch("x_client.requests.get", return_value=mock_resp) as mock_get: + result = client.search_recent() + + assert result == [SAMPLE_TWEET] + # Verify URL, auth header and query string + args, kwargs = mock_get.call_args + assert args[0] == SEARCH_URL + assert kwargs["headers"]["Authorization"] == "Bearer tok" + assert kwargs["params"]["query"] == SEARCH_QUERY + + def test_search_recent_no_data_key_returns_empty_list(self): + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_resp.json.return_value = {"meta": {"result_count": 0}} + + with patch("x_client.requests.get", return_value=mock_resp): + result = client.search_recent() + + assert result == [] + + def test_search_recent_with_since_id_adds_param(self): + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_resp.json.return_value = {"data": [SAMPLE_TWEET]} + + with patch("x_client.requests.get", return_value=mock_resp) as mock_get: + client.search_recent(since_id="9999") + + params = mock_get.call_args.kwargs["params"] + assert params["since_id"] == "9999" + + def test_search_recent_with_start_time_adds_param(self): + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_resp.json.return_value = {"data": []} + + with patch("x_client.requests.get", return_value=mock_resp) as mock_get: + client.search_recent(start_time="2024-01-01T00:00:00Z") + + params = mock_get.call_args.kwargs["params"] + assert params["start_time"] == "2024-01-01T00:00:00Z" + + def test_search_recent_no_since_id_no_start_time_omits_params(self): + """Neither since_id nor start_time in params when not provided.""" + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_resp.json.return_value = {"data": []} + + with patch("x_client.requests.get", return_value=mock_resp) as mock_get: + client.search_recent() + + params = mock_get.call_args.kwargs["params"] + assert "since_id" not in params + assert "start_time" not in params + + def test_search_recent_http_error_propagates(self): + client = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.side_effect = requests.HTTPError("403 Forbidden") + + with patch("x_client.requests.get", return_value=mock_resp): + with pytest.raises(requests.HTTPError): + client.search_recent() + + +# =========================================================================== +# slack_client tests +# =========================================================================== + + +class TestSlackClient: + + def _make_client(self): + from slack_client import SlackClient + + with patch.dict(os.environ, {"SLACK_WEBHOOK_URL": "https://hooks.slack.com/test"}): + return SlackClient() + + def test_init_missing_webhook_raises(self): + from slack_client import SlackClient + + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(EnvironmentError, match="SLACK_WEBHOOK_URL"): + SlackClient() + + def test_init_success(self): + c = self._make_client() + assert c.webhook_url == "https://hooks.slack.com/test" + + def test_engagement_score_sums_correctly(self): + c = self._make_client() + tweet = {"public_metrics": {"like_count": 5, "retweet_count": 3, "reply_count": 2}} + assert c._engagement_score(tweet) == 10 + + def test_engagement_score_missing_metrics_returns_zero(self): + c = self._make_client() + assert c._engagement_score({}) == 0 + + def test_should_at_here_high_engagement_returns_true(self): + c = self._make_client() + assert c._should_at_here(SAMPLE_TWEET_HIGH_ENGAGEMENT) is True + + def test_should_at_here_competitor_name_returns_true(self): + c = self._make_client() + # SAMPLE_TWEET_COMPETITOR contains "langchain" — engagement is 0 + assert c._should_at_here(SAMPLE_TWEET_COMPETITOR) is True + + def test_should_at_here_normal_tweet_returns_false(self): + c = self._make_client() + # SAMPLE_TWEET: engagement=6 (<=10), no competitor + assert c._should_at_here(SAMPLE_TWEET) is False + + def test_post_mentions_empty_list_is_noop(self): + c = self._make_client() + with patch("slack_client.requests.post") as mock_post: + c.post_mentions([]) + mock_post.assert_not_called() + + def test_post_mentions_single_tweet_no_at_here(self): + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + + with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: + c.post_mentions([SAMPLE_TWEET]) + + mock_post.assert_called_once() + payload = mock_post.call_args.kwargs["json"] + section_texts = [ + b["text"]["text"] + for b in payload["blocks"] + if b.get("type") == "section" + ] + # No @here for normal engagement tweet + assert not any("" in t for t in section_texts) + # Header mentions "1 new … mention" + assert any("1 new" in t for t in section_texts) + + def test_post_mentions_multiple_tweets_with_at_here(self): + """High-engagement tweet triggers @here; both tweets appear in payload.""" + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + + with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: + c.post_mentions([SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET]) + + payload = mock_post.call_args.kwargs["json"] + section_texts = [ + b["text"]["text"] + for b in payload["blocks"] + if b.get("type") == "section" + ] + assert any("" in t for t in section_texts) + assert any("2 new" in t for t in section_texts) + + def test_post_mentions_html_escaping_in_tweet_text(self): + """< > & in tweet text are escaped to prevent Slack mrkdwn injection.""" + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + tweet = {**SAMPLE_TWEET, "text": "X < Y & Z > W"} + + with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: + c.post_mentions([tweet]) + + raw = str(mock_post.call_args.kwargs["json"]) + assert "<" in raw + assert ">" in raw + assert "&" in raw + + def test_post_mentions_http_error_propagates(self): + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.side_effect = requests.HTTPError("500") + + with patch("slack_client.requests.post", return_value=mock_resp): + with pytest.raises(requests.HTTPError): + c.post_mentions([SAMPLE_TWEET]) + + def test_post_digest_count_only_no_top_tweets(self): + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + + with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: + c.post_digest({"count": 42}) + + text = mock_post.call_args.kwargs["json"]["text"] + assert "42" in text + assert "Top engagements" not in text + + def test_post_digest_with_top_tweets_included(self): + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + + with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: + c.post_digest({"count": 10, "top_tweets": [SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET]}) + + text = mock_post.call_args.kwargs["json"]["text"] + assert "Top engagements" in text + + def test_post_digest_http_error_propagates(self): + c = self._make_client() + mock_resp = MagicMock() + mock_resp.raise_for_status.side_effect = requests.HTTPError("500") + + with patch("slack_client.requests.post", return_value=mock_resp): + with pytest.raises(requests.HTTPError): + c.post_digest({"count": 1}) + + +# =========================================================================== +# surge tests +# =========================================================================== + + +class TestSurgeState: + + def _make_surge(self, tmp_path): + from surge import SurgeState + + return SurgeState(state_file=str(tmp_path / ".surge_state.json")) + + def test_init_default_state_file(self): + from surge import DEFAULT_SURGE_FILE, SurgeState + + s = SurgeState() + assert s.state_file == DEFAULT_SURGE_FILE + + def test_init_custom_state_file(self, tmp_path): + s = self._make_surge(tmp_path) + assert ".surge_state.json" in s.state_file + + def test_enable_writes_state_file_with_correct_fields(self, tmp_path): + s = self._make_surge(tmp_path) + s.enable(duration_hours=3) + state = json.loads(open(s.state_file).read()) + assert state["active"] is True + assert state["duration_hours"] == 3 + assert "expires_at" in state + assert "enabled_at" in state + + def test_enable_default_duration(self, tmp_path): + from surge import DEFAULT_SURGE_DURATION_HOURS + + s = self._make_surge(tmp_path) + s.enable() + state = json.loads(open(s.state_file).read()) + assert state["duration_hours"] == DEFAULT_SURGE_DURATION_HOURS + + def test_disable_removes_file(self, tmp_path): + s = self._make_surge(tmp_path) + s.enable() + assert os.path.exists(s.state_file) + s.disable() + assert not os.path.exists(s.state_file) + + def test_disable_no_file_does_not_raise(self, tmp_path): + s = self._make_surge(tmp_path) + # File doesn't exist — should be silent + s.disable() + + def test_is_active_no_file_returns_false(self, tmp_path): + s = self._make_surge(tmp_path) + assert s.is_active() is False + + def test_is_active_not_expired_returns_true(self, tmp_path): + s = self._make_surge(tmp_path) + s.enable(duration_hours=6) + assert s.is_active() is True + + def test_is_active_expired_auto_disables_returns_false(self, tmp_path): + s = self._make_surge(tmp_path) + # Write an already-expired state + past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() + json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w")) + assert s.is_active() is False + assert not os.path.exists(s.state_file) + + def test_check_expiry_returns_true_when_active(self, tmp_path): + s = self._make_surge(tmp_path) + s.enable(duration_hours=6) + assert s.check_expiry() is True + + def test_check_expiry_returns_false_when_expired(self, tmp_path): + s = self._make_surge(tmp_path) + past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() + json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w")) + assert s.check_expiry() is False + + def test_get_interval_surge_active_returns_surge_interval(self, tmp_path): + s = self._make_surge(tmp_path) + s.enable(duration_hours=6) + assert s.get_interval(1800, 900) == 900 + + def test_get_interval_surge_inactive_returns_normal_interval(self, tmp_path): + s = self._make_surge(tmp_path) + assert s.get_interval(1800, 900) == 1800 + + +# =========================================================================== +# monitor — validate_env tests +# =========================================================================== + + +class TestValidateEnv: + + def test_all_vars_present_passes(self): + from monitor import validate_env + + with patch.dict(os.environ, BASE_ENV, clear=False): + validate_env() # must not raise + + def test_single_missing_var_raises_with_name(self): + from monitor import validate_env + + env = {k: v for k, v in BASE_ENV.items() if k != "X_BEARER_TOKEN"} + with patch.dict(os.environ, env, clear=True): + with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"): + validate_env() + + def test_multiple_missing_vars_raises_with_all_names(self): + from monitor import validate_env + + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(EnvironmentError) as exc_info: + validate_env() + msg = str(exc_info.value) + assert "X_BEARER_TOKEN" in msg + assert "SLACK_WEBHOOK_URL" in msg + + +# =========================================================================== +# monitor — enable_surge_mode tests +# =========================================================================== + + +class TestEnableSurgeMode: + + def test_default_duration_uses_env_default(self, tmp_path): + from monitor import SURGE_DURATION_HOURS, enable_surge_mode + + sf = str(tmp_path / ".surge.json") + enable_surge_mode(state_file=sf) + state = json.loads(open(sf).read()) + assert state["duration_hours"] == SURGE_DURATION_HOURS + + def test_custom_duration_overrides_default(self, tmp_path): + from monitor import enable_surge_mode + + sf = str(tmp_path / ".surge.json") + enable_surge_mode(duration_hours=12, state_file=sf) + state = json.loads(open(sf).read()) + assert state["duration_hours"] == 12 + + def test_no_state_file_override_uses_default_path(self): + """When state_file=None, SurgeState() is constructed with no kwargs.""" + from monitor import enable_surge_mode + + with patch("monitor.SurgeState") as MockSurge: + mock_instance = MagicMock() + MockSurge.return_value = mock_instance + enable_surge_mode(duration_hours=3) + + MockSurge.assert_called_once_with() + mock_instance.enable.assert_called_once_with(3) + + +# =========================================================================== +# monitor — Monitor class tests +# =========================================================================== + + +class TestMonitor: + """Tests for the Monitor class.""" + + # ------------------------------------------------------------------ + # Constructor helpers + # ------------------------------------------------------------------ + + def _make_monitor(self, tmp_path, state_data=None): + """Build a Monitor with temp files and mocked HTTP clients.""" + from monitor import Monitor + + state_file = str(tmp_path / "monitor_state.json") + surge_file = str(tmp_path / "surge_state.json") + + if state_data is not None: + json.dump(state_data, open(state_file, "w")) + + with patch.dict(os.environ, BASE_ENV, clear=False): + with patch("monitor.XClient"), patch("monitor.SlackClient"): + m = Monitor(state_file=state_file, surge_state_file=surge_file) + return m + + # ------------------------------------------------------------------ + # __init__ + # ------------------------------------------------------------------ + + def test_init_success_with_empty_state(self, tmp_path): + m = self._make_monitor(tmp_path) + assert m.state == {} + + def test_init_loads_existing_state_file(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"since_id": "abc"}) + assert m.state["since_id"] == "abc" + + def test_init_missing_env_raises(self, tmp_path): + from monitor import Monitor + + sf = str(tmp_path / "st.json") + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(EnvironmentError): + Monitor(state_file=sf) + + def test_init_surge_state_file_none_uses_default(self, tmp_path): + """surge_state_file=None → SurgeState constructed with no kwargs.""" + from monitor import Monitor + + sf = str(tmp_path / "st.json") + with patch.dict(os.environ, BASE_ENV, clear=False): + with patch("monitor.XClient"), patch("monitor.SlackClient"): + with patch("monitor.SurgeState") as MockSurge: + Monitor(state_file=sf) # surge_state_file defaults to None + + MockSurge.assert_called_once_with() + + def test_init_surge_state_file_provided_passes_kwarg(self, tmp_path): + """surge_state_file provided → SurgeState(state_file=...) is called.""" + from monitor import Monitor + + sf = str(tmp_path / "st.json") + surge_sf = str(tmp_path / "surge.json") + with patch.dict(os.environ, BASE_ENV, clear=False): + with patch("monitor.XClient"), patch("monitor.SlackClient"): + with patch("monitor.SurgeState") as MockSurge: + Monitor(state_file=sf, surge_state_file=surge_sf) + + MockSurge.assert_called_once_with(state_file=surge_sf) + + # ------------------------------------------------------------------ + # _load_state / _save_state + # ------------------------------------------------------------------ + + def test_load_state_no_file_returns_empty_dict(self, tmp_path): + m = self._make_monitor(tmp_path) + assert m._load_state() == {} + + def test_load_state_existing_file_returns_contents(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"since_id": "XYZ"}) + assert m._load_state()["since_id"] == "XYZ" + + def test_save_state_persists_to_disk(self, tmp_path): + m = self._make_monitor(tmp_path) + m.state["since_id"] = "saved" + m._save_state() + on_disk = json.loads(open(m.state_file).read()) + assert on_disk["since_id"] == "saved" + + # ------------------------------------------------------------------ + # run_poll + # ------------------------------------------------------------------ + + def test_run_poll_first_run_uses_start_time_backfill(self, tmp_path): + """No since_id → search_recent called with start_time set, since_id=None.""" + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [SAMPLE_TWEET] + + tweets = m.run_poll() + + kw = m.x_client.search_recent.call_args.kwargs + assert kw["since_id"] is None + assert kw["start_time"] is not None # 24h backfill + assert tweets == [SAMPLE_TWEET] + assert m.state["since_id"] == SAMPLE_TWEET["id"] + + def test_run_poll_subsequent_run_passes_since_id(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"since_id": "prev_tweet_id"}) + m.x_client.search_recent.return_value = [SAMPLE_TWEET] + + m.run_poll() + + kw = m.x_client.search_recent.call_args.kwargs + assert kw["since_id"] == "prev_tweet_id" + + def test_run_poll_no_tweets_does_not_post_to_slack(self, tmp_path): + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [] + + tweets = m.run_poll() + + m.slack_client.post_mentions.assert_not_called() + assert "since_id" not in m.state + assert tweets == [] + + def test_run_poll_no_tweets_preserves_existing_since_id(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"since_id": "old_id"}) + m.x_client.search_recent.return_value = [] + + m.run_poll() + + assert m.state["since_id"] == "old_id" + + def test_run_poll_new_tweets_posts_to_slack_and_updates_since_id(self, tmp_path): + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [SAMPLE_TWEET] + + m.run_poll() + + m.slack_client.post_mentions.assert_called_once_with([SAMPLE_TWEET]) + assert m.state["since_id"] == SAMPLE_TWEET["id"] + + # ------------------------------------------------------------------ + # _should_send_digest + # ------------------------------------------------------------------ + + def test_should_send_digest_wrong_hour_returns_false(self, tmp_path): + m = self._make_monitor(tmp_path) + fake_now = datetime(2024, 1, 1, 15, 0, 0, tzinfo=timezone.utc) # 15:00 UTC + with patch("monitor.datetime") as mock_dt: + mock_dt.now.return_value = fake_now + assert m._should_send_digest() is False + + def test_should_send_digest_correct_hour_not_yet_sent_returns_true(self, tmp_path): + m = self._make_monitor(tmp_path) + fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc) # 20:00 UTC + with patch("monitor.datetime") as mock_dt: + mock_dt.now.return_value = fake_now + assert m._should_send_digest() is True + + def test_should_send_digest_already_sent_today_returns_false(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"last_digest_date": "2024-01-01"}) + fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc) + with patch("monitor.datetime") as mock_dt: + mock_dt.now.return_value = fake_now + assert m._should_send_digest() is False + + # ------------------------------------------------------------------ + # run_daily_digest + # ------------------------------------------------------------------ + + def test_run_daily_digest_posts_count_and_resets(self, tmp_path): + m = self._make_monitor(tmp_path, state_data={"daily_count": 7}) + + m.run_daily_digest() + + m.slack_client.post_digest.assert_called_once_with({"count": 7}) + assert m.state["daily_count"] == 0 + assert "last_digest_date" in m.state + + # ------------------------------------------------------------------ + # _run_once + # ------------------------------------------------------------------ + + def test_run_once_no_digest_returns_normal_interval(self, tmp_path): + from monitor import POLL_INTERVAL_SECONDS + + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [SAMPLE_TWEET] + + with patch.object(m, "_should_send_digest", return_value=False): + interval = m._run_once() + + assert m.state["daily_count"] == 1 + assert interval == POLL_INTERVAL_SECONDS + + def test_run_once_triggers_digest_when_due(self, tmp_path): + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [] + + with patch.object(m, "_should_send_digest", return_value=True): + with patch.object(m, "run_daily_digest") as mock_digest: + m._run_once() + + mock_digest.assert_called_once() + + def test_run_once_returns_surge_interval_when_surge_active(self, tmp_path): + from monitor import SURGE_INTERVAL_SECONDS + + m = self._make_monitor(tmp_path) + m.x_client.search_recent.return_value = [] + m.surge.enable(duration_hours=6) + + with patch.object(m, "_should_send_digest", return_value=False): + interval = m._run_once() + + assert interval == SURGE_INTERVAL_SECONDS + + # ------------------------------------------------------------------ + # run (infinite loop) + # ------------------------------------------------------------------ + + def test_run_normal_path_sleeps_with_returned_interval(self, tmp_path): + from monitor import Monitor, POLL_INTERVAL_SECONDS + + sf = str(tmp_path / "st.json") + surge_sf = str(tmp_path / "surge.json") + with patch.dict(os.environ, BASE_ENV, clear=False): + with patch("monitor.XClient"), patch("monitor.SlackClient"): + m = Monitor(state_file=sf, surge_state_file=surge_sf) + + sleep_calls = [] + + def fake_sleep(n): + sleep_calls.append(n) + raise SystemExit("terminate test loop") + + with patch.object(m, "_run_once", return_value=POLL_INTERVAL_SECONDS): + with patch("monitor.time.sleep", side_effect=fake_sleep): + with pytest.raises(SystemExit): + m.run() + + assert sleep_calls == [POLL_INTERVAL_SECONDS] + + def test_run_exception_in_run_once_falls_back_to_poll_interval(self, tmp_path): + from monitor import Monitor, POLL_INTERVAL_SECONDS + + sf = str(tmp_path / "st.json") + surge_sf = str(tmp_path / "surge.json") + with patch.dict(os.environ, BASE_ENV, clear=False): + with patch("monitor.XClient"), patch("monitor.SlackClient"): + m = Monitor(state_file=sf, surge_state_file=surge_sf) + + sleep_calls = [] + + def fake_sleep(n): + sleep_calls.append(n) + raise SystemExit("terminate test loop") + + with patch.object(m, "_run_once", side_effect=RuntimeError("api exploded")): + with patch("monitor.time.sleep", side_effect=fake_sleep): + with pytest.raises(SystemExit): + m.run() + + # On exception, sleep is called with the ambient interval + assert sleep_calls == [POLL_INTERVAL_SECONDS] diff --git a/brand-monitor/x_client.py b/brand-monitor/x_client.py new file mode 100644 index 00000000..af05523e --- /dev/null +++ b/brand-monitor/x_client.py @@ -0,0 +1,65 @@ +"""X API v2 thin client for brand mention search.""" + +import os +import logging +import requests + +logger = logging.getLogger(__name__) + +SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent" + +# Verbatim from issue #549 — drug-discovery SEO noise suppressed at query level +SEARCH_QUERY = ( + '("Molecule AI" OR "@moleculeai") ' + '(agent OR workflow OR orchestrat OR "multi-agent" OR developer OR SDK OR API OR "agent platform") ' + '-moleculeai.com -molecule.ai -"drug discovery" -pharmaceutical -CRISPR -oncology ' + '-is:retweet lang:en' +) + +TWEET_FIELDS = "author_id,created_at,public_metrics,entities" + + +class XClient: + """Thin wrapper around X API v2 recent-search endpoint. + + Auth: Bearer token from X_BEARER_TOKEN env var. + """ + + def __init__(self): + self.bearer_token = os.environ.get("X_BEARER_TOKEN") + if not self.bearer_token: + raise EnvironmentError("Missing required environment variable: X_BEARER_TOKEN") + + def search_recent(self, since_id=None, start_time=None, max_results=100): + """Search recent tweets matching SEARCH_QUERY. + + Args: + since_id: Only return tweets newer than this tweet ID. + start_time: ISO 8601 datetime string; only return tweets after this time. + max_results: Max tweets per request (10–100). + + Returns: + List of tweet dicts (newest first), empty list if none found. + + Raises: + requests.HTTPError: On non-2xx API response. + """ + headers = {"Authorization": f"Bearer {self.bearer_token}"} + params = { + "query": SEARCH_QUERY, + "tweet.fields": TWEET_FIELDS, + "max_results": max_results, + } + if since_id: + params["since_id"] = since_id + if start_time: + params["start_time"] = start_time + + logger.debug("Searching X API: since_id=%s start_time=%s", since_id, start_time) + response = requests.get(SEARCH_URL, headers=headers, params=params, timeout=30) + response.raise_for_status() + + data = response.json() + tweets = data.get("data", []) + logger.info("X API returned %d tweet(s)", len(tweets)) + return tweets