Files
Alejandro Gutiérrez 544e028c3f Phase 0: Project restructure to ReviewIQ platform architecture
New structure:
- scrapers/google_reviews/v1_0_0.py (was modules/scraper_clean.py)
- scrapers/base.py (BaseScraper interface)
- scrapers/registry.py (ScraperRegistry for version routing)
- core/database.py, models.py, config.py, enums.py
- utils/logger.py, crash_analyzer.py, health_checks.py, helpers.py, date_converter.py
- workers/chrome_pool.py
- services/webhook_service.py
- api/ routes structure (empty, ready for Phase 2)
- tests/ structure mirroring source

All imports updated in:
- api_server_production.py (7 import paths updated)
- utils/health_checks.py (scraper import path)

Legacy modules moved to modules/_legacy/:
- data_storage.py, image_handler.py, s3_handler.py (unused)

Syntax verified, frontend build passing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:22:08 +00:00

251 lines
8.1 KiB
Python

"""
Structured Logger Module
Provides a thread-safe, structured logging system with JSON-serializable output.
Designed to replace the LogCapture class with enhanced categorization and metrics support.
"""
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, List, Literal, Optional
import threading
import time
LogLevel = Literal['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL']
LogCategory = Literal['scraper', 'browser', 'network', 'system']
@dataclass
class LogEntry:
"""Structured log entry with timestamp, level, category, and optional metrics."""
timestamp: str # ISO 8601 with Z suffix
timestamp_ms: int # Unix milliseconds
level: LogLevel
category: LogCategory
message: str
metrics: Optional[Dict] = None # memory_mb, reviews_count, scroll_position, dom_nodes, etc.
network: Optional[Dict] = None # url, method, status, size_bytes, duration_ms
snapshot_id: Optional[str] = None
def to_dict(self) -> Dict:
"""Convert to JSON-serializable dictionary, excluding None values."""
result = {
'timestamp': self.timestamp,
'timestamp_ms': self.timestamp_ms,
'level': self.level,
'category': self.category,
'message': self.message,
}
if self.metrics is not None:
result['metrics'] = self.metrics
if self.network is not None:
result['network'] = self.network
if self.snapshot_id is not None:
result['snapshot_id'] = self.snapshot_id
return result
class StructuredLogger:
"""
Thread-safe structured logger with categorized log entries and automatic pruning.
Example usage:
logger = StructuredLogger()
logger.info('browser', 'Navigating to URL', metrics={'memory_mb': 245})
logger.warn('network', 'Rate limit detected', network={'status': 429, 'url': '...'})
logger.error('system', 'Chrome crashed', metrics={'memory_mb': 489, 'dom_nodes': 12000})
"""
def __init__(self, max_entries: int = 10000):
"""
Initialize the structured logger.
Args:
max_entries: Maximum number of log entries to retain (default 10000).
Oldest entries are pruned when limit is exceeded.
"""
self._entries: List[LogEntry] = []
self._lock = threading.Lock()
self._max_entries = max_entries
def _create_entry(
self,
level: LogLevel,
category: LogCategory,
message: str,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> LogEntry:
"""Create a new log entry with current timestamp."""
now = datetime.now(timezone.utc)
timestamp = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond // 1000:03d}Z'
timestamp_ms = int(now.timestamp() * 1000)
return LogEntry(
timestamp=timestamp,
timestamp_ms=timestamp_ms,
level=level,
category=category,
message=message,
metrics=metrics,
network=network,
snapshot_id=snapshot_id,
)
def _add_entry(self, entry: LogEntry) -> None:
"""Add an entry to the log with thread-safety and automatic pruning."""
with self._lock:
self._entries.append(entry)
# Prune oldest entries if limit exceeded
if len(self._entries) > self._max_entries:
# Remove oldest 10% to avoid frequent pruning
prune_count = max(1, self._max_entries // 10)
self._entries = self._entries[prune_count:]
def debug(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a DEBUG level message."""
entry = self._create_entry('DEBUG', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def info(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log an INFO level message."""
entry = self._create_entry('INFO', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def warn(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a WARN level message."""
entry = self._create_entry('WARN', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def error(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log an ERROR level message."""
entry = self._create_entry('ERROR', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def fatal(
self,
category: LogCategory,
message: str,
*,
metrics: Optional[Dict] = None,
network: Optional[Dict] = None,
snapshot_id: Optional[str] = None,
) -> None:
"""Log a FATAL level message."""
entry = self._create_entry('FATAL', category, message, metrics, network, snapshot_id)
self._add_entry(entry)
def log(self, message: str, level: str = 'INFO') -> None:
"""
Backward-compatible log method for legacy code.
Maps to 'system' category by default.
Args:
message: The log message
level: Log level as string (DEBUG, INFO, WARN, ERROR, FATAL)
"""
level_upper = level.upper()
if level_upper not in ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'):
level_upper = 'INFO'
entry = self._create_entry(level_upper, 'system', message)
self._add_entry(entry)
def get_logs(self) -> List[Dict]:
"""
Get all log entries as JSON-serializable dictionaries.
Returns:
List of log entry dictionaries.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries]
def get_logs_by_category(self, category: LogCategory) -> List[Dict]:
"""
Get log entries filtered by category.
Args:
category: The category to filter by ('scraper', 'browser', 'network', 'system')
Returns:
List of log entry dictionaries matching the category.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.category == category]
def get_logs_by_level(self, level: LogLevel) -> List[Dict]:
"""
Get log entries filtered by level.
Args:
level: The level to filter by ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL')
Returns:
List of log entry dictionaries matching the level.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.level == level]
def get_logs_since(self, timestamp_ms: int) -> List[Dict]:
"""
Get log entries since a specific timestamp.
Args:
timestamp_ms: Unix timestamp in milliseconds
Returns:
List of log entry dictionaries with timestamp >= timestamp_ms.
"""
with self._lock:
return [entry.to_dict() for entry in self._entries if entry.timestamp_ms >= timestamp_ms]
def clear(self) -> None:
"""Clear all log entries."""
with self._lock:
self._entries.clear()
def count(self) -> int:
"""Get the current number of log entries."""
with self._lock:
return len(self._entries)
def __len__(self) -> int:
"""Get the current number of log entries."""
return self.count()