#!/usr/bin/env python3 """ Production Google Reviews Scraper API Server with Phase 1 & 2 features: - PostgreSQL storage with JSONB - Webhook delivery with retries - Smart health checks with canary testing - Phase 2: Requester tracking (client_id, source, purpose) - Phase 2: Job priority for queue ordering - Phase 2: Callback URL alternative to webhooks - Phase 2: Scraper version/variant selection for A/B testing - Phase 2: Explicit job type endpoint (/api/scrape/google-reviews) """ import asyncio import json import logging import os import time from contextlib import asynccontextmanager from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from uuid import UUID from fastapi import FastAPI, HTTPException, Query, Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl, Field from fastapi.responses import JSONResponse, StreamingResponse from core.database import DatabaseManager, JobStatus from services.webhook_service import WebhookDispatcher, WebhookManager from utils.health_checks import HealthCheckSystem from scrapers.google_reviews.v1_0_0 import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper from utils.crash_analyzer import analyze_crash, summarize_crash_patterns, apply_auto_fix from utils.logger import StructuredLogger, LogEntry from workers.chrome_pool import ( start_worker_pools, stop_worker_pools, get_validation_worker, release_validation_worker, get_scraping_worker, release_scraping_worker, get_pool_stats ) from api.routes import ( batches_router, set_batches_db, dashboard_router, set_dashboard_db, admin_router, set_admin_db, ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) log = logging.getLogger("api_server") # Global instances db: Optional[DatabaseManager] = None webhook_dispatcher: Optional[WebhookDispatcher] = None health_system: Optional[HealthCheckSystem] = None # Concurrent job limiter (prevent too many Chrome instances) MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5')) job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS) # SSE: Store for broadcasting job updates to connected clients # Format: {job_id: [asyncio.Queue, ...]} for job-specific streams # Format: {"all": [asyncio.Queue, ...]} for all-jobs stream job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []} @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown""" global db, webhook_dispatcher, health_system # Startup log.info("Starting Google Reviews Scraper API Server (Production)") # Get database URL from environment database_url = os.getenv( 'DATABASE_URL', 'postgresql://scraper:scraper@localhost:5432/scraper' ) # Initialize database db = DatabaseManager(database_url) await db.connect() await db.initialize_schema() log.info("Database initialized") # Inject database into route modules set_batches_db(db) set_dashboard_db(db) set_admin_db(db) # Initialize health check system with canary monitoring # DISABLED: Canary tests consume Google Maps requests and trigger rate limiting # health_system = HealthCheckSystem(db) # await health_system.start() log.info("Health check system DISABLED (canary tests disabled to avoid rate limiting)") # Initialize webhook dispatcher webhook_dispatcher = WebhookDispatcher(db, interval_seconds=30) asyncio.create_task(webhook_dispatcher.start()) log.info("Webhook dispatcher started") # Start Chrome worker pools (1 for validation, 2 for scraping) # These pre-warm Chrome instances for instant availability # headless=False because Docker uses Xvfb virtual display for better compatibility await asyncio.to_thread( start_worker_pools, validation_size=1, scraping_size=2, headless=False ) log.info("Chrome worker pools started (1 validation + 2 scraping)") yield # Shutdown log.info("Shutting down Google Reviews Scraper API Server") if webhook_dispatcher: webhook_dispatcher.stop() # if health_system: # health_system.stop() # Stop worker pools await asyncio.to_thread(stop_worker_pools) log.info("Chrome worker pools stopped") if db: await db.disconnect() # Initialize FastAPI app app = FastAPI( title="Google Reviews Scraper API - Production", description="Production-ready REST API for Google Maps review scraping with webhooks and health monitoring", version="2.0.0", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include routers from api/routes/ app.include_router(batches_router) app.include_router(dashboard_router) app.include_router(admin_router) # ==================== Request/Response Models ==================== class GeolocationModel(BaseModel): """Geolocation coordinates""" lat: float = Field(..., description="Latitude") lng: float = Field(..., description="Longitude") class ViewportModel(BaseModel): """Browser viewport size""" width: int = Field(..., description="Viewport width") height: int = Field(..., description="Viewport height") class BrowserFingerprintModel(BaseModel): """Browser fingerprint to replicate user's browser""" geolocation: Optional[GeolocationModel] = None userAgent: Optional[str] = Field(None, description="User agent string") viewport: Optional[ViewportModel] = Field(None, description="Screen resolution") timezone: Optional[str] = Field(None, description="Timezone (e.g., Europe/Madrid)") language: Optional[str] = Field(None, description="Browser language (e.g., en-US)") platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)") class RequesterModel(BaseModel): """Information about the requester of a scrape job""" client_id: Optional[str] = Field(None, description="Client identifier") source: Optional[str] = Field(None, description="Source of the request (e.g., 'web', 'api', 'internal')") purpose: Optional[str] = Field(None, description="Purpose of the scrape (e.g., 'competitor_analysis', 'review_monitoring')") metadata: Optional[Dict[str, Any]] = Field(None, description="Additional requester metadata") class ScrapeRequest(BaseModel): """Request model for starting a scrape job (legacy endpoint, routes to google-reviews)""" url: HttpUrl = Field(..., description="Google Maps URL to scrape") webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications") webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature") metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata") geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome") browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint") # Phase 2: New optional fields for enhanced job tracking requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job") priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100) callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)") scraper_version: Optional[str] = Field(None, description="Specific scraper version to use") scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')") class GoogleReviewsScrapeRequest(BaseModel): """Request model for Google Reviews scraping - explicit job type endpoint""" url: HttpUrl = Field(..., description="Google Maps URL to scrape") webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications") webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature") metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata") geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome") browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint") # Phase 2: New optional fields for enhanced job tracking requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job") priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100) callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)") scraper_version: Optional[str] = Field(None, description="Specific scraper version to use") scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')") class JobResponse(BaseModel): """Response model for job information""" job_id: str status: str url: str created_at: str started_at: Optional[str] = None completed_at: Optional[str] = None updated_at: Optional[str] = None # Last update time for progress tracking reviews_count: Optional[int] = None total_reviews: Optional[int] = None # Total reviews available for this place scrape_time: Optional[float] = None error_message: Optional[str] = None webhook_url: Optional[str] = None # Business metadata business_name: Optional[str] = None business_address: Optional[str] = None business_category: Optional[str] = None # Category (e.g., "Barber shop") review_topics: Optional[List[Dict[str, Any]]] = None # Topic filters with mention counts class ReviewsResponse(BaseModel): """Response model for reviews data""" job_id: str reviews: List[Dict[str, Any]] count: int class StatsResponse(BaseModel): """Response model for statistics""" total_jobs: int pending: int running: int completed: int failed: int cancelled: int avg_scrape_time: Optional[float] = None total_reviews: Optional[int] = None class CrashAnalysisModel(BaseModel): """Crash analysis details""" pattern: str = Field(..., description="Identified crash pattern type") confidence: float = Field(..., description="Confidence score 0.0 to 1.0") description: str = Field(..., description="Description of the crash cause") suggested_fix: str = Field(..., description="Recommended fix action") auto_fix_params: Optional[Dict[str, Any]] = Field(None, description="Parameters for auto-fix") class CrashReportResponse(BaseModel): """Response model for crash report""" crash_id: str job_id: str crash_type: str error_message: Optional[str] = None analysis: Optional[CrashAnalysisModel] = None metrics_history: Optional[List[Dict[str, Any]]] = None logs_before_crash: Optional[List[Dict[str, Any]]] = None screenshot_url: Optional[str] = None created_at: str class RetryJobResponse(BaseModel): """Response model for retry job""" job_id: str status: str message: str applied_fixes: Optional[Dict[str, Any]] = None class CrashPatternStats(BaseModel): """Statistics for a single crash pattern""" count: int percentage: float avg_confidence: float class CrashStatsResponse(BaseModel): """Response model for aggregate crash statistics""" total_crashes: int patterns: Dict[str, CrashPatternStats] most_common: Optional[str] = None recommendations: List[Dict[str, Any]] # ==================== API Endpoints ==================== @app.get("/", summary="API Health Check") async def root(): """Basic health check endpoint""" return { "message": "Google Reviews Scraper API (Production)", "status": "healthy", "version": "2.0.0", "features": ["postgresql", "webhooks", "canary-testing"] } async def _create_google_reviews_job( url: str, webhook_url: Optional[str] = None, webhook_secret: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, browser_fingerprint: Optional[BrowserFingerprintModel] = None, geolocation: Optional[GeolocationModel] = None, requester: Optional[RequesterModel] = None, priority: int = 0, callback_url: Optional[str] = None, scraper_version: Optional[str] = None, scraper_variant: Optional[str] = None, job_type: str = "google-reviews" ) -> Dict[str, str]: """ Core logic for creating a Google Reviews scraping job. This is the shared implementation used by both /scrape and /api/scrape/google-reviews endpoints. Returns: Dict with job_id, status, and message """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") try: # Build metadata with all Phase 2 fields job_metadata = metadata.copy() if metadata else {} # Add browser fingerprint if provided if browser_fingerprint: fp = browser_fingerprint job_metadata['browser_fingerprint'] = { "userAgent": fp.userAgent, "timezone": fp.timezone, "language": fp.language, "platform": fp.platform, } if fp.viewport: job_metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height} if fp.geolocation: job_metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng} elif geolocation: job_metadata['geolocation'] = { 'lat': geolocation.lat, 'lng': geolocation.lng } # Phase 2: Add requester info if provided if requester: job_metadata['requester'] = { 'client_id': requester.client_id, 'source': requester.source, 'purpose': requester.purpose, 'metadata': requester.metadata } # Phase 2: Add job type for multi-scraper support job_metadata['job_type'] = job_type # Phase 2: Add priority for job queue ordering job_metadata['priority'] = priority # Phase 2: Add callback_url (alternative to webhook) if callback_url: job_metadata['callback_url'] = callback_url # Phase 2: Add scraper version/variant for A/B testing and version control if scraper_version: job_metadata['scraper_version'] = scraper_version if scraper_variant: job_metadata['scraper_variant'] = scraper_variant # Create job in database job_id = await db.create_job( url=url, webhook_url=webhook_url, webhook_secret=webhook_secret, metadata=job_metadata ) # Start scraping job in background asyncio.create_task(run_scraping_job(job_id)) log.info(f"Created and started job {job_id} (type={job_type}, priority={priority})") return { "job_id": str(job_id), "status": "started", "message": "Scraping job started successfully", "job_type": job_type } except Exception as e: log.error(f"Error creating scraping job: {e}") raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}") @app.post("/api/scrape/google-reviews", response_model=Dict[str, str], summary="Start Google Reviews Scraping Job") async def scrape_google_reviews(request: GoogleReviewsScrapeRequest): """ Start a new Google Reviews scraping job. This is the primary endpoint for Phase 2 onwards. It explicitly creates a job of type 'google-reviews' with full support for all Phase 2 features: - Requester tracking (client_id, source, purpose) - Job priority for queue ordering - Callback URL (alternative to webhook) - Scraper version/variant selection for A/B testing The job runs asynchronously in the background. You can: - Poll GET /jobs/{job_id} for status - Provide webhook_url for automatic notification when complete - Subscribe to SSE at /jobs/{job_id}/stream for real-time updates Returns the job ID for tracking. """ return await _create_google_reviews_job( url=str(request.url), webhook_url=str(request.webhook_url) if request.webhook_url else None, webhook_secret=request.webhook_secret, metadata=request.metadata, browser_fingerprint=request.browser_fingerprint, geolocation=request.geolocation, requester=request.requester, priority=request.priority or 0, callback_url=str(request.callback_url) if request.callback_url else None, scraper_version=request.scraper_version, scraper_variant=request.scraper_variant, job_type="google-reviews" ) @app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job (Legacy)") async def start_scrape(request: ScrapeRequest): """ Start a new scraping job (legacy endpoint, routes to google-reviews). **NOTE**: This endpoint is maintained for backwards compatibility. For new integrations, use POST /api/scrape/google-reviews instead. The job runs asynchronously in the background. You can: - Poll GET /jobs/{job_id} for status - Provide webhook_url for automatic notification when complete Returns the job ID for tracking. """ return await _create_google_reviews_job( url=str(request.url), webhook_url=str(request.webhook_url) if request.webhook_url else None, webhook_secret=request.webhook_secret, metadata=request.metadata, browser_fingerprint=request.browser_fingerprint, geolocation=request.geolocation, requester=request.requester, priority=request.priority or 0, callback_url=str(request.callback_url) if request.callback_url else None, scraper_version=request.scraper_version, scraper_variant=request.scraper_variant, job_type="google-reviews" ) @app.post("/api/scrape", response_model=Dict[str, str], summary="Start Scraping Job") async def api_start_scrape(request: ScrapeRequest): """ Start a new scraping job via the /api/scrape endpoint. This endpoint accepts the same request body as /scrape and routes to google-reviews. For explicit job type control, use POST /api/scrape/google-reviews instead. The job runs asynchronously in the background. You can: - Poll GET /jobs/{job_id} for status - Provide webhook_url for automatic notification when complete - Subscribe to SSE at /jobs/{job_id}/stream for real-time updates Returns the job ID for tracking. """ return await _create_google_reviews_job( url=str(request.url), webhook_url=str(request.webhook_url) if request.webhook_url else None, webhook_secret=request.webhook_secret, metadata=request.metadata, browser_fingerprint=request.browser_fingerprint, geolocation=request.geolocation, requester=request.requester, priority=request.priority or 0, callback_url=str(request.callback_url) if request.callback_url else None, scraper_version=request.scraper_version, scraper_variant=request.scraper_variant, job_type="google-reviews" ) @app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status") async def get_job(job_id: UUID): """Get detailed information about a specific job""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") # Parse review_topics if it's a string (JSONB might be returned as string) review_topics = job.get('review_topics') if isinstance(review_topics, str): try: review_topics = json.loads(review_topics) except: review_topics = None # Extract business info from metadata if available metadata = job.get('metadata') if isinstance(metadata, str): try: metadata = json.loads(metadata) except: metadata = None business_name = metadata.get('business_name') if metadata else None business_category = metadata.get('business_category') if metadata else None return JobResponse( job_id=str(job['job_id']), status=job['status'], url=job['url'], created_at=job['created_at'].isoformat(), started_at=job['started_at'].isoformat() if job['started_at'] else None, completed_at=job['completed_at'].isoformat() if job['completed_at'] else None, updated_at=job['updated_at'].isoformat() if job.get('updated_at') else None, reviews_count=job['reviews_count'], total_reviews=job.get('total_reviews'), scrape_time=job['scrape_time'], error_message=job['error_message'], webhook_url=job.get('webhook_url'), business_name=business_name, business_category=business_category, review_topics=review_topics ) @app.get("/jobs/{job_id}/logs", summary="Get Job Logs") async def get_job_logs(job_id: UUID): """ Get the scraper logs for a job. Returns logs from both successful and failed jobs. Useful for debugging scraping issues. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") # Get scrape_logs from job scrape_logs = job.get('scrape_logs') # Parse if string (asyncpg might return JSONB as string) if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = None return { "job_id": str(job_id), "status": job['status'], "error_message": job.get('error_message'), "logs": scrape_logs or [], "log_count": len(scrape_logs) if scrape_logs else 0 } # ==================== SSE Streaming Endpoints ==================== def format_sse_event(event_type: str, data: dict, use_structured_format: bool = True) -> str: """ Format an SSE event message. Args: event_type: The SSE event type (e.g., 'log', 'metrics', 'status') data: The data payload use_structured_format: If True, wrap in {"type": ..., "data": ...} structure If False, use legacy format for backward compatibility Returns: Formatted SSE message string """ if use_structured_format: payload = {"type": event_type, "data": data} else: payload = data return f"event: {event_type}\ndata: {json.dumps(payload)}\n\n" def format_structured_log_event(log_entry: dict) -> str: """ Format a structured log entry as an SSE event. The log_entry should already be a dict from StructuredLogger.get_logs(). Returns: Formatted SSE message with type "log" """ return format_sse_event("log", log_entry, use_structured_format=True) def format_metrics_event(metrics: dict) -> str: """ Format a metrics event for SSE streaming. Args: metrics: Dict containing reviews_extracted, scroll_count, memory_mb, extraction_rate Returns: Formatted SSE message with type "metrics" """ return format_sse_event("metrics", metrics, use_structured_format=True) async def broadcast_job_update(job_id: str, event_type: str, data: dict): """Broadcast an update to all subscribers of a job stream and the all-jobs stream.""" message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" # Send to job-specific subscribers if job_id in job_update_queues: for queue in job_update_queues[job_id]: try: await queue.put(message) except: pass # Send to all-jobs subscribers for queue in job_update_queues.get("all", []): try: await queue.put(message) except: pass async def broadcast_structured_log(job_id: str, log_entry: dict): """Broadcast a structured log entry to job subscribers.""" message = format_structured_log_event(log_entry) if job_id in job_update_queues: for queue in job_update_queues[job_id]: try: await queue.put(message) except: pass async def broadcast_metrics(job_id: str, metrics: dict): """Broadcast metrics update to job subscribers.""" message = format_metrics_event(metrics) if job_id in job_update_queues: for queue in job_update_queues[job_id]: try: await queue.put(message) except: pass @app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)") async def stream_job_updates( job_id: UUID, format: str = Query("structured", description="Event format: 'structured' (new) or 'legacy' (backward compatible)") ): """ Server-Sent Events stream for real-time job updates. Event types (structured format): - init: Initial job state with all logs - log: Individual structured log entry {"type": "log", "data": {"timestamp": "...", "level": "INFO", ...}} - metrics: Periodic metrics {"type": "metrics", "data": {"reviews_extracted": 150, "scroll_count": 45, ...}} - status: Job status changes - complete: Job finished (completed/failed) Query parameters: - format: 'structured' (default) for new format, 'legacy' for backward compatibility Connect with EventSource in the browser: ```javascript const es = new EventSource('/jobs/{job_id}/stream'); es.addEventListener('log', (e) => { const event = JSON.parse(e.data); console.log('Log:', event.data); // Structured log entry }); es.addEventListener('metrics', (e) => { const event = JSON.parse(e.data); console.log('Metrics:', event.data); // {reviews_extracted, scroll_count, memory_mb, extraction_rate} }); ``` """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Verify job exists job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") job_id_str = str(job_id) use_structured = format.lower() != "legacy" # Create queue for this client queue: asyncio.Queue = asyncio.Queue() # Register subscriber if job_id_str not in job_update_queues: job_update_queues[job_id_str] = [] job_update_queues[job_id_str].append(queue) async def event_generator(): try: # Send initial state job_data = await db.get_job(job_id) scrape_logs = [] if job_data: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] initial = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": job_data.get('reviews_count'), "total_reviews": job_data.get('total_reviews'), "scrape_time": job_data.get('scrape_time'), "error_message": job_data.get('error_message'), "logs": scrape_logs or [] } yield f"event: init\ndata: {json.dumps(initial)}\n\n" # If job is already complete, send complete event and close if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']: yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n" return # Keep connection alive and send updates last_log_count = len(scrape_logs) if scrape_logs else 0 last_reviews_count = job_data.get('reviews_count') if job_data else 0 last_metrics_time = time.time() metrics_interval = 5.0 # Emit metrics every 5 seconds while True: try: # Wait for update with timeout (for keepalive) try: message = await asyncio.wait_for(queue.get(), timeout=2.0) yield message except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" # Also poll database for updates (backup in case broadcast missed) job_data = await db.get_job(job_id) if job_data: # Check for status change if job_data['status'] in ['completed', 'failed', 'cancelled']: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] final = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": job_data.get('reviews_count'), "total_reviews": job_data.get('total_reviews'), "scrape_time": job_data.get('scrape_time'), "error_message": job_data.get('error_message'), "logs": scrape_logs or [] } yield f"event: complete\ndata: {json.dumps(final)}\n\n" return # Check for new logs or progress scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] current_log_count = len(scrape_logs) if scrape_logs else 0 current_reviews = job_data.get('reviews_count') or 0 current_time = time.time() # Emit individual structured log events for new logs if use_structured and current_log_count > last_log_count: new_logs = scrape_logs[last_log_count:] if scrape_logs else [] for log_entry in new_logs: yield format_structured_log_event(log_entry) # Emit metrics event every 5 seconds during job execution if use_structured and job_data['status'] == 'running': if current_time - last_metrics_time >= metrics_interval: # Calculate extraction rate (reviews per second) elapsed = job_data.get('scrape_time') or 0 extraction_rate = (current_reviews / elapsed) if elapsed > 0 else 0 # Count scroll events from logs scroll_count = 0 if scrape_logs: for entry in scrape_logs: msg = entry.get('message', '') if isinstance(entry, dict) else str(entry) if 'scroll' in msg.lower(): scroll_count += 1 # Get memory from latest log entry with metrics memory_mb = 0 if scrape_logs: for entry in reversed(scrape_logs): if isinstance(entry, dict) and entry.get('metrics'): memory_mb = entry['metrics'].get('memory_mb', 0) break metrics_data = { "reviews_extracted": current_reviews, "scroll_count": scroll_count, "memory_mb": memory_mb, "extraction_rate": round(extraction_rate, 2) } yield format_metrics_event(metrics_data) last_metrics_time = current_time # Send legacy update event if reviews or logs changed if current_log_count > last_log_count or current_reviews != last_reviews_count: if not use_structured: # Legacy format: send all logs in update event update = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": current_reviews, "total_reviews": job_data.get('total_reviews'), "logs": scrape_logs or [] } yield f"event: update\ndata: {json.dumps(update)}\n\n" last_log_count = current_log_count last_reviews_count = current_reviews except Exception as e: log.error(f"Error in SSE stream for job {job_id}: {e}") break finally: # Unregister subscriber if job_id_str in job_update_queues: try: job_update_queues[job_id_str].remove(queue) if not job_update_queues[job_id_str]: del job_update_queues[job_id_str] except: pass return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } ) @app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)") async def stream_all_jobs(): """ Server-Sent Events stream for all job updates. Streams: - job_created: New job was created - job_updated: Job status/progress changed - job_completed: Job finished Connect with EventSource in the browser: ```javascript const es = new EventSource('/jobs/stream'); es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data))); ``` """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Create queue for this client queue: asyncio.Queue = asyncio.Queue() # Register subscriber to all-jobs stream job_update_queues["all"].append(queue) async def event_generator(): try: # Send initial jobs list jobs = await db.list_jobs(limit=100) jobs_data = [ { "job_id": str(j['job_id']), "status": j['status'], "url": j['url'], "created_at": j['created_at'].isoformat(), "completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None, "reviews_count": j.get('reviews_count'), "scrape_time": j.get('scrape_time'), "error_message": j.get('error_message') } for j in jobs ] yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n" # Keep connection alive and send updates while True: try: # Wait for update with timeout (for keepalive) try: message = await asyncio.wait_for(queue.get(), timeout=5.0) yield message except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" except Exception as e: log.error(f"Error in all-jobs SSE stream: {e}") break finally: # Unregister subscriber try: job_update_queues["all"].remove(queue) except: pass return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews") async def get_job_reviews(job_id: UUID): """ Get reviews data for a job. Returns reviews for completed, partial, or running jobs (if reviews have been collected). Returns 404 if job not found or no reviews available yet. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Get reviews (includes completed, running, and partial jobs) reviews = await db.get_job_reviews(job_id, include_partial=True) if reviews is None: job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") elif job['status'] == 'pending': raise HTTPException( status_code=400, detail="Job has not started yet" ) elif job['status'] == 'failed': raise HTTPException( status_code=400, detail=f"Job failed without saving any reviews: {job.get('error_message', 'Unknown error')}" ) else: raise HTTPException(status_code=404, detail="No reviews data available yet") return ReviewsResponse( job_id=str(job_id), reviews=reviews, count=len(reviews) ) @app.get("/jobs", response_model=List[JobResponse], summary="List Jobs") async def list_jobs( status: Optional[str] = Query(None, description="Filter by job status"), limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000), offset: int = Query(0, description="Number of jobs to skip", ge=0) ): """List all jobs, optionally filtered by status""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Validate status if provided job_status = None if status: try: job_status = JobStatus(status.lower()) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid status. Must be one of: {[s.value for s in JobStatus]}" ) jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset) result = [] for job in jobs: # Extract business info from metadata if available metadata = job.get('metadata') if isinstance(metadata, str): try: metadata = json.loads(metadata) except: metadata = None business_name = metadata.get('business_name') if metadata else None business_address = metadata.get('business_address') if metadata else None business_category = metadata.get('business_category') if metadata else None # Parse review_topics if it's a string review_topics = job.get('review_topics') if isinstance(review_topics, str): try: review_topics = json.loads(review_topics) except: review_topics = None result.append(JobResponse( job_id=str(job['job_id']), status=job['status'], url=job['url'], created_at=job['created_at'].isoformat(), completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None, reviews_count=job.get('reviews_count'), total_reviews=job.get('total_reviews'), scrape_time=job.get('scrape_time'), error_message=job.get('error_message'), business_name=business_name, business_address=business_address, business_category=business_category, review_topics=review_topics )) return result @app.delete("/jobs/{job_id}", summary="Delete Job") async def delete_job(job_id: UUID): """Delete a job from the system""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") deleted = await db.delete_job(job_id) if not deleted: raise HTTPException(status_code=404, detail="Job not found") return {"message": "Job deleted successfully"} @app.post("/check-reviews", summary="Check if Reviews Exist") async def check_reviews(request: ScrapeRequest): """ Get business card information from Google Maps. Returns business name, address, rating, and review count. Creates a fresh Chrome instance for reliable results (same as full scraper). This is used to show the business confirmation card in the UI. """ try: url = str(request.url) # Use the SAME scraper algorithm with validation_only=True for early return # Creates a fresh Chrome instance (same as full scraper) to avoid stale browser state # Pooled browsers can have cookies/state that cause Google to render pages differently # Build fingerprint dict from request fingerprint = None if request.browser_fingerprint: fp = request.browser_fingerprint fingerprint = { "userAgent": fp.userAgent, "timezone": fp.timezone, "language": fp.language, "platform": fp.platform, } if fp.viewport: fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height} if fp.geolocation: fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng} log.info(f"Creating Chrome with user fingerprint: {fp.platform}, {fp.timezone}") elif request.geolocation: fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}} log.info(f"Creating Chrome with geolocation only") else: log.info(f"Creating Chrome with default settings") result = await asyncio.to_thread( fast_scrape_reviews, url=url, headless=False, # Use Xvfb display validation_only=True, # Return early after getting total_reviews browser_fingerprint=fingerprint # Pass user's browser fingerprint ) # Extract validation info from the result validation_info = result.get('validation_info', {}) total_reviews = validation_info.get('total_reviews') or result.get('total_reviews') or 0 name = validation_info.get('name') rating = validation_info.get('rating') category = validation_info.get('category') address = validation_info.get('address') # Has reviews if we found a business with the Reviews tab (indicated by total_reviews > 0) has_reviews = bool(name and total_reviews > 0) return { "has_reviews": has_reviews, # True if business has reviews "total_reviews": total_reviews, "name": name, "address": address, "rating": rating, "category": category, "success": result.get('success', True), "error": result.get('error') } except Exception as e: log.error(f"Error checking reviews: {e}") return { "has_reviews": False, "review_count": 0, "success": False, "error": str(e) } @app.get("/stats", response_model=StatsResponse, summary="Get Statistics") async def get_stats(): """Get job statistics""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") stats = await db.get_stats() return StatsResponse(**stats) @app.get("/pool-stats", summary="Get Worker Pool Statistics") async def pool_stats(): """Get Chrome worker pool statistics""" return await asyncio.to_thread(get_pool_stats) # ==================== Crash Report Endpoints ==================== @app.get("/jobs/{job_id}/crash-report", response_model=CrashReportResponse, summary="Get Crash Report") async def get_crash_report(job_id: UUID): """ Get the crash report for a failed or partial job. Returns detailed crash analysis including: - Crash pattern identification (memory_exhaustion, rate_limited, etc.) - Confidence score for the pattern match - Suggested fixes and auto-fix parameters - Metrics history and logs before the crash """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Verify job exists job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") # Only failed or partial jobs have crash reports if job['status'] not in ['failed', 'partial']: raise HTTPException( status_code=400, detail=f"Job status is '{job['status']}' - crash reports only available for failed or partial jobs" ) # Get crash report from database crash_report = await db.get_crash_report(str(job_id)) if not crash_report: # No stored crash report - generate one from job data # Build crash report from job data for analysis scrape_logs = job.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] # Get metrics_history if available metrics_history = job.get('metrics_history') if isinstance(metrics_history, str): try: metrics_history = json.loads(metrics_history) except: metrics_history = [] crash_data = { 'error_message': job.get('error_message', 'Unknown error'), 'metrics_history': metrics_history or [], 'logs_before_crash': scrape_logs or [], 'state': { 'reviews_extracted': job.get('reviews_count', 0), 'total_reviews': job.get('total_reviews') } } # Analyze the crash analysis = analyze_crash(crash_data) # Build response from job data and analysis return CrashReportResponse( crash_id=str(job_id), # Use job_id as crash_id when no stored report job_id=str(job_id), crash_type=analysis.pattern, error_message=job.get('error_message'), analysis=CrashAnalysisModel( pattern=analysis.pattern, confidence=analysis.confidence, description=analysis.description, suggested_fix=analysis.suggested_fix, auto_fix_params=analysis.auto_fix_params ), metrics_history=metrics_history, logs_before_crash=scrape_logs, screenshot_url=None, created_at=job['completed_at'].isoformat() if job.get('completed_at') else job['created_at'].isoformat() ) # Parse JSONB fields if needed metrics_history = crash_report.get('metrics_history') if isinstance(metrics_history, str): try: metrics_history = json.loads(metrics_history) except: metrics_history = [] logs_before_crash = crash_report.get('logs_before_crash') if isinstance(logs_before_crash, str): try: logs_before_crash = json.loads(logs_before_crash) except: logs_before_crash = [] stored_analysis = crash_report.get('analysis') if isinstance(stored_analysis, str): try: stored_analysis = json.loads(stored_analysis) except: stored_analysis = None # If no analysis stored, generate one if not stored_analysis: crash_data = { 'error_message': crash_report.get('error_message', ''), 'metrics_history': metrics_history or [], 'logs_before_crash': logs_before_crash or [], 'crash_type': crash_report.get('crash_type'), 'state': crash_report.get('state', {}) } analysis = analyze_crash(crash_data) stored_analysis = { 'pattern': analysis.pattern, 'confidence': analysis.confidence, 'description': analysis.description, 'suggested_fix': analysis.suggested_fix, 'auto_fix_params': analysis.auto_fix_params } return CrashReportResponse( crash_id=crash_report['crash_id'], job_id=crash_report['job_id'], crash_type=crash_report['crash_type'], error_message=crash_report.get('error_message'), analysis=CrashAnalysisModel(**stored_analysis) if stored_analysis else None, metrics_history=metrics_history, logs_before_crash=logs_before_crash, screenshot_url=crash_report.get('screenshot_url'), created_at=crash_report['created_at'].isoformat() ) @app.post("/jobs/{job_id}/retry", response_model=RetryJobResponse, summary="Retry Failed Job") async def retry_job( job_id: UUID, apply_fix: bool = Query(False, description="Apply auto-fix parameters based on crash analysis") ): """ Retry a failed or partial job, optionally applying auto-fix parameters. When apply_fix=true: - Analyzes the crash pattern from the original job - Applies recommended parameter adjustments (e.g., reduced batch size for memory issues) - Creates a new job with the adjusted parameters Returns the new job ID for tracking. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Get original job original_job = await db.get_job(job_id) if not original_job: raise HTTPException(status_code=404, detail="Job not found") # Can only retry failed or partial jobs if original_job['status'] not in ['failed', 'partial']: raise HTTPException( status_code=400, detail=f"Cannot retry job with status '{original_job['status']}' - only failed or partial jobs can be retried" ) # Parse original metadata original_metadata = original_job.get('metadata') if isinstance(original_metadata, str): try: original_metadata = json.loads(original_metadata) except: original_metadata = {} original_metadata = original_metadata or {} applied_fixes = None if apply_fix: # Get crash analysis to determine fixes scrape_logs = original_job.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] metrics_history = original_job.get('metrics_history') if isinstance(metrics_history, str): try: metrics_history = json.loads(metrics_history) except: metrics_history = [] crash_data = { 'error_message': original_job.get('error_message', 'Unknown error'), 'metrics_history': metrics_history or [], 'logs_before_crash': scrape_logs or [], 'state': { 'reviews_extracted': original_job.get('reviews_count', 0), 'total_reviews': original_job.get('total_reviews') } } analysis = analyze_crash(crash_data) if analysis.auto_fix_params: # Get current scraper params from metadata or use defaults current_params = original_metadata.get('scraper_params', {}) # Apply the auto-fix parameters fixed_params = apply_auto_fix(analysis.pattern, current_params) # Store applied fixes in metadata original_metadata['scraper_params'] = fixed_params original_metadata['retry_info'] = { 'original_job_id': str(job_id), 'crash_pattern': analysis.pattern, 'applied_fixes': analysis.auto_fix_params } applied_fixes = analysis.auto_fix_params log.info(f"Applying auto-fix for pattern '{analysis.pattern}': {applied_fixes}") # Create new job with same URL and (possibly modified) metadata new_job_id = await db.create_job( url=original_job['url'], webhook_url=original_job.get('webhook_url'), webhook_secret=original_job.get('webhook_secret'), metadata=original_metadata ) # Start the new scraping job asyncio.create_task(run_scraping_job(new_job_id)) log.info(f"Created retry job {new_job_id} for original job {job_id}") return RetryJobResponse( job_id=str(new_job_id), status="started", message=f"Retry job created from original job {job_id}", applied_fixes=applied_fixes ) @app.get("/crashes/stats", response_model=CrashStatsResponse, summary="Get Crash Statistics") async def get_crash_stats( days: int = Query(7, description="Number of days to look back", ge=1, le=90) ): """ Get aggregate crash statistics and pattern analysis. Analyzes all crash reports from the specified time period to identify: - Most common crash patterns - Confidence scores for pattern detection - Recommended fixes based on recurring patterns Use this to identify systemic issues and optimize scraper configuration. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Get basic crash stats from database basic_stats = await db.get_crash_stats(days=days) # Get all failed/partial jobs for deeper analysis failed_jobs = await db.list_jobs(status=JobStatus.FAILED, limit=500) partial_jobs = await db.list_jobs(status=JobStatus.PARTIAL, limit=500) all_crash_jobs = failed_jobs + partial_jobs # Filter by time if needed (list_jobs doesn't have date filter) cutoff = datetime.now() - timedelta(days=days) recent_crash_jobs = [ job for job in all_crash_jobs if job.get('completed_at') and job['completed_at'] > cutoff ] if not recent_crash_jobs: return CrashStatsResponse( total_crashes=0, patterns={}, most_common=None, recommendations=[] ) # Build crash reports for analysis crash_reports = [] for job in recent_crash_jobs: scrape_logs = job.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] crash_reports.append({ 'error_message': job.get('error_message', ''), 'metrics_history': [], # Not stored in job list query 'logs_before_crash': scrape_logs or [], 'state': { 'reviews_extracted': job.get('reviews_count', 0), 'total_reviews': job.get('total_reviews') } }) # Use summarize_crash_patterns for deep analysis summary = summarize_crash_patterns(crash_reports) # Convert patterns to response model format patterns_response = {} for pattern_name, stats in summary.get('patterns', {}).items(): patterns_response[pattern_name] = CrashPatternStats( count=stats['count'], percentage=stats['percentage'], avg_confidence=stats['avg_confidence'] ) return CrashStatsResponse( total_crashes=summary.get('total_crashes', 0), patterns=patterns_response, most_common=summary.get('most_common'), recommendations=summary.get('recommendations', []) ) # ==================== Health Check Endpoints ==================== @app.get("/health/live", summary="Liveness Probe") async def liveness(): """ Liveness check: Is the server alive? Use this for Kubernetes liveness probe - restart container if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") return await health_system.check_liveness() @app.get("/health/ready", summary="Readiness Probe") async def readiness(): """ Readiness check: Can the server handle traffic? Use this for Kubernetes readiness probe - remove from load balancer if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") result = await health_system.check_readiness() if result["status"] != "ready": return JSONResponse(status_code=503, content=result) return result @app.get("/health/canary", summary="Canary Health Check") async def canary(): """ Canary check: Does scraping actually work? Returns the latest canary test result (runs every 4 hours in background). Use this for external monitoring (PagerDuty, DataDog) - alerts if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") result = await health_system.check_canary() if result["status"] not in ["healthy", "unknown"]: return JSONResponse(status_code=503, content=result) return result @app.get("/health/detailed", summary="Detailed Health Status") async def detailed_health(): """Get detailed health status of all components""" if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") return await health_system.get_detailed_health() # ==================== Background Job Runner ==================== async def run_scraping_job(job_id: UUID): """ Run scraping job in background with concurrency limit. Args: job_id: Job UUID """ job_id_str = str(job_id) async with job_semaphore: # Limit concurrent Chrome instances try: # Update status to running await db.update_job_status(job_id, JobStatus.RUNNING) log.info(f"Starting scraping job {job_id}") # Get job details job = await db.get_job(job_id) url = job['url'] # Extract browser fingerprint from metadata if available browser_fingerprint = None metadata = job.get('metadata') if isinstance(metadata, str): try: metadata = json.loads(metadata) except: metadata = None if metadata and 'browser_fingerprint' in metadata: browser_fingerprint = metadata['browser_fingerprint'] log.info(f"Using user fingerprint: {browser_fingerprint.get('platform')}, {browser_fingerprint.get('timezone')}") elif metadata and 'geolocation' in metadata: browser_fingerprint = {'geolocation': metadata['geolocation']} log.info(f"Using user geolocation only") # Broadcast job started via SSE await broadcast_job_update(job_id_str, "job_started", { "job_id": job_id_str, "status": "running", "url": url }) # Get the event loop for progress updates from worker thread loop = asyncio.get_running_loop() # Create log capture instance that we can access for real-time logs log_capture = LogCapture() # Track total reviews for incremental saves total_reviews_seen = [None] # Accumulate all reviews for incremental saves (flush_callback receives batches) all_reviews_collected = [] # Track last broadcasted log count for streaming new logs last_broadcasted_log_count = [0] # Track last metrics broadcast time last_metrics_broadcast_time = [time.time()] metrics_broadcast_interval = 5.0 # Emit metrics every 5 seconds # Progress callback to update job status with current/total counts AND logs def progress_callback(current_count: int, total_count: int): """Update job progress and logs from worker thread""" if total_count: total_reviews_seen[0] = total_count async def update(): # Get current logs from the shared log_capture current_logs = log_capture.get_logs() await db.update_job_status( job_id, JobStatus.RUNNING, reviews_count=current_count, total_reviews=total_count, scrape_logs=current_logs ) # Broadcast individual structured log events for new logs current_log_count = len(current_logs) if current_log_count > last_broadcasted_log_count[0]: new_logs = current_logs[last_broadcasted_log_count[0]:] for log_entry in new_logs: await broadcast_structured_log(job_id_str, log_entry) last_broadcasted_log_count[0] = current_log_count # Broadcast metrics event every 5 seconds current_time = time.time() if current_time - last_metrics_broadcast_time[0] >= metrics_broadcast_interval: # Calculate extraction rate elapsed = current_time - last_metrics_broadcast_time[0] extraction_rate = (current_count / elapsed) if elapsed > 0 else 0 # Count scroll events from logs scroll_count = 0 for entry in current_logs: msg = entry.get('message', '') if isinstance(entry, dict) else str(entry) if 'scroll' in msg.lower(): scroll_count += 1 # Get memory from latest log entry with metrics memory_mb = 0 for entry in reversed(current_logs): if isinstance(entry, dict) and entry.get('metrics'): memory_mb = entry['metrics'].get('memory_mb', 0) break metrics_data = { "reviews_extracted": current_count, "scroll_count": scroll_count, "memory_mb": memory_mb, "extraction_rate": round(extraction_rate, 2) } await broadcast_metrics(job_id_str, metrics_data) last_metrics_broadcast_time[0] = current_time # Broadcast progress via SSE (legacy format for backward compatibility) await broadcast_job_update(job_id_str, "job_progress", { "job_id": job_id_str, "status": "running", "reviews_count": current_count, "total_reviews": total_count, "logs": current_logs }) # Schedule the coroutine on the event loop asyncio.run_coroutine_threadsafe(update(), loop) # Flush callback to save reviews incrementally (crash recovery) # Note: flush_callback receives batches, so we accumulate them def flush_callback(reviews_batch: list): """Accumulate and save reviews to DB incrementally from worker thread""" # Extend our collection with the new batch all_reviews_collected.extend(reviews_batch) async def save(): await db.save_reviews_incremental( job_id=job_id, reviews=all_reviews_collected, # Save ALL reviews so far total_reviews=total_reviews_seen[0] ) # Schedule the coroutine on the event loop asyncio.run_coroutine_threadsafe(save(), loop) # Run scraping with progress callback and shared log capture # headless=False because Docker uses Xvfb virtual display result = await asyncio.to_thread( fast_scrape_reviews, url=url, headless=False, progress_callback=progress_callback, log_capture=log_capture, flush_callback=flush_callback, browser_fingerprint=browser_fingerprint # Pass user's browser fingerprint ) if result['success']: # Save session fingerprint if captured if result.get('session_fingerprint'): await db.update_session_fingerprint(job_id, result['session_fingerprint']) log.info(f"Saved session fingerprint for job {job_id}") # Save results to database (including scraper logs and review topics) await db.save_job_result( job_id=job_id, reviews=result['reviews'], scrape_time=result['time'], total_reviews=result.get('total_reviews'), scrape_logs=result.get('logs'), review_topics=result.get('review_topics') ) log.info( f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s" ) # Broadcast job completed via SSE await broadcast_job_update(job_id_str, "job_completed", { "job_id": job_id_str, "status": "completed", "reviews_count": result['count'], "total_reviews": result.get('total_reviews'), "scrape_time": result['time'], "logs": result.get('logs', []) }) # Send webhook if configured if job.get('webhook_url'): webhook_manager = WebhookManager() api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000') await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='completed', reviews_count=result['count'], scrape_time=result['time'], reviews_url=f"{api_base_url}/jobs/{job_id}/reviews", secret=job.get('webhook_secret'), db=db ) else: # Save session fingerprint even on failure (useful for debugging bot detection) if result.get('session_fingerprint'): await db.update_session_fingerprint(job_id, result['session_fingerprint']) log.info(f"Saved session fingerprint for failed job {job_id}") # Job failed - check if we have partial reviews saved current_job = await db.get_job(job_id) partial_count = current_job.get('reviews_count', 0) if current_job else 0 if partial_count > 0: # Mark as partial - we have some reviews saved await db.mark_job_partial( job_id, error_message=result.get('error', 'Unknown error'), scrape_logs=result.get('logs') ) log.warning(f"Partial job {job_id}: {partial_count} reviews saved before error: {result.get('error')}") # Broadcast job partial via SSE await broadcast_job_update(job_id_str, "job_partial", { "job_id": job_id_str, "status": "partial", "reviews_count": partial_count, "total_reviews": current_job.get('total_reviews'), "error_message": result.get('error'), "logs": result.get('logs', []) }) # Send partial webhook if configured if job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='partial', reviews_count=partial_count, error_message=result.get('error'), secret=job.get('webhook_secret'), db=db ) else: # No reviews saved - mark as failed await db.update_job_status( job_id, JobStatus.FAILED, error_message=result.get('error', 'Unknown error'), scrape_logs=result.get('logs') ) log.error(f"Failed job {job_id}: {result.get('error')}") # Broadcast job failed via SSE await broadcast_job_update(job_id_str, "job_failed", { "job_id": job_id_str, "status": "failed", "error_message": result.get('error'), "logs": result.get('logs', []) }) # Send failure webhook if configured if job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='failed', error_message=result.get('error'), secret=job.get('webhook_secret'), db=db ) except Exception as e: log.error(f"Error in scraping job {job_id}: {e}") import traceback traceback.print_exc() # Check if we have partial reviews saved current_job = await db.get_job(job_id) partial_count = current_job.get('reviews_count', 0) if current_job else 0 if partial_count > 0: # Mark as partial - we have some reviews saved await db.mark_job_partial( job_id, error_message=str(e), scrape_logs=log_capture.get_logs() if log_capture else None ) log.warning(f"Partial job {job_id}: {partial_count} reviews saved before exception: {e}") # Broadcast job partial via SSE await broadcast_job_update(job_id_str, "job_partial", { "job_id": job_id_str, "status": "partial", "reviews_count": partial_count, "total_reviews": current_job.get('total_reviews'), "error_message": str(e), "logs": log_capture.get_logs() if log_capture else [] }) # Send partial webhook if current_job and current_job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=current_job['webhook_url'], job_id=job_id, status='partial', reviews_count=partial_count, error_message=str(e), secret=current_job.get('webhook_secret'), db=db ) else: # No reviews saved - mark as failed await db.update_job_status( job_id, JobStatus.FAILED, error_message=str(e) ) # Broadcast job failed via SSE await broadcast_job_update(job_id_str, "job_failed", { "job_id": job_id_str, "status": "failed", "error_message": str(e), "logs": [] }) # Send failure webhook if current_job and current_job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=current_job['webhook_url'], job_id=job_id, status='failed', error_message=str(e), secret=current_job.get('webhook_secret'), db=db ) if __name__ == "__main__": import uvicorn port = int(os.getenv('PORT', 8000)) log.info(f"Starting production server on port {port}...") uvicorn.run( "api_server_production:app", host="0.0.0.0", port=port, reload=False, # Disable reload in production log_level="info" )