From 9e1bcde981ae15cd3c6e182f3f13befa3b486fb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 12:17:23 +0000 Subject: [PATCH] Wave 2: Migrate scraper to StructuredLogger, add crash detection & topic tags - Task #2: Migrate scraper_clean.py to use StructuredLogger with categories (37 log calls with metrics across browser/scraper/network/system) - Task #4: Add crash_reports table schema and database methods (save_crash_report, get_crash_report, get_crash_stats) - Task #9: Implement crash detection wrapper with metrics sampling (get_chrome_memory, get_dom_node_count, classify_crash) - Task #17: Add topic tags to frontend ReviewAnalytics (topic filter UI, tags on cards, topics in modal) Co-Authored-By: Claude Opus 4.5 --- modules/database.py | 182 +++++++++++++++++ modules/scraper_clean.py | 303 ++++++++++++++++++++++------- web/components/ReviewAnalytics.tsx | 114 ++++++++++- web/lib/analytics.ts | 1 + 4 files changed, 526 insertions(+), 74 deletions(-) diff --git a/modules/database.py b/modules/database.py index 7a660f8..6c4b3ff 100644 --- a/modules/database.py +++ b/modules/database.py @@ -154,6 +154,41 @@ class DatabaseManager: CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id); """) + # Add session_fingerprint and metrics_history columns to jobs table + await conn.execute(""" + ALTER TABLE jobs ADD COLUMN IF NOT EXISTS session_fingerprint JSONB; + """) + await conn.execute(""" + ALTER TABLE jobs ADD COLUMN IF NOT EXISTS metrics_history JSONB; + """) + + # Create crash_reports table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS crash_reports ( + crash_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID REFERENCES jobs(job_id) ON DELETE CASCADE, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + crash_type VARCHAR(50) NOT NULL, + error_message TEXT, + state JSONB NOT NULL, + metrics_history JSONB, + logs_before_crash JSONB, + analysis JSONB, + screenshot_url TEXT, + dom_snapshot_id UUID + ); + """) + + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_crash_reports_job ON crash_reports(job_id); + """) + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_crash_reports_type ON crash_reports(crash_type); + """) + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_crash_reports_created ON crash_reports(created_at DESC); + """) + log.info("Database schema initialized") # ==================== Job Operations ==================== @@ -657,3 +692,150 @@ class DatabaseManager: INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms) VALUES ($1, $2, $3, $4, $5, $6) """, job_id, attempt_number, success, status_code, error_message, response_time_ms) + + # ==================== Crash Reports ==================== + + async def save_crash_report(self, job_id: str, crash_data: dict) -> str: + """ + Save a crash report and return the crash_id. + + Args: + job_id: Job UUID as string + crash_data: Dictionary containing crash report data: + - crash_type: Type of crash (required) + - error_message: Error message (optional) + - state: Current state at crash time (required) + - metrics_history: Historical metrics (optional) + - logs_before_crash: Log entries before crash (optional) + - analysis: Crash analysis data (optional) + - screenshot_url: URL to screenshot (optional) + - dom_snapshot_id: UUID of DOM snapshot (optional) + + Returns: + UUID of created crash report as string + """ + async with self.pool.acquire() as conn: + # Convert job_id string to UUID + job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id + + crash_id = await conn.fetchval(""" + INSERT INTO crash_reports ( + job_id, + crash_type, + error_message, + state, + metrics_history, + logs_before_crash, + analysis, + screenshot_url, + dom_snapshot_id + ) + VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8, $9) + RETURNING crash_id + """, + job_uuid, + crash_data.get('crash_type'), + crash_data.get('error_message'), + json.dumps(crash_data.get('state', {})), + json.dumps(crash_data.get('metrics_history')) if crash_data.get('metrics_history') else None, + json.dumps(crash_data.get('logs_before_crash')) if crash_data.get('logs_before_crash') else None, + json.dumps(crash_data.get('analysis')) if crash_data.get('analysis') else None, + crash_data.get('screenshot_url'), + UUID(crash_data['dom_snapshot_id']) if crash_data.get('dom_snapshot_id') else None + ) + + log.info(f"Saved crash report {crash_id} for job {job_id}, type: {crash_data.get('crash_type')}") + return str(crash_id) + + async def get_crash_report(self, job_id: str) -> Optional[dict]: + """ + Get crash report for a job, if any. + + Args: + job_id: Job UUID as string + + Returns: + Crash report dictionary or None if not found + """ + async with self.pool.acquire() as conn: + job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id + + row = await conn.fetchrow(""" + SELECT + crash_id, + job_id, + created_at, + crash_type, + error_message, + state, + metrics_history, + logs_before_crash, + analysis, + screenshot_url, + dom_snapshot_id + FROM crash_reports + WHERE job_id = $1 + ORDER BY created_at DESC + LIMIT 1 + """, job_uuid) + + if not row: + return None + + result = dict(row) + # Convert UUIDs to strings for JSON serialization + result['crash_id'] = str(result['crash_id']) + result['job_id'] = str(result['job_id']) + if result.get('dom_snapshot_id'): + result['dom_snapshot_id'] = str(result['dom_snapshot_id']) + + return result + + async def get_crash_stats(self, days: int = 7) -> dict: + """ + Get crash statistics for the last N days. + + Args: + days: Number of days to look back (default: 7) + + Returns: + Dictionary with: + - total: Total number of crashes + - by_type: Dict mapping crash type to count + - by_day: List of dicts with date and count + """ + async with self.pool.acquire() as conn: + # Get total count + total = await conn.fetchval(""" + SELECT COUNT(*) + FROM crash_reports + WHERE created_at >= NOW() - INTERVAL '%s days' + """, days) + + # Get counts by type + type_rows = await conn.fetch(""" + SELECT crash_type, COUNT(*) as count + FROM crash_reports + WHERE created_at >= NOW() - INTERVAL '%s days' + GROUP BY crash_type + ORDER BY count DESC + """, days) + + by_type = {row['crash_type']: row['count'] for row in type_rows} + + # Get counts by day + day_rows = await conn.fetch(""" + SELECT DATE(created_at) as date, COUNT(*) as count + FROM crash_reports + WHERE created_at >= NOW() - INTERVAL '%s days' + GROUP BY DATE(created_at) + ORDER BY date DESC + """, days) + + by_day = [{'date': str(row['date']), 'count': row['count']} for row in day_rows] + + return { + 'total': total or 0, + 'by_type': by_type, + 'by_day': by_day + } diff --git a/modules/scraper_clean.py b/modules/scraper_clean.py index 8aefcb6..cd6ea57 100644 --- a/modules/scraper_clean.py +++ b/modules/scraper_clean.py @@ -9,9 +9,58 @@ import json import time import threading from datetime import datetime -from typing import List +from typing import List, Optional from selenium.webdriver.common.by import By +from modules.structured_logger import StructuredLogger + +def get_chrome_memory(driver) -> Optional[int]: + """Get Chrome memory usage in MB using CDP.""" + try: + # Use CDP Performance.getMetrics + result = driver.execute_cdp_cmd('Performance.getMetrics', {}) + for metric in result.get('metrics', []): + if metric['name'] == 'JSHeapUsedSize': + return int(metric['value'] / 1024 / 1024) + except: + pass + return None + + +def get_dom_node_count(driver) -> Optional[int]: + """Get DOM node count.""" + try: + return driver.execute_script("return document.getElementsByTagName('*').length") + except: + return None + + +def classify_crash(exception: Exception, metrics_history: list) -> str: + """Classify crash type based on exception and metrics.""" + error_str = str(exception).lower() + + if 'aw, snap' in error_str or 'status_access_violation' in error_str: + return 'tab_crash' + if 'timeout' in error_str: + return 'timeout' + if metrics_history and metrics_history[-1].get('memory_mb', 0) > 400: + return 'memory_exhaustion' + if 'no such element' in error_str: + return 'element_not_found' + if '429' in error_str or 'rate' in error_str: + return 'rate_limited' + if 'network' in error_str or 'connection' in error_str: + return 'network_failure' + return 'unknown' + + +class ScraperCrashException(Exception): + """Exception that carries crash report data for analysis.""" + def __init__(self, original_exception, crash_report): + self.original_exception = original_exception + self.crash_report = crash_report + super().__init__(str(original_exception)) + def get_topic_variants(topic: str) -> List[str]: """ @@ -135,34 +184,93 @@ def infer_review_topics(review_text: str, topics: List[dict]) -> List[str]: class LogCapture: - """Captures scraper logs for storage and viewing.""" + """ + Backward-compatible wrapper around StructuredLogger. + + Maintains the original LogCapture API while using StructuredLogger internally. + This allows existing code to continue working while gaining structured logging benefits. + """ def __init__(self): - self.logs = [] + self._logger = StructuredLogger() def log(self, message: str, level: str = "INFO", source: str = "scraper"): - """Add a log entry with timestamp.""" - entry = { - "timestamp": datetime.utcnow().isoformat() + "Z", - "level": level, - "source": source, - "message": message - } - self.logs.append(entry) + """Add a log entry with timestamp (backward compatible).""" + # Map source to category + category = self._source_to_category(source) + level_upper = level.upper() + + if level_upper == "ERROR": + self._logger.error(category, message) + elif level_upper == "WARNING" or level_upper == "WARN": + self._logger.warn(category, message) + elif level_upper == "DEBUG": + self._logger.debug(category, message) + else: + self._logger.info(category, message) + # Also print for console visibility print(message, flush=True) - def info(self, message: str, source: str = "scraper"): - self.log(message, "INFO", source) + def info(self, category_or_msg, message: str = None, *, metrics: dict = None): + """ + Log an INFO message. - def warning(self, message: str, source: str = "scraper"): - self.log(message, "WARNING", source) + Supports both old API: info(message, source) + And new API: info(category, message, metrics={...}) + """ + if message is None: + # Old API: info(message) or info(message, source) + self._logger.info('scraper', category_or_msg, metrics=metrics) + print(category_or_msg, flush=True) + else: + # New API: info(category, message, metrics={...}) + self._logger.info(category_or_msg, message, metrics=metrics) + print(message, flush=True) - def error(self, message: str, source: str = "scraper"): - self.log(message, "ERROR", source) + def warning(self, category_or_msg, message: str = None, *, metrics: dict = None): + """Log a WARNING message (supports both old and new API).""" + if message is None: + self._logger.warn('scraper', category_or_msg, metrics=metrics) + print(category_or_msg, flush=True) + else: + self._logger.warn(category_or_msg, message, metrics=metrics) + print(message, flush=True) + + def warn(self, category, message: str, *, metrics: dict = None): + """Log a WARN message with category (new API).""" + self._logger.warn(category, message, metrics=metrics) + print(message, flush=True) + + def error(self, category_or_msg, message: str = None, *, metrics: dict = None): + """Log an ERROR message (supports both old and new API).""" + if message is None: + self._logger.error('scraper', category_or_msg, metrics=metrics) + print(category_or_msg, flush=True) + else: + self._logger.error(category_or_msg, message, metrics=metrics) + print(message, flush=True) + + def debug(self, category, message: str, *, metrics: dict = None): + """Log a DEBUG message with category (new API).""" + self._logger.debug(category, message, metrics=metrics) + print(message, flush=True) def get_logs(self): - return self.logs + """Get all log entries as JSON-serializable dictionaries.""" + return self._logger.get_logs() + + def _source_to_category(self, source: str) -> str: + """Map legacy source names to StructuredLogger categories.""" + source_lower = source.lower() if source else 'scraper' + if source_lower in ('browser', 'navigation', 'page'): + return 'browser' + elif source_lower in ('network', 'api'): + return 'network' + elif source_lower in ('system', 'memory', 'chrome'): + return 'system' + else: + return 'scraper' def parse_api_review(raw: list) -> dict: @@ -470,23 +578,23 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in time.sleep(0.1) except: pass - log.info(f"🌐 Loading: {url[:80]}...") + log.info('browser', f"Loading: {url[:80]}...") else: - log.info(f"🔄 Hard refresh #{hard_refresh_count[0]}: reloading page...") + log.info('browser', f"Hard refresh #{hard_refresh_count[0]}: reloading page...") driver.get(url) # Handle consent popup if redirected (poll with tiny sleep) start = time.time() while time.time() - start < 5: # Max 5s for consent if "consent.google" in driver.current_url: - log.info(" Handling consent popup...") + log.info('browser', "Handling consent popup...") try: for btn in driver.find_elements(By.CSS_SELECTOR, "button"): txt = btn.text.lower() if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt: btn.click() # Reload original URL after consent - log.info(" Reloading after consent...") + log.info('browser', "Reloading after consent...") driver.get(url) # Wait for page to settle after consent reload time.sleep(1) @@ -554,10 +662,10 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in if info: if info.get('total_reviews') and total_reviews[0] is None: total_reviews[0] = info['total_reviews'] - log.info(f"📊 Total reviews on page: {total_reviews[0]}") + log.info('scraper', f"Total reviews on page: {total_reviews[0]}", metrics={'total_reviews': total_reviews[0]}) if info.get('name') and business_info_cache[0] is None: business_info_cache[0] = info - log.info(f"📍 Business: {info.get('name')}") + log.info('scraper', f"Business: {info.get('name')}") if total_reviews[0] and business_info_cache[0]: break except: @@ -566,7 +674,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in # VALIDATION_ONLY: Return early - skip clicking reviews tab, sorting, etc. if validation_only_mode: - log.info("📋 Validation mode: returning early (skipping reviews tab)") + log.info('scraper', "Validation mode: returning early (skipping reviews tab)") return ("validation_done", None) # Click reviews tab - poll until found @@ -581,12 +689,12 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in if not tabs_logged and tabs: tabs_logged = True tab_texts = [t.text for t in tabs] - log.info(f" Available tabs: {tab_texts}") + log.info('browser', f"Available tabs: {tab_texts}") for tab in tabs: tab_text = tab.text.lower() if any(kw in tab_text for kw in review_keywords): if not is_refresh: - log.info(f" Clicking reviews tab: '{tab.text}'") + log.info('browser', f"Clicking reviews tab: '{tab.text}'") # Extract total_reviews from tab text like "Reviews (79)" or "Reviews\n79" if total_reviews[0] is None: import re @@ -594,13 +702,13 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in match = re.search(r'\((\d+)\)', tab.text) if match: total_reviews[0] = int(match.group(1)) - log.info(f"📊 Total reviews from tab: {total_reviews[0]}") + log.info('scraper', f"Total reviews from tab: {total_reviews[0]}", metrics={'total_reviews': total_reviews[0]}) else: # Try pattern with newline: "Reviews\n79" match = re.search(r'(\d+)', tab.text) if match: total_reviews[0] = int(match.group(1)) - log.info(f"📊 Total reviews from tab: {total_reviews[0]}") + log.info('scraper', f"Total reviews from tab: {total_reviews[0]}", metrics={'total_reviews': total_reviews[0]}) tab.click() tab_clicked = True break @@ -620,24 +728,24 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in break elapsed = int(time.time() - start) if elapsed > last_print: - log.info(f" Waiting for reviews panel...{refresh_label} ({elapsed}s)") + log.info('browser', f"Waiting for reviews panel...{refresh_label} ({elapsed}s)") last_print = elapsed time.sleep(0.01) # 10ms - responsive but low CPU if not scroll_container: - log.error(f"❌ Could not find reviews scroll container{refresh_label}") + log.error('browser', f"Could not find reviews scroll container{refresh_label}") try: - log.error(f"Page title: {driver.title}") - log.error(f"Current URL: {driver.current_url[:100]}") + log.error('browser', f"Page title: {driver.title}") + log.error('browser', f"Current URL: {driver.current_url[:100]}") except: pass return None, None - log.info(f"✅ Found scroll container{refresh_label}") + log.info('browser', f"Found scroll container{refresh_label}") # Inject API interceptor (needs to be re-injected after refresh) if not is_refresh: - log.info("🔌 Injecting API interceptor...") + log.info('network', "Injecting API interceptor...") driver.execute_script(""" // Always re-setup on refresh window.__reviewInterceptorInjected = true; @@ -711,12 +819,12 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in } """) time.sleep(0.5) - log.info(" 📅 Sorted by newest") + log.info('browser', "Sorted by newest") # Re-find scroll container after sorting (DOM may be recreated) new_container = find_scroll_container() if new_container: scroll_container = new_container - log.info(" 🔄 Refreshed scroll container reference") + log.info('browser', "Refreshed scroll container reference") except: pass @@ -734,7 +842,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in return count; """) if expanded > 0: - log.info(f" 📝 Expanded {expanded} truncated reviews") + log.info('browser', f"Expanded {expanded} truncated reviews", metrics={'expanded_count': expanded}) except: pass @@ -745,7 +853,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in }) driver.execute_cdp_cmd('Network.enable', {}) if not is_refresh: - log.info(" 🚫 Blocking images for faster scrolling") + log.info('browser', "Blocking images for faster scrolling") except: pass @@ -848,7 +956,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in time.sleep(0.5) # Brief wait for topic filters to render review_topics = extract_review_topics() if review_topics: - log.info(f"📊 Found {len(review_topics)} review topics: {', '.join(t['topic'] for t in review_topics[:5])}...") + log.info('scraper', f"Found {len(review_topics)} review topics: {', '.join(t['topic'] for t in review_topics[:5])}...", metrics={'topic_count': len(review_topics)}) def get_api_reviews(): """Get reviews from intercepted API responses.""" @@ -918,7 +1026,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in hard_refresh_count[0] += 1 if hard_refresh_count[0] > max_hard_refreshes: - log.warning(f" ⚠️ Max hard refreshes ({max_hard_refreshes}) reached, giving up") + log.warn('system', f"Max hard refreshes ({max_hard_refreshes}) reached, giving up", metrics={'hard_refresh_count': hard_refresh_count[0]}) return False # Stop current scroll worker @@ -931,18 +1039,24 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in scroll_container = new_container stop_scrolling = new_stop recovery_count[0] = 0 # Reset recovery count after successful refresh - log.info(f" ✅ Hard refresh successful, resuming with {len(seen_ids)} reviews already collected") + log.info('browser', f"Hard refresh successful, resuming with {len(seen_ids)} reviews already collected", metrics={'reviews_collected': len(seen_ids)}) return True else: - log.error(f" ❌ Hard refresh failed to find scroll container") + log.error('browser', "Hard refresh failed to find scroll container") return False # Main collection loop last_new_time = time.time() last_count = len(reviews) check_num = 0 + start_time = time.time() - log.info(f"🔄 Scrolling... (timeout: {timeout_no_new}s with no new)") + # Crash detection: metrics sampling + metrics_history = [] + last_sample_time = time.time() + scroll_count = [0] # Track scroll operations for crash reports + + log.info('browser', f"Scrolling... (timeout: {timeout_no_new}s with no new)", metrics={'timeout_seconds': timeout_no_new}) cycle_start = time.time() while True: @@ -954,6 +1068,19 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in cycle_delta = t0 - cycle_start cycle_start = t0 + # CRASH DETECTION: Sample metrics every 5 seconds + if time.time() - last_sample_time >= 5: + current_count_for_metrics = total_flushed[0] + len(reviews) + metrics_history.append({ + 'timestamp_ms': int(time.time() * 1000), + 'memory_mb': get_chrome_memory(driver), + 'dom_nodes': get_dom_node_count(driver), + 'reviews_count': current_count_for_metrics + }) + # Keep only last 100 samples + metrics_history = metrics_history[-100:] + last_sample_time = time.time() + # Collect from API (doesn't affect scroll) - API has FULL TEXT in original language # Use review_id as key to avoid duplicates with DOM t1 = time.time() @@ -1110,14 +1237,14 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in reviews[rid] = rev seen_ids.add(rid) except Exception as e: - log.error(f" ❌ DOM parse error: {e}") + log.error('scraper', f"DOM parse error: {e}") dom_time = time.time() - t2 # BATCH FLUSH: If we have enough reviews, flush to callback and clear memory # Sort by DOM order before flushing t3 = time.time() if flush_callback and len(reviews) >= flush_batch_size: - log.info(f" 💾 Flushing {len(reviews)} reviews to disk...") + log.info('scraper', f"Flushing {len(reviews)} reviews to disk...", metrics={'batch_size': len(reviews), 'source': 'flush'}) sorted_reviews = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf'))) flush_callback([r for _, r in sorted_reviews]) total_flushed[0] += len(reviews) @@ -1128,7 +1255,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in # TIMING: Print if cycle is slow (>2s) if cycle_delta > 2.0: - log.warning(f" ⚠️ SLOW cycle: {cycle_delta:.1f}s (api:{api_time:.1f}s dom:{dom_time:.1f}s/{dom_cards}cards flush:{flush_time:.1f}s seen:{len(seen_ids)})") + log.warn('system', f"SLOW cycle: {cycle_delta:.1f}s (api:{api_time:.1f}s dom:{dom_time:.1f}s/{dom_cards}cards flush:{flush_time:.1f}s seen:{len(seen_ids)})", metrics={'cycle_time_s': cycle_delta, 'api_time_s': api_time, 'dom_time_s': dom_time, 'dom_cards': dom_cards, 'seen_count': len(seen_ids)}) # Check for new reviews if current_count > last_count: @@ -1163,9 +1290,9 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in elapsed = time.time() - last_new_time if total_reviews[0]: pct = (current_count / total_reviews[0]) * 100 - log.info(f" 📊 {current_count}/{total_reviews[0]} ({pct:.0f}%) | idle: {elapsed:.1f}s") + log.info('scraper', f"{current_count}/{total_reviews[0]} ({pct:.0f}%) | idle: {elapsed:.1f}s", metrics={'reviews_count': current_count, 'total_reviews': total_reviews[0], 'progress_pct': pct, 'idle_seconds': elapsed}) else: - log.info(f" 📊 {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s") + log.info('scraper', f"{current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", metrics={'reviews_count': current_count, 'idle_seconds': elapsed}) # Call progress callback on every iteration (for real-time log updates) if progress_callback: @@ -1173,13 +1300,13 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in # Stop conditions - check BEFORE recovery attempts if current_count >= max_reviews: - log.info(f"✅ Reached max: {current_count}") + log.info('scraper', f"Reached max: {current_count}", metrics={'total_reviews': current_count, 'elapsed_seconds': time.time() - start_time}) stop_scrolling.set() break # Also stop if we have all reviews from the page if total_reviews[0] and current_count >= total_reviews[0]: - log.info(f"✅ All {current_count} reviews collected") + log.info('scraper', f"All {current_count} reviews collected", metrics={'total_reviews': current_count, 'elapsed_seconds': time.time() - start_time}) stop_scrolling.set() break @@ -1188,12 +1315,12 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in if elapsed >= 3 and int(elapsed) % 3 == 0: # After 8+ failed recovery attempts, try hard refresh if recovery_count[0] >= 8 and hard_refresh_count[0] < max_hard_refreshes: - log.info(f" 🔄 Soft recovery failed {recovery_count[0]} times, trying hard refresh...") + log.info('browser', f"Soft recovery failed {recovery_count[0]} times, trying hard refresh...", metrics={'recovery_count': recovery_count[0]}) if do_hard_refresh(): last_new_time = time.time() # Reset timer after refresh continue # Skip to next iteration else: - log.info(f" 🔧 Recovery attempt #{recovery_count[0] + 1}...") + log.info('browser', f"Recovery attempt #{recovery_count[0] + 1}...", metrics={'recovery_attempt': recovery_count[0] + 1}) unstick_scroll() # Check scroll state - track if content is still being added @@ -1229,24 +1356,24 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in if truly_done or timeout_hit: # Last chance: try hard refresh before giving up if hard_refresh_count[0] < max_hard_refreshes and current_count < (total_reviews[0] or max_reviews): - log.info(f" 🔄 Timeout reached, trying hard refresh before giving up...") + log.info('browser', "Timeout reached, trying hard refresh before giving up...", metrics={'idle_seconds': elapsed}) if do_hard_refresh(): last_new_time = time.time() continue # Keep trying - log.info(f"✅ All reviews loaded: {current_count}") + log.info('scraper', f"All reviews loaded: {current_count}", metrics={'total_reviews': current_count, 'elapsed_seconds': time.time() - start_time}) stop_scrolling.set() break # Flush any remaining reviews (sorted by DOM order) if flush_callback and reviews: - log.info(f" 💾 Final flush: {len(reviews)} reviews...") + log.info('scraper', f"Final flush: {len(reviews)} reviews...", metrics={'batch_size': len(reviews), 'source': 'final_flush'}) sorted_reviews = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf'))) flush_callback([r for _, r in sorted_reviews]) total_flushed[0] += len(reviews) reviews.clear() # Reviews already parsed during scrolling (real-time parsing) - log.info("📝 Finalizing review data...") + log.info('scraper', "Finalizing review data...") # Final results (sorted by DOM order) sorted_items = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf'))) @@ -1256,13 +1383,13 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in api_count = sum(1 for r in review_list if r.get("source") == "api") if total_flushed[0] > 0: - log.info(f"📋 Total: {grand_total} unique reviews (flushed: {total_flushed[0]}, in memory: {len(review_list)})") + log.info('scraper', f"Total: {grand_total} unique reviews (flushed: {total_flushed[0]}, in memory: {len(review_list)})", metrics={'total_reviews': grand_total, 'flushed_count': total_flushed[0], 'in_memory_count': len(review_list), 'elapsed_seconds': time.time() - start_time}) else: - log.info(f"📋 Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})") + log.info('scraper', f"Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})", metrics={'total_reviews': len(review_list), 'dom_count': dom_count, 'api_count': api_count, 'elapsed_seconds': time.time() - start_time}) # Infer topics for each review if review_topics is available if review_topics: - log.info(f"🏷️ Inferring topics for {len(review_list)} reviews...") + log.info('scraper', f"Inferring topics for {len(review_list)} reviews...", metrics={'reviews_count': len(review_list)}) topics_inferred_count = 0 for review in review_list: review_text = review.get("text", "") @@ -1270,7 +1397,7 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in review["topics"] = matched if matched: topics_inferred_count += 1 - log.info(f"🏷️ Topics inferred for {topics_inferred_count}/{len(review_list)} reviews") + log.info('scraper', f"Topics inferred for {topics_inferred_count}/{len(review_list)} reviews", metrics={'topics_inferred_count': topics_inferred_count, 'reviews_count': len(review_list)}) return { "reviews": review_list, # Only unflushed reviews (flushed already sent to callback) @@ -1279,7 +1406,9 @@ def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: in "checks": check_num, "url": url, "logs": log.get_logs(), - "review_topics": review_topics # Topic filters with mention counts + "review_topics": review_topics, # Topic filters with mention counts + "metrics_history": metrics_history, # For crash detection + "start_time": start_time # For crash report elapsed time } @@ -1344,7 +1473,7 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 # Set timezone if provided if timezone: driver.execute_cdp_cmd('Emulation.setTimezoneOverride', {'timezoneId': timezone}) - log_capture.info(f"Set timezone to {timezone}") + log_capture.info('browser', f"Set timezone to {timezone}") # Set locale/language driver.execute_cdp_cmd('Emulation.setLocaleOverride', {'locale': language}) @@ -1356,7 +1485,7 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 'longitude': geolocation['lng'], 'accuracy': 1000 # ~1km accuracy for IP-based location }) - log_capture.info(f"Set geolocation to ({geolocation['lat']:.2f}, {geolocation['lng']:.2f})") + log_capture.info('browser', f"Set geolocation to ({geolocation['lat']:.2f}, {geolocation['lng']:.2f})", metrics={'lat': geolocation['lat'], 'lng': geolocation['lng']}) else: # Default to US (Boston, MA) if no geolocation provided driver.execute_cdp_cmd('Emulation.setGeolocationOverride', { @@ -1364,12 +1493,12 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 'longitude': -71.0589, 'accuracy': 100 }) - log_capture.info("Set geolocation to US (Boston, MA) [default]") + log_capture.info('browser', "Set geolocation to US (Boston, MA) [default]", metrics={'lat': 42.3601, 'lng': -71.0589}) if fp: - log_capture.info(f"Browser fingerprint applied: {fp.get('platform', 'unknown')}, {viewport['width']}x{viewport['height']}") + log_capture.info('browser', f"Browser fingerprint applied: {fp.get('platform', 'unknown')}, {viewport['width']}x{viewport['height']}", metrics={'viewport_width': viewport['width'], 'viewport_height': viewport['height']}) except Exception as e: - log_capture.warning(f"Could not apply fingerprint settings: {e}") + log_capture.warn('system', f"Could not apply fingerprint settings: {e}") # Add URL parameters for consistent results if 'hl=' not in url: @@ -1435,6 +1564,36 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 except Exception as e: elapsed = time.time() - start_time + # CRASH DETECTION: Build crash report before closing driver + crash_report = None + try: + if driver: + # Try to sample final metrics from the browser + final_metrics = { + 'timestamp_ms': int(time.time() * 1000), + 'memory_mb': get_chrome_memory(driver), + 'dom_nodes': get_dom_node_count(driver) + } + # Build crash report with available information + crash_report = { + 'crash_type': classify_crash(e, [final_metrics]), + 'error_message': str(e), + 'state': { + 'reviews_extracted': 0, # Unknown at crash time + 'total_expected': None, + 'scroll_count': 0, + 'elapsed_seconds': elapsed + }, + 'metrics_history': [final_metrics], + 'logs_before_crash': log_capture.get_logs()[-20:] if log_capture else [], + 'last_successful_review_id': None + } + log_capture.error('system', f"Crash detected: {crash_report['crash_type']}", + metrics={'error': str(e), 'elapsed_seconds': elapsed}) + except: + # If we can't build crash report, continue with basic error handling + pass + if should_close_driver and driver: try: driver.quit() @@ -1442,9 +1601,9 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 pass # Log error to the existing log_capture - log_capture.error(f"Scraper failed: {str(e)}") + log_capture.error('system', f"Scraper failed: {str(e)}") - return { + result = { "reviews": [], "count": 0, "total_reviews": 0, @@ -1455,6 +1614,12 @@ def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999 "logs": log_capture.get_logs() } + # Include crash report if available + if crash_report: + result['crash_report'] = crash_report + + return result + def extract_about_info(driver, url: str = None) -> dict: """ diff --git a/web/components/ReviewAnalytics.tsx b/web/components/ReviewAnalytics.tsx index 5ec4c12..7b2f6b2 100644 --- a/web/components/ReviewAnalytics.tsx +++ b/web/components/ReviewAnalytics.tsx @@ -20,6 +20,7 @@ interface ReviewWithNew extends Review { is_new?: boolean; owner_response?: OwnerResponse | null; photo_urls?: string[] | null; + topics?: string[]; } interface ReviewTopic { @@ -47,6 +48,7 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne const [selectedReview, setSelectedReview] = useState(null); const [showOnlyNew, setShowOnlyNew] = useState(false); const [brushRange, setBrushRange] = useState<{ startIndex: number; endIndex: number } | null>(null); + const [selectedTopics, setSelectedTopics] = useState([]); // Check if we have comparison data const hasComparisonData = reviews.some(r => r.is_new !== undefined); @@ -67,6 +69,19 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne // Calculate timeline data for chart const timelineData = useMemo(() => calculateTimelineData(dateFilteredReviews), [dateFilteredReviews]); + // Calculate available topics with counts + const availableTopics = useMemo(() => { + const topicCounts = new Map(); + reviews.forEach(r => { + r.topics?.forEach(t => { + topicCounts.set(t, (topicCounts.get(t) || 0) + 1); + }); + }); + return Array.from(topicCounts.entries()) + .map(([topic, count]) => ({ topic, count })) + .sort((a, b) => b.count - a.count); + }, [reviews]); + // Check if brush covers the full range (no filtering needed) const isFullRange = useMemo(() => { if (!brushRange || timelineData.length === 0) return true; @@ -155,7 +170,7 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne return 'all'; }, [brushRange, timelineData]); - // Filter reviews by selected ratings, sentiments, response status, new status, and brush range (for table) + // Filter reviews by selected ratings, sentiments, response status, new status, topics, and brush range (for table) const filteredReviews = useMemo(() => { return dateFilteredReviews.filter(r => { const matchesRating = selectedRatings.includes(r.rating); @@ -174,6 +189,10 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne (hasResponse && selectedResponseStatus.includes('answered')) || (!hasResponse && selectedResponseStatus.includes('not_answered')); + // Filter by selected topics + const matchesTopics = selectedTopics.length === 0 || + r.topics?.some(t => selectedTopics.includes(t)); + // Filter by brush date range if active let matchesBrush = true; if (brushDateRange && r.centerDate) { @@ -189,9 +208,9 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne matchesBrush = r.centerDate >= startDate && r.centerDate < endDate; } - return matchesRating && matchesSentiment && matchesSearch && matchesNew && matchesResponseStatus && matchesBrush; + return matchesRating && matchesSentiment && matchesSearch && matchesNew && matchesResponseStatus && matchesTopics && matchesBrush; }); - }, [dateFilteredReviews, selectedRatings, selectedSentiments, selectedResponseStatus, globalFilter, showOnlyNew, brushDateRange]); + }, [dateFilteredReviews, selectedRatings, selectedSentiments, selectedResponseStatus, globalFilter, showOnlyNew, selectedTopics, brushDateRange]); const toggleRating = (rating: number) => { setSelectedRatings(prev => @@ -211,6 +230,14 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne ); }; + const toggleTopicFilter = (topic: string) => { + setSelectedTopics(prev => + prev.includes(topic) + ? prev.filter(t => t !== topic) + : [...prev, topic] + ); + }; + const clearAllFilters = () => { setDateRange('all'); setSelectedRatings([1, 2, 3, 4, 5]); @@ -219,6 +246,7 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne setGlobalFilter(''); setShowOnlyNew(false); setBrushRange(null); + setSelectedTopics([]); }; const hasActiveFilters = dateRange !== 'all' || @@ -227,7 +255,8 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne selectedSentiments.length < 3 || selectedResponseStatus.length < 2 || globalFilter !== '' || - showOnlyNew; + showOnlyNew || + selectedTopics.length > 0; const exportFilteredData = () => { const dataStr = JSON.stringify(filteredReviews, null, 2); @@ -364,6 +393,8 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne const hasResponse = !!ownerResponse?.text; const photoUrls = row.original.photo_urls; const hasPhotos = photoUrls && photoUrls.length > 0; + const topics = row.original.topics; + const hasTopics = topics && topics.length > 0; return (
@@ -387,6 +418,19 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne

