From ee677b8c633f9535b7babcf31c4628761dc01f56 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 16 Apr 2026 21:00:58 -0700 Subject: [PATCH] chore: remove brand-monitor from monorepo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standalone operational tool — doesn't belong in the platform core. Should live in its own repo if needed. Co-Authored-By: Claude Opus 4.6 (1M context) --- brand-monitor/README.md | 139 ------ brand-monitor/monitor.py | 225 ---------- brand-monitor/requirements.txt | 6 - brand-monitor/slack_client.py | 149 ------- brand-monitor/surge.py | 114 ----- brand-monitor/test_monitor.py | 758 --------------------------------- brand-monitor/x_client.py | 65 --- 7 files changed, 1456 deletions(-) delete mode 100644 brand-monitor/README.md delete mode 100644 brand-monitor/monitor.py delete mode 100644 brand-monitor/requirements.txt delete mode 100644 brand-monitor/slack_client.py delete mode 100644 brand-monitor/surge.py delete mode 100644 brand-monitor/test_monitor.py delete mode 100644 brand-monitor/x_client.py diff --git a/brand-monitor/README.md b/brand-monitor/README.md deleted file mode 100644 index adc914b7..00000000 --- a/brand-monitor/README.md +++ /dev/null @@ -1,139 +0,0 @@ -# Molecule AI Brand Monitor - -A cron-based X API v2 poller that posts new brand mentions of **Molecule AI** to Slack `#brand-monitoring`. - -Features: -- Smart query filter (from issue #549) suppresses drug-discovery SEO noise -- Deduplication via `since_id` — never posts the same tweet twice -- First run automatically backfills the last 24 hours -- **Surge mode** — 15-min polling for launch days / crisis windows (see below) -- `@here` alert when engagement > 10 or a competitor name appears -- Daily digest at 20:00 UTC - ---- - -## Setup - -### 1. Install dependencies - -```bash -cd brand-monitor -pip install -r requirements.txt -``` - -### 2. Set environment variables - -| Variable | Required | Description | -|---|---|---| -| `X_BEARER_TOKEN` | ✅ | X API Bearer token (from the Developer Portal) | -| `X_API_KEY` | ✅ | X API key (available for future OAuth use) | -| `X_API_SECRET` | ✅ | X API secret | -| `SLACK_WEBHOOK_URL` | ✅ | Slack incoming webhook URL for `#brand-monitoring` | -| `POLL_INTERVAL_SECONDS` | optional | Ambient polling cadence (default: `1800` = 30 min) | -| `SURGE_DURATION_HOURS` | optional | Surge window length in hours (default: `6`) | - -For local development, create a `.env` file (never commit it): - -```bash -X_BEARER_TOKEN=AAA... -X_API_KEY=BBB... -X_API_SECRET=CCC... -SLACK_WEBHOOK_URL=https://hooks.slack.com/services/... -``` - -> **TODO (DevOps):** Provision `X_BEARER_TOKEN`, `X_API_KEY`, `X_API_SECRET`, and `SLACK_WEBHOOK_URL` -> as workspace secrets. The X Developer App credentials are pending approval — blocked on that before -> the monitor can run in production. - -### 3. Run - -```bash -python monitor.py -``` - -The monitor logs to stdout and polls until interrupted (Ctrl-C or process signal). - ---- - -## Polling Cadence - -| Mode | Interval | How long | -|---|---|---| -| **Ambient** | 30 min (`POLL_INTERVAL_SECONDS`) | Continuous | -| **Surge** | 15 min (fixed) | `SURGE_DURATION_HOURS` (default 6 h) | - ---- - -## Surge Mode - -Surge mode temporarily increases the polling frequency to 15 minutes for a configurable window (default 6 hours). State is persisted in `.surge_state.json` — if the process restarts during a surge window, it picks back up automatically. - -### Activating manually (Slack slash command) - -> **TODO:** Configure the Slack app with a `/surge-monitor` slash command that calls the -> `enable_surge_mode()` Python function (or a thin wrapper HTTP endpoint). The Slack app -> configuration is a separate step; the state machine here is ready. - -When the command is wired up: -``` -/surge-monitor on # enable for default 6 h -/surge-monitor on 12h # enable for 12 h -/surge-monitor off # deactivate immediately -``` - -### Auto-trigger on `feat:` PR merge - -In your CI/CD pipeline (e.g. GitHub Actions), call `enable_surge_mode()` when a PR with a `feat:` prefix is merged: - -```python -# In a post-merge CI step: -import sys -sys.path.insert(0, "brand-monitor") -from monitor import enable_surge_mode -enable_surge_mode() # activates for SURGE_DURATION_HOURS -``` - -Or from the shell: -```bash -python -c "from monitor import enable_surge_mode; enable_surge_mode()" -``` - -### Deactivation - -Surge mode deactivates automatically when its window expires. To force early deactivation: - -```python -from surge import SurgeState -SurgeState().disable() -``` - ---- - -## Tests - -```bash -cd brand-monitor -pip install -r requirements.txt -pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100 -``` - -All HTTP calls are mocked — no live credentials needed in CI. - ---- - -## Gitignored runtime files - -- `.surge_state.json` — surge mode state -- `.monitor_state.json` — polling state (since_id, daily counts) - ---- - -## API Cost Estimate - -X API pay-per-use: **$0.005 / tweet read** - -| Scenario | Reads/month | Est. cost | -|---|---|---| -| Ambient (30 min), ~5 mentions/day | ~150 | $0.75 | -| Surge (15 min) for 6 h, 10 surge events/month | ~300 extra | $1.50 | -| **Total estimate** | **~450–800** | **$2–4/month** | diff --git a/brand-monitor/monitor.py b/brand-monitor/monitor.py deleted file mode 100644 index 2ac5092f..00000000 --- a/brand-monitor/monitor.py +++ /dev/null @@ -1,225 +0,0 @@ -"""Brand monitor — main poller entry point. - -Entry point: - python monitor.py - -Environment variables (all required at startup): - X_BEARER_TOKEN — X API Bearer token - X_API_KEY — X API key (available for future OAuth use) - X_API_SECRET — X API secret - SLACK_WEBHOOK_URL — Slack incoming webhook URL - -Optional tuning: - POLL_INTERVAL_SECONDS — ambient polling cadence in seconds (default: 1800 = 30 min) - SURGE_DURATION_HOURS — surge window length in hours (default: 6) -""" - -import json -import logging -import os -import time -from datetime import datetime, timedelta, timezone - -from slack_client import SlackClient -from surge import SurgeState -from x_client import XClient - -logger = logging.getLogger(__name__) - -# ------------------------------------------------------------------ -# Constants -# ------------------------------------------------------------------ - -REQUIRED_ENV_VARS = ["X_BEARER_TOKEN", "X_API_KEY", "X_API_SECRET", "SLACK_WEBHOOK_URL"] - -DEFAULT_STATE_FILE = ".monitor_state.json" - -# Ambient cadence: 30 min per issue spec (configurable via env) -POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "1800")) - -# Surge cadence: fixed at 15 min -SURGE_INTERVAL_SECONDS = 900 - -# Surge window length (configurable via env) -SURGE_DURATION_HOURS = int(os.environ.get("SURGE_DURATION_HOURS", "6")) - -# UTC hour at which the daily digest is sent -DIGEST_HOUR_UTC = 20 - - -# ------------------------------------------------------------------ -# Startup validation -# ------------------------------------------------------------------ - -def validate_env(): - """Raise EnvironmentError if any required env var is absent.""" - missing = [v for v in REQUIRED_ENV_VARS if not os.environ.get(v)] - if missing: - raise EnvironmentError( - f"Missing required environment variable(s): {', '.join(missing)}" - ) - - -# ------------------------------------------------------------------ -# Surge mode public entry point (callable from CI/CD on feat: PR merge) -# ------------------------------------------------------------------ - -def enable_surge_mode(duration_hours=None, state_file=None): - """Enable surge mode. Call this from CI/CD hooks on feat: PR merges. - - Args: - duration_hours: Override for surge window length. Defaults to the - SURGE_DURATION_HOURS env var (or 6 h). - state_file: Override path for .surge_state.json (mainly for tests). - """ - hours = duration_hours if duration_hours is not None else SURGE_DURATION_HOURS - kwargs = {} - if state_file is not None: - kwargs["state_file"] = state_file - surge = SurgeState(**kwargs) - surge.enable(hours) - logger.info("enable_surge_mode: activated for %d hour(s)", hours) - - -# ------------------------------------------------------------------ -# Monitor class -# ------------------------------------------------------------------ - -class Monitor: - """Cron-style poller: fetches new X mentions and posts them to Slack. - - Args: - state_file: Path to the JSON file that persists polling state - (since_id, daily_count, etc.). Defaults to - ``.monitor_state.json`` in the current directory. - surge_state_file: Path to the surge state JSON file. - """ - - def __init__(self, state_file=DEFAULT_STATE_FILE, surge_state_file=None): - validate_env() - self.x_client = XClient() - self.slack_client = SlackClient() - surge_kwargs = {} - if surge_state_file is not None: - surge_kwargs["state_file"] = surge_state_file - self.surge = SurgeState(**surge_kwargs) - self.state_file = state_file - self.state = self._load_state() - - # ------------------------------------------------------------------ - # State persistence - # ------------------------------------------------------------------ - - def _load_state(self): - if os.path.exists(self.state_file): - with open(self.state_file) as fh: - return json.load(fh) - return {} - - def _save_state(self): - with open(self.state_file, "w") as fh: - json.dump(self.state, fh, indent=2) - - # ------------------------------------------------------------------ - # Core poll - # ------------------------------------------------------------------ - - def run_poll(self): - """Fetch new tweets and post them to Slack. - - On first run (no saved since_id) backfills the last 24 h. - Tracks the newest tweet ID so subsequent runs avoid duplicates. - - Returns: - list: tweets posted this cycle (may be empty). - """ - since_id = self.state.get("since_id") - start_time = None - - if not since_id: - # First run: backfill last 24 h - start_time = ( - datetime.now(timezone.utc) - timedelta(hours=24) - ).strftime("%Y-%m-%dT%H:%M:%SZ") - logger.info("First run — backfilling last 24 h (start_time=%s)", start_time) - - tweets = self.x_client.search_recent(since_id=since_id, start_time=start_time) - - if tweets: - self.slack_client.post_mentions(tweets) - # X API returns tweets newest-first; store the top ID as next since_id - self.state["since_id"] = tweets[0]["id"] - - return tweets - - # ------------------------------------------------------------------ - # Daily digest - # ------------------------------------------------------------------ - - def _should_send_digest(self): - """True if it's 20:00 UTC and today's digest hasn't been sent yet.""" - now = datetime.now(timezone.utc) - if now.hour != DIGEST_HOUR_UTC: - return False - today = now.strftime("%Y-%m-%d") - return self.state.get("last_digest_date") != today - - def run_daily_digest(self): - """Compile and post the daily summary to Slack, then reset the counter.""" - mention_count = self.state.get("daily_count", 0) - self.slack_client.post_digest({"count": mention_count}) - self.state["daily_count"] = 0 - self.state["last_digest_date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d") - self._save_state() - logger.info("Daily digest sent (count=%d)", mention_count) - - # ------------------------------------------------------------------ - # Main loop - # ------------------------------------------------------------------ - - def _run_once(self): - """Execute one full polling cycle. - - Returns: - int: seconds to sleep before the next cycle. - """ - self.surge.check_expiry() - tweets = self.run_poll() - - # Accumulate daily mention count - self.state["daily_count"] = self.state.get("daily_count", 0) + len(tweets) - self._save_state() - - if self._should_send_digest(): - self.run_daily_digest() - - return self.surge.get_interval(POLL_INTERVAL_SECONDS, SURGE_INTERVAL_SECONDS) - - def run(self): - """Blocking main loop. Runs until interrupted.""" - logger.info( - "Brand monitor starting — ambient interval %ds, surge interval %ds", - POLL_INTERVAL_SECONDS, - SURGE_INTERVAL_SECONDS, - ) - while True: - try: - interval = self._run_once() - except Exception as exc: # noqa: BLE001 - logger.error("Poll cycle failed: %s", exc) - interval = POLL_INTERVAL_SECONDS - logger.debug("Sleeping %ds until next poll", interval) - time.sleep(interval) - - -# ------------------------------------------------------------------ -# Entry point -# ------------------------------------------------------------------ - -if __name__ == "__main__": # pragma: no cover - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s %(levelname)s %(name)s — %(message)s", - ) - monitor = Monitor() - monitor.run() diff --git a/brand-monitor/requirements.txt b/brand-monitor/requirements.txt deleted file mode 100644 index 341445eb..00000000 --- a/brand-monitor/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -requests==2.33.1 -python-dotenv==1.0.1 - -# Test / dev -pytest==8.3.5 -pytest-cov==6.1.0 diff --git a/brand-monitor/slack_client.py b/brand-monitor/slack_client.py deleted file mode 100644 index 7ed584a8..00000000 --- a/brand-monitor/slack_client.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Slack webhook client for posting brand mentions and daily digest.""" - -import os -import logging -import requests - -logger = logging.getLogger(__name__) - -# Competitor names that auto-trigger @here alert -COMPETITOR_NAMES = [ - "openai", "langchain", "langgraph", "autogen", "crewai", "crew ai", - "llamaindex", "dify", "flowise", "n8n", "zapier", "make.com", -] - -# Engagement threshold above which @here is triggered -AT_HERE_ENGAGEMENT_THRESHOLD = 10 - - -class SlackClient: - """Posts brand mention alerts and daily digests to a Slack webhook. - - Webhook URL from SLACK_WEBHOOK_URL env var. - """ - - def __init__(self): - self.webhook_url = os.environ.get("SLACK_WEBHOOK_URL") - if not self.webhook_url: - raise EnvironmentError("Missing required environment variable: SLACK_WEBHOOK_URL") - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - - def _engagement_score(self, tweet): - """Sum of likes + retweets + replies.""" - metrics = tweet.get("public_metrics", {}) - return ( - metrics.get("like_count", 0) - + metrics.get("retweet_count", 0) - + metrics.get("reply_count", 0) - ) - - def _escape_mrkdwn(self, text: str) -> str: - """Escape Slack mrkdwn special characters in untrusted content.""" - return text.replace("&", "&").replace("<", "<").replace(">", ">") - - def _should_at_here(self, tweet): - """Return True if the tweet warrants an @here ping.""" - if self._engagement_score(tweet) > AT_HERE_ENGAGEMENT_THRESHOLD: - return True - text = tweet.get("text", "").lower() - return any(comp in text for comp in COMPETITOR_NAMES) - - def _format_tweet_block(self, tweet): - """Format a single tweet as a Slack mrkdwn string.""" - tweet_id = tweet.get("id", "") - author_id = tweet.get("author_id", "unknown") - text = tweet.get("text", "").replace("&", "&").replace("<", "<").replace(">", ">") - created_at = tweet.get("created_at", "") - metrics = tweet.get("public_metrics", {}) - url = f"https://twitter.com/i/web/status/{tweet_id}" - - return ( - f"*New mention* — <{url}|view>\n" - f">{text}\n" - f"Author: `{author_id}` | " - f"❤️ {metrics.get('like_count', 0)} " - f"🔁 {metrics.get('retweet_count', 0)} " - f"💬 {metrics.get('reply_count', 0)}\n" - f"_Posted: {created_at}_" - ) - - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - - def post_mentions(self, tweets): - """Bundle and post new brand mentions to Slack. - - Multiple tweets are sent in a single webhook payload, not one per tweet. - - Args: - tweets: List of tweet dicts from XClient.search_recent(). - - Returns: - None. No-ops on empty list. - - Raises: - requests.HTTPError: On non-2xx Slack response. - """ - if not tweets: - return - - has_at_here = any(self._should_at_here(t) for t in tweets) - - blocks = [] - if has_at_here: - blocks.append( - {"type": "section", "text": {"type": "mrkdwn", "text": ""}} - ) - - count = len(tweets) - header = f"*{count} new Molecule AI mention{'s' if count > 1 else ''}* in #brand-monitoring" - blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": header}}) - blocks.append({"type": "divider"}) - - for tweet in tweets: - blocks.append( - {"type": "section", "text": {"type": "mrkdwn", "text": self._format_tweet_block(tweet)}} - ) - blocks.append({"type": "divider"}) - - payload = {"blocks": blocks} - logger.info("Posting %d mention(s) to Slack (at_here=%s)", count, has_at_here) - response = requests.post(self.webhook_url, json=payload, timeout=15) - response.raise_for_status() - - def post_digest(self, summary): - """Post the daily 20:00 UTC mention digest to Slack. - - Args: - summary: Dict with keys: - count (int): total mentions today - top_tweets (list, optional): list of high-engagement tweet dicts - - Raises: - requests.HTTPError: On non-2xx Slack response. - """ - count = summary.get("count", 0) - top_tweets = summary.get("top_tweets", []) - - lines = [ - "*📊 Daily Digest — Molecule AI Brand Mentions*", - f"Total mentions today: *{count}*", - ] - - if top_tweets: - lines.append("\n*Top engagements:*") - for tweet in top_tweets[:3]: - snippet = self._escape_mrkdwn(tweet.get("text", "")[:120]) - score = self._engagement_score(tweet) - tweet_id = tweet.get("id", "") - url = f"https://twitter.com/i/web/status/{tweet_id}" - lines.append(f"• <{url}|{snippet}…> _(score: {score})_") - - payload = {"text": "\n".join(lines)} - logger.info("Posting daily digest to Slack (count=%d)", count) - response = requests.post(self.webhook_url, json=payload, timeout=15) - response.raise_for_status() diff --git a/brand-monitor/surge.py b/brand-monitor/surge.py deleted file mode 100644 index 9a11800c..00000000 --- a/brand-monitor/surge.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Surge mode state machine. - -Surge mode increases polling frequency from 30 min to 15 min for a -configurable window (default 6 h). State is persisted in a JSON file so -restarts during an active surge window continue in surge mode. - -Activation paths: - 1. Manual: call enable_surge_mode() (or the Slack slash command /surge-monitor on) - 2. Auto: any PR merged with a 'feat:' prefix calls enable_surge_mode() -""" - -import json -import logging -import os -from datetime import datetime, timedelta, timezone - -logger = logging.getLogger(__name__) - -DEFAULT_SURGE_FILE = ".surge_state.json" -DEFAULT_SURGE_DURATION_HOURS = 6 - - -class SurgeState: - """Persist and query surge mode activation. - - Args: - state_file: Path to the JSON state file. Defaults to - ``.surge_state.json`` in the current directory. - """ - - def __init__(self, state_file=DEFAULT_SURGE_FILE): - self.state_file = state_file - - # ------------------------------------------------------------------ - # State I/O - # ------------------------------------------------------------------ - - def _load(self): - """Return parsed state dict, or None if the file doesn't exist.""" - if not os.path.exists(self.state_file): - return None - with open(self.state_file) as fh: - return json.load(fh) - - def _write(self, state): - with open(self.state_file, "w") as fh: - json.dump(state, fh, indent=2) - - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - - def enable(self, duration_hours=DEFAULT_SURGE_DURATION_HOURS): - """Activate surge mode for *duration_hours* hours. - - Writes ``.surge_state.json`` so that restarts re-enter surge mode. - - Args: - duration_hours: How long surge mode stays active (default 6 h). - """ - expires_at = ( - datetime.now(timezone.utc) + timedelta(hours=duration_hours) - ).isoformat() - state = { - "active": True, - "enabled_at": datetime.now(timezone.utc).isoformat(), - "expires_at": expires_at, - "duration_hours": duration_hours, - } - self._write(state) - logger.info("Surge mode enabled for %dh — expires at %s", duration_hours, expires_at) - - def disable(self): - """Deactivate surge mode and remove the state file.""" - if os.path.exists(self.state_file): - os.remove(self.state_file) - logger.info("Surge mode disabled") - - def is_active(self): - """Return True if surge mode is currently active (and not expired). - - Side effect: auto-disables if the expiry timestamp has passed. - """ - state = self._load() - if not state: - return False - expires_at = datetime.fromisoformat(state["expires_at"]) - if datetime.now(timezone.utc) >= expires_at: - logger.info("Surge mode expired — auto-disabling") - self.disable() - return False - return True - - def check_expiry(self): - """Auto-disable surge if its window has elapsed. - - Returns: - bool: whether surge mode is still active after the check. - """ - return self.is_active() - - def get_interval(self, normal_interval, surge_interval): - """Return the appropriate polling interval in seconds. - - Args: - normal_interval: Seconds to sleep in ambient mode. - surge_interval: Seconds to sleep while surge is active. - - Returns: - int: surge_interval if surge is active, else normal_interval. - """ - if self.is_active(): - return surge_interval - return normal_interval diff --git a/brand-monitor/test_monitor.py b/brand-monitor/test_monitor.py deleted file mode 100644 index 649a443a..00000000 --- a/brand-monitor/test_monitor.py +++ /dev/null @@ -1,758 +0,0 @@ -"""Full test suite for brand-monitor modules. - -Run: - pytest test_monitor.py -v --cov=. --cov-report=term-missing --cov-fail-under=100 - -All HTTP calls are mocked — no live API calls, no credentials needed. -""" - -import json -import os -from datetime import datetime, timedelta, timezone -from unittest.mock import MagicMock, call, patch - -import pytest -import requests - -# --------------------------------------------------------------------------- -# Shared fixtures / constants -# --------------------------------------------------------------------------- - -BASE_ENV = { - "X_BEARER_TOKEN": "test-bearer-token", - "X_API_KEY": "test-api-key", - "X_API_SECRET": "test-api-secret", - "SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/TEST", -} - -SAMPLE_TWEET = { - "id": "1111111111", - "text": "Really excited about Molecule AI's agent platform — great SDK!", - "author_id": "9876543210", - "created_at": "2024-01-01T12:00:00Z", - "public_metrics": { - "like_count": 3, - "retweet_count": 1, - "reply_count": 2, - }, -} - -SAMPLE_TWEET_HIGH_ENGAGEMENT = { - "id": "2222222222", - "text": "Molecule AI multi-agent workflow is incredible", - "author_id": "1111111111", - "created_at": "2024-01-01T13:00:00Z", - "public_metrics": { - "like_count": 50, - "retweet_count": 20, - "reply_count": 15, - }, -} - -SAMPLE_TWEET_COMPETITOR = { - "id": "3333333333", - "text": "Comparing Molecule AI with langchain for our orchestration workflow", - "author_id": "2222222222", - "created_at": "2024-01-01T14:00:00Z", - "public_metrics": { - "like_count": 0, - "retweet_count": 0, - "reply_count": 0, - }, -} - - -# =========================================================================== -# x_client tests -# =========================================================================== - - -class TestXClient: - - def test_init_missing_token_raises(self): - from x_client import XClient - - with patch.dict(os.environ, {}, clear=True): - with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"): - XClient() - - def test_init_success(self): - from x_client import XClient - - with patch.dict(os.environ, {"X_BEARER_TOKEN": "my-token"}): - client = XClient() - assert client.bearer_token == "my-token" - - def _make_client(self): - from x_client import XClient - - with patch.dict(os.environ, {"X_BEARER_TOKEN": "tok"}): - return XClient() - - def test_search_recent_returns_tweets(self): - from x_client import SEARCH_QUERY, SEARCH_URL - - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - mock_resp.json.return_value = {"data": [SAMPLE_TWEET]} - - with patch("x_client.requests.get", return_value=mock_resp) as mock_get: - result = client.search_recent() - - assert result == [SAMPLE_TWEET] - # Verify URL, auth header and query string - args, kwargs = mock_get.call_args - assert args[0] == SEARCH_URL - assert kwargs["headers"]["Authorization"] == "Bearer tok" - assert kwargs["params"]["query"] == SEARCH_QUERY - - def test_search_recent_no_data_key_returns_empty_list(self): - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - mock_resp.json.return_value = {"meta": {"result_count": 0}} - - with patch("x_client.requests.get", return_value=mock_resp): - result = client.search_recent() - - assert result == [] - - def test_search_recent_with_since_id_adds_param(self): - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - mock_resp.json.return_value = {"data": [SAMPLE_TWEET]} - - with patch("x_client.requests.get", return_value=mock_resp) as mock_get: - client.search_recent(since_id="9999") - - params = mock_get.call_args.kwargs["params"] - assert params["since_id"] == "9999" - - def test_search_recent_with_start_time_adds_param(self): - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - mock_resp.json.return_value = {"data": []} - - with patch("x_client.requests.get", return_value=mock_resp) as mock_get: - client.search_recent(start_time="2024-01-01T00:00:00Z") - - params = mock_get.call_args.kwargs["params"] - assert params["start_time"] == "2024-01-01T00:00:00Z" - - def test_search_recent_no_since_id_no_start_time_omits_params(self): - """Neither since_id nor start_time in params when not provided.""" - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - mock_resp.json.return_value = {"data": []} - - with patch("x_client.requests.get", return_value=mock_resp) as mock_get: - client.search_recent() - - params = mock_get.call_args.kwargs["params"] - assert "since_id" not in params - assert "start_time" not in params - - def test_search_recent_http_error_propagates(self): - client = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.side_effect = requests.HTTPError("403 Forbidden") - - with patch("x_client.requests.get", return_value=mock_resp): - with pytest.raises(requests.HTTPError): - client.search_recent() - - -# =========================================================================== -# slack_client tests -# =========================================================================== - - -class TestSlackClient: - - def _make_client(self): - from slack_client import SlackClient - - with patch.dict(os.environ, {"SLACK_WEBHOOK_URL": "https://hooks.slack.com/test"}): - return SlackClient() - - def test_init_missing_webhook_raises(self): - from slack_client import SlackClient - - with patch.dict(os.environ, {}, clear=True): - with pytest.raises(EnvironmentError, match="SLACK_WEBHOOK_URL"): - SlackClient() - - def test_init_success(self): - c = self._make_client() - assert c.webhook_url == "https://hooks.slack.com/test" - - def test_engagement_score_sums_correctly(self): - c = self._make_client() - tweet = {"public_metrics": {"like_count": 5, "retweet_count": 3, "reply_count": 2}} - assert c._engagement_score(tweet) == 10 - - def test_engagement_score_missing_metrics_returns_zero(self): - c = self._make_client() - assert c._engagement_score({}) == 0 - - def test_should_at_here_high_engagement_returns_true(self): - c = self._make_client() - assert c._should_at_here(SAMPLE_TWEET_HIGH_ENGAGEMENT) is True - - def test_should_at_here_competitor_name_returns_true(self): - c = self._make_client() - # SAMPLE_TWEET_COMPETITOR contains "langchain" — engagement is 0 - assert c._should_at_here(SAMPLE_TWEET_COMPETITOR) is True - - def test_should_at_here_normal_tweet_returns_false(self): - c = self._make_client() - # SAMPLE_TWEET: engagement=6 (<=10), no competitor - assert c._should_at_here(SAMPLE_TWEET) is False - - def test_post_mentions_empty_list_is_noop(self): - c = self._make_client() - with patch("slack_client.requests.post") as mock_post: - c.post_mentions([]) - mock_post.assert_not_called() - - def test_post_mentions_single_tweet_no_at_here(self): - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_mentions([SAMPLE_TWEET]) - - mock_post.assert_called_once() - payload = mock_post.call_args.kwargs["json"] - section_texts = [ - b["text"]["text"] - for b in payload["blocks"] - if b.get("type") == "section" - ] - # No @here for normal engagement tweet - assert not any("" in t for t in section_texts) - # Header mentions "1 new … mention" - assert any("1 new" in t for t in section_texts) - - def test_post_mentions_multiple_tweets_with_at_here(self): - """High-engagement tweet triggers @here; both tweets appear in payload.""" - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_mentions([SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET]) - - payload = mock_post.call_args.kwargs["json"] - section_texts = [ - b["text"]["text"] - for b in payload["blocks"] - if b.get("type") == "section" - ] - assert any("" in t for t in section_texts) - assert any("2 new" in t for t in section_texts) - - def test_post_mentions_html_escaping_in_tweet_text(self): - """< > & in tweet text are escaped to prevent Slack mrkdwn injection.""" - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - tweet = {**SAMPLE_TWEET, "text": "X < Y & Z > W"} - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_mentions([tweet]) - - raw = str(mock_post.call_args.kwargs["json"]) - assert "<" in raw - assert ">" in raw - assert "&" in raw - - def test_post_mentions_http_error_propagates(self): - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.side_effect = requests.HTTPError("500") - - with patch("slack_client.requests.post", return_value=mock_resp): - with pytest.raises(requests.HTTPError): - c.post_mentions([SAMPLE_TWEET]) - - def test_post_digest_count_only_no_top_tweets(self): - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_digest({"count": 42}) - - text = mock_post.call_args.kwargs["json"]["text"] - assert "42" in text - assert "Top engagements" not in text - - def test_post_digest_with_top_tweets_included(self): - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_digest({"count": 10, "top_tweets": [SAMPLE_TWEET_HIGH_ENGAGEMENT, SAMPLE_TWEET]}) - - text = mock_post.call_args.kwargs["json"]["text"] - assert "Top engagements" in text - - def test_post_digest_mrkdwn_escaping_in_snippet(self): - """< > & in top-tweet snippets are escaped to prevent mrkdwn injection.""" - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.return_value = None - malicious_tweet = {**SAMPLE_TWEET, "text": "X < Y & Z > W "} - - with patch("slack_client.requests.post", return_value=mock_resp) as mock_post: - c.post_digest({"count": 1, "top_tweets": [malicious_tweet]}) - - text = mock_post.call_args.kwargs["json"]["text"] - assert "<" in text - assert ">" in text - assert "&" in text - assert "" not in text - assert "<" not in text.split("twitter.com")[1] # no raw < after the URL - - def test_post_digest_http_error_propagates(self): - c = self._make_client() - mock_resp = MagicMock() - mock_resp.raise_for_status.side_effect = requests.HTTPError("500") - - with patch("slack_client.requests.post", return_value=mock_resp): - with pytest.raises(requests.HTTPError): - c.post_digest({"count": 1}) - - -# =========================================================================== -# surge tests -# =========================================================================== - - -class TestSurgeState: - - def _make_surge(self, tmp_path): - from surge import SurgeState - - return SurgeState(state_file=str(tmp_path / ".surge_state.json")) - - def test_init_default_state_file(self): - from surge import DEFAULT_SURGE_FILE, SurgeState - - s = SurgeState() - assert s.state_file == DEFAULT_SURGE_FILE - - def test_init_custom_state_file(self, tmp_path): - s = self._make_surge(tmp_path) - assert ".surge_state.json" in s.state_file - - def test_enable_writes_state_file_with_correct_fields(self, tmp_path): - s = self._make_surge(tmp_path) - s.enable(duration_hours=3) - state = json.loads(open(s.state_file).read()) - assert state["active"] is True - assert state["duration_hours"] == 3 - assert "expires_at" in state - assert "enabled_at" in state - - def test_enable_default_duration(self, tmp_path): - from surge import DEFAULT_SURGE_DURATION_HOURS - - s = self._make_surge(tmp_path) - s.enable() - state = json.loads(open(s.state_file).read()) - assert state["duration_hours"] == DEFAULT_SURGE_DURATION_HOURS - - def test_disable_removes_file(self, tmp_path): - s = self._make_surge(tmp_path) - s.enable() - assert os.path.exists(s.state_file) - s.disable() - assert not os.path.exists(s.state_file) - - def test_disable_no_file_does_not_raise(self, tmp_path): - s = self._make_surge(tmp_path) - # File doesn't exist — should be silent - s.disable() - - def test_is_active_no_file_returns_false(self, tmp_path): - s = self._make_surge(tmp_path) - assert s.is_active() is False - - def test_is_active_not_expired_returns_true(self, tmp_path): - s = self._make_surge(tmp_path) - s.enable(duration_hours=6) - assert s.is_active() is True - - def test_is_active_expired_auto_disables_returns_false(self, tmp_path): - s = self._make_surge(tmp_path) - # Write an already-expired state - past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() - json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w")) - assert s.is_active() is False - assert not os.path.exists(s.state_file) - - def test_check_expiry_returns_true_when_active(self, tmp_path): - s = self._make_surge(tmp_path) - s.enable(duration_hours=6) - assert s.check_expiry() is True - - def test_check_expiry_returns_false_when_expired(self, tmp_path): - s = self._make_surge(tmp_path) - past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() - json.dump({"active": True, "expires_at": past, "duration_hours": 1}, open(s.state_file, "w")) - assert s.check_expiry() is False - - def test_get_interval_surge_active_returns_surge_interval(self, tmp_path): - s = self._make_surge(tmp_path) - s.enable(duration_hours=6) - assert s.get_interval(1800, 900) == 900 - - def test_get_interval_surge_inactive_returns_normal_interval(self, tmp_path): - s = self._make_surge(tmp_path) - assert s.get_interval(1800, 900) == 1800 - - -# =========================================================================== -# monitor — validate_env tests -# =========================================================================== - - -class TestValidateEnv: - - def test_all_vars_present_passes(self): - from monitor import validate_env - - with patch.dict(os.environ, BASE_ENV, clear=False): - validate_env() # must not raise - - def test_single_missing_var_raises_with_name(self): - from monitor import validate_env - - env = {k: v for k, v in BASE_ENV.items() if k != "X_BEARER_TOKEN"} - with patch.dict(os.environ, env, clear=True): - with pytest.raises(EnvironmentError, match="X_BEARER_TOKEN"): - validate_env() - - def test_multiple_missing_vars_raises_with_all_names(self): - from monitor import validate_env - - with patch.dict(os.environ, {}, clear=True): - with pytest.raises(EnvironmentError) as exc_info: - validate_env() - msg = str(exc_info.value) - assert "X_BEARER_TOKEN" in msg - assert "SLACK_WEBHOOK_URL" in msg - - -# =========================================================================== -# monitor — enable_surge_mode tests -# =========================================================================== - - -class TestEnableSurgeMode: - - def test_default_duration_uses_env_default(self, tmp_path): - from monitor import SURGE_DURATION_HOURS, enable_surge_mode - - sf = str(tmp_path / ".surge.json") - enable_surge_mode(state_file=sf) - state = json.loads(open(sf).read()) - assert state["duration_hours"] == SURGE_DURATION_HOURS - - def test_custom_duration_overrides_default(self, tmp_path): - from monitor import enable_surge_mode - - sf = str(tmp_path / ".surge.json") - enable_surge_mode(duration_hours=12, state_file=sf) - state = json.loads(open(sf).read()) - assert state["duration_hours"] == 12 - - def test_no_state_file_override_uses_default_path(self): - """When state_file=None, SurgeState() is constructed with no kwargs.""" - from monitor import enable_surge_mode - - with patch("monitor.SurgeState") as MockSurge: - mock_instance = MagicMock() - MockSurge.return_value = mock_instance - enable_surge_mode(duration_hours=3) - - MockSurge.assert_called_once_with() - mock_instance.enable.assert_called_once_with(3) - - -# =========================================================================== -# monitor — Monitor class tests -# =========================================================================== - - -class TestMonitor: - """Tests for the Monitor class.""" - - # ------------------------------------------------------------------ - # Constructor helpers - # ------------------------------------------------------------------ - - def _make_monitor(self, tmp_path, state_data=None): - """Build a Monitor with temp files and mocked HTTP clients.""" - from monitor import Monitor - - state_file = str(tmp_path / "monitor_state.json") - surge_file = str(tmp_path / "surge_state.json") - - if state_data is not None: - json.dump(state_data, open(state_file, "w")) - - with patch.dict(os.environ, BASE_ENV, clear=False): - with patch("monitor.XClient"), patch("monitor.SlackClient"): - m = Monitor(state_file=state_file, surge_state_file=surge_file) - return m - - # ------------------------------------------------------------------ - # __init__ - # ------------------------------------------------------------------ - - def test_init_success_with_empty_state(self, tmp_path): - m = self._make_monitor(tmp_path) - assert m.state == {} - - def test_init_loads_existing_state_file(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"since_id": "abc"}) - assert m.state["since_id"] == "abc" - - def test_init_missing_env_raises(self, tmp_path): - from monitor import Monitor - - sf = str(tmp_path / "st.json") - with patch.dict(os.environ, {}, clear=True): - with pytest.raises(EnvironmentError): - Monitor(state_file=sf) - - def test_init_surge_state_file_none_uses_default(self, tmp_path): - """surge_state_file=None → SurgeState constructed with no kwargs.""" - from monitor import Monitor - - sf = str(tmp_path / "st.json") - with patch.dict(os.environ, BASE_ENV, clear=False): - with patch("monitor.XClient"), patch("monitor.SlackClient"): - with patch("monitor.SurgeState") as MockSurge: - Monitor(state_file=sf) # surge_state_file defaults to None - - MockSurge.assert_called_once_with() - - def test_init_surge_state_file_provided_passes_kwarg(self, tmp_path): - """surge_state_file provided → SurgeState(state_file=...) is called.""" - from monitor import Monitor - - sf = str(tmp_path / "st.json") - surge_sf = str(tmp_path / "surge.json") - with patch.dict(os.environ, BASE_ENV, clear=False): - with patch("monitor.XClient"), patch("monitor.SlackClient"): - with patch("monitor.SurgeState") as MockSurge: - Monitor(state_file=sf, surge_state_file=surge_sf) - - MockSurge.assert_called_once_with(state_file=surge_sf) - - # ------------------------------------------------------------------ - # _load_state / _save_state - # ------------------------------------------------------------------ - - def test_load_state_no_file_returns_empty_dict(self, tmp_path): - m = self._make_monitor(tmp_path) - assert m._load_state() == {} - - def test_load_state_existing_file_returns_contents(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"since_id": "XYZ"}) - assert m._load_state()["since_id"] == "XYZ" - - def test_save_state_persists_to_disk(self, tmp_path): - m = self._make_monitor(tmp_path) - m.state["since_id"] = "saved" - m._save_state() - on_disk = json.loads(open(m.state_file).read()) - assert on_disk["since_id"] == "saved" - - # ------------------------------------------------------------------ - # run_poll - # ------------------------------------------------------------------ - - def test_run_poll_first_run_uses_start_time_backfill(self, tmp_path): - """No since_id → search_recent called with start_time set, since_id=None.""" - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [SAMPLE_TWEET] - - tweets = m.run_poll() - - kw = m.x_client.search_recent.call_args.kwargs - assert kw["since_id"] is None - assert kw["start_time"] is not None # 24h backfill - assert tweets == [SAMPLE_TWEET] - assert m.state["since_id"] == SAMPLE_TWEET["id"] - - def test_run_poll_subsequent_run_passes_since_id(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"since_id": "prev_tweet_id"}) - m.x_client.search_recent.return_value = [SAMPLE_TWEET] - - m.run_poll() - - kw = m.x_client.search_recent.call_args.kwargs - assert kw["since_id"] == "prev_tweet_id" - - def test_run_poll_no_tweets_does_not_post_to_slack(self, tmp_path): - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [] - - tweets = m.run_poll() - - m.slack_client.post_mentions.assert_not_called() - assert "since_id" not in m.state - assert tweets == [] - - def test_run_poll_no_tweets_preserves_existing_since_id(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"since_id": "old_id"}) - m.x_client.search_recent.return_value = [] - - m.run_poll() - - assert m.state["since_id"] == "old_id" - - def test_run_poll_new_tweets_posts_to_slack_and_updates_since_id(self, tmp_path): - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [SAMPLE_TWEET] - - m.run_poll() - - m.slack_client.post_mentions.assert_called_once_with([SAMPLE_TWEET]) - assert m.state["since_id"] == SAMPLE_TWEET["id"] - - # ------------------------------------------------------------------ - # _should_send_digest - # ------------------------------------------------------------------ - - def test_should_send_digest_wrong_hour_returns_false(self, tmp_path): - m = self._make_monitor(tmp_path) - fake_now = datetime(2024, 1, 1, 15, 0, 0, tzinfo=timezone.utc) # 15:00 UTC - with patch("monitor.datetime") as mock_dt: - mock_dt.now.return_value = fake_now - assert m._should_send_digest() is False - - def test_should_send_digest_correct_hour_not_yet_sent_returns_true(self, tmp_path): - m = self._make_monitor(tmp_path) - fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc) # 20:00 UTC - with patch("monitor.datetime") as mock_dt: - mock_dt.now.return_value = fake_now - assert m._should_send_digest() is True - - def test_should_send_digest_already_sent_today_returns_false(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"last_digest_date": "2024-01-01"}) - fake_now = datetime(2024, 1, 1, 20, 0, 0, tzinfo=timezone.utc) - with patch("monitor.datetime") as mock_dt: - mock_dt.now.return_value = fake_now - assert m._should_send_digest() is False - - # ------------------------------------------------------------------ - # run_daily_digest - # ------------------------------------------------------------------ - - def test_run_daily_digest_posts_count_and_resets(self, tmp_path): - m = self._make_monitor(tmp_path, state_data={"daily_count": 7}) - - m.run_daily_digest() - - m.slack_client.post_digest.assert_called_once_with({"count": 7}) - assert m.state["daily_count"] == 0 - assert "last_digest_date" in m.state - - # ------------------------------------------------------------------ - # _run_once - # ------------------------------------------------------------------ - - def test_run_once_no_digest_returns_normal_interval(self, tmp_path): - from monitor import POLL_INTERVAL_SECONDS - - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [SAMPLE_TWEET] - - with patch.object(m, "_should_send_digest", return_value=False): - interval = m._run_once() - - assert m.state["daily_count"] == 1 - assert interval == POLL_INTERVAL_SECONDS - - def test_run_once_triggers_digest_when_due(self, tmp_path): - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [] - - with patch.object(m, "_should_send_digest", return_value=True): - with patch.object(m, "run_daily_digest") as mock_digest: - m._run_once() - - mock_digest.assert_called_once() - - def test_run_once_returns_surge_interval_when_surge_active(self, tmp_path): - from monitor import SURGE_INTERVAL_SECONDS - - m = self._make_monitor(tmp_path) - m.x_client.search_recent.return_value = [] - m.surge.enable(duration_hours=6) - - with patch.object(m, "_should_send_digest", return_value=False): - interval = m._run_once() - - assert interval == SURGE_INTERVAL_SECONDS - - # ------------------------------------------------------------------ - # run (infinite loop) - # ------------------------------------------------------------------ - - def test_run_normal_path_sleeps_with_returned_interval(self, tmp_path): - from monitor import Monitor, POLL_INTERVAL_SECONDS - - sf = str(tmp_path / "st.json") - surge_sf = str(tmp_path / "surge.json") - with patch.dict(os.environ, BASE_ENV, clear=False): - with patch("monitor.XClient"), patch("monitor.SlackClient"): - m = Monitor(state_file=sf, surge_state_file=surge_sf) - - sleep_calls = [] - - def fake_sleep(n): - sleep_calls.append(n) - raise SystemExit("terminate test loop") - - with patch.object(m, "_run_once", return_value=POLL_INTERVAL_SECONDS): - with patch("monitor.time.sleep", side_effect=fake_sleep): - with pytest.raises(SystemExit): - m.run() - - assert sleep_calls == [POLL_INTERVAL_SECONDS] - - def test_run_exception_in_run_once_falls_back_to_poll_interval(self, tmp_path): - from monitor import Monitor, POLL_INTERVAL_SECONDS - - sf = str(tmp_path / "st.json") - surge_sf = str(tmp_path / "surge.json") - with patch.dict(os.environ, BASE_ENV, clear=False): - with patch("monitor.XClient"), patch("monitor.SlackClient"): - m = Monitor(state_file=sf, surge_state_file=surge_sf) - - sleep_calls = [] - - def fake_sleep(n): - sleep_calls.append(n) - raise SystemExit("terminate test loop") - - with patch.object(m, "_run_once", side_effect=RuntimeError("api exploded")): - with patch("monitor.time.sleep", side_effect=fake_sleep): - with pytest.raises(SystemExit): - m.run() - - # On exception, sleep is called with the ambient interval - assert sleep_calls == [POLL_INTERVAL_SECONDS] diff --git a/brand-monitor/x_client.py b/brand-monitor/x_client.py deleted file mode 100644 index af05523e..00000000 --- a/brand-monitor/x_client.py +++ /dev/null @@ -1,65 +0,0 @@ -"""X API v2 thin client for brand mention search.""" - -import os -import logging -import requests - -logger = logging.getLogger(__name__) - -SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent" - -# Verbatim from issue #549 — drug-discovery SEO noise suppressed at query level -SEARCH_QUERY = ( - '("Molecule AI" OR "@moleculeai") ' - '(agent OR workflow OR orchestrat OR "multi-agent" OR developer OR SDK OR API OR "agent platform") ' - '-moleculeai.com -molecule.ai -"drug discovery" -pharmaceutical -CRISPR -oncology ' - '-is:retweet lang:en' -) - -TWEET_FIELDS = "author_id,created_at,public_metrics,entities" - - -class XClient: - """Thin wrapper around X API v2 recent-search endpoint. - - Auth: Bearer token from X_BEARER_TOKEN env var. - """ - - def __init__(self): - self.bearer_token = os.environ.get("X_BEARER_TOKEN") - if not self.bearer_token: - raise EnvironmentError("Missing required environment variable: X_BEARER_TOKEN") - - def search_recent(self, since_id=None, start_time=None, max_results=100): - """Search recent tweets matching SEARCH_QUERY. - - Args: - since_id: Only return tweets newer than this tweet ID. - start_time: ISO 8601 datetime string; only return tweets after this time. - max_results: Max tweets per request (10–100). - - Returns: - List of tweet dicts (newest first), empty list if none found. - - Raises: - requests.HTTPError: On non-2xx API response. - """ - headers = {"Authorization": f"Bearer {self.bearer_token}"} - params = { - "query": SEARCH_QUERY, - "tweet.fields": TWEET_FIELDS, - "max_results": max_results, - } - if since_id: - params["since_id"] = since_id - if start_time: - params["start_time"] = start_time - - logger.debug("Searching X API: since_id=%s start_time=%s", since_id, start_time) - response = requests.get(SEARCH_URL, headers=headers, params=params, timeout=30) - response.raise_for_status() - - data = response.json() - tweets = data.get("data", []) - logger.info("X API returned %d tweet(s)", len(tweets)) - return tweets