Files
whyrating-engine-legacy/api_server_production.py
Alejandro Gutiérrez a540ab97b1 Add browser fingerprint support and analytics metadata display
- Transfer user's browser fingerprint (user-agent, viewport, timezone,
  language, geolocation) to Chrome for more authentic scraping
- Display review topics from Google Maps in analytics dashboard
- Show business category badge in analytics header
- Fix date_text null handling in analytics (handle undefined/timestamp fields)
- Add review_topics and business_category to JobStatus interface

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 10:36:06 +00:00

1183 lines
44 KiB
Python

#!/usr/bin/env python3
"""
Production Google Reviews Scraper API Server with Phase 1 features:
- PostgreSQL storage with JSONB
- Webhook delivery with retries
- Smart health checks with canary testing
"""
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
from typing import Optional, List, Dict, Any
from uuid import UUID
from fastapi import FastAPI, HTTPException, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl, Field
from fastapi.responses import JSONResponse, StreamingResponse
from modules.database import DatabaseManager, JobStatus
from modules.webhooks import WebhookDispatcher, WebhookManager
from modules.health_checks import HealthCheckSystem
from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper
from modules.chrome_pool import (
start_worker_pools,
stop_worker_pools,
get_validation_worker,
release_validation_worker,
get_scraping_worker,
release_scraping_worker,
get_pool_stats
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
log = logging.getLogger("api_server")
# Global instances
db: Optional[DatabaseManager] = None
webhook_dispatcher: Optional[WebhookDispatcher] = None
health_system: Optional[HealthCheckSystem] = None
# Concurrent job limiter (prevent too many Chrome instances)
MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5'))
job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)
# SSE: Store for broadcasting job updates to connected clients
# Format: {job_id: [asyncio.Queue, ...]} for job-specific streams
# Format: {"all": [asyncio.Queue, ...]} for all-jobs stream
job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown"""
global db, webhook_dispatcher, health_system
# Startup
log.info("Starting Google Reviews Scraper API Server (Production)")
# Get database URL from environment
database_url = os.getenv(
'DATABASE_URL',
'postgresql://scraper:scraper@localhost:5432/scraper'
)
# Initialize database
db = DatabaseManager(database_url)
await db.connect()
await db.initialize_schema()
log.info("Database initialized")
# Initialize health check system with canary monitoring
# DISABLED: Canary tests consume Google Maps requests and trigger rate limiting
# health_system = HealthCheckSystem(db)
# await health_system.start()
log.info("Health check system DISABLED (canary tests disabled to avoid rate limiting)")
# Initialize webhook dispatcher
webhook_dispatcher = WebhookDispatcher(db, interval_seconds=30)
asyncio.create_task(webhook_dispatcher.start())
log.info("Webhook dispatcher started")
# Start Chrome worker pools (1 for validation, 2 for scraping)
# These pre-warm Chrome instances for instant availability
# headless=False because Docker uses Xvfb virtual display for better compatibility
await asyncio.to_thread(
start_worker_pools,
validation_size=1,
scraping_size=2,
headless=False
)
log.info("Chrome worker pools started (1 validation + 2 scraping)")
yield
# Shutdown
log.info("Shutting down Google Reviews Scraper API Server")
if webhook_dispatcher:
webhook_dispatcher.stop()
# if health_system:
# health_system.stop()
# Stop worker pools
await asyncio.to_thread(stop_worker_pools)
log.info("Chrome worker pools stopped")
if db:
await db.disconnect()
# Initialize FastAPI app
app = FastAPI(
title="Google Reviews Scraper API - Production",
description="Production-ready REST API for Google Maps review scraping with webhooks and health monitoring",
version="2.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== Request/Response Models ====================
class GeolocationModel(BaseModel):
"""Geolocation coordinates"""
lat: float = Field(..., description="Latitude")
lng: float = Field(..., description="Longitude")
class ViewportModel(BaseModel):
"""Browser viewport size"""
width: int = Field(..., description="Viewport width")
height: int = Field(..., description="Viewport height")
class BrowserFingerprintModel(BaseModel):
"""Browser fingerprint to replicate user's browser"""
geolocation: Optional[GeolocationModel] = None
userAgent: Optional[str] = Field(None, description="User agent string")
viewport: Optional[ViewportModel] = Field(None, description="Screen resolution")
timezone: Optional[str] = Field(None, description="Timezone (e.g., Europe/Madrid)")
language: Optional[str] = Field(None, description="Browser language (e.g., en-US)")
platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)")
class ScrapeRequest(BaseModel):
"""Request model for starting a scrape job"""
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications")
webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature")
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata")
geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome")
browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint")
class JobResponse(BaseModel):
"""Response model for job information"""
job_id: str
status: str
url: str
created_at: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
updated_at: Optional[str] = None # Last update time for progress tracking
reviews_count: Optional[int] = None
total_reviews: Optional[int] = None # Total reviews available for this place
scrape_time: Optional[float] = None
error_message: Optional[str] = None
webhook_url: Optional[str] = None
# Business metadata
business_name: Optional[str] = None
business_address: Optional[str] = None
business_category: Optional[str] = None # Category (e.g., "Barber shop")
review_topics: Optional[List[Dict[str, Any]]] = None # Topic filters with mention counts
class ReviewsResponse(BaseModel):
"""Response model for reviews data"""
job_id: str
reviews: List[Dict[str, Any]]
count: int
class StatsResponse(BaseModel):
"""Response model for statistics"""
total_jobs: int
pending: int
running: int
completed: int
failed: int
cancelled: int
avg_scrape_time: Optional[float] = None
total_reviews: Optional[int] = None
# ==================== API Endpoints ====================
@app.get("/", summary="API Health Check")
async def root():
"""Basic health check endpoint"""
return {
"message": "Google Reviews Scraper API (Production)",
"status": "healthy",
"version": "2.0.0",
"features": ["postgresql", "webhooks", "canary-testing"]
}
@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job")
async def start_scrape(request: ScrapeRequest):
"""
Start a new scraping job.
The job runs asynchronously in the background. You can:
- Poll GET /jobs/{job_id} for status
- Provide webhook_url for automatic notification when complete
Returns the job ID for tracking.
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
try:
# Merge browser fingerprint into metadata if provided
metadata = request.metadata or {}
if request.browser_fingerprint:
fp = request.browser_fingerprint
metadata['browser_fingerprint'] = {
"userAgent": fp.userAgent,
"timezone": fp.timezone,
"language": fp.language,
"platform": fp.platform,
}
if fp.viewport:
metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height}
if fp.geolocation:
metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
elif request.geolocation:
metadata['geolocation'] = {
'lat': request.geolocation.lat,
'lng': request.geolocation.lng
}
# Create job in database
job_id = await db.create_job(
url=str(request.url),
webhook_url=str(request.webhook_url) if request.webhook_url else None,
webhook_secret=request.webhook_secret,
metadata=metadata
)
# Start scraping job in background
asyncio.create_task(run_scraping_job(job_id))
log.info(f"Created and started job {job_id}")
return {
"job_id": str(job_id),
"status": "started",
"message": "Scraping job started successfully"
}
except Exception as e:
log.error(f"Error creating scraping job: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}")
@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status")
async def get_job(job_id: UUID):
"""Get detailed information about a specific job"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# Parse review_topics if it's a string (JSONB might be returned as string)
review_topics = job.get('review_topics')
if isinstance(review_topics, str):
try:
review_topics = json.loads(review_topics)
except:
review_topics = None
# Extract business info from metadata if available
metadata = job.get('metadata')
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except:
metadata = None
business_name = metadata.get('business_name') if metadata else None
business_category = metadata.get('business_category') if metadata else None
return JobResponse(
job_id=str(job['job_id']),
status=job['status'],
url=job['url'],
created_at=job['created_at'].isoformat(),
started_at=job['started_at'].isoformat() if job['started_at'] else None,
completed_at=job['completed_at'].isoformat() if job['completed_at'] else None,
updated_at=job['updated_at'].isoformat() if job.get('updated_at') else None,
reviews_count=job['reviews_count'],
total_reviews=job.get('total_reviews'),
scrape_time=job['scrape_time'],
error_message=job['error_message'],
webhook_url=job.get('webhook_url'),
business_name=business_name,
business_category=business_category,
review_topics=review_topics
)
@app.get("/jobs/{job_id}/logs", summary="Get Job Logs")
async def get_job_logs(job_id: UUID):
"""
Get the scraper logs for a job.
Returns logs from both successful and failed jobs.
Useful for debugging scraping issues.
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# Get scrape_logs from job
scrape_logs = job.get('scrape_logs')
# Parse if string (asyncpg might return JSONB as string)
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = None
return {
"job_id": str(job_id),
"status": job['status'],
"error_message": job.get('error_message'),
"logs": scrape_logs or [],
"log_count": len(scrape_logs) if scrape_logs else 0
}
# ==================== SSE Streaming Endpoints ====================
async def broadcast_job_update(job_id: str, event_type: str, data: dict):
"""Broadcast an update to all subscribers of a job stream and the all-jobs stream."""
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
# Send to job-specific subscribers
if job_id in job_update_queues:
for queue in job_update_queues[job_id]:
try:
await queue.put(message)
except:
pass
# Send to all-jobs subscribers
for queue in job_update_queues.get("all", []):
try:
await queue.put(message)
except:
pass
@app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)")
async def stream_job_updates(job_id: UUID):
"""
Server-Sent Events stream for real-time job updates.
Streams:
- status: Job status changes
- progress: Review count and progress updates
- logs: New log entries
- complete: Job finished (completed/failed)
Connect with EventSource in the browser:
```javascript
const es = new EventSource('/jobs/{job_id}/stream');
es.onmessage = (e) => console.log(JSON.parse(e.data));
es.addEventListener('logs', (e) => console.log('Logs:', JSON.parse(e.data)));
```
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Verify job exists
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
job_id_str = str(job_id)
# Create queue for this client
queue: asyncio.Queue = asyncio.Queue()
# Register subscriber
if job_id_str not in job_update_queues:
job_update_queues[job_id_str] = []
job_update_queues[job_id_str].append(queue)
async def event_generator():
try:
# Send initial state
job_data = await db.get_job(job_id)
if job_data:
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
initial = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": job_data.get('reviews_count'),
"total_reviews": job_data.get('total_reviews'),
"scrape_time": job_data.get('scrape_time'),
"error_message": job_data.get('error_message'),
"logs": scrape_logs or []
}
yield f"event: init\ndata: {json.dumps(initial)}\n\n"
# If job is already complete, send complete event and close
if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']:
yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n"
return
# Keep connection alive and send updates
last_log_count = len(scrape_logs) if scrape_logs else 0
last_reviews_count = job_data.get('reviews_count') if job_data else 0
while True:
try:
# Wait for update with timeout (for keepalive)
try:
message = await asyncio.wait_for(queue.get(), timeout=2.0)
yield message
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
# Also poll database for updates (backup in case broadcast missed)
job_data = await db.get_job(job_id)
if job_data:
# Check for status change
if job_data['status'] in ['completed', 'failed', 'cancelled']:
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
final = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": job_data.get('reviews_count'),
"total_reviews": job_data.get('total_reviews'),
"scrape_time": job_data.get('scrape_time'),
"error_message": job_data.get('error_message'),
"logs": scrape_logs or []
}
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
return
# Check for new logs or progress
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
current_log_count = len(scrape_logs) if scrape_logs else 0
current_reviews = job_data.get('reviews_count') or 0
if current_log_count > last_log_count or current_reviews != last_reviews_count:
update = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": current_reviews,
"total_reviews": job_data.get('total_reviews'),
"logs": scrape_logs or []
}
yield f"event: update\ndata: {json.dumps(update)}\n\n"
last_log_count = current_log_count
last_reviews_count = current_reviews
except Exception as e:
log.error(f"Error in SSE stream for job {job_id}: {e}")
break
finally:
# Unregister subscriber
if job_id_str in job_update_queues:
try:
job_update_queues[job_id_str].remove(queue)
if not job_update_queues[job_id_str]:
del job_update_queues[job_id_str]
except:
pass
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)")
async def stream_all_jobs():
"""
Server-Sent Events stream for all job updates.
Streams:
- job_created: New job was created
- job_updated: Job status/progress changed
- job_completed: Job finished
Connect with EventSource in the browser:
```javascript
const es = new EventSource('/jobs/stream');
es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data)));
```
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Create queue for this client
queue: asyncio.Queue = asyncio.Queue()
# Register subscriber to all-jobs stream
job_update_queues["all"].append(queue)
async def event_generator():
try:
# Send initial jobs list
jobs = await db.list_jobs(limit=100)
jobs_data = [
{
"job_id": str(j['job_id']),
"status": j['status'],
"url": j['url'],
"created_at": j['created_at'].isoformat(),
"completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None,
"reviews_count": j.get('reviews_count'),
"scrape_time": j.get('scrape_time'),
"error_message": j.get('error_message')
}
for j in jobs
]
yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n"
# Keep connection alive and send updates
while True:
try:
# Wait for update with timeout (for keepalive)
try:
message = await asyncio.wait_for(queue.get(), timeout=5.0)
yield message
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
except Exception as e:
log.error(f"Error in all-jobs SSE stream: {e}")
break
finally:
# Unregister subscriber
try:
job_update_queues["all"].remove(queue)
except:
pass
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
async def get_job_reviews(job_id: UUID):
"""
Get reviews data for a job.
Returns reviews for completed, partial, or running jobs (if reviews have been collected).
Returns 404 if job not found or no reviews available yet.
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Get reviews (includes completed, running, and partial jobs)
reviews = await db.get_job_reviews(job_id, include_partial=True)
if reviews is None:
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
elif job['status'] == 'pending':
raise HTTPException(
status_code=400,
detail="Job has not started yet"
)
elif job['status'] == 'failed':
raise HTTPException(
status_code=400,
detail=f"Job failed without saving any reviews: {job.get('error_message', 'Unknown error')}"
)
else:
raise HTTPException(status_code=404, detail="No reviews data available yet")
return ReviewsResponse(
job_id=str(job_id),
reviews=reviews,
count=len(reviews)
)
@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs")
async def list_jobs(
status: Optional[str] = Query(None, description="Filter by job status"),
limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000),
offset: int = Query(0, description="Number of jobs to skip", ge=0)
):
"""List all jobs, optionally filtered by status"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Validate status if provided
job_status = None
if status:
try:
job_status = JobStatus(status.lower())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status. Must be one of: {[s.value for s in JobStatus]}"
)
jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset)
result = []
for job in jobs:
# Extract business info from metadata if available
metadata = job.get('metadata')
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except:
metadata = None
business_name = metadata.get('business_name') if metadata else None
business_address = metadata.get('business_address') if metadata else None
business_category = metadata.get('business_category') if metadata else None
# Parse review_topics if it's a string
review_topics = job.get('review_topics')
if isinstance(review_topics, str):
try:
review_topics = json.loads(review_topics)
except:
review_topics = None
result.append(JobResponse(
job_id=str(job['job_id']),
status=job['status'],
url=job['url'],
created_at=job['created_at'].isoformat(),
completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None,
reviews_count=job.get('reviews_count'),
total_reviews=job.get('total_reviews'),
scrape_time=job.get('scrape_time'),
error_message=job.get('error_message'),
business_name=business_name,
business_address=business_address,
business_category=business_category,
review_topics=review_topics
))
return result
@app.delete("/jobs/{job_id}", summary="Delete Job")
async def delete_job(job_id: UUID):
"""Delete a job from the system"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
deleted = await db.delete_job(job_id)
if not deleted:
raise HTTPException(status_code=404, detail="Job not found")
return {"message": "Job deleted successfully"}
@app.post("/check-reviews", summary="Check if Reviews Exist")
async def check_reviews(request: ScrapeRequest):
"""
Get business card information from Google Maps.
Returns business name, address, rating, and review count.
Creates a fresh Chrome instance for reliable results (same as full scraper).
This is used to show the business confirmation card in the UI.
"""
try:
url = str(request.url)
# Use the SAME scraper algorithm with validation_only=True for early return
# Creates a fresh Chrome instance (same as full scraper) to avoid stale browser state
# Pooled browsers can have cookies/state that cause Google to render pages differently
# Build fingerprint dict from request
fingerprint = None
if request.browser_fingerprint:
fp = request.browser_fingerprint
fingerprint = {
"userAgent": fp.userAgent,
"timezone": fp.timezone,
"language": fp.language,
"platform": fp.platform,
}
if fp.viewport:
fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height}
if fp.geolocation:
fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
log.info(f"Creating Chrome with user fingerprint: {fp.platform}, {fp.timezone}")
elif request.geolocation:
fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}}
log.info(f"Creating Chrome with geolocation only")
else:
log.info(f"Creating Chrome with default settings")
result = await asyncio.to_thread(
fast_scrape_reviews,
url=url,
headless=False, # Use Xvfb display
validation_only=True, # Return early after getting total_reviews
browser_fingerprint=fingerprint # Pass user's browser fingerprint
)
# Extract validation info from the result
validation_info = result.get('validation_info', {})
total_reviews = validation_info.get('total_reviews') or result.get('total_reviews') or 0
name = validation_info.get('name')
rating = validation_info.get('rating')
category = validation_info.get('category')
address = validation_info.get('address')
# Has reviews if we found a business with the Reviews tab (indicated by total_reviews > 0)
has_reviews = bool(name and total_reviews > 0)
return {
"has_reviews": has_reviews, # True if business has reviews
"total_reviews": total_reviews,
"name": name,
"address": address,
"rating": rating,
"category": category,
"success": result.get('success', True),
"error": result.get('error')
}
except Exception as e:
log.error(f"Error checking reviews: {e}")
return {
"has_reviews": False,
"review_count": 0,
"success": False,
"error": str(e)
}
@app.get("/stats", response_model=StatsResponse, summary="Get Statistics")
async def get_stats():
"""Get job statistics"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
stats = await db.get_stats()
return StatsResponse(**stats)
@app.get("/pool-stats", summary="Get Worker Pool Statistics")
async def pool_stats():
"""Get Chrome worker pool statistics"""
return await asyncio.to_thread(get_pool_stats)
# ==================== Health Check Endpoints ====================
@app.get("/health/live", summary="Liveness Probe")
async def liveness():
"""
Liveness check: Is the server alive?
Use this for Kubernetes liveness probe - restart container if fails.
"""
if not health_system:
raise HTTPException(status_code=503, detail="Health system not initialized")
return await health_system.check_liveness()
@app.get("/health/ready", summary="Readiness Probe")
async def readiness():
"""
Readiness check: Can the server handle traffic?
Use this for Kubernetes readiness probe - remove from load balancer if fails.
"""
if not health_system:
raise HTTPException(status_code=503, detail="Health system not initialized")
result = await health_system.check_readiness()
if result["status"] != "ready":
return JSONResponse(status_code=503, content=result)
return result
@app.get("/health/canary", summary="Canary Health Check")
async def canary():
"""
Canary check: Does scraping actually work?
Returns the latest canary test result (runs every 4 hours in background).
Use this for external monitoring (PagerDuty, DataDog) - alerts if fails.
"""
if not health_system:
raise HTTPException(status_code=503, detail="Health system not initialized")
result = await health_system.check_canary()
if result["status"] not in ["healthy", "unknown"]:
return JSONResponse(status_code=503, content=result)
return result
@app.get("/health/detailed", summary="Detailed Health Status")
async def detailed_health():
"""Get detailed health status of all components"""
if not health_system:
raise HTTPException(status_code=503, detail="Health system not initialized")
return await health_system.get_detailed_health()
# ==================== Background Job Runner ====================
async def run_scraping_job(job_id: UUID):
"""
Run scraping job in background with concurrency limit.
Args:
job_id: Job UUID
"""
job_id_str = str(job_id)
async with job_semaphore: # Limit concurrent Chrome instances
try:
# Update status to running
await db.update_job_status(job_id, JobStatus.RUNNING)
log.info(f"Starting scraping job {job_id}")
# Get job details
job = await db.get_job(job_id)
url = job['url']
# Extract browser fingerprint from metadata if available
browser_fingerprint = None
metadata = job.get('metadata')
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except:
metadata = None
if metadata and 'browser_fingerprint' in metadata:
browser_fingerprint = metadata['browser_fingerprint']
log.info(f"Using user fingerprint: {browser_fingerprint.get('platform')}, {browser_fingerprint.get('timezone')}")
elif metadata and 'geolocation' in metadata:
browser_fingerprint = {'geolocation': metadata['geolocation']}
log.info(f"Using user geolocation only")
# Broadcast job started via SSE
await broadcast_job_update(job_id_str, "job_started", {
"job_id": job_id_str,
"status": "running",
"url": url
})
# Get the event loop for progress updates from worker thread
loop = asyncio.get_running_loop()
# Create log capture instance that we can access for real-time logs
log_capture = LogCapture()
# Track total reviews for incremental saves
total_reviews_seen = [None]
# Accumulate all reviews for incremental saves (flush_callback receives batches)
all_reviews_collected = []
# Progress callback to update job status with current/total counts AND logs
def progress_callback(current_count: int, total_count: int):
"""Update job progress and logs from worker thread"""
if total_count:
total_reviews_seen[0] = total_count
async def update():
# Get current logs from the shared log_capture
current_logs = log_capture.get_logs()
await db.update_job_status(
job_id,
JobStatus.RUNNING,
reviews_count=current_count,
total_reviews=total_count,
scrape_logs=current_logs
)
# Broadcast progress via SSE
await broadcast_job_update(job_id_str, "job_progress", {
"job_id": job_id_str,
"status": "running",
"reviews_count": current_count,
"total_reviews": total_count,
"logs": current_logs
})
# Schedule the coroutine on the event loop
asyncio.run_coroutine_threadsafe(update(), loop)
# Flush callback to save reviews incrementally (crash recovery)
# Note: flush_callback receives batches, so we accumulate them
def flush_callback(reviews_batch: list):
"""Accumulate and save reviews to DB incrementally from worker thread"""
# Extend our collection with the new batch
all_reviews_collected.extend(reviews_batch)
async def save():
await db.save_reviews_incremental(
job_id=job_id,
reviews=all_reviews_collected, # Save ALL reviews so far
total_reviews=total_reviews_seen[0]
)
# Schedule the coroutine on the event loop
asyncio.run_coroutine_threadsafe(save(), loop)
# Run scraping with progress callback and shared log capture
# headless=False because Docker uses Xvfb virtual display
result = await asyncio.to_thread(
fast_scrape_reviews,
url=url,
headless=False,
progress_callback=progress_callback,
log_capture=log_capture,
flush_callback=flush_callback,
browser_fingerprint=browser_fingerprint # Pass user's browser fingerprint
)
if result['success']:
# Save results to database (including scraper logs and review topics)
await db.save_job_result(
job_id=job_id,
reviews=result['reviews'],
scrape_time=result['time'],
total_reviews=result.get('total_reviews'),
scrape_logs=result.get('logs'),
review_topics=result.get('review_topics')
)
log.info(
f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s"
)
# Broadcast job completed via SSE
await broadcast_job_update(job_id_str, "job_completed", {
"job_id": job_id_str,
"status": "completed",
"reviews_count": result['count'],
"total_reviews": result.get('total_reviews'),
"scrape_time": result['time'],
"logs": result.get('logs', [])
})
# Send webhook if configured
if job.get('webhook_url'):
webhook_manager = WebhookManager()
api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000')
await webhook_manager.send_job_completed_webhook(
webhook_url=job['webhook_url'],
job_id=job_id,
status='completed',
reviews_count=result['count'],
scrape_time=result['time'],
reviews_url=f"{api_base_url}/jobs/{job_id}/reviews",
secret=job.get('webhook_secret'),
db=db
)
else:
# Job failed - check if we have partial reviews saved
current_job = await db.get_job(job_id)
partial_count = current_job.get('reviews_count', 0) if current_job else 0
if partial_count > 0:
# Mark as partial - we have some reviews saved
await db.mark_job_partial(
job_id,
error_message=result.get('error', 'Unknown error'),
scrape_logs=result.get('logs')
)
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before error: {result.get('error')}")
# Broadcast job partial via SSE
await broadcast_job_update(job_id_str, "job_partial", {
"job_id": job_id_str,
"status": "partial",
"reviews_count": partial_count,
"total_reviews": current_job.get('total_reviews'),
"error_message": result.get('error'),
"logs": result.get('logs', [])
})
# Send partial webhook if configured
if job.get('webhook_url'):
webhook_manager = WebhookManager()
await webhook_manager.send_job_completed_webhook(
webhook_url=job['webhook_url'],
job_id=job_id,
status='partial',
reviews_count=partial_count,
error_message=result.get('error'),
secret=job.get('webhook_secret'),
db=db
)
else:
# No reviews saved - mark as failed
await db.update_job_status(
job_id,
JobStatus.FAILED,
error_message=result.get('error', 'Unknown error'),
scrape_logs=result.get('logs')
)
log.error(f"Failed job {job_id}: {result.get('error')}")
# Broadcast job failed via SSE
await broadcast_job_update(job_id_str, "job_failed", {
"job_id": job_id_str,
"status": "failed",
"error_message": result.get('error'),
"logs": result.get('logs', [])
})
# Send failure webhook if configured
if job.get('webhook_url'):
webhook_manager = WebhookManager()
await webhook_manager.send_job_completed_webhook(
webhook_url=job['webhook_url'],
job_id=job_id,
status='failed',
error_message=result.get('error'),
secret=job.get('webhook_secret'),
db=db
)
except Exception as e:
log.error(f"Error in scraping job {job_id}: {e}")
import traceback
traceback.print_exc()
# Check if we have partial reviews saved
current_job = await db.get_job(job_id)
partial_count = current_job.get('reviews_count', 0) if current_job else 0
if partial_count > 0:
# Mark as partial - we have some reviews saved
await db.mark_job_partial(
job_id,
error_message=str(e),
scrape_logs=log_capture.get_logs() if log_capture else None
)
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before exception: {e}")
# Broadcast job partial via SSE
await broadcast_job_update(job_id_str, "job_partial", {
"job_id": job_id_str,
"status": "partial",
"reviews_count": partial_count,
"total_reviews": current_job.get('total_reviews'),
"error_message": str(e),
"logs": log_capture.get_logs() if log_capture else []
})
# Send partial webhook
if current_job and current_job.get('webhook_url'):
webhook_manager = WebhookManager()
await webhook_manager.send_job_completed_webhook(
webhook_url=current_job['webhook_url'],
job_id=job_id,
status='partial',
reviews_count=partial_count,
error_message=str(e),
secret=current_job.get('webhook_secret'),
db=db
)
else:
# No reviews saved - mark as failed
await db.update_job_status(
job_id,
JobStatus.FAILED,
error_message=str(e)
)
# Broadcast job failed via SSE
await broadcast_job_update(job_id_str, "job_failed", {
"job_id": job_id_str,
"status": "failed",
"error_message": str(e),
"logs": []
})
# Send failure webhook
if current_job and current_job.get('webhook_url'):
webhook_manager = WebhookManager()
await webhook_manager.send_job_completed_webhook(
webhook_url=current_job['webhook_url'],
job_id=job_id,
status='failed',
error_message=str(e),
secret=current_job.get('webhook_secret'),
db=db
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv('PORT', 8000))
log.info(f"Starting production server on port {port}...")
uvicorn.run(
"api_server_production:app",
host="0.0.0.0",
port=port,
reload=False, # Disable reload in production
log_level="info"
)