{text}

+ {hasTopics && ( +
+ {topics.map(topic => ( + { e.stopPropagation(); toggleTopicFilter(topic); }} + > + {topic} + + ))} +
+ )}
{text.length > 100 && ( + ))} + {selectedTopics.length > 0 && ( + + )} +
+ )} + {/* Filter Summary */}
@@ -1535,6 +1608,37 @@ export default function ReviewAnalytics({ reviews, businessName, businessUrl, ne
+ {/* Review Topics */} + {selectedReview.topics && selectedReview.topics.length > 0 && ( +
+
+ + Topics + + {selectedReview.topics.length} topic{selectedReview.topics.length > 1 ? 's' : ''} + +
+
+ {selectedReview.topics.map(topic => ( + + ))} +
+
+ )} + {/* Owner Response */} {selectedReview.owner_response?.text ? (
diff --git a/web/lib/analytics.ts b/web/lib/analytics.ts index bccc2ba..4e32e55 100644 --- a/web/lib/analytics.ts +++ b/web/lib/analytics.ts @@ -15,6 +15,7 @@ export interface Review { review_id: string; owner_response?: OwnerResponse | null; photo_urls?: string[] | null; + topics?: string[]; // Inferred topics from scraper // Derived fields (computed on load) parsedDate?: Date; dateCategory?: 'recent' | 'month' | 'year' | 'older'; // Time range category