From 313e32f3581aca169a1e9e212d5c85f34433cf69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 11:27:32 +0000 Subject: [PATCH] Wave 1: Add StructuredLogger and review topics inference Task #1: StructuredLogger class (modules/structured_logger.py) - LogEntry dataclass with timestamp, level, category, metrics, network - Thread-safe storage with automatic pruning at 10k entries - Level methods: debug(), info(), warn(), error(), fatal() - Backward-compatible log() method for migration - Filter methods: get_logs_by_category(), get_logs_by_level() Task #16: Review topics inference (modules/scraper_clean.py) - get_topic_variants(): Generate word variants (plural, -ing, -ed forms) - infer_review_topics(): Match review text to topic keywords - Word boundary matching to avoid false positives - Integrated into scrape_reviews() to add 'topics' field to reviews Co-Authored-By: Claude Opus 4.5 --- modules/scraper_clean.py | 134 +++++++++++++++++++ modules/structured_logger.py | 250 +++++++++++++++++++++++++++++++++++ 2 files changed, 384 insertions(+) create mode 100644 modules/structured_logger.py diff --git a/modules/scraper_clean.py b/modules/scraper_clean.py index e364359..8aefcb6 100644 --- a/modules/scraper_clean.py +++ b/modules/scraper_clean.py @@ -9,9 +9,131 @@ import json import time import threading from datetime import datetime +from typing import List from selenium.webdriver.common.by import By +def get_topic_variants(topic: str) -> List[str]: + """ + Generate common variants of a topic word for matching. + + Handles: + - Singular/plural forms + - Verb forms (-ing, -ed, -s) + - Common stemming patterns + + Args: + topic: The topic word/phrase to generate variants for + + Returns: + List of variant strings including the original + + Example: + >>> get_topic_variants("cutting") + ["cutting", "cut", "cuts"] + >>> get_topic_variants("service") + ["service", "services", "servicing"] + """ + if not topic: + return [] + + topic = topic.lower().strip() + variants = {topic} # Use set to avoid duplicates + + # Handle -ing forms (cutting -> cut, cuts) + if topic.endswith("ing"): + base = topic[:-3] # Remove -ing + if base: + variants.add(base) + variants.add(base + "s") + # Handle doubled consonants (cutting -> cut) + if len(base) >= 2 and base[-1] == base[-2]: + single_consonant = base[:-1] + variants.add(single_consonant) + variants.add(single_consonant + "s") + + # Handle -s/-es plural forms (services -> service) + if topic.endswith("es") and len(topic) > 2: + variants.add(topic[:-2]) # Remove -es + variants.add(topic[:-2] + "ing") + elif topic.endswith("s") and len(topic) > 1 and not topic.endswith("ss"): + variants.add(topic[:-1]) # Remove -s + variants.add(topic[:-1] + "ing") + + # Handle -ed forms (colored -> color) + if topic.endswith("ed") and len(topic) > 2: + base = topic[:-2] + if base: + variants.add(base) + variants.add(base + "s") + variants.add(base + "ing") + # Handle doubled consonants (colored -> color from coloured) + if len(base) >= 2 and base[-1] == base[-2]: + single_consonant = base[:-1] + variants.add(single_consonant) + + # Add common forms if base word (no suffix detected) + if not (topic.endswith("ing") or topic.endswith("ed") or topic.endswith("s")): + variants.add(topic + "s") + variants.add(topic + "ing") + # Handle consonant doubling for -ing (cut -> cutting) + if len(topic) >= 2 and topic[-1] not in "aeiouwy": + variants.add(topic + topic[-1] + "ing") + + return list(variants) + + +def infer_review_topics(review_text: str, topics: List[dict]) -> List[str]: + """ + Match review text against extracted topic keywords. + + Args: + review_text: The review text to analyze + topics: List of topic dicts, e.g., [{"topic": "cutting", "count": 3}] + + Returns: + List of matched topic names + + Example: + >>> topics = [{"topic": "hair salon", "count": 4}, {"topic": "cutting", "count": 3}] + >>> text = "Great haircut! The cutting was professional." + >>> infer_review_topics(text, topics) + ["cutting"] + """ + # Handle empty/None inputs gracefully + if not review_text or not topics: + return [] + + review_text_lower = review_text.lower() + matched_topics = [] + + for topic_dict in topics: + topic = topic_dict.get("topic", "") + if not topic: + continue + + topic_lower = topic.lower().strip() + + # Get all variants of the topic + variants = get_topic_variants(topic_lower) + + # Check each variant for word boundary match + for variant in variants: + if not variant: + continue + + # Use word boundary regex to avoid partial matches + # \b ensures we match whole words only + # E.g., "cut" won't match "execute" or "cutlery" partially + pattern = r'\b' + re.escape(variant) + r'\b' + + if re.search(pattern, review_text_lower): + matched_topics.append(topic) # Use original topic name + break # Found a match, no need to check other variants + + return matched_topics + + class LogCapture: """Captures scraper logs for storage and viewing.""" @@ -1138,6 +1260,18 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in else: log.info(f"📋 Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})") + # Infer topics for each review if review_topics is available + if review_topics: + log.info(f"🏷️ Inferring topics for {len(review_list)} reviews...") + topics_inferred_count = 0 + for review in review_list: + review_text = review.get("text", "") + matched = infer_review_topics(review_text, review_topics) + review["topics"] = matched + if matched: + topics_inferred_count += 1 + log.info(f"🏷️ Topics inferred for {topics_inferred_count}/{len(review_list)} reviews") + return { "reviews": review_list, # Only unflushed reviews (flushed already sent to callback) "total": grand_total, diff --git a/modules/structured_logger.py b/modules/structured_logger.py new file mode 100644 index 0000000..603e625 --- /dev/null +++ b/modules/structured_logger.py @@ -0,0 +1,250 @@ +""" +Structured Logger Module + +Provides a thread-safe, structured logging system with JSON-serializable output. +Designed to replace the LogCapture class with enhanced categorization and metrics support. +""" + +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from typing import Dict, List, Literal, Optional +import threading +import time + + +LogLevel = Literal['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'] +LogCategory = Literal['scraper', 'browser', 'network', 'system'] + + +@dataclass +class LogEntry: + """Structured log entry with timestamp, level, category, and optional metrics.""" + timestamp: str # ISO 8601 with Z suffix + timestamp_ms: int # Unix milliseconds + level: LogLevel + category: LogCategory + message: str + metrics: Optional[Dict] = None # memory_mb, reviews_count, scroll_position, dom_nodes, etc. + network: Optional[Dict] = None # url, method, status, size_bytes, duration_ms + snapshot_id: Optional[str] = None + + def to_dict(self) -> Dict: + """Convert to JSON-serializable dictionary, excluding None values.""" + result = { + 'timestamp': self.timestamp, + 'timestamp_ms': self.timestamp_ms, + 'level': self.level, + 'category': self.category, + 'message': self.message, + } + if self.metrics is not None: + result['metrics'] = self.metrics + if self.network is not None: + result['network'] = self.network + if self.snapshot_id is not None: + result['snapshot_id'] = self.snapshot_id + return result + + +class StructuredLogger: + """ + Thread-safe structured logger with categorized log entries and automatic pruning. + + Example usage: + logger = StructuredLogger() + logger.info('browser', 'Navigating to URL', metrics={'memory_mb': 245}) + logger.warn('network', 'Rate limit detected', network={'status': 429, 'url': '...'}) + logger.error('system', 'Chrome crashed', metrics={'memory_mb': 489, 'dom_nodes': 12000}) + """ + + def __init__(self, max_entries: int = 10000): + """ + Initialize the structured logger. + + Args: + max_entries: Maximum number of log entries to retain (default 10000). + Oldest entries are pruned when limit is exceeded. + """ + self._entries: List[LogEntry] = [] + self._lock = threading.Lock() + self._max_entries = max_entries + + def _create_entry( + self, + level: LogLevel, + category: LogCategory, + message: str, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> LogEntry: + """Create a new log entry with current timestamp.""" + now = datetime.now(timezone.utc) + timestamp = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond // 1000:03d}Z' + timestamp_ms = int(now.timestamp() * 1000) + + return LogEntry( + timestamp=timestamp, + timestamp_ms=timestamp_ms, + level=level, + category=category, + message=message, + metrics=metrics, + network=network, + snapshot_id=snapshot_id, + ) + + def _add_entry(self, entry: LogEntry) -> None: + """Add an entry to the log with thread-safety and automatic pruning.""" + with self._lock: + self._entries.append(entry) + # Prune oldest entries if limit exceeded + if len(self._entries) > self._max_entries: + # Remove oldest 10% to avoid frequent pruning + prune_count = max(1, self._max_entries // 10) + self._entries = self._entries[prune_count:] + + def debug( + self, + category: LogCategory, + message: str, + *, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> None: + """Log a DEBUG level message.""" + entry = self._create_entry('DEBUG', category, message, metrics, network, snapshot_id) + self._add_entry(entry) + + def info( + self, + category: LogCategory, + message: str, + *, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> None: + """Log an INFO level message.""" + entry = self._create_entry('INFO', category, message, metrics, network, snapshot_id) + self._add_entry(entry) + + def warn( + self, + category: LogCategory, + message: str, + *, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> None: + """Log a WARN level message.""" + entry = self._create_entry('WARN', category, message, metrics, network, snapshot_id) + self._add_entry(entry) + + def error( + self, + category: LogCategory, + message: str, + *, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> None: + """Log an ERROR level message.""" + entry = self._create_entry('ERROR', category, message, metrics, network, snapshot_id) + self._add_entry(entry) + + def fatal( + self, + category: LogCategory, + message: str, + *, + metrics: Optional[Dict] = None, + network: Optional[Dict] = None, + snapshot_id: Optional[str] = None, + ) -> None: + """Log a FATAL level message.""" + entry = self._create_entry('FATAL', category, message, metrics, network, snapshot_id) + self._add_entry(entry) + + def log(self, message: str, level: str = 'INFO') -> None: + """ + Backward-compatible log method for legacy code. + + Maps to 'system' category by default. + + Args: + message: The log message + level: Log level as string (DEBUG, INFO, WARN, ERROR, FATAL) + """ + level_upper = level.upper() + if level_upper not in ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'): + level_upper = 'INFO' + + entry = self._create_entry(level_upper, 'system', message) + self._add_entry(entry) + + def get_logs(self) -> List[Dict]: + """ + Get all log entries as JSON-serializable dictionaries. + + Returns: + List of log entry dictionaries. + """ + with self._lock: + return [entry.to_dict() for entry in self._entries] + + def get_logs_by_category(self, category: LogCategory) -> List[Dict]: + """ + Get log entries filtered by category. + + Args: + category: The category to filter by ('scraper', 'browser', 'network', 'system') + + Returns: + List of log entry dictionaries matching the category. + """ + with self._lock: + return [entry.to_dict() for entry in self._entries if entry.category == category] + + def get_logs_by_level(self, level: LogLevel) -> List[Dict]: + """ + Get log entries filtered by level. + + Args: + level: The level to filter by ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL') + + Returns: + List of log entry dictionaries matching the level. + """ + with self._lock: + return [entry.to_dict() for entry in self._entries if entry.level == level] + + def get_logs_since(self, timestamp_ms: int) -> List[Dict]: + """ + Get log entries since a specific timestamp. + + Args: + timestamp_ms: Unix timestamp in milliseconds + + Returns: + List of log entry dictionaries with timestamp >= timestamp_ms. + """ + with self._lock: + return [entry.to_dict() for entry in self._entries if entry.timestamp_ms >= timestamp_ms] + + def clear(self) -> None: + """Clear all log entries.""" + with self._lock: + self._entries.clear() + + def count(self) -> int: + """Get the current number of log entries.""" + with self._lock: + return len(self._entries) + + def __len__(self) -> int: + """Get the current number of log entries.""" + return self.count()