- Task #3: Update SSE stream to emit structured log events (type: "log" for entries, type: "metrics" every 5s, ?format=legacy for backward compat) - Task #10: Create crash pattern analyzer module (6 patterns: memory_exhaustion, dom_bloat, rate_limited, consent_loop, scroll_timeout, element_stale) (confidence scoring, auto-fix params, summarize_crash_patterns for recurring issues) - Task #13: Capture session fingerprint in backend (user_agent, platform, timezone, webgl, canvas, bot_detection_tests) (saved on success and failure for debugging) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1366 lines
52 KiB
Python
1366 lines
52 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Production Google Reviews Scraper API Server with Phase 1 features:
|
|
- PostgreSQL storage with JSONB
|
|
- Webhook delivery with retries
|
|
- Smart health checks with canary testing
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from typing import Optional, List, Dict, Any
|
|
from uuid import UUID
|
|
|
|
from fastapi import FastAPI, HTTPException, Query, Header
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, HttpUrl, Field
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
|
from modules.database import DatabaseManager, JobStatus
|
|
from modules.webhooks import WebhookDispatcher, WebhookManager
|
|
from modules.health_checks import HealthCheckSystem
|
|
from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper
|
|
from modules.structured_logger import StructuredLogger, LogEntry
|
|
from modules.chrome_pool import (
|
|
start_worker_pools,
|
|
stop_worker_pools,
|
|
get_validation_worker,
|
|
release_validation_worker,
|
|
get_scraping_worker,
|
|
release_scraping_worker,
|
|
get_pool_stats
|
|
)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
log = logging.getLogger("api_server")
|
|
|
|
# Global instances
|
|
db: Optional[DatabaseManager] = None
|
|
webhook_dispatcher: Optional[WebhookDispatcher] = None
|
|
health_system: Optional[HealthCheckSystem] = None
|
|
|
|
# Concurrent job limiter (prevent too many Chrome instances)
|
|
MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5'))
|
|
job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)
|
|
|
|
# SSE: Store for broadcasting job updates to connected clients
|
|
# Format: {job_id: [asyncio.Queue, ...]} for job-specific streams
|
|
# Format: {"all": [asyncio.Queue, ...]} for all-jobs stream
|
|
job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []}
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Lifespan context manager for startup and shutdown"""
|
|
global db, webhook_dispatcher, health_system
|
|
|
|
# Startup
|
|
log.info("Starting Google Reviews Scraper API Server (Production)")
|
|
|
|
# Get database URL from environment
|
|
database_url = os.getenv(
|
|
'DATABASE_URL',
|
|
'postgresql://scraper:scraper@localhost:5432/scraper'
|
|
)
|
|
|
|
# Initialize database
|
|
db = DatabaseManager(database_url)
|
|
await db.connect()
|
|
await db.initialize_schema()
|
|
log.info("Database initialized")
|
|
|
|
# Initialize health check system with canary monitoring
|
|
# DISABLED: Canary tests consume Google Maps requests and trigger rate limiting
|
|
# health_system = HealthCheckSystem(db)
|
|
# await health_system.start()
|
|
log.info("Health check system DISABLED (canary tests disabled to avoid rate limiting)")
|
|
|
|
# Initialize webhook dispatcher
|
|
webhook_dispatcher = WebhookDispatcher(db, interval_seconds=30)
|
|
asyncio.create_task(webhook_dispatcher.start())
|
|
log.info("Webhook dispatcher started")
|
|
|
|
# Start Chrome worker pools (1 for validation, 2 for scraping)
|
|
# These pre-warm Chrome instances for instant availability
|
|
# headless=False because Docker uses Xvfb virtual display for better compatibility
|
|
await asyncio.to_thread(
|
|
start_worker_pools,
|
|
validation_size=1,
|
|
scraping_size=2,
|
|
headless=False
|
|
)
|
|
log.info("Chrome worker pools started (1 validation + 2 scraping)")
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
log.info("Shutting down Google Reviews Scraper API Server")
|
|
if webhook_dispatcher:
|
|
webhook_dispatcher.stop()
|
|
# if health_system:
|
|
# health_system.stop()
|
|
|
|
# Stop worker pools
|
|
await asyncio.to_thread(stop_worker_pools)
|
|
log.info("Chrome worker pools stopped")
|
|
|
|
if db:
|
|
await db.disconnect()
|
|
|
|
|
|
# Initialize FastAPI app
|
|
app = FastAPI(
|
|
title="Google Reviews Scraper API - Production",
|
|
description="Production-ready REST API for Google Maps review scraping with webhooks and health monitoring",
|
|
version="2.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# ==================== Request/Response Models ====================
|
|
|
|
class GeolocationModel(BaseModel):
|
|
"""Geolocation coordinates"""
|
|
lat: float = Field(..., description="Latitude")
|
|
lng: float = Field(..., description="Longitude")
|
|
|
|
|
|
class ViewportModel(BaseModel):
|
|
"""Browser viewport size"""
|
|
width: int = Field(..., description="Viewport width")
|
|
height: int = Field(..., description="Viewport height")
|
|
|
|
|
|
class BrowserFingerprintModel(BaseModel):
|
|
"""Browser fingerprint to replicate user's browser"""
|
|
geolocation: Optional[GeolocationModel] = None
|
|
userAgent: Optional[str] = Field(None, description="User agent string")
|
|
viewport: Optional[ViewportModel] = Field(None, description="Screen resolution")
|
|
timezone: Optional[str] = Field(None, description="Timezone (e.g., Europe/Madrid)")
|
|
language: Optional[str] = Field(None, description="Browser language (e.g., en-US)")
|
|
platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)")
|
|
|
|
|
|
class ScrapeRequest(BaseModel):
|
|
"""Request model for starting a scrape job"""
|
|
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
|
|
webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications")
|
|
webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature")
|
|
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata")
|
|
geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome")
|
|
browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint")
|
|
|
|
|
|
class JobResponse(BaseModel):
|
|
"""Response model for job information"""
|
|
job_id: str
|
|
status: str
|
|
url: str
|
|
created_at: str
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
updated_at: Optional[str] = None # Last update time for progress tracking
|
|
reviews_count: Optional[int] = None
|
|
total_reviews: Optional[int] = None # Total reviews available for this place
|
|
scrape_time: Optional[float] = None
|
|
error_message: Optional[str] = None
|
|
webhook_url: Optional[str] = None
|
|
# Business metadata
|
|
business_name: Optional[str] = None
|
|
business_address: Optional[str] = None
|
|
business_category: Optional[str] = None # Category (e.g., "Barber shop")
|
|
review_topics: Optional[List[Dict[str, Any]]] = None # Topic filters with mention counts
|
|
|
|
|
|
class ReviewsResponse(BaseModel):
|
|
"""Response model for reviews data"""
|
|
job_id: str
|
|
reviews: List[Dict[str, Any]]
|
|
count: int
|
|
|
|
|
|
class StatsResponse(BaseModel):
|
|
"""Response model for statistics"""
|
|
total_jobs: int
|
|
pending: int
|
|
running: int
|
|
completed: int
|
|
failed: int
|
|
cancelled: int
|
|
avg_scrape_time: Optional[float] = None
|
|
total_reviews: Optional[int] = None
|
|
|
|
|
|
# ==================== API Endpoints ====================
|
|
|
|
@app.get("/", summary="API Health Check")
|
|
async def root():
|
|
"""Basic health check endpoint"""
|
|
return {
|
|
"message": "Google Reviews Scraper API (Production)",
|
|
"status": "healthy",
|
|
"version": "2.0.0",
|
|
"features": ["postgresql", "webhooks", "canary-testing"]
|
|
}
|
|
|
|
|
|
@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job")
|
|
async def start_scrape(request: ScrapeRequest):
|
|
"""
|
|
Start a new scraping job.
|
|
|
|
The job runs asynchronously in the background. You can:
|
|
- Poll GET /jobs/{job_id} for status
|
|
- Provide webhook_url for automatic notification when complete
|
|
|
|
Returns the job ID for tracking.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
try:
|
|
# Merge browser fingerprint into metadata if provided
|
|
metadata = request.metadata or {}
|
|
if request.browser_fingerprint:
|
|
fp = request.browser_fingerprint
|
|
metadata['browser_fingerprint'] = {
|
|
"userAgent": fp.userAgent,
|
|
"timezone": fp.timezone,
|
|
"language": fp.language,
|
|
"platform": fp.platform,
|
|
}
|
|
if fp.viewport:
|
|
metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
|
if fp.geolocation:
|
|
metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
|
elif request.geolocation:
|
|
metadata['geolocation'] = {
|
|
'lat': request.geolocation.lat,
|
|
'lng': request.geolocation.lng
|
|
}
|
|
|
|
# Create job in database
|
|
job_id = await db.create_job(
|
|
url=str(request.url),
|
|
webhook_url=str(request.webhook_url) if request.webhook_url else None,
|
|
webhook_secret=request.webhook_secret,
|
|
metadata=metadata
|
|
)
|
|
|
|
# Start scraping job in background
|
|
asyncio.create_task(run_scraping_job(job_id))
|
|
|
|
log.info(f"Created and started job {job_id}")
|
|
|
|
return {
|
|
"job_id": str(job_id),
|
|
"status": "started",
|
|
"message": "Scraping job started successfully"
|
|
}
|
|
|
|
except Exception as e:
|
|
log.error(f"Error creating scraping job: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}")
|
|
|
|
|
|
@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status")
|
|
async def get_job(job_id: UUID):
|
|
"""Get detailed information about a specific job"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Parse review_topics if it's a string (JSONB might be returned as string)
|
|
review_topics = job.get('review_topics')
|
|
if isinstance(review_topics, str):
|
|
try:
|
|
review_topics = json.loads(review_topics)
|
|
except:
|
|
review_topics = None
|
|
|
|
# Extract business info from metadata if available
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
|
|
business_name = metadata.get('business_name') if metadata else None
|
|
business_category = metadata.get('business_category') if metadata else None
|
|
|
|
return JobResponse(
|
|
job_id=str(job['job_id']),
|
|
status=job['status'],
|
|
url=job['url'],
|
|
created_at=job['created_at'].isoformat(),
|
|
started_at=job['started_at'].isoformat() if job['started_at'] else None,
|
|
completed_at=job['completed_at'].isoformat() if job['completed_at'] else None,
|
|
updated_at=job['updated_at'].isoformat() if job.get('updated_at') else None,
|
|
reviews_count=job['reviews_count'],
|
|
total_reviews=job.get('total_reviews'),
|
|
scrape_time=job['scrape_time'],
|
|
error_message=job['error_message'],
|
|
webhook_url=job.get('webhook_url'),
|
|
business_name=business_name,
|
|
business_category=business_category,
|
|
review_topics=review_topics
|
|
)
|
|
|
|
|
|
@app.get("/jobs/{job_id}/logs", summary="Get Job Logs")
|
|
async def get_job_logs(job_id: UUID):
|
|
"""
|
|
Get the scraper logs for a job.
|
|
|
|
Returns logs from both successful and failed jobs.
|
|
Useful for debugging scraping issues.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
# Get scrape_logs from job
|
|
scrape_logs = job.get('scrape_logs')
|
|
|
|
# Parse if string (asyncpg might return JSONB as string)
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = None
|
|
|
|
return {
|
|
"job_id": str(job_id),
|
|
"status": job['status'],
|
|
"error_message": job.get('error_message'),
|
|
"logs": scrape_logs or [],
|
|
"log_count": len(scrape_logs) if scrape_logs else 0
|
|
}
|
|
|
|
|
|
# ==================== SSE Streaming Endpoints ====================
|
|
|
|
def format_sse_event(event_type: str, data: dict, use_structured_format: bool = True) -> str:
|
|
"""
|
|
Format an SSE event message.
|
|
|
|
Args:
|
|
event_type: The SSE event type (e.g., 'log', 'metrics', 'status')
|
|
data: The data payload
|
|
use_structured_format: If True, wrap in {"type": ..., "data": ...} structure
|
|
If False, use legacy format for backward compatibility
|
|
|
|
Returns:
|
|
Formatted SSE message string
|
|
"""
|
|
if use_structured_format:
|
|
payload = {"type": event_type, "data": data}
|
|
else:
|
|
payload = data
|
|
return f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
|
|
|
|
|
|
def format_structured_log_event(log_entry: dict) -> str:
|
|
"""
|
|
Format a structured log entry as an SSE event.
|
|
|
|
The log_entry should already be a dict from StructuredLogger.get_logs().
|
|
|
|
Returns:
|
|
Formatted SSE message with type "log"
|
|
"""
|
|
return format_sse_event("log", log_entry, use_structured_format=True)
|
|
|
|
|
|
def format_metrics_event(metrics: dict) -> str:
|
|
"""
|
|
Format a metrics event for SSE streaming.
|
|
|
|
Args:
|
|
metrics: Dict containing reviews_extracted, scroll_count, memory_mb, extraction_rate
|
|
|
|
Returns:
|
|
Formatted SSE message with type "metrics"
|
|
"""
|
|
return format_sse_event("metrics", metrics, use_structured_format=True)
|
|
|
|
|
|
async def broadcast_job_update(job_id: str, event_type: str, data: dict):
|
|
"""Broadcast an update to all subscribers of a job stream and the all-jobs stream."""
|
|
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
|
|
|
|
# Send to job-specific subscribers
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
# Send to all-jobs subscribers
|
|
for queue in job_update_queues.get("all", []):
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
async def broadcast_structured_log(job_id: str, log_entry: dict):
|
|
"""Broadcast a structured log entry to job subscribers."""
|
|
message = format_structured_log_event(log_entry)
|
|
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
async def broadcast_metrics(job_id: str, metrics: dict):
|
|
"""Broadcast metrics update to job subscribers."""
|
|
message = format_metrics_event(metrics)
|
|
|
|
if job_id in job_update_queues:
|
|
for queue in job_update_queues[job_id]:
|
|
try:
|
|
await queue.put(message)
|
|
except:
|
|
pass
|
|
|
|
|
|
@app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)")
|
|
async def stream_job_updates(
|
|
job_id: UUID,
|
|
format: str = Query("structured", description="Event format: 'structured' (new) or 'legacy' (backward compatible)")
|
|
):
|
|
"""
|
|
Server-Sent Events stream for real-time job updates.
|
|
|
|
Event types (structured format):
|
|
- init: Initial job state with all logs
|
|
- log: Individual structured log entry {"type": "log", "data": {"timestamp": "...", "level": "INFO", ...}}
|
|
- metrics: Periodic metrics {"type": "metrics", "data": {"reviews_extracted": 150, "scroll_count": 45, ...}}
|
|
- status: Job status changes
|
|
- complete: Job finished (completed/failed)
|
|
|
|
Query parameters:
|
|
- format: 'structured' (default) for new format, 'legacy' for backward compatibility
|
|
|
|
Connect with EventSource in the browser:
|
|
```javascript
|
|
const es = new EventSource('/jobs/{job_id}/stream');
|
|
es.addEventListener('log', (e) => {
|
|
const event = JSON.parse(e.data);
|
|
console.log('Log:', event.data); // Structured log entry
|
|
});
|
|
es.addEventListener('metrics', (e) => {
|
|
const event = JSON.parse(e.data);
|
|
console.log('Metrics:', event.data); // {reviews_extracted, scroll_count, memory_mb, extraction_rate}
|
|
});
|
|
```
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Verify job exists
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
job_id_str = str(job_id)
|
|
use_structured = format.lower() != "legacy"
|
|
|
|
# Create queue for this client
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Register subscriber
|
|
if job_id_str not in job_update_queues:
|
|
job_update_queues[job_id_str] = []
|
|
job_update_queues[job_id_str].append(queue)
|
|
|
|
async def event_generator():
|
|
try:
|
|
# Send initial state
|
|
job_data = await db.get_job(job_id)
|
|
scrape_logs = []
|
|
if job_data:
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
initial = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": job_data.get('reviews_count'),
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"scrape_time": job_data.get('scrape_time'),
|
|
"error_message": job_data.get('error_message'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: init\ndata: {json.dumps(initial)}\n\n"
|
|
|
|
# If job is already complete, send complete event and close
|
|
if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']:
|
|
yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n"
|
|
return
|
|
|
|
# Keep connection alive and send updates
|
|
last_log_count = len(scrape_logs) if scrape_logs else 0
|
|
last_reviews_count = job_data.get('reviews_count') if job_data else 0
|
|
last_metrics_time = time.time()
|
|
metrics_interval = 5.0 # Emit metrics every 5 seconds
|
|
|
|
while True:
|
|
try:
|
|
# Wait for update with timeout (for keepalive)
|
|
try:
|
|
message = await asyncio.wait_for(queue.get(), timeout=2.0)
|
|
yield message
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive comment
|
|
yield ": keepalive\n\n"
|
|
|
|
# Also poll database for updates (backup in case broadcast missed)
|
|
job_data = await db.get_job(job_id)
|
|
if job_data:
|
|
# Check for status change
|
|
if job_data['status'] in ['completed', 'failed', 'cancelled']:
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
final = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": job_data.get('reviews_count'),
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"scrape_time": job_data.get('scrape_time'),
|
|
"error_message": job_data.get('error_message'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
|
|
return
|
|
|
|
# Check for new logs or progress
|
|
scrape_logs = job_data.get('scrape_logs')
|
|
if isinstance(scrape_logs, str):
|
|
try:
|
|
scrape_logs = json.loads(scrape_logs)
|
|
except:
|
|
scrape_logs = []
|
|
|
|
current_log_count = len(scrape_logs) if scrape_logs else 0
|
|
current_reviews = job_data.get('reviews_count') or 0
|
|
current_time = time.time()
|
|
|
|
# Emit individual structured log events for new logs
|
|
if use_structured and current_log_count > last_log_count:
|
|
new_logs = scrape_logs[last_log_count:] if scrape_logs else []
|
|
for log_entry in new_logs:
|
|
yield format_structured_log_event(log_entry)
|
|
|
|
# Emit metrics event every 5 seconds during job execution
|
|
if use_structured and job_data['status'] == 'running':
|
|
if current_time - last_metrics_time >= metrics_interval:
|
|
# Calculate extraction rate (reviews per second)
|
|
elapsed = job_data.get('scrape_time') or 0
|
|
extraction_rate = (current_reviews / elapsed) if elapsed > 0 else 0
|
|
|
|
# Count scroll events from logs
|
|
scroll_count = 0
|
|
if scrape_logs:
|
|
for entry in scrape_logs:
|
|
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
|
if 'scroll' in msg.lower():
|
|
scroll_count += 1
|
|
|
|
# Get memory from latest log entry with metrics
|
|
memory_mb = 0
|
|
if scrape_logs:
|
|
for entry in reversed(scrape_logs):
|
|
if isinstance(entry, dict) and entry.get('metrics'):
|
|
memory_mb = entry['metrics'].get('memory_mb', 0)
|
|
break
|
|
|
|
metrics_data = {
|
|
"reviews_extracted": current_reviews,
|
|
"scroll_count": scroll_count,
|
|
"memory_mb": memory_mb,
|
|
"extraction_rate": round(extraction_rate, 2)
|
|
}
|
|
yield format_metrics_event(metrics_data)
|
|
last_metrics_time = current_time
|
|
|
|
# Send legacy update event if reviews or logs changed
|
|
if current_log_count > last_log_count or current_reviews != last_reviews_count:
|
|
if not use_structured:
|
|
# Legacy format: send all logs in update event
|
|
update = {
|
|
"job_id": job_id_str,
|
|
"status": job_data['status'],
|
|
"reviews_count": current_reviews,
|
|
"total_reviews": job_data.get('total_reviews'),
|
|
"logs": scrape_logs or []
|
|
}
|
|
yield f"event: update\ndata: {json.dumps(update)}\n\n"
|
|
last_log_count = current_log_count
|
|
last_reviews_count = current_reviews
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in SSE stream for job {job_id}: {e}")
|
|
break
|
|
|
|
finally:
|
|
# Unregister subscriber
|
|
if job_id_str in job_update_queues:
|
|
try:
|
|
job_update_queues[job_id_str].remove(queue)
|
|
if not job_update_queues[job_id_str]:
|
|
del job_update_queues[job_id_str]
|
|
except:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no" # Disable nginx buffering
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)")
|
|
async def stream_all_jobs():
|
|
"""
|
|
Server-Sent Events stream for all job updates.
|
|
|
|
Streams:
|
|
- job_created: New job was created
|
|
- job_updated: Job status/progress changed
|
|
- job_completed: Job finished
|
|
|
|
Connect with EventSource in the browser:
|
|
```javascript
|
|
const es = new EventSource('/jobs/stream');
|
|
es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data)));
|
|
```
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Create queue for this client
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Register subscriber to all-jobs stream
|
|
job_update_queues["all"].append(queue)
|
|
|
|
async def event_generator():
|
|
try:
|
|
# Send initial jobs list
|
|
jobs = await db.list_jobs(limit=100)
|
|
jobs_data = [
|
|
{
|
|
"job_id": str(j['job_id']),
|
|
"status": j['status'],
|
|
"url": j['url'],
|
|
"created_at": j['created_at'].isoformat(),
|
|
"completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None,
|
|
"reviews_count": j.get('reviews_count'),
|
|
"scrape_time": j.get('scrape_time'),
|
|
"error_message": j.get('error_message')
|
|
}
|
|
for j in jobs
|
|
]
|
|
yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n"
|
|
|
|
# Keep connection alive and send updates
|
|
while True:
|
|
try:
|
|
# Wait for update with timeout (for keepalive)
|
|
try:
|
|
message = await asyncio.wait_for(queue.get(), timeout=5.0)
|
|
yield message
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive comment
|
|
yield ": keepalive\n\n"
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in all-jobs SSE stream: {e}")
|
|
break
|
|
|
|
finally:
|
|
# Unregister subscriber
|
|
try:
|
|
job_update_queues["all"].remove(queue)
|
|
except:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no"
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
|
|
async def get_job_reviews(job_id: UUID):
|
|
"""
|
|
Get reviews data for a job.
|
|
|
|
Returns reviews for completed, partial, or running jobs (if reviews have been collected).
|
|
Returns 404 if job not found or no reviews available yet.
|
|
"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Get reviews (includes completed, running, and partial jobs)
|
|
reviews = await db.get_job_reviews(job_id, include_partial=True)
|
|
if reviews is None:
|
|
job = await db.get_job(job_id)
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
elif job['status'] == 'pending':
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Job has not started yet"
|
|
)
|
|
elif job['status'] == 'failed':
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Job failed without saving any reviews: {job.get('error_message', 'Unknown error')}"
|
|
)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="No reviews data available yet")
|
|
|
|
return ReviewsResponse(
|
|
job_id=str(job_id),
|
|
reviews=reviews,
|
|
count=len(reviews)
|
|
)
|
|
|
|
|
|
@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs")
|
|
async def list_jobs(
|
|
status: Optional[str] = Query(None, description="Filter by job status"),
|
|
limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000),
|
|
offset: int = Query(0, description="Number of jobs to skip", ge=0)
|
|
):
|
|
"""List all jobs, optionally filtered by status"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
# Validate status if provided
|
|
job_status = None
|
|
if status:
|
|
try:
|
|
job_status = JobStatus(status.lower())
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid status. Must be one of: {[s.value for s in JobStatus]}"
|
|
)
|
|
|
|
jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset)
|
|
|
|
result = []
|
|
for job in jobs:
|
|
# Extract business info from metadata if available
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
|
|
business_name = metadata.get('business_name') if metadata else None
|
|
business_address = metadata.get('business_address') if metadata else None
|
|
business_category = metadata.get('business_category') if metadata else None
|
|
|
|
# Parse review_topics if it's a string
|
|
review_topics = job.get('review_topics')
|
|
if isinstance(review_topics, str):
|
|
try:
|
|
review_topics = json.loads(review_topics)
|
|
except:
|
|
review_topics = None
|
|
|
|
result.append(JobResponse(
|
|
job_id=str(job['job_id']),
|
|
status=job['status'],
|
|
url=job['url'],
|
|
created_at=job['created_at'].isoformat(),
|
|
completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None,
|
|
reviews_count=job.get('reviews_count'),
|
|
total_reviews=job.get('total_reviews'),
|
|
scrape_time=job.get('scrape_time'),
|
|
error_message=job.get('error_message'),
|
|
business_name=business_name,
|
|
business_address=business_address,
|
|
business_category=business_category,
|
|
review_topics=review_topics
|
|
))
|
|
|
|
return result
|
|
|
|
|
|
@app.delete("/jobs/{job_id}", summary="Delete Job")
|
|
async def delete_job(job_id: UUID):
|
|
"""Delete a job from the system"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
deleted = await db.delete_job(job_id)
|
|
if not deleted:
|
|
raise HTTPException(status_code=404, detail="Job not found")
|
|
|
|
return {"message": "Job deleted successfully"}
|
|
|
|
|
|
@app.post("/check-reviews", summary="Check if Reviews Exist")
|
|
async def check_reviews(request: ScrapeRequest):
|
|
"""
|
|
Get business card information from Google Maps.
|
|
Returns business name, address, rating, and review count.
|
|
|
|
Creates a fresh Chrome instance for reliable results (same as full scraper).
|
|
This is used to show the business confirmation card in the UI.
|
|
"""
|
|
try:
|
|
url = str(request.url)
|
|
|
|
# Use the SAME scraper algorithm with validation_only=True for early return
|
|
# Creates a fresh Chrome instance (same as full scraper) to avoid stale browser state
|
|
# Pooled browsers can have cookies/state that cause Google to render pages differently
|
|
|
|
# Build fingerprint dict from request
|
|
fingerprint = None
|
|
if request.browser_fingerprint:
|
|
fp = request.browser_fingerprint
|
|
fingerprint = {
|
|
"userAgent": fp.userAgent,
|
|
"timezone": fp.timezone,
|
|
"language": fp.language,
|
|
"platform": fp.platform,
|
|
}
|
|
if fp.viewport:
|
|
fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
|
if fp.geolocation:
|
|
fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
|
log.info(f"Creating Chrome with user fingerprint: {fp.platform}, {fp.timezone}")
|
|
elif request.geolocation:
|
|
fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}}
|
|
log.info(f"Creating Chrome with geolocation only")
|
|
else:
|
|
log.info(f"Creating Chrome with default settings")
|
|
|
|
result = await asyncio.to_thread(
|
|
fast_scrape_reviews,
|
|
url=url,
|
|
headless=False, # Use Xvfb display
|
|
validation_only=True, # Return early after getting total_reviews
|
|
browser_fingerprint=fingerprint # Pass user's browser fingerprint
|
|
)
|
|
|
|
# Extract validation info from the result
|
|
validation_info = result.get('validation_info', {})
|
|
total_reviews = validation_info.get('total_reviews') or result.get('total_reviews') or 0
|
|
name = validation_info.get('name')
|
|
rating = validation_info.get('rating')
|
|
category = validation_info.get('category')
|
|
address = validation_info.get('address')
|
|
|
|
# Has reviews if we found a business with the Reviews tab (indicated by total_reviews > 0)
|
|
has_reviews = bool(name and total_reviews > 0)
|
|
|
|
return {
|
|
"has_reviews": has_reviews, # True if business has reviews
|
|
"total_reviews": total_reviews,
|
|
"name": name,
|
|
"address": address,
|
|
"rating": rating,
|
|
"category": category,
|
|
"success": result.get('success', True),
|
|
"error": result.get('error')
|
|
}
|
|
|
|
except Exception as e:
|
|
log.error(f"Error checking reviews: {e}")
|
|
|
|
return {
|
|
"has_reviews": False,
|
|
"review_count": 0,
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
@app.get("/stats", response_model=StatsResponse, summary="Get Statistics")
|
|
async def get_stats():
|
|
"""Get job statistics"""
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
|
|
stats = await db.get_stats()
|
|
return StatsResponse(**stats)
|
|
|
|
|
|
@app.get("/pool-stats", summary="Get Worker Pool Statistics")
|
|
async def pool_stats():
|
|
"""Get Chrome worker pool statistics"""
|
|
return await asyncio.to_thread(get_pool_stats)
|
|
|
|
|
|
# ==================== Health Check Endpoints ====================
|
|
|
|
@app.get("/health/live", summary="Liveness Probe")
|
|
async def liveness():
|
|
"""
|
|
Liveness check: Is the server alive?
|
|
|
|
Use this for Kubernetes liveness probe - restart container if fails.
|
|
"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
return await health_system.check_liveness()
|
|
|
|
|
|
@app.get("/health/ready", summary="Readiness Probe")
|
|
async def readiness():
|
|
"""
|
|
Readiness check: Can the server handle traffic?
|
|
|
|
Use this for Kubernetes readiness probe - remove from load balancer if fails.
|
|
"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
result = await health_system.check_readiness()
|
|
|
|
if result["status"] != "ready":
|
|
return JSONResponse(status_code=503, content=result)
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/health/canary", summary="Canary Health Check")
|
|
async def canary():
|
|
"""
|
|
Canary check: Does scraping actually work?
|
|
|
|
Returns the latest canary test result (runs every 4 hours in background).
|
|
Use this for external monitoring (PagerDuty, DataDog) - alerts if fails.
|
|
"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
result = await health_system.check_canary()
|
|
|
|
if result["status"] not in ["healthy", "unknown"]:
|
|
return JSONResponse(status_code=503, content=result)
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/health/detailed", summary="Detailed Health Status")
|
|
async def detailed_health():
|
|
"""Get detailed health status of all components"""
|
|
if not health_system:
|
|
raise HTTPException(status_code=503, detail="Health system not initialized")
|
|
|
|
return await health_system.get_detailed_health()
|
|
|
|
|
|
# ==================== Background Job Runner ====================
|
|
|
|
async def run_scraping_job(job_id: UUID):
|
|
"""
|
|
Run scraping job in background with concurrency limit.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
"""
|
|
job_id_str = str(job_id)
|
|
|
|
async with job_semaphore: # Limit concurrent Chrome instances
|
|
try:
|
|
# Update status to running
|
|
await db.update_job_status(job_id, JobStatus.RUNNING)
|
|
log.info(f"Starting scraping job {job_id}")
|
|
|
|
# Get job details
|
|
job = await db.get_job(job_id)
|
|
url = job['url']
|
|
|
|
# Extract browser fingerprint from metadata if available
|
|
browser_fingerprint = None
|
|
metadata = job.get('metadata')
|
|
if isinstance(metadata, str):
|
|
try:
|
|
metadata = json.loads(metadata)
|
|
except:
|
|
metadata = None
|
|
if metadata and 'browser_fingerprint' in metadata:
|
|
browser_fingerprint = metadata['browser_fingerprint']
|
|
log.info(f"Using user fingerprint: {browser_fingerprint.get('platform')}, {browser_fingerprint.get('timezone')}")
|
|
elif metadata and 'geolocation' in metadata:
|
|
browser_fingerprint = {'geolocation': metadata['geolocation']}
|
|
log.info(f"Using user geolocation only")
|
|
|
|
# Broadcast job started via SSE
|
|
await broadcast_job_update(job_id_str, "job_started", {
|
|
"job_id": job_id_str,
|
|
"status": "running",
|
|
"url": url
|
|
})
|
|
|
|
# Get the event loop for progress updates from worker thread
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Create log capture instance that we can access for real-time logs
|
|
log_capture = LogCapture()
|
|
|
|
# Track total reviews for incremental saves
|
|
total_reviews_seen = [None]
|
|
# Accumulate all reviews for incremental saves (flush_callback receives batches)
|
|
all_reviews_collected = []
|
|
# Track last broadcasted log count for streaming new logs
|
|
last_broadcasted_log_count = [0]
|
|
# Track last metrics broadcast time
|
|
last_metrics_broadcast_time = [time.time()]
|
|
metrics_broadcast_interval = 5.0 # Emit metrics every 5 seconds
|
|
|
|
# Progress callback to update job status with current/total counts AND logs
|
|
def progress_callback(current_count: int, total_count: int):
|
|
"""Update job progress and logs from worker thread"""
|
|
if total_count:
|
|
total_reviews_seen[0] = total_count
|
|
|
|
async def update():
|
|
# Get current logs from the shared log_capture
|
|
current_logs = log_capture.get_logs()
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.RUNNING,
|
|
reviews_count=current_count,
|
|
total_reviews=total_count,
|
|
scrape_logs=current_logs
|
|
)
|
|
|
|
# Broadcast individual structured log events for new logs
|
|
current_log_count = len(current_logs)
|
|
if current_log_count > last_broadcasted_log_count[0]:
|
|
new_logs = current_logs[last_broadcasted_log_count[0]:]
|
|
for log_entry in new_logs:
|
|
await broadcast_structured_log(job_id_str, log_entry)
|
|
last_broadcasted_log_count[0] = current_log_count
|
|
|
|
# Broadcast metrics event every 5 seconds
|
|
current_time = time.time()
|
|
if current_time - last_metrics_broadcast_time[0] >= metrics_broadcast_interval:
|
|
# Calculate extraction rate
|
|
elapsed = current_time - last_metrics_broadcast_time[0]
|
|
extraction_rate = (current_count / elapsed) if elapsed > 0 else 0
|
|
|
|
# Count scroll events from logs
|
|
scroll_count = 0
|
|
for entry in current_logs:
|
|
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
|
if 'scroll' in msg.lower():
|
|
scroll_count += 1
|
|
|
|
# Get memory from latest log entry with metrics
|
|
memory_mb = 0
|
|
for entry in reversed(current_logs):
|
|
if isinstance(entry, dict) and entry.get('metrics'):
|
|
memory_mb = entry['metrics'].get('memory_mb', 0)
|
|
break
|
|
|
|
metrics_data = {
|
|
"reviews_extracted": current_count,
|
|
"scroll_count": scroll_count,
|
|
"memory_mb": memory_mb,
|
|
"extraction_rate": round(extraction_rate, 2)
|
|
}
|
|
await broadcast_metrics(job_id_str, metrics_data)
|
|
last_metrics_broadcast_time[0] = current_time
|
|
|
|
# Broadcast progress via SSE (legacy format for backward compatibility)
|
|
await broadcast_job_update(job_id_str, "job_progress", {
|
|
"job_id": job_id_str,
|
|
"status": "running",
|
|
"reviews_count": current_count,
|
|
"total_reviews": total_count,
|
|
"logs": current_logs
|
|
})
|
|
|
|
# Schedule the coroutine on the event loop
|
|
asyncio.run_coroutine_threadsafe(update(), loop)
|
|
|
|
# Flush callback to save reviews incrementally (crash recovery)
|
|
# Note: flush_callback receives batches, so we accumulate them
|
|
def flush_callback(reviews_batch: list):
|
|
"""Accumulate and save reviews to DB incrementally from worker thread"""
|
|
# Extend our collection with the new batch
|
|
all_reviews_collected.extend(reviews_batch)
|
|
|
|
async def save():
|
|
await db.save_reviews_incremental(
|
|
job_id=job_id,
|
|
reviews=all_reviews_collected, # Save ALL reviews so far
|
|
total_reviews=total_reviews_seen[0]
|
|
)
|
|
# Schedule the coroutine on the event loop
|
|
asyncio.run_coroutine_threadsafe(save(), loop)
|
|
|
|
# Run scraping with progress callback and shared log capture
|
|
# headless=False because Docker uses Xvfb virtual display
|
|
result = await asyncio.to_thread(
|
|
fast_scrape_reviews,
|
|
url=url,
|
|
headless=False,
|
|
progress_callback=progress_callback,
|
|
log_capture=log_capture,
|
|
flush_callback=flush_callback,
|
|
browser_fingerprint=browser_fingerprint # Pass user's browser fingerprint
|
|
)
|
|
|
|
if result['success']:
|
|
# Save session fingerprint if captured
|
|
if result.get('session_fingerprint'):
|
|
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
|
log.info(f"Saved session fingerprint for job {job_id}")
|
|
|
|
# Save results to database (including scraper logs and review topics)
|
|
await db.save_job_result(
|
|
job_id=job_id,
|
|
reviews=result['reviews'],
|
|
scrape_time=result['time'],
|
|
total_reviews=result.get('total_reviews'),
|
|
scrape_logs=result.get('logs'),
|
|
review_topics=result.get('review_topics')
|
|
)
|
|
|
|
log.info(
|
|
f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s"
|
|
)
|
|
|
|
# Broadcast job completed via SSE
|
|
await broadcast_job_update(job_id_str, "job_completed", {
|
|
"job_id": job_id_str,
|
|
"status": "completed",
|
|
"reviews_count": result['count'],
|
|
"total_reviews": result.get('total_reviews'),
|
|
"scrape_time": result['time'],
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000')
|
|
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='completed',
|
|
reviews_count=result['count'],
|
|
scrape_time=result['time'],
|
|
reviews_url=f"{api_base_url}/jobs/{job_id}/reviews",
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
else:
|
|
# Save session fingerprint even on failure (useful for debugging bot detection)
|
|
if result.get('session_fingerprint'):
|
|
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
|
log.info(f"Saved session fingerprint for failed job {job_id}")
|
|
|
|
# Job failed - check if we have partial reviews saved
|
|
current_job = await db.get_job(job_id)
|
|
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
|
|
|
if partial_count > 0:
|
|
# Mark as partial - we have some reviews saved
|
|
await db.mark_job_partial(
|
|
job_id,
|
|
error_message=result.get('error', 'Unknown error'),
|
|
scrape_logs=result.get('logs')
|
|
)
|
|
|
|
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before error: {result.get('error')}")
|
|
|
|
# Broadcast job partial via SSE
|
|
await broadcast_job_update(job_id_str, "job_partial", {
|
|
"job_id": job_id_str,
|
|
"status": "partial",
|
|
"reviews_count": partial_count,
|
|
"total_reviews": current_job.get('total_reviews'),
|
|
"error_message": result.get('error'),
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send partial webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='partial',
|
|
reviews_count=partial_count,
|
|
error_message=result.get('error'),
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
else:
|
|
# No reviews saved - mark as failed
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.FAILED,
|
|
error_message=result.get('error', 'Unknown error'),
|
|
scrape_logs=result.get('logs')
|
|
)
|
|
|
|
log.error(f"Failed job {job_id}: {result.get('error')}")
|
|
|
|
# Broadcast job failed via SSE
|
|
await broadcast_job_update(job_id_str, "job_failed", {
|
|
"job_id": job_id_str,
|
|
"status": "failed",
|
|
"error_message": result.get('error'),
|
|
"logs": result.get('logs', [])
|
|
})
|
|
|
|
# Send failure webhook if configured
|
|
if job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=job['webhook_url'],
|
|
job_id=job_id,
|
|
status='failed',
|
|
error_message=result.get('error'),
|
|
secret=job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in scraping job {job_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Check if we have partial reviews saved
|
|
current_job = await db.get_job(job_id)
|
|
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
|
|
|
if partial_count > 0:
|
|
# Mark as partial - we have some reviews saved
|
|
await db.mark_job_partial(
|
|
job_id,
|
|
error_message=str(e),
|
|
scrape_logs=log_capture.get_logs() if log_capture else None
|
|
)
|
|
|
|
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before exception: {e}")
|
|
|
|
# Broadcast job partial via SSE
|
|
await broadcast_job_update(job_id_str, "job_partial", {
|
|
"job_id": job_id_str,
|
|
"status": "partial",
|
|
"reviews_count": partial_count,
|
|
"total_reviews": current_job.get('total_reviews'),
|
|
"error_message": str(e),
|
|
"logs": log_capture.get_logs() if log_capture else []
|
|
})
|
|
|
|
# Send partial webhook
|
|
if current_job and current_job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=current_job['webhook_url'],
|
|
job_id=job_id,
|
|
status='partial',
|
|
reviews_count=partial_count,
|
|
error_message=str(e),
|
|
secret=current_job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
else:
|
|
# No reviews saved - mark as failed
|
|
await db.update_job_status(
|
|
job_id,
|
|
JobStatus.FAILED,
|
|
error_message=str(e)
|
|
)
|
|
|
|
# Broadcast job failed via SSE
|
|
await broadcast_job_update(job_id_str, "job_failed", {
|
|
"job_id": job_id_str,
|
|
"status": "failed",
|
|
"error_message": str(e),
|
|
"logs": []
|
|
})
|
|
|
|
# Send failure webhook
|
|
if current_job and current_job.get('webhook_url'):
|
|
webhook_manager = WebhookManager()
|
|
await webhook_manager.send_job_completed_webhook(
|
|
webhook_url=current_job['webhook_url'],
|
|
job_id=job_id,
|
|
status='failed',
|
|
error_message=str(e),
|
|
secret=current_job.get('webhook_secret'),
|
|
db=db
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv('PORT', 8000))
|
|
|
|
log.info(f"Starting production server on port {port}...")
|
|
uvicorn.run(
|
|
"api_server_production:app",
|
|
host="0.0.0.0",
|
|
port=port,
|
|
reload=False, # Disable reload in production
|
|
log_level="info"
|
|
)
|