""" Structured Logger Module Provides a thread-safe, structured logging system with JSON-serializable output. Designed to replace the LogCapture class with enhanced categorization and metrics support. """ from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from typing import Dict, List, Literal, Optional import threading import time LogLevel = Literal['DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'] LogCategory = Literal['scraper', 'browser', 'network', 'system'] @dataclass class LogEntry: """Structured log entry with timestamp, level, category, and optional metrics.""" timestamp: str # ISO 8601 with Z suffix timestamp_ms: int # Unix milliseconds level: LogLevel category: LogCategory message: str metrics: Optional[Dict] = None # memory_mb, reviews_count, scroll_position, dom_nodes, etc. network: Optional[Dict] = None # url, method, status, size_bytes, duration_ms snapshot_id: Optional[str] = None def to_dict(self) -> Dict: """Convert to JSON-serializable dictionary, excluding None values.""" result = { 'timestamp': self.timestamp, 'timestamp_ms': self.timestamp_ms, 'level': self.level, 'category': self.category, 'message': self.message, } if self.metrics is not None: result['metrics'] = self.metrics if self.network is not None: result['network'] = self.network if self.snapshot_id is not None: result['snapshot_id'] = self.snapshot_id return result class StructuredLogger: """ Thread-safe structured logger with categorized log entries and automatic pruning. Example usage: logger = StructuredLogger() logger.info('browser', 'Navigating to URL', metrics={'memory_mb': 245}) logger.warn('network', 'Rate limit detected', network={'status': 429, 'url': '...'}) logger.error('system', 'Chrome crashed', metrics={'memory_mb': 489, 'dom_nodes': 12000}) """ def __init__(self, max_entries: int = 10000): """ Initialize the structured logger. Args: max_entries: Maximum number of log entries to retain (default 10000). Oldest entries are pruned when limit is exceeded. """ self._entries: List[LogEntry] = [] self._lock = threading.Lock() self._max_entries = max_entries def _create_entry( self, level: LogLevel, category: LogCategory, message: str, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> LogEntry: """Create a new log entry with current timestamp.""" now = datetime.now(timezone.utc) timestamp = now.strftime('%Y-%m-%dT%H:%M:%S.') + f'{now.microsecond // 1000:03d}Z' timestamp_ms = int(now.timestamp() * 1000) return LogEntry( timestamp=timestamp, timestamp_ms=timestamp_ms, level=level, category=category, message=message, metrics=metrics, network=network, snapshot_id=snapshot_id, ) def _add_entry(self, entry: LogEntry) -> None: """Add an entry to the log with thread-safety and automatic pruning.""" with self._lock: self._entries.append(entry) # Prune oldest entries if limit exceeded if len(self._entries) > self._max_entries: # Remove oldest 10% to avoid frequent pruning prune_count = max(1, self._max_entries // 10) self._entries = self._entries[prune_count:] def debug( self, category: LogCategory, message: str, *, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> None: """Log a DEBUG level message.""" entry = self._create_entry('DEBUG', category, message, metrics, network, snapshot_id) self._add_entry(entry) def info( self, category: LogCategory, message: str, *, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> None: """Log an INFO level message.""" entry = self._create_entry('INFO', category, message, metrics, network, snapshot_id) self._add_entry(entry) def warn( self, category: LogCategory, message: str, *, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> None: """Log a WARN level message.""" entry = self._create_entry('WARN', category, message, metrics, network, snapshot_id) self._add_entry(entry) def error( self, category: LogCategory, message: str, *, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> None: """Log an ERROR level message.""" entry = self._create_entry('ERROR', category, message, metrics, network, snapshot_id) self._add_entry(entry) def fatal( self, category: LogCategory, message: str, *, metrics: Optional[Dict] = None, network: Optional[Dict] = None, snapshot_id: Optional[str] = None, ) -> None: """Log a FATAL level message.""" entry = self._create_entry('FATAL', category, message, metrics, network, snapshot_id) self._add_entry(entry) def log(self, message: str, level: str = 'INFO') -> None: """ Backward-compatible log method for legacy code. Maps to 'system' category by default. Args: message: The log message level: Log level as string (DEBUG, INFO, WARN, ERROR, FATAL) """ level_upper = level.upper() if level_upper not in ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL'): level_upper = 'INFO' entry = self._create_entry(level_upper, 'system', message) self._add_entry(entry) def get_logs(self) -> List[Dict]: """ Get all log entries as JSON-serializable dictionaries. Returns: List of log entry dictionaries. """ with self._lock: return [entry.to_dict() for entry in self._entries] def get_logs_by_category(self, category: LogCategory) -> List[Dict]: """ Get log entries filtered by category. Args: category: The category to filter by ('scraper', 'browser', 'network', 'system') Returns: List of log entry dictionaries matching the category. """ with self._lock: return [entry.to_dict() for entry in self._entries if entry.category == category] def get_logs_by_level(self, level: LogLevel) -> List[Dict]: """ Get log entries filtered by level. Args: level: The level to filter by ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL') Returns: List of log entry dictionaries matching the level. """ with self._lock: return [entry.to_dict() for entry in self._entries if entry.level == level] def get_logs_since(self, timestamp_ms: int) -> List[Dict]: """ Get log entries since a specific timestamp. Args: timestamp_ms: Unix timestamp in milliseconds Returns: List of log entry dictionaries with timestamp >= timestamp_ms. """ with self._lock: return [entry.to_dict() for entry in self._entries if entry.timestamp_ms >= timestamp_ms] def clear(self) -> None: """Clear all log entries.""" with self._lock: self._entries.clear() def count(self) -> int: """Get the current number of log entries.""" with self._lock: return len(self._entries) def __len__(self) -> int: """Get the current number of log entries.""" return self.count()