Wave 1: Add StructuredLogger and review topics inference

Task #1: StructuredLogger class (modules/structured_logger.py)
- LogEntry dataclass with timestamp, level, category, metrics, network
- Thread-safe storage with automatic pruning at 10k entries
- Level methods: debug(), info(), warn(), error(), fatal()
- Backward-compatible log() method for migration
- Filter methods: get_logs_by_category(), get_logs_by_level()

Task #16: Review topics inference (modules/scraper_clean.py)
- get_topic_variants(): Generate word variants (plural, -ing, -ed forms)
- infer_review_topics(): Match review text to topic keywords
- Word boundary matching to avoid false positives
- Integrated into scrape_reviews() to add 'topics' field to reviews

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 11:27:32 +00:00
parent 3da243be79
commit 313e32f358
2 changed files with 384 additions and 0 deletions

View File

@@ -9,9 +9,131 @@ import json
import time
import threading
from datetime import datetime
from typing import List
from selenium.webdriver.common.by import By
def get_topic_variants(topic: str) -> List[str]:
"""
Generate common variants of a topic word for matching.
Handles:
- Singular/plural forms
- Verb forms (-ing, -ed, -s)
- Common stemming patterns
Args:
topic: The topic word/phrase to generate variants for
Returns:
List of variant strings including the original
Example:
>>> get_topic_variants("cutting")
["cutting", "cut", "cuts"]
>>> get_topic_variants("service")
["service", "services", "servicing"]
"""
if not topic:
return []
topic = topic.lower().strip()
variants = {topic} # Use set to avoid duplicates
# Handle -ing forms (cutting -> cut, cuts)
if topic.endswith("ing"):
base = topic[:-3] # Remove -ing
if base:
variants.add(base)
variants.add(base + "s")
# Handle doubled consonants (cutting -> cut)
if len(base) >= 2 and base[-1] == base[-2]:
single_consonant = base[:-1]
variants.add(single_consonant)
variants.add(single_consonant + "s")
# Handle -s/-es plural forms (services -> service)
if topic.endswith("es") and len(topic) > 2:
variants.add(topic[:-2]) # Remove -es
variants.add(topic[:-2] + "ing")
elif topic.endswith("s") and len(topic) > 1 and not topic.endswith("ss"):
variants.add(topic[:-1]) # Remove -s
variants.add(topic[:-1] + "ing")
# Handle -ed forms (colored -> color)
if topic.endswith("ed") and len(topic) > 2:
base = topic[:-2]
if base:
variants.add(base)
variants.add(base + "s")
variants.add(base + "ing")
# Handle doubled consonants (colored -> color from coloured)
if len(base) >= 2 and base[-1] == base[-2]:
single_consonant = base[:-1]
variants.add(single_consonant)
# Add common forms if base word (no suffix detected)
if not (topic.endswith("ing") or topic.endswith("ed") or topic.endswith("s")):
variants.add(topic + "s")
variants.add(topic + "ing")
# Handle consonant doubling for -ing (cut -> cutting)
if len(topic) >= 2 and topic[-1] not in "aeiouwy":
variants.add(topic + topic[-1] + "ing")
return list(variants)
def infer_review_topics(review_text: str, topics: List[dict]) -> List[str]:
"""
Match review text against extracted topic keywords.
Args:
review_text: The review text to analyze
topics: List of topic dicts, e.g., [{"topic": "cutting", "count": 3}]
Returns:
List of matched topic names
Example:
>>> topics = [{"topic": "hair salon", "count": 4}, {"topic": "cutting", "count": 3}]
>>> text = "Great haircut! The cutting was professional."
>>> infer_review_topics(text, topics)
["cutting"]
"""
# Handle empty/None inputs gracefully
if not review_text or not topics:
return []
review_text_lower = review_text.lower()
matched_topics = []
for topic_dict in topics:
topic = topic_dict.get("topic", "")
if not topic:
continue
topic_lower = topic.lower().strip()
# Get all variants of the topic
variants = get_topic_variants(topic_lower)
# Check each variant for word boundary match
for variant in variants:
if not variant:
continue
# Use word boundary regex to avoid partial matches
# \b ensures we match whole words only
# E.g., "cut" won't match "execute" or "cutlery" partially
pattern = r'\b' + re.escape(variant) + r'\b'
if re.search(pattern, review_text_lower):
matched_topics.append(topic) # Use original topic name
break # Found a match, no need to check other variants
return matched_topics
class LogCapture:
"""Captures scraper logs for storage and viewing."""
@@ -1138,6 +1260,18 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in
else:
log.info(f"📋 Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})")
# Infer topics for each review if review_topics is available
if review_topics:
log.info(f"🏷️ Inferring topics for {len(review_list)} reviews...")
topics_inferred_count = 0
for review in review_list:
review_text = review.get("text", "")
matched = infer_review_topics(review_text, review_topics)
review["topics"] = matched
if matched:
topics_inferred_count += 1
log.info(f"🏷️ Topics inferred for {topics_inferred_count}/{len(review_list)} reviews")
return {
"reviews": review_list, # Only unflushed reviews (flushed already sent to callback)
"total": grand_total,

View File

@@ -0,0 +1,250 @@
"""
Structured Logger Module
Provides a thread-safe, structured logging system with JSON-serializable output.
Designed to replace the LogCapture class with enhanced categorization and metrics support.
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, List, Literal, Optional
import threading
import time
LogLevel = Literal['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']
LogCategory = Literal['scraper', 'browser', 'network', 'system']
@dataclass
class LogEntry:
"""Structured log entry with timestamp, level, category, and optional metrics."""
timestamp: str # ISO 8601 with Z suffix
timestamp_ms: int # Unix milliseconds
level: LogLevel
category: LogCategory
message: str
metrics: Optional[Dict] = None # memory_mb, reviews_count, scroll_position, dom_nodes, etc.
network: Optional[Dict] = None # url, method, status, size_bytes, duration_ms
snapshot_id: Optional[str] = None
def to_dict(self) -> Dict:
"""Convert to JSON-serializable dictionary, excluding None values."""
result = {
'timestamp': self.timestamp,
'timestamp_ms': self.timestamp_ms,
'level': self.level,
'category': self.category,
'message': self.message,
}
if self.metrics is not None:
result['metrics'] = self.metrics
if self.network is not None:
result['network'] = self.network
if self.snapshot_id is not None:
result['snapshot_id'] = self.snapshot_id
return result
class StructuredLogger:
"""
Thread-safe structured logger with categorized log entries and automatic pruning.
Example usage:
logger = StructuredLogger()
logger.info('browser', 'Navigating to URL', metrics={'memory_mb': 245})
logger.warn('network', 'Rate limit detected', network={'status': 429, 'url': '...'})
logger.error('system', 'Chrome crashed', metrics={'memory_mb': 489, 'dom_nodes': 12000})
"""
def __init__(self, max_entries: int = 10000):
"""
Initialize the structured logger.
Args:
max_entries: Maximum number of log entries to retain (default 10000).
Oldest entries are pruned when limit is exceeded.
"""
self._entries: List[LogEntry] = []
self._lock = threading.Lock()
self._max_entries = max_entries
def _create_entry(
self,
level: LogLevel,
category: LogCategory,
message: str,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> LogEntry:
"""Create a new log entry with current timestamp."""
now = datetime.now(timezone.utc)
timestamp = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond // 1000:03d}Z'
timestamp_ms = int(now.timestamp() * 1000)
return LogEntry(
timestamp=timestamp,
timestamp_ms=timestamp_ms,
level=level,
category=category,
message=message,
metrics=metrics,
network=network,
snapshot_id=snapshot_id,
)
def _add_entry(self, entry: LogEntry) -> None:
"""Add an entry to the log with thread-safety and automatic pruning."""
with self._lock:
self._entries.append(entry)
# Prune oldest entries if limit exceeded
if len(self._entries) > self._max_entries:
# Remove oldest 10% to avoid frequent pruning
prune_count = max(1, self._max_entries // 10)
self._entries = self._entries[prune_count:]
def debug(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a DEBUG level message."""
entry = self._create_entry('DEBUG', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def info(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log an INFO level message."""
entry = self._create_entry('INFO', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def warn(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a WARN level message."""
entry = self._create_entry('WARN', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def error(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log an ERROR level message."""
entry = self._create_entry('ERROR', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def fatal(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a FATAL level message."""
entry = self._create_entry('FATAL', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def log(self, message: str, level: str = 'INFO') -> None:
"""
Backward-compatible log method for legacy code.
Maps to 'system' category by default.
Args:
message: The log message
level: Log level as string (DEBUG, INFO, WARN, ERROR, FATAL)
"""
level_upper = level.upper()
if level_upper not in ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'):
level_upper = 'INFO'
entry = self._create_entry(level_upper, 'system', message)
self._add_entry(entry)
def get_logs(self) -> List[Dict]:
"""
Get all log entries as JSON-serializable dictionaries.
Returns:
List of log entry dictionaries.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries]
def get_logs_by_category(self, category: LogCategory) -> List[Dict]:
"""
Get log entries filtered by category.
Args:
category: The category to filter by ('scraper', 'browser', 'network', 'system')
Returns:
List of log entry dictionaries matching the category.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.category == category]
def get_logs_by_level(self, level: LogLevel) -> List[Dict]:
"""
Get log entries filtered by level.
Args:
level: The level to filter by ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL')
Returns:
List of log entry dictionaries matching the level.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.level == level]
def get_logs_since(self, timestamp_ms: int) -> List[Dict]:
"""
Get log entries since a specific timestamp.
Args:
timestamp_ms: Unix timestamp in milliseconds
Returns:
List of log entry dictionaries with timestamp >= timestamp_ms.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.timestamp_ms >= timestamp_ms]
def clear(self) -> None:
"""Clear all log entries."""
with self._lock:
self._entries.clear()
def count(self) -> int:
"""Get the current number of log entries."""
with self._lock:
return len(self._entries)
def __len__(self) -> int:
"""Get the current number of log entries."""
return self.count()