Performance improvements: - Validation speed: 59.71s → 10.96s (5.5x improvement) - Removed 50+ console.log statements from JavaScript extraction - Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting - Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls) Scraping improvements: - Increased idle detection from 6 to 12 consecutive idle scrolls for completeness - Added real-time progress updates every 5 scrolls with percentage calculation - Added crash recovery to extract partial reviews if Chrome crashes - Removed artificial 200-review limit to scrape ALL reviews Timestamp tracking: - Added updated_at field separate from started_at for progress tracking - Frontend now shows both "Started" (fixed) and "Last Update" (dynamic) Robustness improvements: - Added 5 fallback CSS selectors to handle different Google Maps page structures - Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc. - Automatic selector detection logs which selector works for debugging Test results: - Successfully scraped 550 reviews in 150.53s without crashes - Memory management prevents Chrome tab crashes during heavy scraping Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
383 lines
13 KiB
Python
383 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
FastAPI server for Google Reviews Scraper.
|
|
Provides REST API endpoints to trigger and manage scraping jobs.
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
from contextlib import asynccontextmanager
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, HttpUrl, Field
|
|
|
|
from modules.job_manager import JobManager, JobStatus, ScrapingJob
|
|
from modules.chrome_pool import start_worker_pools, stop_worker_pools, get_pool_stats, get_validation_worker, release_validation_worker
|
|
from modules.fast_scraper import check_reviews_available, get_business_card_info
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
log = logging.getLogger("api_server")
|
|
|
|
# Global job manager instance
|
|
job_manager: Optional[JobManager] = None
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Lifespan context manager for startup and shutdown"""
|
|
global job_manager
|
|
|
|
# Startup
|
|
log.info("Starting Google Reviews Scraper API Server")
|
|
|
|
# Start Chrome worker pools
|
|
log.info("Initializing Chrome worker pools...")
|
|
start_worker_pools(
|
|
validation_size=1, # 1 pre-warmed worker for validation
|
|
scraping_size=2, # 2 pre-warmed workers for scraping
|
|
headless=True
|
|
)
|
|
|
|
job_manager = JobManager(max_concurrent_jobs=3)
|
|
|
|
# Start auto-cleanup task
|
|
asyncio.create_task(cleanup_jobs_periodically())
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
log.info("Shutting down Google Reviews Scraper API Server")
|
|
|
|
if job_manager:
|
|
job_manager.shutdown()
|
|
|
|
# Stop Chrome worker pools
|
|
log.info("Stopping Chrome worker pools...")
|
|
stop_worker_pools()
|
|
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(
|
|
title="Google Reviews Scraper API",
|
|
description="REST API for triggering and managing Google Maps review scraping jobs",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure appropriately for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# Pydantic models for API
|
|
class ScrapeRequest(BaseModel):
|
|
"""Request model for starting a scrape job"""
|
|
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
|
|
headless: Optional[bool] = Field(None, description="Run Chrome in headless mode (default: True)")
|
|
max_scrolls: Optional[int] = Field(None, description="Maximum scrolls (default: unlimited - stops via idle detection)")
|
|
sort_by: Optional[str] = Field(None, description="Sort order: newest, highest, lowest, relevance")
|
|
stop_on_match: Optional[bool] = Field(None, description="Stop when first already-seen review is encountered")
|
|
overwrite_existing: Optional[bool] = Field(None, description="Overwrite existing reviews instead of appending")
|
|
download_images: Optional[bool] = Field(None, description="Download images from reviews")
|
|
use_s3: Optional[bool] = Field(None, description="Upload images to S3")
|
|
custom_params: Optional[Dict[str, Any]] = Field(None, description="Custom parameters to add to each document")
|
|
|
|
|
|
class JobResponse(BaseModel):
|
|
"""Response model for job information"""
|
|
job_id: str
|
|
status: JobStatus
|
|
url: str
|
|
created_at: str
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
updated_at: Optional[str] = None # Last update time for progress tracking
|
|
error_message: Optional[str] = None
|
|
reviews_count: Optional[int] = None
|
|
total_reviews: Optional[int] = None # Total reviews available for this place
|
|
images_count: Optional[int] = None
|
|
progress: Optional[Dict[str, Any]] = None
|
|
scrape_time: Optional[float] = None # Time taken to scrape in seconds
|
|
|
|
|
|
class JobStatsResponse(BaseModel):
|
|
"""Response model for job statistics"""
|
|
total_jobs: int
|
|
by_status: Dict[str, int]
|
|
running_jobs: int
|
|
max_concurrent_jobs: int
|
|
|
|
|
|
class ReviewsResponse(BaseModel):
|
|
"""Response model for reviews data"""
|
|
job_id: str
|
|
reviews: List[Dict[str, Any]]
|
|
count: int
|
|
|
|
|
|
# Background task for periodic cleanup
|
|
async def cleanup_jobs_periodically():
|
|
"""Periodically clean up old jobs"""
|
|
while True:
|
|
await asyncio.sleep(3600) # Run every hour
|
|
if job_manager:
|
|
job_manager.cleanup_old_jobs(max_age_hours=24)
|
|
|
|
|
|
# API Endpoints
|
|
|
|
@app.get("/", summary="API Health Check")
|
|
async def root():
|
|
"""Health check endpoint"""
|
|
return {
|
|
"message": "Google Reviews Scraper API is running",
|
|
"status": "healthy",
|
|
"version": "1.0.0"
|
|
}
|
|
|
|
|
|
@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job")
|
|
async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
|
|
"""
|
|
Start a new scraping job in the background.
|
|
|
|
Returns the job ID that can be used to check status.
|
|
"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
# Prepare config overrides
|
|
config_overrides = {}
|
|
|
|
# Only include non-None values
|
|
for field, value in request.dict().items():
|
|
if value is not None and field != "url":
|
|
config_overrides[field] = value
|
|
|
|
# Convert URL to string
|
|
url = str(request.url)
|
|
|
|
try:
|
|
# Create job
|
|
job_id = job_manager.create_job(url, config_overrides)
|
|
|
|
# Start job immediately if possible
|
|
started = job_manager.start_job(job_id)
|
|
|
|
log.info(f"Created scraping job {job_id} for URL: {url}")
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "started" if started else "queued",
|
|
"message": f"Scraping job {'started' if started else 'queued'} successfully"
|
|
}
|
|
|
|
except Exception as e:
|
|
log.error(f"Error creating scraping job: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}")
|
|
|
|
|
|
@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status")
|
|
async def get_job(job_id: str):
|
|
"""Get detailed information about a specific job"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
job = job_manager.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
return JobResponse(**job.to_dict())
|
|
|
|
|
|
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
|
|
async def get_job_reviews(job_id: str):
|
|
"""
|
|
Get the actual reviews data for a completed job.
|
|
|
|
Returns 404 if job not found or not completed yet.
|
|
"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
reviews = job_manager.get_job_reviews(job_id)
|
|
if reviews is None:
|
|
job = job_manager.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
elif job.status != JobStatus.COMPLETED:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Job not completed yet (current status: {job.status})"
|
|
)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Reviews data not available")
|
|
|
|
return ReviewsResponse(
|
|
job_id=job_id,
|
|
reviews=reviews,
|
|
count=len(reviews)
|
|
)
|
|
|
|
|
|
@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs")
|
|
async def list_jobs(
|
|
status: Optional[JobStatus] = Query(None, description="Filter by job status"),
|
|
limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000)
|
|
):
|
|
"""List all jobs, optionally filtered by status"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
jobs = job_manager.list_jobs(status=status, limit=limit)
|
|
return [JobResponse(**job.to_dict()) for job in jobs]
|
|
|
|
|
|
@app.post("/jobs/{job_id}/start", summary="Start Pending Job")
|
|
async def start_job(job_id: str):
|
|
"""Start a pending job manually"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
started = job_manager.start_job(job_id)
|
|
if not started:
|
|
job = job_manager.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
if job.status != JobStatus.PENDING:
|
|
raise HTTPException(status_code=400, detail=f"Job is not pending (current status: {job.status})")
|
|
|
|
raise HTTPException(status_code=429, detail="Maximum concurrent jobs reached")
|
|
|
|
return {"message": "Job started successfully"}
|
|
|
|
|
|
@app.post("/jobs/{job_id}/cancel", summary="Cancel Job")
|
|
async def cancel_job(job_id: str):
|
|
"""Cancel a pending or running job"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
cancelled = job_manager.cancel_job(job_id)
|
|
if not cancelled:
|
|
job = job_manager.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
raise HTTPException(status_code=400, detail="Job cannot be cancelled (already completed, failed, or cancelled)")
|
|
|
|
return {"message": "Job cancelled successfully"}
|
|
|
|
|
|
@app.delete("/jobs/{job_id}", summary="Delete Job")
|
|
async def delete_job(job_id: str):
|
|
"""Delete a job from the system"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
deleted = job_manager.delete_job(job_id)
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
return {"message": "Job deleted successfully"}
|
|
|
|
|
|
@app.get("/stats", response_model=JobStatsResponse, summary="Get Job Statistics")
|
|
async def get_stats():
|
|
"""Get job manager statistics"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
stats = job_manager.get_stats()
|
|
return JobStatsResponse(**stats)
|
|
|
|
|
|
@app.post("/check-reviews", summary="Check if Business Has Reviews")
|
|
async def check_reviews(request: Dict[str, str]):
|
|
"""
|
|
Lightweight validation endpoint to check if a business has reviews.
|
|
Uses the Chrome validation pool for fast response.
|
|
|
|
Returns business name, rating, address, and review count.
|
|
"""
|
|
url = request.get("url")
|
|
if not url:
|
|
raise HTTPException(status_code=400, detail="URL is required")
|
|
|
|
log.info(f"Validating business at: {url}")
|
|
|
|
# Get a worker from validation pool
|
|
worker = get_validation_worker(timeout=10)
|
|
|
|
if not worker:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="No validation workers available. Please try again in a few seconds."
|
|
)
|
|
|
|
try:
|
|
# Use the worker's driver to get business card info (faster than check_reviews_available)
|
|
result = get_business_card_info(
|
|
url=url,
|
|
headless=True,
|
|
driver=worker.driver,
|
|
return_driver=True # Don't close the driver
|
|
)
|
|
|
|
# Pop the driver from result before returning
|
|
result.pop('driver', None)
|
|
|
|
log.info(f"Validation result: name={result.get('name')}, rating={result.get('rating')}, reviews={result.get('total_reviews')}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
log.error(f"Error during validation: {e}")
|
|
# Recycle worker if there was an error
|
|
release_validation_worker(worker, recycle=True)
|
|
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
|
|
|
|
finally:
|
|
# Release worker back to pool (unless already recycled)
|
|
if worker and worker.driver:
|
|
release_validation_worker(worker, recycle=False)
|
|
|
|
|
|
@app.get("/pool-stats", summary="Get Chrome Pool Statistics")
|
|
async def pool_stats():
|
|
"""Get statistics about Chrome worker pools"""
|
|
stats = get_pool_stats()
|
|
return stats
|
|
|
|
|
|
@app.post("/cleanup", summary="Manual Job Cleanup")
|
|
async def cleanup_jobs(max_age_hours: int = Query(24, description="Maximum age in hours", ge=1)):
|
|
"""Manually trigger cleanup of old completed/failed jobs"""
|
|
if not job_manager:
|
|
raise HTTPException(status_code=500, detail="Job manager not initialized")
|
|
|
|
job_manager.cleanup_old_jobs(max_age_hours=max_age_hours)
|
|
return {"message": f"Cleaned up jobs older than {max_age_hours} hours"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
log.info("Starting FastAPI server...")
|
|
uvicorn.run(
|
|
"api_server:app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
reload=True,
|
|
log_level="info"
|
|
) |