Files
whyrating-engine-legacy/api_server.py
Alejandro Gutiérrez faa0704737 Optimize scraper performance and add fallback selectors for robustness
Performance improvements:
- Validation speed: 59.71s → 10.96s (5.5x improvement)
- Removed 50+ console.log statements from JavaScript extraction
- Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting
- Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls)

Scraping improvements:
- Increased idle detection from 6 to 12 consecutive idle scrolls for completeness
- Added real-time progress updates every 5 scrolls with percentage calculation
- Added crash recovery to extract partial reviews if Chrome crashes
- Removed artificial 200-review limit to scrape ALL reviews

Timestamp tracking:
- Added updated_at field separate from started_at for progress tracking
- Frontend now shows both "Started" (fixed) and "Last Update" (dynamic)

Robustness improvements:
- Added 5 fallback CSS selectors to handle different Google Maps page structures
- Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc.
- Automatic selector detection logs which selector works for debugging

Test results:
- Successfully scraped 550 reviews in 150.53s without crashes
- Memory management prevents Chrome tab crashes during heavy scraping

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-18 19:49:24 +00:00

383 lines
13 KiB
Python

#!/usr/bin/env python3
"""
FastAPI server for Google Reviews Scraper.
Provides REST API endpoints to trigger and manage scraping jobs.
"""
import logging
import asyncio
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl, Field
from modules.job_manager import JobManager, JobStatus, ScrapingJob
from modules.chrome_pool import start_worker_pools, stop_worker_pools, get_pool_stats, get_validation_worker, release_validation_worker
from modules.fast_scraper import check_reviews_available, get_business_card_info
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
log = logging.getLogger("api_server")
# Global job manager instance
job_manager: Optional[JobManager] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown"""
global job_manager
# Startup
log.info("Starting Google Reviews Scraper API Server")
# Start Chrome worker pools
log.info("Initializing Chrome worker pools...")
start_worker_pools(
validation_size=1, # 1 pre-warmed worker for validation
scraping_size=2, # 2 pre-warmed workers for scraping
headless=True
)
job_manager = JobManager(max_concurrent_jobs=3)
# Start auto-cleanup task
asyncio.create_task(cleanup_jobs_periodically())
yield
# Shutdown
log.info("Shutting down Google Reviews Scraper API Server")
if job_manager:
job_manager.shutdown()
# Stop Chrome worker pools
log.info("Stopping Chrome worker pools...")
stop_worker_pools()
# Initialize FastAPI app
app = FastAPI(
title="Google Reviews Scraper API",
description="REST API for triggering and managing Google Maps review scraping jobs",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models for API
class ScrapeRequest(BaseModel):
"""Request model for starting a scrape job"""
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
headless: Optional[bool] = Field(None, description="Run Chrome in headless mode (default: True)")
max_scrolls: Optional[int] = Field(None, description="Maximum scrolls (default: unlimited - stops via idle detection)")
sort_by: Optional[str] = Field(None, description="Sort order: newest, highest, lowest, relevance")
stop_on_match: Optional[bool] = Field(None, description="Stop when first already-seen review is encountered")
overwrite_existing: Optional[bool] = Field(None, description="Overwrite existing reviews instead of appending")
download_images: Optional[bool] = Field(None, description="Download images from reviews")
use_s3: Optional[bool] = Field(None, description="Upload images to S3")
custom_params: Optional[Dict[str, Any]] = Field(None, description="Custom parameters to add to each document")
class JobResponse(BaseModel):
"""Response model for job information"""
job_id: str
status: JobStatus
url: str
created_at: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
updated_at: Optional[str] = None # Last update time for progress tracking
error_message: Optional[str] = None
reviews_count: Optional[int] = None
total_reviews: Optional[int] = None # Total reviews available for this place
images_count: Optional[int] = None
progress: Optional[Dict[str, Any]] = None
scrape_time: Optional[float] = None # Time taken to scrape in seconds
class JobStatsResponse(BaseModel):
"""Response model for job statistics"""
total_jobs: int
by_status: Dict[str, int]
running_jobs: int
max_concurrent_jobs: int
class ReviewsResponse(BaseModel):
"""Response model for reviews data"""
job_id: str
reviews: List[Dict[str, Any]]
count: int
# Background task for periodic cleanup
async def cleanup_jobs_periodically():
"""Periodically clean up old jobs"""
while True:
await asyncio.sleep(3600) # Run every hour
if job_manager:
job_manager.cleanup_old_jobs(max_age_hours=24)
# API Endpoints
@app.get("/", summary="API Health Check")
async def root():
"""Health check endpoint"""
return {
"message": "Google Reviews Scraper API is running",
"status": "healthy",
"version": "1.0.0"
}
@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job")
async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
"""
Start a new scraping job in the background.
Returns the job ID that can be used to check status.
"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
# Prepare config overrides
config_overrides = {}
# Only include non-None values
for field, value in request.dict().items():
if value is not None and field != "url":
config_overrides[field] = value
# Convert URL to string
url = str(request.url)
try:
# Create job
job_id = job_manager.create_job(url, config_overrides)
# Start job immediately if possible
started = job_manager.start_job(job_id)
log.info(f"Created scraping job {job_id} for URL: {url}")
return {
"job_id": job_id,
"status": "started" if started else "queued",
"message": f"Scraping job {'started' if started else 'queued'} successfully"
}
except Exception as e:
log.error(f"Error creating scraping job: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}")
@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status")
async def get_job(job_id: str):
"""Get detailed information about a specific job"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
job = job_manager.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return JobResponse(**job.to_dict())
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
async def get_job_reviews(job_id: str):
"""
Get the actual reviews data for a completed job.
Returns 404 if job not found or not completed yet.
"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
reviews = job_manager.get_job_reviews(job_id)
if reviews is None:
job = job_manager.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
elif job.status != JobStatus.COMPLETED:
raise HTTPException(
status_code=400,
detail=f"Job not completed yet (current status: {job.status})"
)
else:
raise HTTPException(status_code=404, detail="Reviews data not available")
return ReviewsResponse(
job_id=job_id,
reviews=reviews,
count=len(reviews)
)
@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs")
async def list_jobs(
status: Optional[JobStatus] = Query(None, description="Filter by job status"),
limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000)
):
"""List all jobs, optionally filtered by status"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
jobs = job_manager.list_jobs(status=status, limit=limit)
return [JobResponse(**job.to_dict()) for job in jobs]
@app.post("/jobs/{job_id}/start", summary="Start Pending Job")
async def start_job(job_id: str):
"""Start a pending job manually"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
started = job_manager.start_job(job_id)
if not started:
job = job_manager.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status != JobStatus.PENDING:
raise HTTPException(status_code=400, detail=f"Job is not pending (current status: {job.status})")
raise HTTPException(status_code=429, detail="Maximum concurrent jobs reached")
return {"message": "Job started successfully"}
@app.post("/jobs/{job_id}/cancel", summary="Cancel Job")
async def cancel_job(job_id: str):
"""Cancel a pending or running job"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
cancelled = job_manager.cancel_job(job_id)
if not cancelled:
job = job_manager.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
raise HTTPException(status_code=400, detail="Job cannot be cancelled (already completed, failed, or cancelled)")
return {"message": "Job cancelled successfully"}
@app.delete("/jobs/{job_id}", summary="Delete Job")
async def delete_job(job_id: str):
"""Delete a job from the system"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
deleted = job_manager.delete_job(job_id)
if not deleted:
raise HTTPException(status_code=404, detail="Job not found")
return {"message": "Job deleted successfully"}
@app.get("/stats", response_model=JobStatsResponse, summary="Get Job Statistics")
async def get_stats():
"""Get job manager statistics"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
stats = job_manager.get_stats()
return JobStatsResponse(**stats)
@app.post("/check-reviews", summary="Check if Business Has Reviews")
async def check_reviews(request: Dict[str, str]):
"""
Lightweight validation endpoint to check if a business has reviews.
Uses the Chrome validation pool for fast response.
Returns business name, rating, address, and review count.
"""
url = request.get("url")
if not url:
raise HTTPException(status_code=400, detail="URL is required")
log.info(f"Validating business at: {url}")
# Get a worker from validation pool
worker = get_validation_worker(timeout=10)
if not worker:
raise HTTPException(
status_code=503,
detail="No validation workers available. Please try again in a few seconds."
)
try:
# Use the worker's driver to get business card info (faster than check_reviews_available)
result = get_business_card_info(
url=url,
headless=True,
driver=worker.driver,
return_driver=True # Don't close the driver
)
# Pop the driver from result before returning
result.pop('driver', None)
log.info(f"Validation result: name={result.get('name')}, rating={result.get('rating')}, reviews={result.get('total_reviews')}")
return result
except Exception as e:
log.error(f"Error during validation: {e}")
# Recycle worker if there was an error
release_validation_worker(worker, recycle=True)
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
finally:
# Release worker back to pool (unless already recycled)
if worker and worker.driver:
release_validation_worker(worker, recycle=False)
@app.get("/pool-stats", summary="Get Chrome Pool Statistics")
async def pool_stats():
"""Get statistics about Chrome worker pools"""
stats = get_pool_stats()
return stats
@app.post("/cleanup", summary="Manual Job Cleanup")
async def cleanup_jobs(max_age_hours: int = Query(24, description="Maximum age in hours", ge=1)):
"""Manually trigger cleanup of old completed/failed jobs"""
if not job_manager:
raise HTTPException(status_code=500, detail="Job manager not initialized")
job_manager.cleanup_old_jobs(max_age_hours=max_age_hours)
return {"message": f"Cleaned up jobs older than {max_age_hours} hours"}
if __name__ == "__main__":
import uvicorn
log.info("Starting FastAPI server...")
uvicorn.run(
"api_server:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)