#!/usr/bin/env python3 """ Production Google Reviews Scraper API Server with Phase 1 features: - PostgreSQL storage with JSONB - Webhook delivery with retries - Smart health checks with canary testing """ import asyncio import json import logging import os from contextlib import asynccontextmanager from typing import Optional, List, Dict, Any from uuid import UUID from fastapi import FastAPI, HTTPException, Query, Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl, Field from fastapi.responses import JSONResponse, StreamingResponse from modules.database import DatabaseManager, JobStatus from modules.webhooks import WebhookDispatcher, WebhookManager from modules.health_checks import HealthCheckSystem from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper from modules.chrome_pool import ( start_worker_pools, stop_worker_pools, get_validation_worker, release_validation_worker, get_scraping_worker, release_scraping_worker, get_pool_stats ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) log = logging.getLogger("api_server") # Global instances db: Optional[DatabaseManager] = None webhook_dispatcher: Optional[WebhookDispatcher] = None health_system: Optional[HealthCheckSystem] = None # Concurrent job limiter (prevent too many Chrome instances) MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5')) job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS) # SSE: Store for broadcasting job updates to connected clients # Format: {job_id: [asyncio.Queue, ...]} for job-specific streams # Format: {"all": [asyncio.Queue, ...]} for all-jobs stream job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []} @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown""" global db, webhook_dispatcher, health_system # Startup log.info("Starting Google Reviews Scraper API Server (Production)") # Get database URL from environment database_url = os.getenv( 'DATABASE_URL', 'postgresql://scraper:scraper@localhost:5432/scraper' ) # Initialize database db = DatabaseManager(database_url) await db.connect() await db.initialize_schema() log.info("Database initialized") # Initialize health check system with canary monitoring # DISABLED: Canary tests consume Google Maps requests and trigger rate limiting # health_system = HealthCheckSystem(db) # await health_system.start() log.info("Health check system DISABLED (canary tests disabled to avoid rate limiting)") # Initialize webhook dispatcher webhook_dispatcher = WebhookDispatcher(db, interval_seconds=30) asyncio.create_task(webhook_dispatcher.start()) log.info("Webhook dispatcher started") # Start Chrome worker pools (1 for validation, 2 for scraping) # These pre-warm Chrome instances for instant availability # headless=False because Docker uses Xvfb virtual display for better compatibility await asyncio.to_thread( start_worker_pools, validation_size=1, scraping_size=2, headless=False ) log.info("Chrome worker pools started (1 validation + 2 scraping)") yield # Shutdown log.info("Shutting down Google Reviews Scraper API Server") if webhook_dispatcher: webhook_dispatcher.stop() # if health_system: # health_system.stop() # Stop worker pools await asyncio.to_thread(stop_worker_pools) log.info("Chrome worker pools stopped") if db: await db.disconnect() # Initialize FastAPI app app = FastAPI( title="Google Reviews Scraper API - Production", description="Production-ready REST API for Google Maps review scraping with webhooks and health monitoring", version="2.0.0", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ==================== Request/Response Models ==================== class ScrapeRequest(BaseModel): """Request model for starting a scrape job""" url: HttpUrl = Field(..., description="Google Maps URL to scrape") webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications") webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature") metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata") class JobResponse(BaseModel): """Response model for job information""" job_id: str status: str url: str created_at: str started_at: Optional[str] = None completed_at: Optional[str] = None reviews_count: Optional[int] = None total_reviews: Optional[int] = None # Total reviews available for this place scrape_time: Optional[float] = None error_message: Optional[str] = None webhook_url: Optional[str] = None # Business metadata business_name: Optional[str] = None business_address: Optional[str] = None class ReviewsResponse(BaseModel): """Response model for reviews data""" job_id: str reviews: List[Dict[str, Any]] count: int class StatsResponse(BaseModel): """Response model for statistics""" total_jobs: int pending: int running: int completed: int failed: int cancelled: int avg_scrape_time: Optional[float] = None total_reviews: Optional[int] = None # ==================== API Endpoints ==================== @app.get("/", summary="API Health Check") async def root(): """Basic health check endpoint""" return { "message": "Google Reviews Scraper API (Production)", "status": "healthy", "version": "2.0.0", "features": ["postgresql", "webhooks", "canary-testing"] } @app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job") async def start_scrape(request: ScrapeRequest): """ Start a new scraping job. The job runs asynchronously in the background. You can: - Poll GET /jobs/{job_id} for status - Provide webhook_url for automatic notification when complete Returns the job ID for tracking. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") try: # Create job in database job_id = await db.create_job( url=str(request.url), webhook_url=str(request.webhook_url) if request.webhook_url else None, webhook_secret=request.webhook_secret, metadata=request.metadata ) # Start scraping job in background asyncio.create_task(run_scraping_job(job_id)) log.info(f"Created and started job {job_id}") return { "job_id": str(job_id), "status": "started", "message": "Scraping job started successfully" } except Exception as e: log.error(f"Error creating scraping job: {e}") raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}") @app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status") async def get_job(job_id: UUID): """Get detailed information about a specific job""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return JobResponse( job_id=str(job['job_id']), status=job['status'], url=job['url'], created_at=job['created_at'].isoformat(), started_at=job['started_at'].isoformat() if job['started_at'] else None, completed_at=job['completed_at'].isoformat() if job['completed_at'] else None, reviews_count=job['reviews_count'], total_reviews=job.get('total_reviews'), scrape_time=job['scrape_time'], error_message=job['error_message'], webhook_url=job.get('webhook_url') ) @app.get("/jobs/{job_id}/logs", summary="Get Job Logs") async def get_job_logs(job_id: UUID): """ Get the scraper logs for a job. Returns logs from both successful and failed jobs. Useful for debugging scraping issues. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") # Get scrape_logs from job scrape_logs = job.get('scrape_logs') # Parse if string (asyncpg might return JSONB as string) if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = None return { "job_id": str(job_id), "status": job['status'], "error_message": job.get('error_message'), "logs": scrape_logs or [], "log_count": len(scrape_logs) if scrape_logs else 0 } # ==================== SSE Streaming Endpoints ==================== async def broadcast_job_update(job_id: str, event_type: str, data: dict): """Broadcast an update to all subscribers of a job stream and the all-jobs stream.""" message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" # Send to job-specific subscribers if job_id in job_update_queues: for queue in job_update_queues[job_id]: try: await queue.put(message) except: pass # Send to all-jobs subscribers for queue in job_update_queues.get("all", []): try: await queue.put(message) except: pass @app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)") async def stream_job_updates(job_id: UUID): """ Server-Sent Events stream for real-time job updates. Streams: - status: Job status changes - progress: Review count and progress updates - logs: New log entries - complete: Job finished (completed/failed) Connect with EventSource in the browser: ```javascript const es = new EventSource('/jobs/{job_id}/stream'); es.onmessage = (e) => console.log(JSON.parse(e.data)); es.addEventListener('logs', (e) => console.log('Logs:', JSON.parse(e.data))); ``` """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Verify job exists job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") job_id_str = str(job_id) # Create queue for this client queue: asyncio.Queue = asyncio.Queue() # Register subscriber if job_id_str not in job_update_queues: job_update_queues[job_id_str] = [] job_update_queues[job_id_str].append(queue) async def event_generator(): try: # Send initial state job_data = await db.get_job(job_id) if job_data: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] initial = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": job_data.get('reviews_count'), "total_reviews": job_data.get('total_reviews'), "scrape_time": job_data.get('scrape_time'), "error_message": job_data.get('error_message'), "logs": scrape_logs or [] } yield f"event: init\ndata: {json.dumps(initial)}\n\n" # If job is already complete, send complete event and close if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']: yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n" return # Keep connection alive and send updates last_log_count = len(scrape_logs) if scrape_logs else 0 last_reviews_count = job_data.get('reviews_count') if job_data else 0 while True: try: # Wait for update with timeout (for keepalive) try: message = await asyncio.wait_for(queue.get(), timeout=2.0) yield message except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" # Also poll database for updates (backup in case broadcast missed) job_data = await db.get_job(job_id) if job_data: # Check for status change if job_data['status'] in ['completed', 'failed', 'cancelled']: scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] final = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": job_data.get('reviews_count'), "total_reviews": job_data.get('total_reviews'), "scrape_time": job_data.get('scrape_time'), "error_message": job_data.get('error_message'), "logs": scrape_logs or [] } yield f"event: complete\ndata: {json.dumps(final)}\n\n" return # Check for new logs or progress scrape_logs = job_data.get('scrape_logs') if isinstance(scrape_logs, str): try: scrape_logs = json.loads(scrape_logs) except: scrape_logs = [] current_log_count = len(scrape_logs) if scrape_logs else 0 current_reviews = job_data.get('reviews_count') or 0 if current_log_count > last_log_count or current_reviews != last_reviews_count: update = { "job_id": job_id_str, "status": job_data['status'], "reviews_count": current_reviews, "total_reviews": job_data.get('total_reviews'), "logs": scrape_logs or [] } yield f"event: update\ndata: {json.dumps(update)}\n\n" last_log_count = current_log_count last_reviews_count = current_reviews except Exception as e: log.error(f"Error in SSE stream for job {job_id}: {e}") break finally: # Unregister subscriber if job_id_str in job_update_queues: try: job_update_queues[job_id_str].remove(queue) if not job_update_queues[job_id_str]: del job_update_queues[job_id_str] except: pass return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } ) @app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)") async def stream_all_jobs(): """ Server-Sent Events stream for all job updates. Streams: - job_created: New job was created - job_updated: Job status/progress changed - job_completed: Job finished Connect with EventSource in the browser: ```javascript const es = new EventSource('/jobs/stream'); es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data))); ``` """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Create queue for this client queue: asyncio.Queue = asyncio.Queue() # Register subscriber to all-jobs stream job_update_queues["all"].append(queue) async def event_generator(): try: # Send initial jobs list jobs = await db.list_jobs(limit=100) jobs_data = [ { "job_id": str(j['job_id']), "status": j['status'], "url": j['url'], "created_at": j['created_at'].isoformat(), "completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None, "reviews_count": j.get('reviews_count'), "scrape_time": j.get('scrape_time'), "error_message": j.get('error_message') } for j in jobs ] yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n" # Keep connection alive and send updates while True: try: # Wait for update with timeout (for keepalive) try: message = await asyncio.wait_for(queue.get(), timeout=5.0) yield message except asyncio.TimeoutError: # Send keepalive comment yield ": keepalive\n\n" except Exception as e: log.error(f"Error in all-jobs SSE stream: {e}") break finally: # Unregister subscriber try: job_update_queues["all"].remove(queue) except: pass return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews") async def get_job_reviews(job_id: UUID): """ Get the actual reviews data for a completed job. Returns 404 if job not found or not completed yet. """ if not db: raise HTTPException(status_code=500, detail="Database not initialized") reviews = await db.get_job_reviews(job_id) if reviews is None: job = await db.get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") elif job['status'] != 'completed': raise HTTPException( status_code=400, detail=f"Job not completed yet (current status: {job['status']})" ) else: raise HTTPException(status_code=404, detail="Reviews data not available") return ReviewsResponse( job_id=str(job_id), reviews=reviews, count=len(reviews) ) @app.get("/jobs", response_model=List[JobResponse], summary="List Jobs") async def list_jobs( status: Optional[str] = Query(None, description="Filter by job status"), limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000), offset: int = Query(0, description="Number of jobs to skip", ge=0) ): """List all jobs, optionally filtered by status""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") # Validate status if provided job_status = None if status: try: job_status = JobStatus(status.lower()) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid status. Must be one of: {[s.value for s in JobStatus]}" ) jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset) result = [] for job in jobs: # Extract business info from metadata if available metadata = job.get('metadata') if isinstance(metadata, str): try: metadata = json.loads(metadata) except: metadata = None business_name = metadata.get('business_name') if metadata else None business_address = metadata.get('business_address') if metadata else None result.append(JobResponse( job_id=str(job['job_id']), status=job['status'], url=job['url'], created_at=job['created_at'].isoformat(), completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None, reviews_count=job.get('reviews_count'), total_reviews=job.get('total_reviews'), scrape_time=job.get('scrape_time'), error_message=job.get('error_message'), business_name=business_name, business_address=business_address )) return result @app.delete("/jobs/{job_id}", summary="Delete Job") async def delete_job(job_id: UUID): """Delete a job from the system""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") deleted = await db.delete_job(job_id) if not deleted: raise HTTPException(status_code=404, detail="Job not found") return {"message": "Job deleted successfully"} @app.post("/check-reviews", summary="Check if Reviews Exist") async def check_reviews(request: ScrapeRequest): """ Get business card information from Google Maps. Returns business name, address, rating, and review count. Uses pre-warmed Chrome worker from pool for instant response. This is used to show the business confirmation card in the UI. """ worker = None recycle_worker = False try: url = str(request.url) # Get pre-warmed worker from validation pool worker = await asyncio.to_thread(get_validation_worker, timeout=10) if worker: log.info(f"Using worker {worker.worker_id} for business card extraction") # Use the pooled worker (don't close it) result = await asyncio.to_thread( get_business_card_info, url=url, driver=worker.driver, return_driver=True ) # Check if the result indicates a session error if not result['success'] and result.get('error'): error_msg = result.get('error', '').lower() if 'invalid session' in error_msg or 'session' in error_msg: log.warning(f"Worker {worker.worker_id} has invalid session, will recycle") recycle_worker = True else: # Fallback: create temporary worker log.warning("No pooled worker available, creating temporary instance") result = await asyncio.to_thread( get_business_card_info, url=url ) # SIMPLIFIED VALIDATION: If we found a business (name + rating), assume it has reviews # Let the actual scraper determine if reviews exist has_business = bool(result.get('name') and result.get('rating')) return { "has_reviews": has_business, # Boolean: true if business exists "total_reviews": result.get('total_reviews') or 0, # Show 0 if unknown "name": result.get('name'), "address": result.get('address'), "rating": result.get('rating'), "success": result['success'], "error": result.get('error') } except Exception as e: log.error(f"Error checking reviews: {e}") # If it's a session error, recycle the worker if worker: error_msg = str(e).lower() if 'invalid session' in error_msg or 'session' in error_msg: recycle_worker = True return { "has_reviews": False, "review_count": 0, "success": False, "error": str(e) } finally: # Release worker back to pool (or recycle if broken) if worker: await asyncio.to_thread(release_validation_worker, worker, recycle=recycle_worker) @app.get("/stats", response_model=StatsResponse, summary="Get Statistics") async def get_stats(): """Get job statistics""" if not db: raise HTTPException(status_code=500, detail="Database not initialized") stats = await db.get_stats() return StatsResponse(**stats) @app.get("/pool-stats", summary="Get Worker Pool Statistics") async def pool_stats(): """Get Chrome worker pool statistics""" return await asyncio.to_thread(get_pool_stats) # ==================== Health Check Endpoints ==================== @app.get("/health/live", summary="Liveness Probe") async def liveness(): """ Liveness check: Is the server alive? Use this for Kubernetes liveness probe - restart container if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") return await health_system.check_liveness() @app.get("/health/ready", summary="Readiness Probe") async def readiness(): """ Readiness check: Can the server handle traffic? Use this for Kubernetes readiness probe - remove from load balancer if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") result = await health_system.check_readiness() if result["status"] != "ready": return JSONResponse(status_code=503, content=result) return result @app.get("/health/canary", summary="Canary Health Check") async def canary(): """ Canary check: Does scraping actually work? Returns the latest canary test result (runs every 4 hours in background). Use this for external monitoring (PagerDuty, DataDog) - alerts if fails. """ if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") result = await health_system.check_canary() if result["status"] not in ["healthy", "unknown"]: return JSONResponse(status_code=503, content=result) return result @app.get("/health/detailed", summary="Detailed Health Status") async def detailed_health(): """Get detailed health status of all components""" if not health_system: raise HTTPException(status_code=503, detail="Health system not initialized") return await health_system.get_detailed_health() # ==================== Background Job Runner ==================== async def run_scraping_job(job_id: UUID): """ Run scraping job in background with concurrency limit. Args: job_id: Job UUID """ job_id_str = str(job_id) async with job_semaphore: # Limit concurrent Chrome instances try: # Update status to running await db.update_job_status(job_id, JobStatus.RUNNING) log.info(f"Starting scraping job {job_id}") # Get job details job = await db.get_job(job_id) url = job['url'] # Broadcast job started via SSE await broadcast_job_update(job_id_str, "job_started", { "job_id": job_id_str, "status": "running", "url": url }) # Get the event loop for progress updates from worker thread loop = asyncio.get_running_loop() # Create log capture instance that we can access for real-time logs log_capture = LogCapture() # Progress callback to update job status with current/total counts AND logs def progress_callback(current_count: int, total_count: int): """Update job progress and logs from worker thread""" async def update(): # Get current logs from the shared log_capture current_logs = log_capture.get_logs() await db.update_job_status( job_id, JobStatus.RUNNING, reviews_count=current_count, total_reviews=total_count, scrape_logs=current_logs ) # Broadcast progress via SSE await broadcast_job_update(job_id_str, "job_progress", { "job_id": job_id_str, "status": "running", "reviews_count": current_count, "total_reviews": total_count, "logs": current_logs }) # Schedule the coroutine on the event loop asyncio.run_coroutine_threadsafe(update(), loop) # Run scraping with progress callback and shared log capture # headless=False because Docker uses Xvfb virtual display result = await asyncio.to_thread( fast_scrape_reviews, url=url, headless=False, progress_callback=progress_callback, log_capture=log_capture ) if result['success']: # Save results to database (including scraper logs) await db.save_job_result( job_id=job_id, reviews=result['reviews'], scrape_time=result['time'], total_reviews=result.get('total_reviews'), scrape_logs=result.get('logs') ) log.info( f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s" ) # Broadcast job completed via SSE await broadcast_job_update(job_id_str, "job_completed", { "job_id": job_id_str, "status": "completed", "reviews_count": result['count'], "total_reviews": result.get('total_reviews'), "scrape_time": result['time'], "logs": result.get('logs', []) }) # Send webhook if configured if job.get('webhook_url'): webhook_manager = WebhookManager() api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000') await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='completed', reviews_count=result['count'], scrape_time=result['time'], reviews_url=f"{api_base_url}/jobs/{job_id}/reviews", secret=job.get('webhook_secret'), db=db ) else: # Job failed - save logs for debugging await db.update_job_status( job_id, JobStatus.FAILED, error_message=result.get('error', 'Unknown error'), scrape_logs=result.get('logs') ) log.error(f"Failed job {job_id}: {result.get('error')}") # Broadcast job failed via SSE await broadcast_job_update(job_id_str, "job_failed", { "job_id": job_id_str, "status": "failed", "error_message": result.get('error'), "logs": result.get('logs', []) }) # Send failure webhook if configured if job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='failed', error_message=result.get('error'), secret=job.get('webhook_secret'), db=db ) except Exception as e: log.error(f"Error in scraping job {job_id}: {e}") import traceback traceback.print_exc() await db.update_job_status( job_id, JobStatus.FAILED, error_message=str(e) ) # Broadcast job failed via SSE await broadcast_job_update(job_id_str, "job_failed", { "job_id": job_id_str, "status": "failed", "error_message": str(e), "logs": [] }) # Send failure webhook job = await db.get_job(job_id) if job and job.get('webhook_url'): webhook_manager = WebhookManager() await webhook_manager.send_job_completed_webhook( webhook_url=job['webhook_url'], job_id=job_id, status='failed', error_message=str(e), secret=job.get('webhook_secret'), db=db ) if __name__ == "__main__": import uvicorn port = int(os.getenv('PORT', 8000)) log.info(f"Starting production server on port {port}...") uvicorn.run( "api_server_production:app", host="0.0.0.0", port=port, reload=False, # Disable reload in production log_level="info" )