2385 lines
93 KiB
Python
2385 lines
93 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Production Google Reviews Scraper API Server with Phase 1 & 2 features:
|
|
- PostgreSQL storage with JSONB
|
|
- Webhook delivery with retries
|
|
- Smart health checks with canary testing
|
|
- Phase 2: Requester tracking (client_id, source, purpose)
|
|
- Phase 2: Job priority for queue ordering
|
|
- Phase 2: Callback URL alternative to webhooks
|
|
- Phase 2: Scraper version/variant selection for A/B testing
|
|
- Phase 2: Explicit job type endpoint (/api/scrape/google-reviews)
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
from uuid import UUID
|
|
|
|
from fastapi import FastAPI, HTTPException, Query, Header
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, HttpUrl, Field
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
|
from core.database import DatabaseManager, JobStatus
|
|
from services.webhook_service import WebhookDispatcher, WebhookManager
|
|
from utils.health_checks import HealthCheckSystem
|
|
from scrapers.google_reviews.v1_0_0 import fast_scrape_reviews as fast_scrape_v1_0_0, LogCapture, get_business_card_info
|
|
from scrapers.google_reviews.v1_1_0 import fast_scrape_reviews as fast_scrape_v1_1_0
|
|
|
|
# Scraper version registry - maps version strings to scraper functions
|
|
SCRAPER_VERSIONS = {
|
|
"1.0.0": fast_scrape_v1_0_0,
|
|
"v1.0.0": fast_scrape_v1_0_0,
|
|
"1.1.0": fast_scrape_v1_1_0,
|
|
"v1.1.0": fast_scrape_v1_1_0,
|
|
}
|
|
DEFAULT_SCRAPER_VERSION = "1.1.0" # Latest version as default
|
|
|
|
def get_scraper_for_version(version: str = None):
|
|
"""Get the appropriate scraper function for a version string."""
|
|
if version and version in SCRAPER_VERSIONS:
|
|
return SCRAPER_VERSIONS[version], version
|
|
return SCRAPER_VERSIONS[DEFAULT_SCRAPER_VERSION], DEFAULT_SCRAPER_VERSION
|
|
from utils.crash_analyzer import analyze_crash, summarize_crash_patterns, apply_auto_fix
|
|
from utils.logger import StructuredLogger, LogEntry
|
|
from workers.chrome_pool import (
|
|
start_worker_pools,
|
|
stop_worker_pools,
|
|
get_validation_worker,
|
|
release_validation_worker,
|
|
get_scraping_worker,
|
|
release_scraping_worker,
|
|
get_pool_stats
|
|
)
|
|
from api.routes import (
|
|
batches_router, set_batches_db,
|
|
dashboard_router, set_dashboard_db,
|
|
admin_router, set_admin_db,
|
|
pipelines_router, set_pipelines_db,
|
|
reviewiq_analytics_router, set_reviewiq_analytics_db,
|
|
)
|
|
from api.routes.sessions import router as sessions_router
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
log = logging.getLogger("api_server")
|
|
|
|
# Global instances
|
|
db: Optional[DatabaseManager] = None
|
|
webhook_dispatcher: Optional[WebhookDispatcher] = None
|
|
health_system: Optional[HealthCheckSystem] = None
|
|
|
|
# Concurrent job limiter (prevent too many Chrome instances)
|
|
MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5'))
|
|
job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)
|
|
|
|
# SSE: Store for broadcasting job updates to connected clients
|
|
# Format: {job_id: [asyncio.Queue, ...]} for job-specific streams
|
|
# Format: {"all": [asyncio.Queue, ...]} for all-jobs stream
|
|
job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Lifespan context manager for startup and shutdown"""
|
|
global db, webhook_dispatcher, health_system
|
|
|
|
# Startup
|
|
log.info("Starting Google Reviews Scraper API Server (Production)")
|
|
|
|
# Get database URL from environment
|
|
database_url = os.getenv(
|
|
'DATABASE_URL',
|
|
'postgresql://scraper:scraper@localhost:5432/scraper'
|
|
)
|
|
|
|
# Initialize database
|
|
db = DatabaseManager(database_url)
|
|
await db.connect()
|
|
await db.initialize_schema()
|
|
log.info("Database initialized")
|
|
|
|
# Inject database into route modules
|
|
set_batches_db(db)
|
|
set_dashboard_db(db)
|
|
set_admin_db(db)
|
|
set_pipelines_db(db.pool) # Pipeline router uses raw asyncpg pool
|
|
set_reviewiq_analytics_db(db.pool) # ReviewIQ analytics uses raw asyncpg pool
|
|
|
|
# Initialize health check system with canary monitoring
|
|
# DISABLED: Canary tests consume Google Maps requests and trigger rate limiting
|
|
# health_system = HealthCheckSystem(db)
|
|
# await health_system.start()
|
|
log.info("Health check system DISABLED (canary tests disabled to avoid rate limiting)")
|
|
|
|
# Initialize webhook dispatcher
|
|
webhook_dispatcher = WebhookDispatcher(db, interval_seconds=30)
|
|
asyncio.create_task(webhook_dispatcher.start())
|
|
log.info("Webhook dispatcher started")
|
|
|
|
# Start Chrome worker pools (1 for validation, 2 for scraping)
|
|
# These pre-warm Chrome instances for instant availability
|
|
# In Docker: headless=False with Xvfb virtual display for better compatibility
|
|
# Locally: use CHROME_HEADLESS env var to control (default: headed for scraping)
|
|
is_docker = os.path.exists("/.dockerenv") or os.environ.get("DOCKER_CONTAINER", "false").lower() == "true"
|
|
chrome_headless = os.environ.get("CHROME_HEADLESS", "false").lower() == "true"
|
|
await asyncio.to_thread(
|
|
start_worker_pools,
|
|
validation_size=1,
|
|
scraping_size=2,
|
|
headless=chrome_headless if not is_docker else False
|
|
)
|
|
log.info("Chrome worker pools started (1 validation + 2 scraping)")
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
log.info("Shutting down Google Reviews Scraper API Server")
|
|
if webhook_dispatcher:
|
|
webhook_dispatcher.stop()
|
|
# if health_system:
|
|
# health_system.stop()
|
|
|
|
# Stop worker pools
|
|
await asyncio.to_thread(stop_worker_pools)
|
|
log.info("Chrome worker pools stopped")
|
|
|
|
if db:
|
|
await db.disconnect()
|
|
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(
|
|
title="Google Reviews Scraper API - Production",
|
|
description="Production-ready REST API for Google Maps review scraping with webhooks and health monitoring",
|
|
version="2.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Include routers from api/routes/
|
|
app.include_router(batches_router)
|
|
app.include_router(dashboard_router)
|
|
app.include_router(admin_router)
|
|
app.include_router(pipelines_router)
|
|
app.include_router(reviewiq_analytics_router)
|
|
app.include_router(sessions_router) # Session handoff for validation → scraping
|
|
|
|
|
|
# ==================== Request/Response Models ====================
|
|
|
|
class GeolocationModel(BaseModel):
|
|
"""Geolocation coordinates"""
|
|
lat: float = Field(..., description="Latitude")
|
|
lng: float = Field(..., description="Longitude")
|
|
|
|
|
|
class ViewportModel(BaseModel):
|
|
"""Browser viewport size"""
|
|
width: int = Field(..., description="Viewport width")
|
|
height: int = Field(..., description="Viewport height")
|
|
|
|
|
|
class BrowserFingerprintModel(BaseModel):
|
|
"""Browser fingerprint to replicate user's browser"""
|
|
geolocation: Optional[GeolocationModel] = None
|
|
userAgent: Optional[str] = Field(None, description="User agent string")
|
|
viewport: Optional[ViewportModel] = Field(None, description="Screen resolution")
|
|
timezone: Optional[str] = Field(None, description="Timezone (e.g., Europe/Madrid)")
|
|
language: Optional[str] = Field(None, description="Browser language (e.g., en-US)")
|
|
platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)")
|
|
|
|
|
|
class RequesterModel(BaseModel):
|
|
"""Information about the requester of a scrape job"""
|
|
client_id: Optional[str] = Field(None, description="Client identifier")
|
|
source: Optional[str] = Field(None, description="Source of the request (e.g., 'web', 'api', 'internal')")
|
|
purpose: Optional[str] = Field(None, description="Purpose of the scrape (e.g., 'competitor_analysis', 'review_monitoring')")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional requester metadata")
|
|
|
|
|
|
class ScrapeRequest(BaseModel):
|
|
"""Request model for starting a scrape job (legacy endpoint, routes to google-reviews)"""
|
|
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
|
|
webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications")
|
|
webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata")
|
|
geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome")
|
|
browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint")
|
|
# Phase 2: New optional fields for enhanced job tracking
|
|
requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job")
|
|
priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100)
|
|
callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)")
|
|
scraper_version: Optional[str] = Field(None, description="Specific scraper version to use")
|
|
scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')")
|
|
# Testing options
|
|
max_reviews: Optional[int] = Field(None, description="Maximum reviews to collect (for testing, default: unlimited)", ge=1, le=10000)
|
|
# Session handoff (v1.2.0) - reuse browser from validation
|
|
session_id: Optional[str] = Field(None, description="Session ID from /sessions/validate for browser reuse")
|
|
|
|
|
|
class GoogleReviewsScrapeRequest(BaseModel):
|
|
"""Request model for Google Reviews scraping - explicit job type endpoint"""
|
|
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
|
|
webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications")
|
|
webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata")
|
|
geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome")
|
|
browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint")
|
|
# Phase 2: New optional fields for enhanced job tracking
|
|
requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job")
|
|
priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100)
|
|
callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)")
|
|
scraper_version: Optional[str] = Field(None, description="Specific scraper version to use")
|
|
scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')")
|
|
# Testing options
|
|
max_reviews: Optional[int] = Field(None, description="Maximum reviews to collect (for testing, default: unlimited)", ge=1, le=10000)
|
|
# Session handoff (v1.2.0) - reuse browser from validation
|
|
session_id: Optional[str] = Field(None, description="Session ID from /sessions/validate for browser reuse")
|
|
|
|
|
|
class JobResponse(BaseModel):
|
|
"""Response model for job information"""
|
|
job_id: str
|
|
status: str
|
|
url: str
|
|
created_at: str
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
updated_at: Optional[str] = None # Last update time for progress tracking
|
|
reviews_count: Optional[int] = None
|
|
total_reviews: Optional[int] = None # Total reviews available for this place
|
|
scrape_time: Optional[float] = None
|
|
error_message: Optional[str] = None
|
|
webhook_url: Optional[str] = None
|
|
# Business metadata
|
|
business_name: Optional[str] = None
|
|
business_address: Optional[str] = None
|
|
business_category: Optional[str] = None # Category (e.g., "Barber shop")
|
|
review_topics: Optional[List[Dict[str, Any]]] = None # Topic filters with mention counts
|
|
|
|
|
|
class ReviewsResponse(BaseModel):
|
|
"""Response model for reviews data"""
|
|
job_id: str
|
|
reviews: List[Dict[str, Any]]
|
|
count: int
|
|
|
|
|
|
class StatsResponse(BaseModel):
|
|
"""Response model for statistics"""
|
|
total_jobs: int
|
|
pending: int
|
|
running: int
|
|
completed: int
|
|
failed: int
|
|
cancelled: int
|
|
avg_scrape_time: Optional[float] = None
|
|
total_reviews: Optional[int] = None
|
|
|
|
|
|
class CrashAnalysisModel(BaseModel):
|
|
"""Crash analysis details"""
|
|
pattern: str = Field(..., description="Identified crash pattern type")
|
|
confidence: float = Field(..., description="Confidence score 0.0 to 1.0")
|
|
description: str = Field(..., description="Description of the crash cause")
|
|
suggested_fix: str = Field(..., description="Recommended fix action")
|
|
auto_fix_params: Optional[Dict[str, Any]] = Field(None, description="Parameters for auto-fix")
|
|
|
|
|
|
class CrashReportResponse(BaseModel):
|
|
"""Response model for crash report"""
|
|
crash_id: str
|
|
job_id: str
|
|
crash_type: str
|
|
error_message: Optional[str] = None
|
|
analysis: Optional[CrashAnalysisModel] = None
|
|
metrics_history: Optional[List[Dict[str, Any]]] = None
|
|
logs_before_crash: Optional[List[Dict[str, Any]]] = None
|
|
screenshot_url: Optional[str] = None
|
|
created_at: str
|
|
|
|
|
|
class RetryJobResponse(BaseModel):
|
|
"""Response model for retry job"""
|
|
job_id: str
|
|
status: str
|
|
message: str
|
|
applied_fixes: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class CrashPatternStats(BaseModel):
|
|
"""Statistics for a single crash pattern"""
|
|
count: int
|
|
percentage: float
|
|
avg_confidence: float
|
|
|
|
|
|
class CrashStatsResponse(BaseModel):
|
|
"""Response model for aggregate crash statistics"""
|
|
total_crashes: int
|
|
patterns: Dict[str, CrashPatternStats]
|
|
most_common: Optional[str] = None
|
|
recommendations: List[Dict[str, Any]]
|
|
|
|
|
|
# ==================== API Endpoints ====================
|
|
|
|
@app.get("/", summary="API Health Check")
|
|
async def root():
|
|
"""Basic health check endpoint"""
|
|
return {
|
|
"message": "Google Reviews Scraper API (Production)",
|
|
"status": "healthy",
|
|
"version": "2.0.0",
|
|
"features": ["postgresql", "webhooks", "canary-testing"]
|
|
}
|
|
|
|
|
|
async def _create_google_reviews_job(
|
|
url: str,
|
|
webhook_url: Optional[str] = None,
|
|
webhook_secret: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
browser_fingerprint: Optional[BrowserFingerprintModel] = None,
|
|
geolocation: Optional[GeolocationModel] = None,
|
|
requester: Optional[RequesterModel] = None,
|
|
priority: int = 0,
|
|
callback_url: Optional[str] = None,
|
|
scraper_version: Optional[str] = None,
|
|
scraper_variant: Optional[str] = None,
|
|
job_type: str = "google-reviews"
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Core logic for creating a Google Reviews scraping job.
|
|
|
|
This is the shared implementation used by both /scrape and /api/scrape/google-reviews endpoints.
|
|
|
|
Returns:
|
|
Dict with job_id, status, and message
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
try:
|
|
# Build metadata with all Phase 2 fields
|
|
job_metadata = metadata.copy() if metadata else {}
|
|
|
|
# Add browser fingerprint if provided
|
|
if browser_fingerprint:
|
|
fp = browser_fingerprint
|
|
job_metadata['browser_fingerprint'] = {
|
|
"userAgent": fp.userAgent,
|
|
"timezone": fp.timezone,
|
|
"language": fp.language,
|
|
"platform": fp.platform,
|
|
}
|
|
if fp.viewport:
|
|
job_metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
|
if fp.geolocation:
|
|
job_metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
|
elif geolocation:
|
|
job_metadata['geolocation'] = {
|
|
'lat': geolocation.lat,
|
|
'lng': geolocation.lng
|
|
}
|
|
|
|
# Phase 2: Add requester info if provided
|
|
if requester:
|
|
job_metadata['requester'] = {
|
|
'client_id': requester.client_id,
|
|
'source': requester.source,
|
|
'purpose': requester.purpose,
|
|
'metadata': requester.metadata
|
|
}
|
|
|
|
# Phase 2: Add job type for multi-scraper support
|
|
job_metadata['job_type'] = job_type
|
|
|
|
# Phase 2: Add priority for job queue ordering
|
|
job_metadata['priority'] = priority
|
|
|
|
# Phase 2: Add callback_url (alternative to webhook)
|
|
if callback_url:
|
|
job_metadata['callback_url'] = callback_url
|
|
|
|
# Phase 2: Add scraper version/variant for A/B testing and version control
|
|
if scraper_version:
|
|
job_metadata['scraper_version'] = scraper_version
|
|
if scraper_variant:
|
|
job_metadata['scraper_variant'] = scraper_variant
|
|
|
|
# Create job in database
|
|
job_id = await db.create_job(
|
|
url=url,
|
|
webhook_url=webhook_url,
|
|
webhook_secret=webhook_secret,
|
|
metadata=job_metadata
|
|
)
|
|
|
|
# Start scraping job in background
|
|
asyncio.create_task(run_scraping_job(job_id))
|
|
|
|
log.info(f"Created and started job {job_id} (type={job_type}, priority={priority})")
|
|
|
|
return {
|
|
"job_id": str(job_id),
|
|
"status": "started",
|
|
"message": "Scraping job started successfully",
|
|
"job_type": job_type
|
|
}
|
|
|
|
except Exception as e:
|
|
log.error(f"Error creating scraping job: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}")
|
|
|
|
|
|
@app.post("/api/scrape/google-reviews", response_model=Dict[str, str], summary="Start Google Reviews Scraping Job")
|
|
async def scrape_google_reviews(request: GoogleReviewsScrapeRequest):
|
|
"""
|
|
Start a new Google Reviews scraping job.
|
|
|
|
This is the primary endpoint for Phase 2 onwards. It explicitly creates a job
|
|
of type 'google-reviews' with full support for all Phase 2 features:
|
|
- Requester tracking (client_id, source, purpose)
|
|
- Job priority for queue ordering
|
|
- Callback URL (alternative to webhook)
|
|
- Scraper version/variant selection for A/B testing
|
|
|
|
The job runs asynchronously in the background. You can:
|
|
- Poll GET /jobs/{job_id} for status
|
|
- Provide webhook_url for automatic notification when complete
|
|
- Subscribe to SSE at /jobs/{job_id}/stream for real-time updates
|
|
|
|
Returns the job ID for tracking.
|
|
"""
|
|
return await _create_google_reviews_job(
|
|
url=str(request.url),
|
|
webhook_url=str(request.webhook_url) if request.webhook_url else None,
|
|
webhook_secret=request.webhook_secret,
|
|
metadata=request.metadata,
|
|
browser_fingerprint=request.browser_fingerprint,
|
|
geolocation=request.geolocation,
|
|
requester=request.requester,
|
|
priority=request.priority or 0,
|
|
callback_url=str(request.callback_url) if request.callback_url else None,
|
|
scraper_version=request.scraper_version,
|
|
scraper_variant=request.scraper_variant,
|
|
job_type="google-reviews"
|
|
)
|
|
|
|
|
|
@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job (Legacy)")
|
|
async def start_scrape(request: ScrapeRequest):
|
|
"""
|
|
Start a new scraping job (legacy endpoint, routes to google-reviews).
|
|
|
|
**NOTE**: This endpoint is maintained for backwards compatibility.
|
|
For new integrations, use POST /api/scrape/google-reviews instead.
|
|
|
|
The job runs asynchronously in the background. You can:
|
|
- Poll GET /jobs/{job_id} for status
|
|
- Provide webhook_url for automatic notification when complete
|
|
|
|
Returns the job ID for tracking.
|
|
"""
|
|
return await _create_google_reviews_job(
|
|
url=str(request.url),
|
|
webhook_url=str(request.webhook_url) if request.webhook_url else None,
|
|
webhook_secret=request.webhook_secret,
|
|
metadata=request.metadata,
|
|
browser_fingerprint=request.browser_fingerprint,
|
|
geolocation=request.geolocation,
|
|
requester=request.requester,
|
|
priority=request.priority or 0,
|
|
callback_url=str(request.callback_url) if request.callback_url else None,
|
|
scraper_version=request.scraper_version,
|
|
scraper_variant=request.scraper_variant,
|
|
job_type="google-reviews"
|
|
)
|
|
|
|
|
|
@app.post("/api/scrape", response_model=Dict[str, str], summary="Start Scraping Job")
|
|
async def api_start_scrape(request: ScrapeRequest):
|
|
"""
|
|
Start a new scraping job via the /api/scrape endpoint.
|
|
|
|
This endpoint accepts the same request body as /scrape and routes to google-reviews.
|
|
For explicit job type control, use POST /api/scrape/google-reviews instead.
|
|
|
|
The job runs asynchronously in the background. You can:
|
|
- Poll GET /jobs/{job_id} for status
|
|
- Provide webhook_url for automatic notification when complete
|
|
- Subscribe to SSE at /jobs/{job_id}/stream for real-time updates
|
|
|
|
Returns the job ID for tracking.
|
|
"""
|
|
return await _create_google_reviews_job(
|
|
url=str(request.url),
|
|
webhook_url=str(request.webhook_url) if request.webhook_url else None,
|
|
webhook_secret=request.webhook_secret,
|
|
metadata=request.metadata,
|
|
browser_fingerprint=request.browser_fingerprint,
|
|
geolocation=request.geolocation,
|
|
requester=request.requester,
|
|
priority=request.priority or 0,
|
|
callback_url=str(request.callback_url) if request.callback_url else None,
|
|
scraper_version=request.scraper_version,
|
|
scraper_variant=request.scraper_variant,
|
|
job_type="google-reviews"
|
|
)
|
|
|
|
|
|
@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status")
|
|
async def get_job(job_id: UUID):
|
|
"""Get detailed information about a specific job"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Parse review_topics if it's a string (JSONB might be returned as string)
|
|
review_topics = job.get('review_topics')
|
|
if isinstance(review_topics, str):
|
|
try:
|
|
review_topics = json.loads(review_topics)
|
|
except:
|
|
review_topics = None
|
|
|
|
# Read business info from dedicated columns (with fallback to metadata for older jobs)
|
|
business_name = job.get('business_name')
|
|
business_category = job.get('business_category')
|
|
|
|
# Fallback to metadata for jobs created before migration
|
|
if not business_name or not business_category:
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
if metadata:
|
|
business_name = business_name or metadata.get('business_name')
|
|
# Note: business_category was not previously stored in metadata
|
|
|
|
return JobResponse(
|
|
job_id=str(job['job_id']),
|
|
status=job['status'],
|
|
url=job['url'],
|
|
created_at=job['created_at'].isoformat(),
|
|
started_at=job['started_at'].isoformat() if job['started_at'] else None,
|
|
completed_at=job['completed_at'].isoformat() if job['completed_at'] else None,
|
|
updated_at=job['updated_at'].isoformat() if job.get('updated_at') else None,
|
|
reviews_count=job['reviews_count'],
|
|
total_reviews=job.get('total_reviews'),
|
|
scrape_time=job['scrape_time'],
|
|
error_message=job['error_message'],
|
|
webhook_url=job.get('webhook_url'),
|
|
business_name=business_name,
|
|
business_category=business_category,
|
|
review_topics=review_topics
|
|
)
|
|
|
|
|
|
@app.get("/jobs/{job_id}/logs", summary="Get Job Logs")
|
|
async def get_job_logs(job_id: UUID):
|
|
"""
|
|
Get the scraper logs for a job.
|
|
|
|
Returns logs from both successful and failed jobs.
|
|
Useful for debugging scraping issues.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Get scrape_logs from job
|
|
scrape_logs = job.get('scrape_logs')
|
|
|
|
# Parse if string (asyncpg might return JSONB as string)
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = None
|
|
|
|
return {
|
|
"job_id": str(job_id),
|
|
"status": job['status'],
|
|
"error_message": job.get('error_message'),
|
|
"logs": scrape_logs or [],
|
|
"log_count": len(scrape_logs) if scrape_logs else 0
|
|
}
|
|
|
|
|
|
# ==================== SSE Streaming Endpoints ====================
|
|
|
|
def format_sse_event(event_type: str, data: dict, use_structured_format: bool = True) -> str:
|
|
"""
|
|
Format an SSE event message.
|
|
|
|
Args:
|
|
event_type: The SSE event type (e.g., 'log', 'metrics', 'status')
|
|
data: The data payload
|
|
use_structured_format: If True, wrap in {"type": ..., "data": ...} structure
|
|
If False, use legacy format for backward compatibility
|
|
|
|
Returns:
|
|
Formatted SSE message string
|
|
"""
|
|
if use_structured_format:
|
|
payload = {"type": event_type, "data": data}
|
|
else:
|
|
payload = data
|
|
return f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
|
|
|
|
|
|
def format_structured_log_event(log_entry: dict) -> str:
|
|
"""
|
|
Format a structured log entry as an SSE event.
|
|
|
|
The log_entry should already be a dict from StructuredLogger.get_logs().
|
|
|
|
Returns:
|
|
Formatted SSE message with type "log"
|
|
"""
|
|
return format_sse_event("log", log_entry, use_structured_format=True)
|
|
|
|
|
|
def format_metrics_event(metrics: dict) -> str:
|
|
"""
|
|
Format a metrics event for SSE streaming.
|
|
|
|
Args:
|
|
metrics: Dict containing reviews_extracted, scroll_count, memory_mb, extraction_rate
|
|
|
|
Returns:
|
|
Formatted SSE message with type "metrics"
|
|
"""
|
|
return format_sse_event("metrics", metrics, use_structured_format=True)
|
|
|
|
|
|
async def broadcast_job_update(job_id: str, event_type: str, data: dict):
|
|
"""Broadcast an update to all subscribers of a job stream and the all-jobs stream."""
|
|
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
|
|
|
|
# Send to job-specific subscribers
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
# Send to all-jobs subscribers
|
|
for queue in job_update_queues.get("all", []):
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
async def broadcast_structured_log(job_id: str, log_entry: dict):
|
|
"""Broadcast a structured log entry to job subscribers."""
|
|
message = format_structured_log_event(log_entry)
|
|
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
async def broadcast_metrics(job_id: str, metrics: dict):
|
|
"""Broadcast metrics update to job subscribers."""
|
|
message = format_metrics_event(metrics)
|
|
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
@app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)")
|
|
async def stream_job_updates(
|
|
job_id: UUID,
|
|
format: str = Query("structured", description="Event format: 'structured' (new) or 'legacy' (backward compatible)")
|
|
):
|
|
"""
|
|
Server-Sent Events stream for real-time job updates.
|
|
|
|
Event types (structured format):
|
|
- init: Initial job state with all logs
|
|
- log: Individual structured log entry {"type": "log", "data": {"timestamp": "...", "level": "INFO", ...}}
|
|
- metrics: Periodic metrics {"type": "metrics", "data": {"reviews_extracted": 150, "scroll_count": 45, ...}}
|
|
- status: Job status changes
|
|
- complete: Job finished (completed/failed)
|
|
|
|
Query parameters:
|
|
- format: 'structured' (default) for new format, 'legacy' for backward compatibility
|
|
|
|
Connect with EventSource in the browser:
|
|
```javascript
|
|
const es = new EventSource('/jobs/{job_id}/stream');
|
|
es.addEventListener('log', (e) => {
|
|
const event = JSON.parse(e.data);
|
|
console.log('Log:', event.data); // Structured log entry
|
|
});
|
|
es.addEventListener('metrics', (e) => {
|
|
const event = JSON.parse(e.data);
|
|
console.log('Metrics:', event.data); // {reviews_extracted, scroll_count, memory_mb, extraction_rate}
|
|
});
|
|
```
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Verify job exists
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
job_id_str = str(job_id)
|
|
use_structured = format.lower() != "legacy"
|
|
|
|
# Create queue for this client
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Register subscriber
|
|
if job_id_str not in job_update_queues:
|
|
job_update_queues[job_id_str] = []
|
|
job_update_queues[job_id_str].append(queue)
|
|
|
|
async def event_generator():
|
|
try:
|
|
# Send initial state
|
|
job_data = await db.get_job(job_id)
|
|
scrape_logs = []
|
|
if job_data:
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
initial = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": job_data.get('reviews_count'),
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"scrape_time": job_data.get('scrape_time'),
|
|
"error_message": job_data.get('error_message'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: init\ndata: {json.dumps(initial)}\n\n"
|
|
|
|
# If job is already complete, send complete event and close
|
|
if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']:
|
|
yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n"
|
|
return
|
|
|
|
# Keep connection alive and send updates
|
|
last_log_count = len(scrape_logs) if scrape_logs else 0
|
|
last_reviews_count = job_data.get('reviews_count') if job_data else 0
|
|
last_metrics_time = time.time()
|
|
metrics_interval = 5.0 # Emit metrics every 5 seconds
|
|
|
|
while True:
|
|
try:
|
|
# Wait for update with timeout (for keepalive)
|
|
try:
|
|
message = await asyncio.wait_for(queue.get(), timeout=2.0)
|
|
yield message
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive comment
|
|
yield ": keepalive\n\n"
|
|
|
|
# Also poll database for updates (backup in case broadcast missed)
|
|
job_data = await db.get_job(job_id)
|
|
if job_data:
|
|
# Check for status change
|
|
if job_data['status'] in ['completed', 'failed', 'cancelled']:
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
final = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": job_data.get('reviews_count'),
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"scrape_time": job_data.get('scrape_time'),
|
|
"error_message": job_data.get('error_message'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
|
|
return
|
|
|
|
# Check for new logs or progress
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
current_log_count = len(scrape_logs) if scrape_logs else 0
|
|
current_reviews = job_data.get('reviews_count') or 0
|
|
current_time = time.time()
|
|
|
|
# Emit individual structured log events for new logs
|
|
if use_structured and current_log_count > last_log_count:
|
|
new_logs = scrape_logs[last_log_count:] if scrape_logs else []
|
|
for log_entry in new_logs:
|
|
yield format_structured_log_event(log_entry)
|
|
|
|
# Emit metrics event every 5 seconds during job execution
|
|
if use_structured and job_data['status'] == 'running':
|
|
if current_time - last_metrics_time >= metrics_interval:
|
|
# Calculate extraction rate (reviews per second)
|
|
elapsed = job_data.get('scrape_time') or 0
|
|
extraction_rate = (current_reviews / elapsed) if elapsed > 0 else 0
|
|
|
|
# Count scroll events from logs
|
|
scroll_count = 0
|
|
if scrape_logs:
|
|
for entry in scrape_logs:
|
|
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
|
if 'scroll' in msg.lower():
|
|
scroll_count += 1
|
|
|
|
# Get memory from latest log entry with metrics
|
|
memory_mb = 0
|
|
if scrape_logs:
|
|
for entry in reversed(scrape_logs):
|
|
if isinstance(entry, dict) and entry.get('metrics'):
|
|
memory_mb = entry['metrics'].get('memory_mb', 0)
|
|
break
|
|
|
|
metrics_data = {
|
|
"reviews_extracted": current_reviews,
|
|
"scroll_count": scroll_count,
|
|
"memory_mb": memory_mb,
|
|
"extraction_rate": round(extraction_rate, 2)
|
|
}
|
|
yield format_metrics_event(metrics_data)
|
|
last_metrics_time = current_time
|
|
|
|
# Send legacy update event if reviews or logs changed
|
|
if current_log_count > last_log_count or current_reviews != last_reviews_count:
|
|
if not use_structured:
|
|
# Legacy format: send all logs in update event
|
|
update = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": current_reviews,
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: update\ndata: {json.dumps(update)}\n\n"
|
|
last_log_count = current_log_count
|
|
last_reviews_count = current_reviews
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in SSE stream for job {job_id}: {e}")
|
|
break
|
|
|
|
finally:
|
|
# Unregister subscriber
|
|
if job_id_str in job_update_queues:
|
|
try:
|
|
job_update_queues[job_id_str].remove(queue)
|
|
if not job_update_queues[job_id_str]:
|
|
del job_update_queues[job_id_str]
|
|
except:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no" # Disable nginx buffering
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)")
|
|
async def stream_all_jobs():
|
|
"""
|
|
Server-Sent Events stream for all job updates.
|
|
|
|
Streams:
|
|
- job_created: New job was created
|
|
- job_updated: Job status/progress changed
|
|
- job_completed: Job finished
|
|
|
|
Connect with EventSource in the browser:
|
|
```javascript
|
|
const es = new EventSource('/jobs/stream');
|
|
es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data)));
|
|
```
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Create queue for this client
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Register subscriber to all-jobs stream
|
|
job_update_queues["all"].append(queue)
|
|
|
|
async def event_generator():
|
|
try:
|
|
# Send initial jobs list
|
|
jobs = await db.list_jobs(limit=100)
|
|
jobs_data = [
|
|
{
|
|
"job_id": str(j['job_id']),
|
|
"status": j['status'],
|
|
"url": j['url'],
|
|
"created_at": j['created_at'].isoformat(),
|
|
"completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None,
|
|
"reviews_count": j.get('reviews_count'),
|
|
"scrape_time": j.get('scrape_time'),
|
|
"error_message": j.get('error_message')
|
|
}
|
|
for j in jobs
|
|
]
|
|
yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n"
|
|
|
|
# Keep connection alive and send updates
|
|
while True:
|
|
try:
|
|
# Wait for update with timeout (for keepalive)
|
|
try:
|
|
message = await asyncio.wait_for(queue.get(), timeout=5.0)
|
|
yield message
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive comment
|
|
yield ": keepalive\n\n"
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in all-jobs SSE stream: {e}")
|
|
break
|
|
|
|
finally:
|
|
# Unregister subscriber
|
|
try:
|
|
job_update_queues["all"].remove(queue)
|
|
except:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no"
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
|
|
async def get_job_reviews(job_id: UUID):
|
|
"""
|
|
Get reviews data for a job.
|
|
|
|
Returns reviews for completed, partial, or running jobs (if reviews have been collected).
|
|
Returns 404 if job not found or no reviews available yet.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Get reviews (includes completed, running, and partial jobs)
|
|
reviews = await db.get_job_reviews(job_id, include_partial=True)
|
|
if reviews is None:
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
elif job['status'] == 'pending':
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Job has not started yet"
|
|
)
|
|
elif job['status'] == 'failed':
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Job failed without saving any reviews: {job.get('error_message', 'Unknown error')}"
|
|
)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="No reviews data available yet")
|
|
|
|
return ReviewsResponse(
|
|
job_id=str(job_id),
|
|
reviews=reviews,
|
|
count=len(reviews)
|
|
)
|
|
|
|
|
|
@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs")
|
|
async def list_jobs(
|
|
status: Optional[str] = Query(None, description="Filter by job status"),
|
|
limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000),
|
|
offset: int = Query(0, description="Number of jobs to skip", ge=0)
|
|
):
|
|
"""List all jobs, optionally filtered by status"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Validate status if provided
|
|
job_status = None
|
|
if status:
|
|
try:
|
|
job_status = JobStatus(status.lower())
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid status. Must be one of: {[s.value for s in JobStatus]}"
|
|
)
|
|
|
|
jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset)
|
|
|
|
result = []
|
|
for job in jobs:
|
|
# Read business info from dedicated columns (with fallback to metadata for older jobs)
|
|
business_name = job.get('business_name')
|
|
business_address = job.get('business_address')
|
|
business_category = job.get('business_category')
|
|
|
|
# Fallback to metadata for jobs created before migration
|
|
if not business_name:
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
if metadata:
|
|
business_name = business_name or metadata.get('business_name')
|
|
business_address = business_address or metadata.get('business_address')
|
|
|
|
# Parse review_topics if it's a string
|
|
review_topics = job.get('review_topics')
|
|
if isinstance(review_topics, str):
|
|
try:
|
|
review_topics = json.loads(review_topics)
|
|
except:
|
|
review_topics = None
|
|
|
|
result.append(JobResponse(
|
|
job_id=str(job['job_id']),
|
|
status=job['status'],
|
|
url=job['url'],
|
|
created_at=job['created_at'].isoformat(),
|
|
completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None,
|
|
reviews_count=job.get('reviews_count'),
|
|
total_reviews=job.get('total_reviews'),
|
|
scrape_time=job.get('scrape_time'),
|
|
error_message=job.get('error_message'),
|
|
business_name=business_name,
|
|
business_address=business_address,
|
|
business_category=business_category,
|
|
review_topics=review_topics
|
|
))
|
|
|
|
return result
|
|
|
|
|
|
@app.delete("/jobs/{job_id}", summary="Delete Job")
|
|
async def delete_job(job_id: UUID):
|
|
"""Delete a job from the system"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
deleted = await db.delete_job(job_id)
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
return {"message": "Job deleted successfully"}
|
|
|
|
|
|
@app.post("/check-reviews", summary="Check if Reviews Exist")
|
|
async def check_reviews(request: ScrapeRequest):
|
|
"""
|
|
Get business card information from Google Maps.
|
|
Returns business name, address, rating, and review count.
|
|
|
|
Creates a fresh Chrome instance for reliable results (same as full scraper).
|
|
This is used to show the business confirmation card in the UI.
|
|
"""
|
|
try:
|
|
url = str(request.url)
|
|
|
|
# Use the SAME scraper algorithm with validation_only=True for early return
|
|
# Creates a fresh Chrome instance (same as full scraper) to avoid stale browser state
|
|
# Pooled browsers can have cookies/state that cause Google to render pages differently
|
|
|
|
# Build fingerprint dict from request
|
|
fingerprint = None
|
|
if request.browser_fingerprint:
|
|
fp = request.browser_fingerprint
|
|
fingerprint = {
|
|
"userAgent": fp.userAgent,
|
|
"timezone": fp.timezone,
|
|
"language": fp.language,
|
|
"platform": fp.platform,
|
|
}
|
|
if fp.viewport:
|
|
fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
|
if fp.geolocation:
|
|
fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
|
log.info(f"Creating Chrome with user fingerprint: {fp.platform}, {fp.timezone}")
|
|
elif request.geolocation:
|
|
fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}}
|
|
log.info(f"Creating Chrome with geolocation only")
|
|
else:
|
|
log.info(f"Creating Chrome with default settings")
|
|
|
|
result = await asyncio.to_thread(
|
|
fast_scrape_v1_0_0, # Use v1.0.0 for validation (fastest)
|
|
url=url,
|
|
headless=False, # Use Xvfb display
|
|
validation_only=True, # Return early after getting total_reviews
|
|
browser_fingerprint=fingerprint # Pass user's browser fingerprint
|
|
)
|
|
|
|
# Extract validation info from the result
|
|
validation_info = result.get('validation_info', {})
|
|
total_reviews = validation_info.get('total_reviews') or result.get('total_reviews') or 0
|
|
name = validation_info.get('name')
|
|
rating = validation_info.get('rating')
|
|
category = validation_info.get('category')
|
|
address = validation_info.get('address')
|
|
|
|
# Has reviews if we found a business with the Reviews tab (indicated by total_reviews > 0)
|
|
has_reviews = bool(name and total_reviews > 0)
|
|
|
|
return {
|
|
"has_reviews": has_reviews, # True if business has reviews
|
|
"total_reviews": total_reviews,
|
|
"name": name,
|
|
"address": address,
|
|
"rating": rating,
|
|
"category": category,
|
|
"success": result.get('success', True),
|
|
"error": result.get('error')
|
|
}
|
|
|
|
except Exception as e:
|
|
log.error(f"Error checking reviews: {e}")
|
|
|
|
return {
|
|
"has_reviews": False,
|
|
"review_count": 0,
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
@app.get("/stats", response_model=StatsResponse, summary="Get Statistics")
|
|
async def get_stats():
|
|
"""Get job statistics"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
stats = await db.get_stats()
|
|
return StatsResponse(**stats)
|
|
|
|
|
|
# ==================== GBP Categories Endpoints ====================
|
|
|
|
@app.get("/categories", summary="Get GBP Categories")
|
|
async def get_categories(
|
|
search: Optional[str] = Query(None, description="Search term for category name"),
|
|
parent: Optional[str] = Query(None, description="Parent path (ltree) to filter children"),
|
|
level: Optional[int] = Query(None, description="Category level (1-4)", ge=1, le=4),
|
|
limit: int = Query(5000, description="Maximum number of results", ge=1, le=10000),
|
|
offset: int = Query(0, description="Offset for pagination", ge=0),
|
|
):
|
|
"""
|
|
Get Google Business Profile categories.
|
|
|
|
Supports filtering by:
|
|
- search: Text search in category name
|
|
- parent: Get children of a specific path
|
|
- level: Filter by hierarchy level (1=Sector, 2=Business Type, 3=Sub-category, 4=Category)
|
|
"""
|
|
if not db or not db.pool:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
async with db.pool.acquire() as conn:
|
|
# Build query dynamically based on filters
|
|
conditions = []
|
|
params = []
|
|
param_idx = 1
|
|
|
|
if search:
|
|
conditions.append(f"name ILIKE ${param_idx}")
|
|
params.append(f"%{search}%")
|
|
param_idx += 1
|
|
|
|
if parent:
|
|
conditions.append(f"path <@ ${param_idx}::ltree AND path != ${param_idx}::ltree")
|
|
params.append(parent)
|
|
param_idx += 1
|
|
|
|
if level:
|
|
conditions.append(f"level = ${param_idx}")
|
|
params.append(level)
|
|
param_idx += 1
|
|
|
|
where_clause = " AND ".join(conditions) if conditions else "TRUE"
|
|
|
|
# Get total count
|
|
count_query = f"SELECT COUNT(*) FROM gbp_categories WHERE {where_clause}"
|
|
total = await conn.fetchval(count_query, *params)
|
|
|
|
# Get categories
|
|
query = f"""
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
WHERE {where_clause}
|
|
ORDER BY path
|
|
LIMIT ${param_idx} OFFSET ${param_idx + 1}
|
|
"""
|
|
params.extend([limit, offset])
|
|
|
|
rows = await conn.fetch(query, *params)
|
|
categories = [dict(row) for row in rows]
|
|
|
|
return {
|
|
"categories": categories,
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
}
|
|
|
|
|
|
@app.get("/categories/tree", summary="Get GBP Categories Tree")
|
|
async def get_categories_tree(
|
|
root: Optional[str] = Query(None, description="Root path to start the tree from"),
|
|
max_depth: int = Query(4, description="Maximum depth of the tree", ge=1, le=4),
|
|
):
|
|
"""
|
|
Get categories as a hierarchical tree structure.
|
|
|
|
Returns nested categories starting from root (or all roots if not specified).
|
|
"""
|
|
if not db or not db.pool:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
async with db.pool.acquire() as conn:
|
|
if root:
|
|
# Get subtree starting from root
|
|
query = """
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
WHERE path <@ $1::ltree
|
|
ORDER BY path
|
|
"""
|
|
rows = await conn.fetch(query, root)
|
|
else:
|
|
# Get all categories
|
|
query = """
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
ORDER BY path
|
|
"""
|
|
rows = await conn.fetch(query)
|
|
|
|
categories = [dict(row) for row in rows]
|
|
|
|
# Build tree structure
|
|
def build_tree(cats, parent_path=None, current_depth=1):
|
|
if current_depth > max_depth:
|
|
return []
|
|
|
|
result = []
|
|
for cat in cats:
|
|
cat_parts = cat['path'].split('.')
|
|
|
|
if parent_path is None:
|
|
# Root level - single segment paths
|
|
if len(cat_parts) == 1:
|
|
children = build_tree(cats, cat['path'], current_depth + 1)
|
|
result.append({
|
|
**cat,
|
|
'children': children if children else None
|
|
})
|
|
else:
|
|
# Check if this is a direct child of parent_path
|
|
parent_parts = parent_path.split('.')
|
|
if (len(cat_parts) == len(parent_parts) + 1 and
|
|
cat['path'].startswith(parent_path + '.')):
|
|
children = build_tree(cats, cat['path'], current_depth + 1)
|
|
result.append({
|
|
**cat,
|
|
'children': children if children else None
|
|
})
|
|
|
|
return result
|
|
|
|
tree = build_tree(categories)
|
|
|
|
return {
|
|
"tree": tree,
|
|
"total": len(categories),
|
|
}
|
|
|
|
|
|
@app.get("/categories/{path:path}", summary="Get Category by Path")
|
|
async def get_category_by_path(path: str):
|
|
"""
|
|
Get a specific category by its ltree path.
|
|
|
|
Also returns ancestors and direct children.
|
|
"""
|
|
if not db or not db.pool:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
async with db.pool.acquire() as conn:
|
|
# Get the category
|
|
category = await conn.fetchrow("""
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
WHERE path = $1::ltree
|
|
""", path)
|
|
|
|
if not category:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
category = dict(category)
|
|
|
|
# Get ancestors
|
|
ancestors = await conn.fetch("""
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
WHERE path @> $1::ltree AND path != $1::ltree
|
|
ORDER BY path
|
|
""", path)
|
|
|
|
# Get direct children
|
|
children = await conn.fetch("""
|
|
SELECT id, name, slug, path::text as path, level, parent_id, category_count
|
|
FROM gbp_categories
|
|
WHERE path ~ ($1 || '.*{1}')::lquery
|
|
ORDER BY name
|
|
""", path)
|
|
|
|
return {
|
|
"category": category,
|
|
"ancestors": [dict(a) for a in ancestors],
|
|
"children": [dict(c) for c in children],
|
|
}
|
|
|
|
|
|
@app.get("/pool-stats", summary="Get Worker Pool Statistics")
|
|
async def pool_stats():
|
|
"""Get Chrome worker pool statistics"""
|
|
return await asyncio.to_thread(get_pool_stats)
|
|
|
|
|
|
# ==================== Crash Report Endpoints ====================
|
|
|
|
@app.get("/jobs/{job_id}/crash-report", response_model=CrashReportResponse, summary="Get Crash Report")
|
|
async def get_crash_report(job_id: UUID):
|
|
"""
|
|
Get the crash report for a failed or partial job.
|
|
|
|
Returns detailed crash analysis including:
|
|
- Crash pattern identification (memory_exhaustion, rate_limited, etc.)
|
|
- Confidence score for the pattern match
|
|
- Suggested fixes and auto-fix parameters
|
|
- Metrics history and logs before the crash
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Verify job exists
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Only failed or partial jobs have crash reports
|
|
if job['status'] not in ['failed', 'partial']:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Job status is '{job['status']}' - crash reports only available for failed or partial jobs"
|
|
)
|
|
|
|
# Get crash report from database
|
|
crash_report = await db.get_crash_report(str(job_id))
|
|
|
|
if not crash_report:
|
|
# No stored crash report - generate one from job data
|
|
# Build crash report from job data for analysis
|
|
scrape_logs = job.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
# Get metrics_history if available
|
|
metrics_history = job.get('metrics_history')
|
|
if isinstance(metrics_history, str):
|
|
try:
|
|
metrics_history = json.loads(metrics_history)
|
|
except:
|
|
metrics_history = []
|
|
|
|
crash_data = {
|
|
'error_message': job.get('error_message', 'Unknown error'),
|
|
'metrics_history': metrics_history or [],
|
|
'logs_before_crash': scrape_logs or [],
|
|
'state': {
|
|
'reviews_extracted': job.get('reviews_count', 0),
|
|
'total_reviews': job.get('total_reviews')
|
|
}
|
|
}
|
|
|
|
# Analyze the crash
|
|
analysis = analyze_crash(crash_data)
|
|
|
|
# Build response from job data and analysis
|
|
return CrashReportResponse(
|
|
crash_id=str(job_id), # Use job_id as crash_id when no stored report
|
|
job_id=str(job_id),
|
|
crash_type=analysis.pattern,
|
|
error_message=job.get('error_message'),
|
|
analysis=CrashAnalysisModel(
|
|
pattern=analysis.pattern,
|
|
confidence=analysis.confidence,
|
|
description=analysis.description,
|
|
suggested_fix=analysis.suggested_fix,
|
|
auto_fix_params=analysis.auto_fix_params
|
|
),
|
|
metrics_history=metrics_history,
|
|
logs_before_crash=scrape_logs,
|
|
screenshot_url=None,
|
|
created_at=job['completed_at'].isoformat() if job.get('completed_at') else job['created_at'].isoformat()
|
|
)
|
|
|
|
# Parse JSONB fields if needed
|
|
metrics_history = crash_report.get('metrics_history')
|
|
if isinstance(metrics_history, str):
|
|
try:
|
|
metrics_history = json.loads(metrics_history)
|
|
except:
|
|
metrics_history = []
|
|
|
|
logs_before_crash = crash_report.get('logs_before_crash')
|
|
if isinstance(logs_before_crash, str):
|
|
try:
|
|
logs_before_crash = json.loads(logs_before_crash)
|
|
except:
|
|
logs_before_crash = []
|
|
|
|
stored_analysis = crash_report.get('analysis')
|
|
if isinstance(stored_analysis, str):
|
|
try:
|
|
stored_analysis = json.loads(stored_analysis)
|
|
except:
|
|
stored_analysis = None
|
|
|
|
# If no analysis stored, generate one
|
|
if not stored_analysis:
|
|
crash_data = {
|
|
'error_message': crash_report.get('error_message', ''),
|
|
'metrics_history': metrics_history or [],
|
|
'logs_before_crash': logs_before_crash or [],
|
|
'crash_type': crash_report.get('crash_type'),
|
|
'state': crash_report.get('state', {})
|
|
}
|
|
analysis = analyze_crash(crash_data)
|
|
stored_analysis = {
|
|
'pattern': analysis.pattern,
|
|
'confidence': analysis.confidence,
|
|
'description': analysis.description,
|
|
'suggested_fix': analysis.suggested_fix,
|
|
'auto_fix_params': analysis.auto_fix_params
|
|
}
|
|
|
|
return CrashReportResponse(
|
|
crash_id=crash_report['crash_id'],
|
|
job_id=crash_report['job_id'],
|
|
crash_type=crash_report['crash_type'],
|
|
error_message=crash_report.get('error_message'),
|
|
analysis=CrashAnalysisModel(**stored_analysis) if stored_analysis else None,
|
|
metrics_history=metrics_history,
|
|
logs_before_crash=logs_before_crash,
|
|
screenshot_url=crash_report.get('screenshot_url'),
|
|
created_at=crash_report['created_at'].isoformat()
|
|
)
|
|
|
|
|
|
# Available sort orders for retry strategy
|
|
SORT_ORDERS = ["newest", "lowest", "highest", "relevant"]
|
|
|
|
# Fingerprint rotation for retry - realistic browser profiles to avoid bot detection
|
|
import random
|
|
|
|
FINGERPRINT_PROFILES = [
|
|
{
|
|
"platform": "MacIntel",
|
|
"timezone": "Europe/Madrid",
|
|
"language": "es-ES",
|
|
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"viewport": {"width": 1440, "height": 900}
|
|
},
|
|
{
|
|
"platform": "Win32",
|
|
"timezone": "Europe/London",
|
|
"language": "en-GB",
|
|
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
|
"viewport": {"width": 1920, "height": 1080}
|
|
},
|
|
{
|
|
"platform": "MacIntel",
|
|
"timezone": "America/New_York",
|
|
"language": "en-US",
|
|
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
|
"viewport": {"width": 1680, "height": 1050}
|
|
},
|
|
{
|
|
"platform": "Win32",
|
|
"timezone": "Europe/Paris",
|
|
"language": "fr-FR",
|
|
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
|
"viewport": {"width": 1366, "height": 768}
|
|
},
|
|
{
|
|
"platform": "MacIntel",
|
|
"timezone": "Europe/Berlin",
|
|
"language": "de-DE",
|
|
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
|
|
"viewport": {"width": 1512, "height": 982}
|
|
},
|
|
]
|
|
|
|
def get_rotated_fingerprint(retry_attempt: int = 0, previous_fingerprints: list = None) -> dict:
|
|
"""
|
|
Get a fingerprint profile for retry, avoiding previously used ones.
|
|
|
|
Args:
|
|
retry_attempt: Which retry attempt this is (0-indexed)
|
|
previous_fingerprints: List of previously used fingerprint platforms
|
|
|
|
Returns:
|
|
A fingerprint profile dict
|
|
"""
|
|
previous_fingerprints = previous_fingerprints or []
|
|
|
|
# Filter out previously used profiles
|
|
available = [fp for fp in FINGERPRINT_PROFILES
|
|
if fp["platform"] not in previous_fingerprints]
|
|
|
|
# If all used, cycle back
|
|
if not available:
|
|
available = FINGERPRINT_PROFILES
|
|
|
|
# Select based on retry attempt (deterministic but varied)
|
|
selected = available[retry_attempt % len(available)]
|
|
|
|
return selected.copy()
|
|
|
|
|
|
@app.post("/jobs/{job_id}/retry", response_model=RetryJobResponse, summary="Retry Failed Job")
|
|
async def retry_job(
|
|
job_id: UUID,
|
|
apply_fix: bool = Query(False, description="Apply auto-fix parameters based on crash analysis"),
|
|
next_sort: bool = Query(False, description="Use a different sort order than the original job (for partial jobs)")
|
|
):
|
|
"""
|
|
Retry a failed or partial job, optionally applying auto-fix parameters.
|
|
|
|
When apply_fix=true:
|
|
- Analyzes the crash pattern from the original job
|
|
- Applies recommended parameter adjustments (e.g., reduced batch size for memory issues)
|
|
- Creates a new job with the adjusted parameters
|
|
|
|
When next_sort=true:
|
|
- Uses a different sort order than previously attempted
|
|
- Helps get different reviews when stuck at ~1000 limit
|
|
- Tracks sort_orders_attempted for review merging
|
|
|
|
Returns the new job ID for tracking.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Get original job
|
|
original_job = await db.get_job(job_id)
|
|
if not original_job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Can only retry failed or partial jobs
|
|
if original_job['status'] not in ['failed', 'partial']:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Cannot retry job with status '{original_job['status']}' - only failed or partial jobs can be retried"
|
|
)
|
|
|
|
# Parse original metadata
|
|
original_metadata = original_job.get('metadata')
|
|
if isinstance(original_metadata, str):
|
|
try:
|
|
original_metadata = json.loads(original_metadata)
|
|
except:
|
|
original_metadata = {}
|
|
original_metadata = original_metadata or {}
|
|
|
|
applied_fixes = None
|
|
|
|
if apply_fix:
|
|
# Get crash analysis to determine fixes
|
|
scrape_logs = original_job.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
metrics_history = original_job.get('metrics_history')
|
|
if isinstance(metrics_history, str):
|
|
try:
|
|
metrics_history = json.loads(metrics_history)
|
|
except:
|
|
metrics_history = []
|
|
|
|
crash_data = {
|
|
'error_message': original_job.get('error_message', 'Unknown error'),
|
|
'metrics_history': metrics_history or [],
|
|
'logs_before_crash': scrape_logs or [],
|
|
'state': {
|
|
'reviews_extracted': original_job.get('reviews_count', 0),
|
|
'total_reviews': original_job.get('total_reviews')
|
|
}
|
|
}
|
|
|
|
analysis = analyze_crash(crash_data)
|
|
|
|
if analysis.auto_fix_params:
|
|
# Get current scraper params from metadata or use defaults
|
|
current_params = original_metadata.get('scraper_params', {})
|
|
|
|
# Apply the auto-fix parameters
|
|
fixed_params = apply_auto_fix(analysis.pattern, current_params)
|
|
|
|
# Store applied fixes in metadata
|
|
original_metadata['scraper_params'] = fixed_params
|
|
original_metadata['retry_info'] = {
|
|
'original_job_id': str(job_id),
|
|
'crash_pattern': analysis.pattern,
|
|
'applied_fixes': analysis.auto_fix_params
|
|
}
|
|
|
|
applied_fixes = analysis.auto_fix_params
|
|
log.info(f"Applying auto-fix for pattern '{analysis.pattern}': {applied_fixes}")
|
|
|
|
# Handle next_sort: use a different sort order than previously attempted
|
|
selected_sort = None
|
|
if next_sort:
|
|
# Get previously attempted sort orders
|
|
sort_orders_attempted = original_metadata.get('sort_orders_attempted', [])
|
|
|
|
# If no sort was tracked, assume "newest" was used (default)
|
|
if not sort_orders_attempted:
|
|
initial_sort_used = original_metadata.get('initial_sort_used', 'newest')
|
|
sort_orders_attempted = [initial_sort_used]
|
|
|
|
# Find next unused sort order
|
|
for sort_order in SORT_ORDERS:
|
|
if sort_order not in sort_orders_attempted:
|
|
selected_sort = sort_order
|
|
break
|
|
|
|
if selected_sort:
|
|
# Set the new sort strategy
|
|
original_metadata['initial_sort'] = selected_sort
|
|
original_metadata['sort_strategy'] = 'single' # Don't auto-trigger multi-sort
|
|
|
|
# Track all attempted sorts (including this one)
|
|
original_metadata['sort_orders_attempted'] = sort_orders_attempted + [selected_sort]
|
|
|
|
# Track retry chain for review merging
|
|
if 'retry_chain' not in original_metadata:
|
|
original_metadata['retry_chain'] = [str(job_id)]
|
|
else:
|
|
original_metadata['retry_chain'].append(str(job_id))
|
|
|
|
original_metadata['retry_info'] = original_metadata.get('retry_info', {})
|
|
original_metadata['retry_info']['original_job_id'] = str(job_id)
|
|
original_metadata['retry_info']['retry_reason'] = 'next_sort'
|
|
original_metadata['retry_info']['selected_sort'] = selected_sort
|
|
|
|
log.info(f"Retry with next_sort: using '{selected_sort}' (previously tried: {sort_orders_attempted})")
|
|
else:
|
|
log.warn(f"All sort orders already attempted: {sort_orders_attempted}")
|
|
|
|
# Fingerprint rotation: if bot was detected, use a different fingerprint
|
|
selected_fingerprint = None
|
|
if next_sort and original_metadata.get('bot_detected', False):
|
|
# Get previously used fingerprints
|
|
previous_fingerprints = original_metadata.get('fingerprints_used', [])
|
|
retry_count = len(original_metadata.get('retry_chain', []))
|
|
|
|
# Get a rotated fingerprint
|
|
selected_fingerprint = get_rotated_fingerprint(retry_count, previous_fingerprints)
|
|
|
|
# Store the fingerprint in metadata
|
|
original_metadata['browser_fingerprint'] = selected_fingerprint
|
|
|
|
# Track used fingerprints
|
|
if 'fingerprints_used' not in original_metadata:
|
|
original_metadata['fingerprints_used'] = []
|
|
original_metadata['fingerprints_used'].append(selected_fingerprint['platform'])
|
|
|
|
original_metadata['retry_info']['fingerprint_rotated'] = True
|
|
original_metadata['retry_info']['new_fingerprint'] = {
|
|
'platform': selected_fingerprint['platform'],
|
|
'timezone': selected_fingerprint['timezone']
|
|
}
|
|
|
|
log.info(f"Fingerprint rotated for retry: {selected_fingerprint['platform']}, {selected_fingerprint['timezone']}")
|
|
|
|
# Create new job with same URL and (possibly modified) metadata
|
|
new_job_id = await db.create_job(
|
|
url=original_job['url'],
|
|
webhook_url=original_job.get('webhook_url'),
|
|
webhook_secret=original_job.get('webhook_secret'),
|
|
metadata=original_metadata
|
|
)
|
|
|
|
# Start the new scraping job
|
|
asyncio.create_task(run_scraping_job(new_job_id))
|
|
|
|
log.info(f"Created retry job {new_job_id} for original job {job_id}")
|
|
|
|
# Build response message
|
|
message = f"Retry job created from original job {job_id}"
|
|
if selected_sort:
|
|
message += f" (using sort: {selected_sort})"
|
|
if selected_fingerprint:
|
|
message += f" (fingerprint: {selected_fingerprint['platform']}/{selected_fingerprint['timezone']})"
|
|
|
|
# Build applied_fixes response
|
|
retry_fixes = {}
|
|
if selected_sort:
|
|
retry_fixes["selected_sort"] = selected_sort
|
|
if selected_fingerprint:
|
|
retry_fixes["fingerprint"] = {
|
|
"platform": selected_fingerprint["platform"],
|
|
"timezone": selected_fingerprint["timezone"]
|
|
}
|
|
|
|
return RetryJobResponse(
|
|
job_id=str(new_job_id),
|
|
status="started",
|
|
message=message,
|
|
applied_fixes=applied_fixes if applied_fixes else (retry_fixes if retry_fixes else None)
|
|
)
|
|
|
|
|
|
@app.get("/crashes/stats", response_model=CrashStatsResponse, summary="Get Crash Statistics")
|
|
async def get_crash_stats(
|
|
days: int = Query(7, description="Number of days to look back", ge=1, le=90)
|
|
):
|
|
"""
|
|
Get aggregate crash statistics and pattern analysis.
|
|
|
|
Analyzes all crash reports from the specified time period to identify:
|
|
- Most common crash patterns
|
|
- Confidence scores for pattern detection
|
|
- Recommended fixes based on recurring patterns
|
|
|
|
Use this to identify systemic issues and optimize scraper configuration.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Get basic crash stats from database
|
|
basic_stats = await db.get_crash_stats(days=days)
|
|
|
|
# Get all failed/partial jobs for deeper analysis
|
|
failed_jobs = await db.list_jobs(status=JobStatus.FAILED, limit=500)
|
|
partial_jobs = await db.list_jobs(status=JobStatus.PARTIAL, limit=500)
|
|
|
|
all_crash_jobs = failed_jobs + partial_jobs
|
|
|
|
# Filter by time if needed (list_jobs doesn't have date filter)
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
recent_crash_jobs = [
|
|
job for job in all_crash_jobs
|
|
if job.get('completed_at') and job['completed_at'] > cutoff
|
|
]
|
|
|
|
if not recent_crash_jobs:
|
|
return CrashStatsResponse(
|
|
total_crashes=0,
|
|
patterns={},
|
|
most_common=None,
|
|
recommendations=[]
|
|
)
|
|
|
|
# Build crash reports for analysis
|
|
crash_reports = []
|
|
for job in recent_crash_jobs:
|
|
scrape_logs = job.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
crash_reports.append({
|
|
'error_message': job.get('error_message', ''),
|
|
'metrics_history': [], # Not stored in job list query
|
|
'logs_before_crash': scrape_logs or [],
|
|
'state': {
|
|
'reviews_extracted': job.get('reviews_count', 0),
|
|
'total_reviews': job.get('total_reviews')
|
|
}
|
|
})
|
|
|
|
# Use summarize_crash_patterns for deep analysis
|
|
summary = summarize_crash_patterns(crash_reports)
|
|
|
|
# Convert patterns to response model format
|
|
patterns_response = {}
|
|
for pattern_name, stats in summary.get('patterns', {}).items():
|
|
patterns_response[pattern_name] = CrashPatternStats(
|
|
count=stats['count'],
|
|
percentage=stats['percentage'],
|
|
avg_confidence=stats['avg_confidence']
|
|
)
|
|
|
|
return CrashStatsResponse(
|
|
total_crashes=summary.get('total_crashes', 0),
|
|
patterns=patterns_response,
|
|
most_common=summary.get('most_common'),
|
|
recommendations=summary.get('recommendations', [])
|
|
)
|
|
|
|
|
|
# ==================== Health Check Endpoints ====================
|
|
|
|
@app.get("/health/live", summary="Liveness Probe")
|
|
async def liveness():
|
|
"""
|
|
Liveness check: Is the server alive?
|
|
|
|
Use this for Kubernetes liveness probe - restart container if fails.
|
|
"""
|
|
# If health system is disabled, just return healthy (server is alive)
|
|
if not health_system:
|
|
return {"status": "healthy", "message": "Server is alive (health system disabled)"}
|
|
|
|
return await health_system.check_liveness()
|
|
|
|
|
|
@app.get("/health/ready", summary="Readiness Probe")
|
|
async def readiness():
|
|
"""
|
|
Readiness check: Can the server handle traffic?
|
|
|
|
Use this for Kubernetes readiness probe - remove from load balancer if fails.
|
|
"""
|
|
# If health system is disabled, check if DB is connected
|
|
if not health_system:
|
|
if db and db.pool:
|
|
return {"status": "ready", "message": "Server is ready (health system disabled)"}
|
|
else:
|
|
raise HTTPException(status_code=503, detail="Database not connected")
|
|
|
|
result = await health_system.check_readiness()
|
|
|
|
if result["status"] != "ready":
|
|
return JSONResponse(status_code=503, content=result)
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/health/canary", summary="Canary Health Check")
|
|
async def canary():
|
|
"""
|
|
Canary check: Does scraping actually work?
|
|
|
|
Returns the latest canary test result (runs every 4 hours in background).
|
|
Use this for external monitoring (PagerDuty, DataDog) - alerts if fails.
|
|
"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
result = await health_system.check_canary()
|
|
|
|
if result["status"] not in ["healthy", "unknown"]:
|
|
return JSONResponse(status_code=503, content=result)
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/health/detailed", summary="Detailed Health Status")
|
|
async def detailed_health():
|
|
"""Get detailed health status of all components"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
return await health_system.get_detailed_health()
|
|
|
|
|
|
# ==================== Background Job Runner ====================
|
|
|
|
async def run_scraping_job(job_id: UUID):
|
|
"""
|
|
Run scraping job in background with concurrency limit.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
"""
|
|
job_id_str = str(job_id)
|
|
|
|
async with job_semaphore: # Limit concurrent Chrome instances
|
|
try:
|
|
# Update status to running
|
|
await db.update_job_status(job_id, JobStatus.RUNNING)
|
|
log.info(f"Starting scraping job {job_id}")
|
|
|
|
# Get job details
|
|
job = await db.get_job(job_id)
|
|
url = job['url']
|
|
|
|
# Extract browser fingerprint from metadata if available
|
|
browser_fingerprint = None
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
if metadata and 'browser_fingerprint' in metadata:
|
|
browser_fingerprint = metadata['browser_fingerprint']
|
|
log.info(f"Using user fingerprint: {browser_fingerprint.get('platform')}, {browser_fingerprint.get('timezone')}")
|
|
elif metadata and 'geolocation' in metadata:
|
|
browser_fingerprint = {'geolocation': metadata['geolocation']}
|
|
log.info(f"Using user geolocation only")
|
|
|
|
# Broadcast job started via SSE
|
|
await broadcast_job_update(job_id_str, "job_started", {
|
|
"job_id": job_id_str,
|
|
"status": "running",
|
|
"url": url
|
|
})
|
|
|
|
# Get the event loop for progress updates from worker thread
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Create log capture instance that we can access for real-time logs
|
|
log_capture = LogCapture()
|
|
|
|
# Track total reviews for incremental saves
|
|
total_reviews_seen = [None]
|
|
# Accumulate all reviews for incremental saves (flush_callback receives batches)
|
|
all_reviews_collected = []
|
|
# Track last broadcasted log count for streaming new logs
|
|
last_broadcasted_log_count = [0]
|
|
# Track last metrics broadcast time
|
|
last_metrics_broadcast_time = [time.time()]
|
|
metrics_broadcast_interval = 5.0 # Emit metrics every 5 seconds
|
|
|
|
# Progress callback to update job status with current/total counts AND logs
|
|
def progress_callback(current_count: int, total_count: int):
|
|
"""Update job progress and logs from worker thread"""
|
|
if total_count:
|
|
total_reviews_seen[0] = total_count
|
|
|
|
async def update():
|
|
# Get current logs from the shared log_capture
|
|
current_logs = log_capture.get_logs()
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.RUNNING,
|
|
reviews_count=current_count,
|
|
total_reviews=total_count,
|
|
scrape_logs=current_logs
|
|
)
|
|
|
|
# Broadcast individual structured log events for new logs
|
|
current_log_count = len(current_logs)
|
|
if current_log_count > last_broadcasted_log_count[0]:
|
|
new_logs = current_logs[last_broadcasted_log_count[0]:]
|
|
for log_entry in new_logs:
|
|
await broadcast_structured_log(job_id_str, log_entry)
|
|
last_broadcasted_log_count[0] = current_log_count
|
|
|
|
# Broadcast metrics event every 5 seconds
|
|
current_time = time.time()
|
|
if current_time - last_metrics_broadcast_time[0] >= metrics_broadcast_interval:
|
|
# Calculate extraction rate
|
|
elapsed = current_time - last_metrics_broadcast_time[0]
|
|
extraction_rate = (current_count / elapsed) if elapsed > 0 else 0
|
|
|
|
# Count scroll events from logs
|
|
scroll_count = 0
|
|
for entry in current_logs:
|
|
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
|
if 'scroll' in msg.lower():
|
|
scroll_count += 1
|
|
|
|
# Get memory from latest log entry with metrics
|
|
memory_mb = 0
|
|
for entry in reversed(current_logs):
|
|
if isinstance(entry, dict) and entry.get('metrics'):
|
|
memory_mb = entry['metrics'].get('memory_mb', 0)
|
|
break
|
|
|
|
metrics_data = {
|
|
"reviews_extracted": current_count,
|
|
"scroll_count": scroll_count,
|
|
"memory_mb": memory_mb,
|
|
"extraction_rate": round(extraction_rate, 2)
|
|
}
|
|
await broadcast_metrics(job_id_str, metrics_data)
|
|
last_metrics_broadcast_time[0] = current_time
|
|
|
|
# Broadcast progress via SSE (legacy format for backward compatibility)
|
|
await broadcast_job_update(job_id_str, "job_progress", {
|
|
"job_id": job_id_str,
|
|
"status": "running",
|
|
"reviews_count": current_count,
|
|
"total_reviews": total_count,
|
|
"logs": current_logs
|
|
})
|
|
|
|
# Schedule the coroutine on the event loop
|
|
asyncio.run_coroutine_threadsafe(update(), loop)
|
|
|
|
# Flush callback to save reviews incrementally (crash recovery)
|
|
# Note: flush_callback receives batches, so we accumulate them
|
|
def flush_callback(reviews_batch: list):
|
|
"""Accumulate and save reviews to DB incrementally from worker thread"""
|
|
# Extend our collection with the new batch
|
|
all_reviews_collected.extend(reviews_batch)
|
|
|
|
async def save():
|
|
await db.save_reviews_incremental(
|
|
job_id=job_id,
|
|
reviews=all_reviews_collected, # Save ALL reviews so far
|
|
total_reviews=total_reviews_seen[0]
|
|
)
|
|
# Schedule the coroutine on the event loop
|
|
asyncio.run_coroutine_threadsafe(save(), loop)
|
|
|
|
# Get scraper version from metadata (frontend can specify version)
|
|
requested_version = metadata.get('scraper_version') if metadata else None
|
|
scraper_func, actual_version = get_scraper_for_version(requested_version)
|
|
log.info(f"Using scraper version {actual_version} for job {job_id}")
|
|
|
|
# Get sort strategy parameters from metadata (for retry with different sort)
|
|
initial_sort = metadata.get('initial_sort') if metadata else None
|
|
sort_strategy = metadata.get('sort_strategy', 'auto') if metadata else 'auto'
|
|
max_reviews = metadata.get('max_reviews') if metadata else None
|
|
session_id = metadata.get('session_id') if metadata else None
|
|
if initial_sort:
|
|
log.info(f"Using initial_sort={initial_sort}, sort_strategy={sort_strategy} for job {job_id}")
|
|
if max_reviews:
|
|
log.info(f"Using max_reviews={max_reviews} limit for job {job_id} (testing mode)")
|
|
|
|
# Check if we have a session_id for browser reuse (session handoff from validation)
|
|
if session_id:
|
|
log.info(f"Using session handoff (session_id={session_id}) for job {job_id} - skipping navigation")
|
|
from scrapers.google_reviews.v1_2_0 import scrape_with_session
|
|
result = await asyncio.to_thread(
|
|
scrape_with_session,
|
|
session_id=session_id,
|
|
max_reviews=max_reviews,
|
|
progress_callback=progress_callback,
|
|
flush_callback=flush_callback,
|
|
sort_strategy=sort_strategy,
|
|
initial_sort=initial_sort
|
|
)
|
|
# Add logs from session scraping
|
|
if 'logs' in result:
|
|
for log_entry in result.get('logs', []):
|
|
log_capture.entries.append(log_entry)
|
|
else:
|
|
# Run scraping with progress callback and shared log capture
|
|
# headless=False because Docker uses Xvfb virtual display
|
|
result = await asyncio.to_thread(
|
|
scraper_func,
|
|
url=url,
|
|
headless=False,
|
|
progress_callback=progress_callback,
|
|
log_capture=log_capture,
|
|
flush_callback=flush_callback,
|
|
browser_fingerprint=browser_fingerprint, # Pass user's browser fingerprint
|
|
initial_sort=initial_sort, # Sort order for retry strategy
|
|
sort_strategy=sort_strategy, # Sort strategy (auto, multi, single)
|
|
max_reviews=max_reviews # Optional limit for testing
|
|
)
|
|
|
|
# Update job metadata with tracking info from scraper result
|
|
tracking_metadata = {
|
|
'bot_detected': result.get('bot_detected', False),
|
|
'initial_sort_used': result.get('initial_sort_used', 'newest'),
|
|
'multi_sort': result.get('multi_sort', {}),
|
|
}
|
|
# Preserve existing sort_orders_attempted and add current sort
|
|
existing_sorts = metadata.get('sort_orders_attempted', []) if metadata else []
|
|
current_sort = result.get('initial_sort_used', 'newest')
|
|
if current_sort not in existing_sorts:
|
|
tracking_metadata['sort_orders_attempted'] = existing_sorts + [current_sort]
|
|
else:
|
|
tracking_metadata['sort_orders_attempted'] = existing_sorts
|
|
|
|
# Update metadata in database
|
|
await db.update_job_metadata(job_id, tracking_metadata)
|
|
if result.get('bot_detected'):
|
|
log.warn(f"Bot detection flagged for job {job_id} - sort button was hidden")
|
|
|
|
if result['success']:
|
|
# Save session fingerprint if captured
|
|
if result.get('session_fingerprint'):
|
|
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
|
log.info(f"Saved session fingerprint for job {job_id}")
|
|
|
|
# Save business info to dedicated columns (queryable/indexable)
|
|
business_info = result.get('business_info', {})
|
|
if business_info:
|
|
await db.update_business_info(
|
|
job_id=job_id,
|
|
business_name=business_info.get('name'),
|
|
business_category=business_info.get('category'),
|
|
business_address=business_info.get('address'),
|
|
business_rating=business_info.get('rating')
|
|
)
|
|
log.info(f"Saved business info for job {job_id}: {business_info.get('name')} ({business_info.get('category')})")
|
|
|
|
# Save results to database (including scraper logs and review topics)
|
|
await db.save_job_result(
|
|
job_id=job_id,
|
|
reviews=result['reviews'],
|
|
scrape_time=result['time'],
|
|
total_reviews=result.get('total_reviews'),
|
|
scrape_logs=result.get('logs'),
|
|
review_topics=result.get('review_topics')
|
|
)
|
|
|
|
log.info(
|
|
f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s"
|
|
)
|
|
|
|
# Broadcast job completed via SSE
|
|
await broadcast_job_update(job_id_str, "job_completed", {
|
|
"job_id": job_id_str,
|
|
"status": "completed",
|
|
"reviews_count": result['count'],
|
|
"total_reviews": result.get('total_reviews'),
|
|
"scrape_time": result['time'],
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000')
|
|
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='completed',
|
|
reviews_count=result['count'],
|
|
scrape_time=result['time'],
|
|
reviews_url=f"{api_base_url}/jobs/{job_id}/reviews",
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
else:
|
|
# Save session fingerprint even on failure (useful for debugging bot detection)
|
|
if result.get('session_fingerprint'):
|
|
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
|
log.info(f"Saved session fingerprint for failed job {job_id}")
|
|
|
|
# Job failed - check if we have partial reviews saved
|
|
current_job = await db.get_job(job_id)
|
|
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
|
|
|
if partial_count > 0:
|
|
# Mark as partial - we have some reviews saved
|
|
await db.mark_job_partial(
|
|
job_id,
|
|
error_message=result.get('error', 'Unknown error'),
|
|
scrape_logs=result.get('logs')
|
|
)
|
|
|
|
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before error: {result.get('error')}")
|
|
|
|
# Broadcast job partial via SSE
|
|
await broadcast_job_update(job_id_str, "job_partial", {
|
|
"job_id": job_id_str,
|
|
"status": "partial",
|
|
"reviews_count": partial_count,
|
|
"total_reviews": current_job.get('total_reviews'),
|
|
"error_message": result.get('error'),
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send partial webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='partial',
|
|
reviews_count=partial_count,
|
|
error_message=result.get('error'),
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
else:
|
|
# No reviews saved - mark as failed
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.FAILED,
|
|
error_message=result.get('error', 'Unknown error'),
|
|
scrape_logs=result.get('logs')
|
|
)
|
|
|
|
log.error(f"Failed job {job_id}: {result.get('error')}")
|
|
|
|
# Broadcast job failed via SSE
|
|
await broadcast_job_update(job_id_str, "job_failed", {
|
|
"job_id": job_id_str,
|
|
"status": "failed",
|
|
"error_message": result.get('error'),
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send failure webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='failed',
|
|
error_message=result.get('error'),
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in scraping job {job_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Check if we have partial reviews saved
|
|
current_job = await db.get_job(job_id)
|
|
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
|
|
|
if partial_count > 0:
|
|
# Mark as partial - we have some reviews saved
|
|
await db.mark_job_partial(
|
|
job_id,
|
|
error_message=str(e),
|
|
scrape_logs=log_capture.get_logs() if log_capture else None
|
|
)
|
|
|
|
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before exception: {e}")
|
|
|
|
# Broadcast job partial via SSE
|
|
await broadcast_job_update(job_id_str, "job_partial", {
|
|
"job_id": job_id_str,
|
|
"status": "partial",
|
|
"reviews_count": partial_count,
|
|
"total_reviews": current_job.get('total_reviews'),
|
|
"error_message": str(e),
|
|
"logs": log_capture.get_logs() if log_capture else []
|
|
})
|
|
|
|
# Send partial webhook
|
|
if current_job and current_job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=current_job['webhook_url'],
|
|
job_id=job_id,
|
|
status='partial',
|
|
reviews_count=partial_count,
|
|
error_message=str(e),
|
|
secret=current_job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
else:
|
|
# No reviews saved - mark as failed
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.FAILED,
|
|
error_message=str(e)
|
|
)
|
|
|
|
# Broadcast job failed via SSE
|
|
await broadcast_job_update(job_id_str, "job_failed", {
|
|
"job_id": job_id_str,
|
|
"status": "failed",
|
|
"error_message": str(e),
|
|
"logs": []
|
|
})
|
|
|
|
# Send failure webhook
|
|
if current_job and current_job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=current_job['webhook_url'],
|
|
job_id=job_id,
|
|
status='failed',
|
|
error_message=str(e),
|
|
secret=current_job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv('PORT', 8000))
|
|
|
|
log.info(f"Starting production server on port {port}...")
|
|
uvicorn.run(
|
|
"api_server_production:app",
|
|
host="0.0.0.0",
|
|
port=port,
|
|
reload=False, # Disable reload in production
|
|
log_level="info"
|
|
)
|