diff --git a/api_server_production.py b/api_server_production.py index 62b96e8..03d3ced 100644 --- a/api_server_production.py +++ b/api_server_production.py @@ -9,6 +9,7 @@ import asyncio import json import logging import os +import time from contextlib import asynccontextmanager from typing import Optional, List, Dict, Any from uuid import UUID @@ -22,6 +23,7 @@ from modules.database import DatabaseManager, JobStatus from modules.webhooks import WebhookDispatcher, WebhookManager from modules.health_checks import HealthCheckSystem from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper +from modules.structured_logger import StructuredLogger, LogEntry from modules.chrome_pool import ( start_worker_pools, stop_worker_pools, @@ -361,6 +363,51 @@ async def get_job_logs(job_id: UUID): # ==================== SSE Streaming Endpoints ==================== +def format_sse_event(event_type: str, data: dict, use_structured_format: bool = True) -> str: + """ + Format an SSE event message. + + Args: + event_type: The SSE event type (e.g., 'log', 'metrics', 'status') + data: The data payload + use_structured_format: If True, wrap in {"type": ..., "data": ...} structure + If False, use legacy format for backward compatibility + + Returns: + Formatted SSE message string + """ + if use_structured_format: + payload = {"type": event_type, "data": data} + else: + payload = data + return f"event: {event_type}\ndata: {json.dumps(payload)}\n\n" + + +def format_structured_log_event(log_entry: dict) -> str: + """ + Format a structured log entry as an SSE event. + + The log_entry should already be a dict from StructuredLogger.get_logs(). + + Returns: + Formatted SSE message with type "log" + """ + return format_sse_event("log", log_entry, use_structured_format=True) + + +def format_metrics_event(metrics: dict) -> str: + """ + Format a metrics event for SSE streaming. + + Args: + metrics: Dict containing reviews_extracted, scroll_count, memory_mb, extraction_rate + + Returns: + Formatted SSE message with type "metrics" + """ + return format_sse_event("metrics", metrics, use_structured_format=True) + + async def broadcast_job_update(job_id: str, event_type: str, data: dict): """Broadcast an update to all subscribers of a job stream and the all-jobs stream.""" message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" @@ -381,22 +428,59 @@ async def broadcast_job_update(job_id: str, event_type: str, data: dict): pass +async def broadcast_structured_log(job_id: str, log_entry: dict): + """Broadcast a structured log entry to job subscribers.""" + message = format_structured_log_event(log_entry) + + if job_id in job_update_queues: + for queue in job_update_queues[job_id]: + try: + await queue.put(message) + except: + pass + + +async def broadcast_metrics(job_id: str, metrics: dict): + """Broadcast metrics update to job subscribers.""" + message = format_metrics_event(metrics) + + if job_id in job_update_queues: + for queue in job_update_queues[job_id]: + try: + await queue.put(message) + except: + pass + + @app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)") -async def stream_job_updates(job_id: UUID): +async def stream_job_updates( + job_id: UUID, + format: str = Query("structured", description="Event format: 'structured' (new) or 'legacy' (backward compatible)") +): """ Server-Sent Events stream for real-time job updates. - Streams: + Event types (structured format): + - init: Initial job state with all logs + - log: Individual structured log entry {"type": "log", "data": {"timestamp": "...", "level": "INFO", ...}} + - metrics: Periodic metrics {"type": "metrics", "data": {"reviews_extracted": 150, "scroll_count": 45, ...}} - status: Job status changes - - progress: Review count and progress updates - - logs: New log entries - complete: Job finished (completed/failed) + Query parameters: + - format: 'structured' (default) for new format, 'legacy' for backward compatibility + Connect with EventSource in the browser: ```javascript const es = new EventSource('/jobs/{job_id}/stream'); - es.onmessage = (e) => console.log(JSON.parse(e.data)); - es.addEventListener('logs', (e) => console.log('Logs:', JSON.parse(e.data))); + es.addEventListener('log', (e) => { + const event = JSON.parse(e.data); + console.log('Log:', event.data); // Structured log entry + }); + es.addEventListener('metrics', (e) => { + const event = JSON.parse(e.data); + console.log('Metrics:', event.data); // {reviews_extracted, scroll_count, memory_mb, extraction_rate} + }); ``` """ if not db: @@ -408,6 +492,7 @@ async def stream_job_updates(job_id: UUID): raise HTTPException(status_code=404, detail="Job not found") job_id_str = str(job_id) + use_structured = format.lower() != "legacy" # Create queue for this client queue: asyncio.Queue = asyncio.Queue() @@ -421,6 +506,7 @@ async def stream_job_updates(job_id: UUID): try: # Send initial state job_data = await db.get_job(job_id) + scrape_logs = [] if job_data: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): @@ -448,6 +534,8 @@ async def stream_job_updates(job_id: UUID): # Keep connection alive and send updates last_log_count = len(scrape_logs) if scrape_logs else 0 last_reviews_count = job_data.get('reviews_count') if job_data else 0 + last_metrics_time = time.time() + metrics_interval = 5.0 # Emit metrics every 5 seconds while True: try: @@ -459,31 +547,11 @@ async def stream_job_updates(job_id: UUID): # Send keepalive comment yield ": keepalive\n\n" - # Also poll database for updates (backup in case broadcast missed) - job_data = await db.get_job(job_id) - if job_data: - # Check for status change - if job_data['status'] in ['completed', 'failed', 'cancelled']: - scrape_logs = job_data.get('scrape_logs') - if isinstance(scrape_logs, str): - try: - scrape_logs = json.loads(scrape_logs) - except: - scrape_logs = [] - - final = { - "job_id": job_id_str, - "status": job_data['status'], - "reviews_count": job_data.get('reviews_count'), - "total_reviews": job_data.get('total_reviews'), - "scrape_time": job_data.get('scrape_time'), - "error_message": job_data.get('error_message'), - "logs": scrape_logs or [] - } - yield f"event: complete\ndata: {json.dumps(final)}\n\n" - return - - # Check for new logs or progress + # Also poll database for updates (backup in case broadcast missed) + job_data = await db.get_job(job_id) + if job_data: + # Check for status change + if job_data['status'] in ['completed', 'failed', 'cancelled']: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: @@ -491,10 +559,72 @@ async def stream_job_updates(job_id: UUID): except: scrape_logs = [] - current_log_count = len(scrape_logs) if scrape_logs else 0 - current_reviews = job_data.get('reviews_count') or 0 + final = { + "job_id": job_id_str, + "status": job_data['status'], + "reviews_count": job_data.get('reviews_count'), + "total_reviews": job_data.get('total_reviews'), + "scrape_time": job_data.get('scrape_time'), + "error_message": job_data.get('error_message'), + "logs": scrape_logs or [] + } + yield f"event: complete\ndata: {json.dumps(final)}\n\n" + return - if current_log_count > last_log_count or current_reviews != last_reviews_count: + # Check for new logs or progress + scrape_logs = job_data.get('scrape_logs') + if isinstance(scrape_logs, str): + try: + scrape_logs = json.loads(scrape_logs) + except: + scrape_logs = [] + + current_log_count = len(scrape_logs) if scrape_logs else 0 + current_reviews = job_data.get('reviews_count') or 0 + current_time = time.time() + + # Emit individual structured log events for new logs + if use_structured and current_log_count > last_log_count: + new_logs = scrape_logs[last_log_count:] if scrape_logs else [] + for log_entry in new_logs: + yield format_structured_log_event(log_entry) + + # Emit metrics event every 5 seconds during job execution + if use_structured and job_data['status'] == 'running': + if current_time - last_metrics_time >= metrics_interval: + # Calculate extraction rate (reviews per second) + elapsed = job_data.get('scrape_time') or 0 + extraction_rate = (current_reviews / elapsed) if elapsed > 0 else 0 + + # Count scroll events from logs + scroll_count = 0 + if scrape_logs: + for entry in scrape_logs: + msg = entry.get('message', '') if isinstance(entry, dict) else str(entry) + if 'scroll' in msg.lower(): + scroll_count += 1 + + # Get memory from latest log entry with metrics + memory_mb = 0 + if scrape_logs: + for entry in reversed(scrape_logs): + if isinstance(entry, dict) and entry.get('metrics'): + memory_mb = entry['metrics'].get('memory_mb', 0) + break + + metrics_data = { + "reviews_extracted": current_reviews, + "scroll_count": scroll_count, + "memory_mb": memory_mb, + "extraction_rate": round(extraction_rate, 2) + } + yield format_metrics_event(metrics_data) + last_metrics_time = current_time + + # Send legacy update event if reviews or logs changed + if current_log_count > last_log_count or current_reviews != last_reviews_count: + if not use_structured: + # Legacy format: send all logs in update event update = { "job_id": job_id_str, "status": job_data['status'], @@ -503,8 +633,8 @@ async def stream_job_updates(job_id: UUID): "logs": scrape_logs or [] } yield f"event: update\ndata: {json.dumps(update)}\n\n" - last_log_count = current_log_count - last_reviews_count = current_reviews + last_log_count = current_log_count + last_reviews_count = current_reviews except Exception as e: log.error(f"Error in SSE stream for job {job_id}: {e}") @@ -930,6 +1060,11 @@ async def run_scraping_job(job_id: UUID): total_reviews_seen = [None] # Accumulate all reviews for incremental saves (flush_callback receives batches) all_reviews_collected = [] + # Track last broadcasted log count for streaming new logs + last_broadcasted_log_count = [0] + # Track last metrics broadcast time + last_metrics_broadcast_time = [time.time()] + metrics_broadcast_interval = 5.0 # Emit metrics every 5 seconds # Progress callback to update job status with current/total counts AND logs def progress_callback(current_count: int, total_count: int): @@ -948,7 +1083,45 @@ async def run_scraping_job(job_id: UUID): scrape_logs=current_logs ) - # Broadcast progress via SSE + # Broadcast individual structured log events for new logs + current_log_count = len(current_logs) + if current_log_count > last_broadcasted_log_count[0]: + new_logs = current_logs[last_broadcasted_log_count[0]:] + for log_entry in new_logs: + await broadcast_structured_log(job_id_str, log_entry) + last_broadcasted_log_count[0] = current_log_count + + # Broadcast metrics event every 5 seconds + current_time = time.time() + if current_time - last_metrics_broadcast_time[0] >= metrics_broadcast_interval: + # Calculate extraction rate + elapsed = current_time - last_metrics_broadcast_time[0] + extraction_rate = (current_count / elapsed) if elapsed > 0 else 0 + + # Count scroll events from logs + scroll_count = 0 + for entry in current_logs: + msg = entry.get('message', '') if isinstance(entry, dict) else str(entry) + if 'scroll' in msg.lower(): + scroll_count += 1 + + # Get memory from latest log entry with metrics + memory_mb = 0 + for entry in reversed(current_logs): + if isinstance(entry, dict) and entry.get('metrics'): + memory_mb = entry['metrics'].get('memory_mb', 0) + break + + metrics_data = { + "reviews_extracted": current_count, + "scroll_count": scroll_count, + "memory_mb": memory_mb, + "extraction_rate": round(extraction_rate, 2) + } + await broadcast_metrics(job_id_str, metrics_data) + last_metrics_broadcast_time[0] = current_time + + # Broadcast progress via SSE (legacy format for backward compatibility) await broadcast_job_update(job_id_str, "job_progress", { "job_id": job_id_str, "status": "running", @@ -989,6 +1162,11 @@ async def run_scraping_job(job_id: UUID): ) if result['success']: + # Save session fingerprint if captured + if result.get('session_fingerprint'): + await db.update_session_fingerprint(job_id, result['session_fingerprint']) + log.info(f"Saved session fingerprint for job {job_id}") + # Save results to database (including scraper logs and review topics) await db.save_job_result( job_id=job_id, @@ -1030,6 +1208,11 @@ async def run_scraping_job(job_id: UUID): ) else: + # Save session fingerprint even on failure (useful for debugging bot detection) + if result.get('session_fingerprint'): + await db.update_session_fingerprint(job_id, result['session_fingerprint']) + log.info(f"Saved session fingerprint for failed job {job_id}") + # Job failed - check if we have partial reviews saved current_job = await db.get_job(job_id) partial_count = current_job.get('reviews_count', 0) if current_job else 0 diff --git a/modules/crash_analyzer.py b/modules/crash_analyzer.py new file mode 100644 index 0000000..ded5a03 --- /dev/null +++ b/modules/crash_analyzer.py @@ -0,0 +1,666 @@ +""" +Crash Pattern Analyzer Module + +Provides deep analysis of scraper crashes with pattern detection, +confidence scoring, and auto-fix parameter suggestions. + +Builds on top of the basic classify_crash function in scraper_clean.py +with more sophisticated pattern matching and multi-signal analysis. +""" + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +import re + + +@dataclass +class CrashAnalysis: + """ + Result of crash pattern analysis. + + Attributes: + pattern: The identified crash pattern type (e.g., "memory_exhaustion", "dom_bloat") + confidence: Confidence score from 0.0 to 1.0 based on multiple signals + description: Human-readable description of the crash cause + suggested_fix: Recommended action to prevent this crash + auto_fix_params: Parameters that can be applied automatically to prevent recurrence + """ + pattern: str # e.g., "memory_exhaustion", "dom_bloat", "rate_limited" + confidence: float # 0.0 to 1.0 + description: str + suggested_fix: str + auto_fix_params: Optional[Dict[str, Any]] + + +# Thresholds for pattern detection +MEMORY_EXHAUSTION_THRESHOLD_MB = 1500 # 1.5GB in MB +MEMORY_GROWTH_RATE_THRESHOLD_MB_S = 10 # 10MB/s +DOM_BLOAT_THRESHOLD = 50000 # 50000 nodes +SCROLL_TIMEOUT_MIN_SCROLLS = 10 # Minimum scrolls before considering scroll_timeout + + +# Auto-fix parameters for each crash pattern +AUTO_FIX_PARAMS = { + "memory_exhaustion": { + "max_reviews": 500, + "restart_browser_after": 200 + }, + "dom_bloat": { + "scroll_cleanup": True, + "lazy_load": True + }, + "rate_limited": { + "delay_multiplier": 2.0, + "use_different_proxy": True + }, + "consent_loop": { + "skip_consent_retries": True + }, + "scroll_timeout": { + "reduce_target": True, + "target_reviews": "current - 10%" + }, + "element_stale": { + "retry_with_fresh_elements": True + } +} + + +def _calculate_memory_growth_rate(metrics_history: List[Dict]) -> Optional[float]: + """ + Calculate memory growth rate in MB/s from metrics history. + + Args: + metrics_history: List of metric samples with timestamp_ms and memory_mb + + Returns: + Growth rate in MB/s, or None if cannot be calculated + """ + if not metrics_history or len(metrics_history) < 2: + return None + + # Filter samples that have valid memory readings + valid_samples = [ + m for m in metrics_history + if m.get('memory_mb') is not None and m.get('timestamp_ms') is not None + ] + + if len(valid_samples) < 2: + return None + + # Use first and last valid samples + first = valid_samples[0] + last = valid_samples[-1] + + time_delta_s = (last['timestamp_ms'] - first['timestamp_ms']) / 1000 + if time_delta_s <= 0: + return None + + memory_delta_mb = last['memory_mb'] - first['memory_mb'] + return memory_delta_mb / time_delta_s + + +def _get_max_memory(metrics_history: List[Dict]) -> Optional[int]: + """Get maximum memory usage from metrics history.""" + if not metrics_history: + return None + + memories = [m.get('memory_mb') for m in metrics_history if m.get('memory_mb') is not None] + return max(memories) if memories else None + + +def _get_max_dom_nodes(metrics_history: List[Dict]) -> Optional[int]: + """Get maximum DOM node count from metrics history.""" + if not metrics_history: + return None + + nodes = [m.get('dom_nodes') for m in metrics_history if m.get('dom_nodes') is not None] + return max(nodes) if nodes else None + + +def _check_memory_exhaustion( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict] +) -> tuple[float, str]: + """ + Check for memory exhaustion pattern. + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check for high memory usage + max_memory = _get_max_memory(metrics_history) + if max_memory is not None: + if max_memory >= MEMORY_EXHAUSTION_THRESHOLD_MB: + confidence += 0.5 + signals.append(f"Memory reached {max_memory}MB (threshold: {MEMORY_EXHAUSTION_THRESHOLD_MB}MB)") + elif max_memory >= MEMORY_EXHAUSTION_THRESHOLD_MB * 0.8: + confidence += 0.3 + signals.append(f"Memory at {max_memory}MB approaching threshold") + + # Check for rapid memory growth + growth_rate = _calculate_memory_growth_rate(metrics_history) + if growth_rate is not None and growth_rate >= MEMORY_GROWTH_RATE_THRESHOLD_MB_S: + confidence += 0.3 + signals.append(f"Memory growing at {growth_rate:.1f}MB/s (threshold: {MEMORY_GROWTH_RATE_THRESHOLD_MB_S}MB/s)") + + # Check error message for memory-related keywords + error_lower = error_message.lower() + memory_keywords = ['memory', 'heap', 'out of memory', 'oom', 'aw, snap', 'status_access_violation'] + for keyword in memory_keywords: + if keyword in error_lower: + confidence += 0.2 + signals.append(f"Error contains '{keyword}'") + break + + # Check logs for memory warnings + for log_entry in logs: + msg = log_entry.get('message', '').lower() + if 'memory' in msg and ('high' in msg or 'warning' in msg or 'exceeded' in msg): + confidence += 0.1 + signals.append("Memory warning found in logs") + break + + description = "; ".join(signals) if signals else "No memory exhaustion signals detected" + return min(confidence, 1.0), description + + +def _check_dom_bloat( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict] +) -> tuple[float, str]: + """ + Check for DOM bloat pattern. + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check for high DOM node count + max_nodes = _get_max_dom_nodes(metrics_history) + if max_nodes is not None: + if max_nodes >= DOM_BLOAT_THRESHOLD: + confidence += 0.6 + signals.append(f"DOM nodes reached {max_nodes} (threshold: {DOM_BLOAT_THRESHOLD})") + elif max_nodes >= DOM_BLOAT_THRESHOLD * 0.8: + confidence += 0.3 + signals.append(f"DOM nodes at {max_nodes} approaching threshold") + + # Check error message for DOM-related keywords + error_lower = error_message.lower() + dom_keywords = ['dom', 'element', 'node', 'render', 'paint', 'layout'] + for keyword in dom_keywords: + if keyword in error_lower: + confidence += 0.2 + signals.append(f"Error contains '{keyword}'") + break + + # Check if memory is high too (DOM bloat often causes memory issues) + max_memory = _get_max_memory(metrics_history) + if max_memory is not None and max_memory >= 800: # 800MB + confidence += 0.1 + signals.append(f"Memory also elevated ({max_memory}MB)") + + # Check logs for DOM-related messages + for log_entry in logs: + msg = log_entry.get('message', '').lower() + if 'dom' in msg and ('large' in msg or 'cleanup' in msg or 'remove' in msg): + confidence += 0.1 + signals.append("DOM warning found in logs") + break + + description = "; ".join(signals) if signals else "No DOM bloat signals detected" + return min(confidence, 1.0), description + + +def _check_rate_limited( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict] +) -> tuple[float, str]: + """ + Check for rate limiting pattern. + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check error message for rate limit indicators + error_lower = error_message.lower() + if '429' in error_message: + confidence += 0.6 + signals.append("HTTP 429 status code in error") + + rate_keywords = ['rate limit', 'too many requests', 'unusual traffic', 'captcha', 'blocked'] + for keyword in rate_keywords: + if keyword in error_lower: + confidence += 0.4 + signals.append(f"Error contains '{keyword}'") + break + + # Check logs for rate limiting signals + rate_log_count = 0 + for log_entry in logs: + msg = log_entry.get('message', '').lower() + network = log_entry.get('network', {}) + status = network.get('status') + + if status == 429: + rate_log_count += 1 + confidence += 0.2 + + if 'unusual traffic' in msg or 'rate' in msg or 'blocked' in msg: + rate_log_count += 1 + confidence += 0.1 + + if rate_log_count > 0: + signals.append(f"Found {rate_log_count} rate-limiting indicators in logs") + + description = "; ".join(signals) if signals else "No rate limiting signals detected" + return min(confidence, 1.0), description + + +def _check_consent_loop( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict] +) -> tuple[float, str]: + """ + Check for consent popup loop pattern. + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check error message for consent keywords + error_lower = error_message.lower() + if 'consent' in error_lower: + confidence += 0.3 + signals.append("Error mentions consent") + + # Count consent-related log entries + consent_count = 0 + consent_messages = [] + for log_entry in logs: + msg = log_entry.get('message', '').lower() + if 'consent' in msg: + consent_count += 1 + consent_messages.append(msg[:50]) + + # Multiple consent messages indicate a loop + if consent_count >= 3: + confidence += 0.5 + signals.append(f"Consent popup appeared {consent_count} times in logs") + elif consent_count >= 2: + confidence += 0.3 + signals.append(f"Consent popup appeared {consent_count} times") + elif consent_count == 1: + confidence += 0.1 + signals.append("Single consent popup detected") + + # Check for timeout after consent handling + if 'timeout' in error_lower and consent_count > 0: + confidence += 0.2 + signals.append("Timeout occurred with consent activity") + + description = "; ".join(signals) if signals else "No consent loop signals detected" + return min(confidence, 1.0), description + + +def _check_scroll_timeout( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict], + state: Optional[Dict] = None +) -> tuple[float, str]: + """ + Check for scroll timeout pattern (no new reviews after many scrolls). + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check state for scroll count + scroll_count = 0 + reviews_count = 0 + if state: + scroll_count = state.get('scroll_count', 0) + reviews_count = state.get('reviews_extracted', 0) + + # Check error for timeout indicators + error_lower = error_message.lower() + if 'timeout' in error_lower: + confidence += 0.2 + signals.append("Timeout in error message") + + # Count recovery attempts in logs (indicate stuck scrolling) + recovery_count = 0 + no_new_count = 0 + for log_entry in logs: + msg = log_entry.get('message', '').lower() + if 'recovery attempt' in msg: + recovery_count += 1 + if 'no new' in msg or 'stuck' in msg: + no_new_count += 1 + + if recovery_count >= SCROLL_TIMEOUT_MIN_SCROLLS: + confidence += 0.5 + signals.append(f"Made {recovery_count} recovery attempts") + elif recovery_count >= 5: + confidence += 0.3 + signals.append(f"Made {recovery_count} recovery attempts") + + if no_new_count > 0: + confidence += 0.2 + signals.append(f"Found {no_new_count} 'no new reviews' log entries") + + # Check if reviews stopped growing + if metrics_history and len(metrics_history) >= 5: + # Check if reviews count plateaued + recent_counts = [m.get('reviews_count', 0) for m in metrics_history[-5:] if m.get('reviews_count')] + if recent_counts and len(set(recent_counts)) == 1: + confidence += 0.2 + signals.append(f"Review count stuck at {recent_counts[0]}") + + description = "; ".join(signals) if signals else "No scroll timeout signals detected" + return min(confidence, 1.0), description + + +def _check_element_stale( + error_message: str, + metrics_history: List[Dict], + logs: List[Dict] +) -> tuple[float, str]: + """ + Check for stale element reference pattern. + + Returns: + Tuple of (confidence, description) + """ + confidence = 0.0 + signals = [] + + # Check error message for stale element indicators + error_lower = error_message.lower() + stale_keywords = [ + 'stale element', 'staleelement', 'stale_element', + 'element is not attached', 'element reference', + 'no such element', 'element not found', + 'element is no longer valid' + ] + + for keyword in stale_keywords: + if keyword in error_lower: + confidence += 0.6 + signals.append(f"Error contains '{keyword}'") + break + + # Check logs for stale element patterns + stale_log_count = 0 + for log_entry in logs: + msg = log_entry.get('message', '').lower() + for keyword in stale_keywords: + if keyword in msg: + stale_log_count += 1 + break + + if stale_log_count > 0: + confidence += 0.2 + signals.append(f"Found {stale_log_count} stale element references in logs") + + # Check if DOM was changing rapidly (indicates dynamic page) + if metrics_history and len(metrics_history) >= 3: + dom_counts = [m.get('dom_nodes') for m in metrics_history if m.get('dom_nodes')] + if len(dom_counts) >= 3: + # Calculate variance + avg = sum(dom_counts) / len(dom_counts) + variance = sum((x - avg) ** 2 for x in dom_counts) / len(dom_counts) + std_dev = variance ** 0.5 + # High variance indicates rapidly changing DOM + if std_dev > 1000: + confidence += 0.2 + signals.append(f"High DOM variability (std dev: {std_dev:.0f})") + + description = "; ".join(signals) if signals else "No stale element signals detected" + return min(confidence, 1.0), description + + +def analyze_crash(crash_report: Dict) -> CrashAnalysis: + """ + Analyze a crash report to determine the most likely crash pattern. + + Examines error_message, metrics_history, and logs_before_crash to + calculate confidence scores for each crash pattern type. + + Args: + crash_report: Dictionary containing: + - error_message: str - The exception message + - metrics_history: List[Dict] - Sampled metrics with timestamp_ms, memory_mb, dom_nodes + - logs_before_crash: List[Dict] - Recent log entries before the crash + - state: Optional[Dict] - Scraper state (reviews_extracted, scroll_count, etc.) + - crash_type: Optional[str] - Basic crash classification from classify_crash() + + Returns: + CrashAnalysis with the highest-confidence pattern match + """ + # Extract data from crash report + error_message = crash_report.get('error_message', '') + metrics_history = crash_report.get('metrics_history', []) + logs = crash_report.get('logs_before_crash', []) + state = crash_report.get('state', {}) + basic_type = crash_report.get('crash_type', 'unknown') + + # Run all pattern checks + pattern_results = {} + + # Memory exhaustion + conf, desc = _check_memory_exhaustion(error_message, metrics_history, logs) + pattern_results['memory_exhaustion'] = (conf, desc) + + # DOM bloat + conf, desc = _check_dom_bloat(error_message, metrics_history, logs) + pattern_results['dom_bloat'] = (conf, desc) + + # Rate limited + conf, desc = _check_rate_limited(error_message, metrics_history, logs) + pattern_results['rate_limited'] = (conf, desc) + + # Consent loop + conf, desc = _check_consent_loop(error_message, metrics_history, logs) + pattern_results['consent_loop'] = (conf, desc) + + # Scroll timeout + conf, desc = _check_scroll_timeout(error_message, metrics_history, logs, state) + pattern_results['scroll_timeout'] = (conf, desc) + + # Element stale + conf, desc = _check_element_stale(error_message, metrics_history, logs) + pattern_results['element_stale'] = (conf, desc) + + # Find the pattern with highest confidence + best_pattern = max(pattern_results.items(), key=lambda x: x[1][0]) + pattern_name = best_pattern[0] + confidence = best_pattern[1][0] + description = best_pattern[1][1] + + # If confidence is too low, fall back to basic classification + if confidence < 0.2: + # Map basic crash types to our patterns + basic_to_pattern = { + 'memory_exhaustion': 'memory_exhaustion', + 'tab_crash': 'memory_exhaustion', # Tab crashes often from memory + 'timeout': 'scroll_timeout', + 'element_not_found': 'element_stale', + 'rate_limited': 'rate_limited', + 'network_failure': 'rate_limited', # Could be blocking + } + + if basic_type in basic_to_pattern: + pattern_name = basic_to_pattern[basic_type] + confidence = 0.3 # Low confidence fallback + description = f"Inferred from basic crash type '{basic_type}'" + else: + pattern_name = 'unknown' + confidence = 0.0 + description = f"Unable to determine crash pattern (basic type: {basic_type})" + + # Generate suggested fix based on pattern + suggested_fixes = { + 'memory_exhaustion': ( + "Reduce batch size and restart browser more frequently. " + "Consider limiting max_reviews to 500 and restarting browser after every 200 reviews." + ), + 'dom_bloat': ( + "Enable DOM cleanup during scrolling. " + "Hide processed review cards and remove separator elements to keep DOM light." + ), + 'rate_limited': ( + "Increase delays between requests and consider rotating proxies. " + "Double the delay multiplier and switch to a different proxy if available." + ), + 'consent_loop': ( + "Skip consent handling after initial attempt to avoid infinite loops. " + "The consent popup may be appearing due to cookie clearing or navigation issues." + ), + 'scroll_timeout': ( + "The page may have stopped loading new reviews. " + "Try reducing the target review count by 10% and accepting partial results." + ), + 'element_stale': ( + "Page elements are being removed/replaced during scraping. " + "Retry operations with freshly-located elements and add defensive waits." + ), + 'unknown': ( + "Unable to determine specific crash cause. " + "Review logs and consider restarting with fresh browser session." + ) + } + + suggested_fix = suggested_fixes.get(pattern_name, suggested_fixes['unknown']) + auto_fix_params = AUTO_FIX_PARAMS.get(pattern_name) + + return CrashAnalysis( + pattern=pattern_name, + confidence=confidence, + description=description, + suggested_fix=suggested_fix, + auto_fix_params=auto_fix_params + ) + + +def get_auto_fix_params(pattern: str) -> Optional[Dict[str, Any]]: + """ + Get auto-fix parameters for a specific crash pattern. + + Args: + pattern: The crash pattern name + + Returns: + Dictionary of auto-fix parameters, or None if pattern not recognized + """ + return AUTO_FIX_PARAMS.get(pattern) + + +def apply_auto_fix(pattern: str, current_params: Dict[str, Any]) -> Dict[str, Any]: + """ + Apply auto-fix parameters to current scraper parameters. + + Args: + pattern: The crash pattern name + current_params: Current scraper parameters to modify + + Returns: + Updated parameters dictionary with fixes applied + """ + fix_params = AUTO_FIX_PARAMS.get(pattern, {}) + updated = current_params.copy() + + for key, value in fix_params.items(): + if key == 'target_reviews' and value == 'current - 10%': + # Special case: reduce target by 10% + current_target = updated.get('max_reviews', 1000) + updated['max_reviews'] = int(current_target * 0.9) + elif key == 'delay_multiplier': + # Multiply existing delay + current_delay = updated.get('scroll_delay', 1.0) + updated['scroll_delay'] = current_delay * value + else: + updated[key] = value + + return updated + + +def summarize_crash_patterns(crash_reports: List[Dict]) -> Dict[str, Any]: + """ + Analyze multiple crash reports to identify recurring patterns. + + Args: + crash_reports: List of crash report dictionaries + + Returns: + Summary dictionary with pattern frequencies and recommendations + """ + if not crash_reports: + return { + 'total_crashes': 0, + 'patterns': {}, + 'most_common': None, + 'recommendations': [] + } + + pattern_counts: Dict[str, int] = {} + pattern_confidences: Dict[str, List[float]] = {} + + for report in crash_reports: + analysis = analyze_crash(report) + pattern = analysis.pattern + + pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1 + if pattern not in pattern_confidences: + pattern_confidences[pattern] = [] + pattern_confidences[pattern].append(analysis.confidence) + + # Calculate average confidence per pattern + patterns_summary = {} + for pattern, count in pattern_counts.items(): + avg_confidence = sum(pattern_confidences[pattern]) / len(pattern_confidences[pattern]) + patterns_summary[pattern] = { + 'count': count, + 'percentage': count / len(crash_reports) * 100, + 'avg_confidence': avg_confidence + } + + # Find most common pattern + most_common = max(pattern_counts.items(), key=lambda x: x[1])[0] if pattern_counts else None + + # Generate recommendations + recommendations = [] + for pattern, stats in sorted(patterns_summary.items(), key=lambda x: x[1]['count'], reverse=True): + if stats['count'] >= 2: # Only recommend for recurring patterns + fix_params = AUTO_FIX_PARAMS.get(pattern) + if fix_params: + recommendations.append({ + 'pattern': pattern, + 'occurrences': stats['count'], + 'auto_fix_params': fix_params + }) + + return { + 'total_crashes': len(crash_reports), + 'patterns': patterns_summary, + 'most_common': most_common, + 'recommendations': recommendations + } diff --git a/modules/database.py b/modules/database.py index 6c4b3ff..f288e43 100644 --- a/modules/database.py +++ b/modules/database.py @@ -430,6 +430,47 @@ class DatabaseManager: log.debug(f"Incremental save: {len(reviews)} reviews for job {job_id}") + async def update_session_fingerprint( + self, + job_id: UUID, + session_fingerprint: Dict[str, Any] + ): + """ + Update the session fingerprint for a job. + + This should be called early in the scraping process after the browser + fingerprint is captured, to record browser characteristics for + bot detection analysis. + + Args: + job_id: Job UUID + session_fingerprint: Dictionary containing browser fingerprint data: + - user_agent: Browser user agent string + - platform: OS platform + - language: Primary language + - languages: List of accepted languages + - timezone: Timezone string + - screen: {width, height, colorDepth} + - viewport: {width, height} + - webgl_vendor: WebGL vendor string + - webgl_renderer: WebGL renderer string + - canvas_fingerprint: Canvas fingerprint hash + - hardware_concurrency: Number of CPU cores + - device_memory: Device memory in GB + - bot_detection_tests: {webdriver_hidden, chrome_runtime, permissions_query} + - captured_at: ISO timestamp when fingerprint was captured + """ + async with self.pool.acquire() as conn: + await conn.execute(""" + UPDATE jobs + SET + session_fingerprint = $2::jsonb, + updated_at = NOW() + WHERE job_id = $1 + """, job_id, json.dumps(session_fingerprint)) + + log.debug(f"Updated session fingerprint for job {job_id}") + async def mark_job_partial( self, job_id: UUID, diff --git a/modules/scraper_clean.py b/modules/scraper_clean.py index cd6ea57..a8048a2 100644 --- a/modules/scraper_clean.py +++ b/modules/scraper_clean.py @@ -35,6 +35,214 @@ def get_dom_node_count(driver) -> Optional[int]: return None +def capture_session_fingerprint(driver) -> dict: + """ + Capture browser session fingerprint for bot detection analysis. + + This captures various browser attributes that can be used to: + 1. Verify bot detection evasion is working + 2. Debug issues when scraping fails + 3. Track session characteristics for analysis + + Args: + driver: Selenium WebDriver instance (must be initialized) + + Returns: + Dictionary containing session fingerprint data + """ + fingerprint = { + "user_agent": None, + "platform": None, + "language": None, + "languages": None, + "timezone": None, + "screen": { + "width": None, + "height": None, + "colorDepth": None + }, + "viewport": { + "width": None, + "height": None + }, + "webgl_vendor": None, + "webgl_renderer": None, + "canvas_fingerprint": None, + "hardware_concurrency": None, + "device_memory": None, + "bot_detection_tests": { + "webdriver_hidden": None, + "chrome_runtime": None, + "permissions_query": None + }, + "captured_at": None + } + + try: + # Navigate to about:blank first to ensure we can execute JS + # (in case driver was just created and hasn't navigated yet) + current_url = driver.current_url + if not current_url or current_url == "data:,": + driver.get("about:blank") + + # Capture timestamp + fingerprint["captured_at"] = datetime.now().isoformat() + + # Basic navigator properties + try: + fingerprint["user_agent"] = driver.execute_script("return navigator.userAgent") + except: + pass + + try: + fingerprint["platform"] = driver.execute_script("return navigator.platform") + except: + pass + + try: + fingerprint["language"] = driver.execute_script("return navigator.language") + except: + pass + + try: + fingerprint["languages"] = driver.execute_script("return navigator.languages") + except: + pass + + try: + fingerprint["timezone"] = driver.execute_script( + "return Intl.DateTimeFormat().resolvedOptions().timeZone" + ) + except: + pass + + # Screen properties + try: + fingerprint["screen"]["width"] = driver.execute_script("return screen.width") + fingerprint["screen"]["height"] = driver.execute_script("return screen.height") + fingerprint["screen"]["colorDepth"] = driver.execute_script("return screen.colorDepth") + except: + pass + + # Viewport properties + try: + fingerprint["viewport"]["width"] = driver.execute_script("return window.innerWidth") + fingerprint["viewport"]["height"] = driver.execute_script("return window.innerHeight") + except: + pass + + # WebGL vendor and renderer (important for fingerprinting) + try: + webgl_info = driver.execute_script(""" + try { + var canvas = document.createElement('canvas'); + var gl = canvas.getContext('webgl') || canvas.getContext('experimental-webgl'); + if (gl) { + var debugInfo = gl.getExtension('WEBGL_debug_renderer_info'); + if (debugInfo) { + return { + vendor: gl.getParameter(debugInfo.UNMASKED_VENDOR_WEBGL), + renderer: gl.getParameter(debugInfo.UNMASKED_RENDERER_WEBGL) + }; + } + } + } catch(e) {} + return {vendor: null, renderer: null}; + """) + fingerprint["webgl_vendor"] = webgl_info.get("vendor") + fingerprint["webgl_renderer"] = webgl_info.get("renderer") + except: + pass + + # Canvas fingerprint (hash of canvas drawing) + try: + canvas_hash = driver.execute_script(""" + try { + var canvas = document.createElement('canvas'); + canvas.width = 200; + canvas.height = 50; + var ctx = canvas.getContext('2d'); + ctx.textBaseline = 'top'; + ctx.font = '14px Arial'; + ctx.fillStyle = '#f60'; + ctx.fillRect(125, 1, 62, 20); + ctx.fillStyle = '#069'; + ctx.fillText('Fingerprint', 2, 15); + ctx.fillStyle = 'rgba(102, 204, 0, 0.7)'; + ctx.fillText('Fingerprint', 4, 17); + var dataUrl = canvas.toDataURL(); + // Simple hash + var hash = 0; + for (var i = 0; i < dataUrl.length; i++) { + var char = dataUrl.charCodeAt(i); + hash = ((hash << 5) - hash) + char; + hash = hash & hash; + } + return hash.toString(16); + } catch(e) { + return null; + } + """) + fingerprint["canvas_fingerprint"] = canvas_hash + except: + pass + + # Hardware info + try: + fingerprint["hardware_concurrency"] = driver.execute_script( + "return navigator.hardwareConcurrency" + ) + except: + pass + + try: + fingerprint["device_memory"] = driver.execute_script( + "return navigator.deviceMemory" + ) + except: + pass + + # Bot detection tests + try: + # Test 1: webdriver property should be hidden/false for undetected Chrome + webdriver_hidden = driver.execute_script( + "return navigator.webdriver === undefined || navigator.webdriver === false" + ) + fingerprint["bot_detection_tests"]["webdriver_hidden"] = webdriver_hidden + except: + pass + + try: + # Test 2: chrome runtime should exist in real Chrome + chrome_runtime = driver.execute_script( + "return typeof window.chrome !== 'undefined'" + ) + fingerprint["bot_detection_tests"]["chrome_runtime"] = chrome_runtime + except: + pass + + try: + # Test 3: permissions.query should work in real Chrome + permissions_query = driver.execute_script(""" + try { + if (navigator.permissions && navigator.permissions.query) { + return true; + } + return false; + } catch(e) { + return false; + } + """) + fingerprint["bot_detection_tests"]["permissions_query"] = permissions_query + except: + pass + + except Exception as e: + fingerprint["capture_error"] = str(e) + + return fingerprint + + def classify_crash(exception: Exception, metrics_history: list) -> str: """Classify crash type based on exception and metrics.""" error_str = str(exception).lower() @@ -519,6 +727,16 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in # Use provided log_capture or create a dummy that just prints log = log_capture or LogCapture() + # Capture session fingerprint early (before navigation) for bot detection analysis + session_fingerprint = capture_session_fingerprint(driver) + log.info('browser', "Session fingerprint captured", metrics={ + 'user_agent': session_fingerprint.get('user_agent', 'unknown')[:50] + '...' if session_fingerprint.get('user_agent') else 'unknown', + 'platform': session_fingerprint.get('platform'), + 'timezone': session_fingerprint.get('timezone'), + 'webdriver_hidden': session_fingerprint.get('bot_detection_tests', {}).get('webdriver_hidden'), + 'chrome_runtime': session_fingerprint.get('bot_detection_tests', {}).get('chrome_runtime') + }) + # Storage - use review ID as key reviews = {} # review_id -> review seen_ids = set() # Track all IDs we've seen (persists after flush) @@ -946,11 +1164,12 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in "category": business_info.get("category"), "address": business_info.get("address"), "total_reviews": total_reviews[0] - } + }, + "session_fingerprint": session_fingerprint # Browser fingerprint for bot detection analysis } if not scroll_container: - return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"} + return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found", "session_fingerprint": session_fingerprint} # Extract review topics after reviews tab is loaded (before scrolling begins) time.sleep(0.5) # Brief wait for topic filters to render @@ -1408,7 +1627,8 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in "logs": log.get_logs(), "review_topics": review_topics, # Topic filters with mention counts "metrics_history": metrics_history, # For crash detection - "start_time": start_time # For crash report elapsed time + "start_time": start_time, # For crash report elapsed time + "session_fingerprint": session_fingerprint # Browser fingerprint for bot detection analysis } @@ -1544,7 +1764,8 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 "success": True, "error": None, "logs": result.get("logs", []), - "review_topics": result.get("review_topics", []) # Topic filters with mention counts + "review_topics": result.get("review_topics", []), # Topic filters with mention counts + "session_fingerprint": result.get("session_fingerprint") # Browser fingerprint for bot detection } # Include validation_info if in validation_only mode