Remove old scraper files - consolidate to scraper_clean

Production (api_server_production.py) only uses:
- modules/scraper_clean.py - main scraping logic
- modules/fast_scraper.py - validation helpers
- modules/database.py, webhooks.py, health_checks.py, chrome_pool.py

Deleted 33 unused Python files including:
- Old API server (api_server.py)
- 14 start*.py experimental scrapers
- 7 *_scraper.py variants
- Old modules: scraper.py, api_interceptor.py, job_manager.py, cli.py
- Various debug/test/utility scripts

Saves ~11,000 lines of unmaintained code.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-23 17:25:00 +00:00
parent 80e7771c00
commit 8ccf72a489
37 changed files with 859 additions and 11116 deletions

View File

@@ -6,6 +6,7 @@ Production Google Reviews Scraper API Server with Phase 1 features:
- Smart health checks with canary testing
"""
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
@@ -15,12 +16,12 @@ from uuid import UUID
from fastapi import FastAPI, HTTPException, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl, Field
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from modules.database import DatabaseManager, JobStatus
from modules.webhooks import WebhookDispatcher, WebhookManager
from modules.health_checks import HealthCheckSystem
from modules.scraper_clean import fast_scrape_reviews # Clean scraper with hard refresh recovery
from modules.scraper_clean import fast_scrape_reviews, LogCapture # Clean scraper with hard refresh recovery
from modules.fast_scraper import check_reviews_available, get_business_card_info # Helper functions
from modules.chrome_pool import (
start_worker_pools,
@@ -48,6 +49,11 @@ health_system: Optional[HealthCheckSystem] = None
MAX_CONCURRENT_JOBS = int(os.getenv('MAX_CONCURRENT_JOBS', '5'))
job_semaphore = asyncio.Semaphore(MAX_CONCURRENT_JOBS)
# SSE: Store for broadcasting job updates to connected clients
# Format: {job_id: [asyncio.Queue, ...]} for job-specific streams
# Format: {"all": [asyncio.Queue, ...]} for all-jobs stream
job_update_queues: Dict[str, List[asyncio.Queue]] = {"all": []}
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -82,11 +88,12 @@ async def lifespan(app: FastAPI):
# Start Chrome worker pools (1 for validation, 2 for scraping)
# These pre-warm Chrome instances for instant availability
# headless=False because Docker uses Xvfb virtual display for better compatibility
await asyncio.to_thread(
start_worker_pools,
validation_size=1,
scraping_size=2,
headless=True
headless=False
)
log.info("Chrome worker pools started (1 validation + 2 scraping)")
@@ -148,6 +155,9 @@ class JobResponse(BaseModel):
scrape_time: Optional[float] = None
error_message: Optional[str] = None
webhook_url: Optional[str] = None
# Business metadata
business_name: Optional[str] = None
business_address: Optional[str] = None
class ReviewsResponse(BaseModel):
@@ -239,12 +249,296 @@ async def get_job(job_id: UUID):
started_at=job['started_at'].isoformat() if job['started_at'] else None,
completed_at=job['completed_at'].isoformat() if job['completed_at'] else None,
reviews_count=job['reviews_count'],
total_reviews=job.get('total_reviews'),
scrape_time=job['scrape_time'],
error_message=job['error_message'],
webhook_url=job.get('webhook_url')
)
@app.get("/jobs/{job_id}/logs", summary="Get Job Logs")
async def get_job_logs(job_id: UUID):
"""
Get the scraper logs for a job.
Returns logs from both successful and failed jobs.
Useful for debugging scraping issues.
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
# Get scrape_logs from job
scrape_logs = job.get('scrape_logs')
# Parse if string (asyncpg might return JSONB as string)
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = None
return {
"job_id": str(job_id),
"status": job['status'],
"error_message": job.get('error_message'),
"logs": scrape_logs or [],
"log_count": len(scrape_logs) if scrape_logs else 0
}
# ==================== SSE Streaming Endpoints ====================
async def broadcast_job_update(job_id: str, event_type: str, data: dict):
"""Broadcast an update to all subscribers of a job stream and the all-jobs stream."""
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
# Send to job-specific subscribers
if job_id in job_update_queues:
for queue in job_update_queues[job_id]:
try:
await queue.put(message)
except:
pass
# Send to all-jobs subscribers
for queue in job_update_queues.get("all", []):
try:
await queue.put(message)
except:
pass
@app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)")
async def stream_job_updates(job_id: UUID):
"""
Server-Sent Events stream for real-time job updates.
Streams:
- status: Job status changes
- progress: Review count and progress updates
- logs: New log entries
- complete: Job finished (completed/failed)
Connect with EventSource in the browser:
```javascript
const es = new EventSource('/jobs/{job_id}/stream');
es.onmessage = (e) => console.log(JSON.parse(e.data));
es.addEventListener('logs', (e) => console.log('Logs:', JSON.parse(e.data)));
```
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Verify job exists
job = await db.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
job_id_str = str(job_id)
# Create queue for this client
queue: asyncio.Queue = asyncio.Queue()
# Register subscriber
if job_id_str not in job_update_queues:
job_update_queues[job_id_str] = []
job_update_queues[job_id_str].append(queue)
async def event_generator():
try:
# Send initial state
job_data = await db.get_job(job_id)
if job_data:
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
initial = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": job_data.get('reviews_count'),
"total_reviews": job_data.get('total_reviews'),
"scrape_time": job_data.get('scrape_time'),
"error_message": job_data.get('error_message'),
"logs": scrape_logs or []
}
yield f"event: init\ndata: {json.dumps(initial)}\n\n"
# If job is already complete, send complete event and close
if job_data and job_data['status'] in ['completed', 'failed', 'cancelled']:
yield f"event: complete\ndata: {json.dumps({'status': job_data['status']})}\n\n"
return
# Keep connection alive and send updates
last_log_count = len(scrape_logs) if scrape_logs else 0
last_reviews_count = job_data.get('reviews_count') if job_data else 0
while True:
try:
# Wait for update with timeout (for keepalive)
try:
message = await asyncio.wait_for(queue.get(), timeout=2.0)
yield message
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
# Also poll database for updates (backup in case broadcast missed)
job_data = await db.get_job(job_id)
if job_data:
# Check for status change
if job_data['status'] in ['completed', 'failed', 'cancelled']:
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
final = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": job_data.get('reviews_count'),
"total_reviews": job_data.get('total_reviews'),
"scrape_time": job_data.get('scrape_time'),
"error_message": job_data.get('error_message'),
"logs": scrape_logs or []
}
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
return
# Check for new logs or progress
scrape_logs = job_data.get('scrape_logs')
if isinstance(scrape_logs, str):
try:
scrape_logs = json.loads(scrape_logs)
except:
scrape_logs = []
current_log_count = len(scrape_logs) if scrape_logs else 0
current_reviews = job_data.get('reviews_count') or 0
if current_log_count > last_log_count or current_reviews != last_reviews_count:
update = {
"job_id": job_id_str,
"status": job_data['status'],
"reviews_count": current_reviews,
"total_reviews": job_data.get('total_reviews'),
"logs": scrape_logs or []
}
yield f"event: update\ndata: {json.dumps(update)}\n\n"
last_log_count = current_log_count
last_reviews_count = current_reviews
except Exception as e:
log.error(f"Error in SSE stream for job {job_id}: {e}")
break
finally:
# Unregister subscriber
if job_id_str in job_update_queues:
try:
job_update_queues[job_id_str].remove(queue)
if not job_update_queues[job_id_str]:
del job_update_queues[job_id_str]
except:
pass
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@app.get("/jobs/stream", summary="Stream All Jobs Updates (SSE)")
async def stream_all_jobs():
"""
Server-Sent Events stream for all job updates.
Streams:
- job_created: New job was created
- job_updated: Job status/progress changed
- job_completed: Job finished
Connect with EventSource in the browser:
```javascript
const es = new EventSource('/jobs/stream');
es.addEventListener('job_updated', (e) => console.log('Update:', JSON.parse(e.data)));
```
"""
if not db:
raise HTTPException(status_code=500, detail="Database not initialized")
# Create queue for this client
queue: asyncio.Queue = asyncio.Queue()
# Register subscriber to all-jobs stream
job_update_queues["all"].append(queue)
async def event_generator():
try:
# Send initial jobs list
jobs = await db.list_jobs(limit=100)
jobs_data = [
{
"job_id": str(j['job_id']),
"status": j['status'],
"url": j['url'],
"created_at": j['created_at'].isoformat(),
"completed_at": j['completed_at'].isoformat() if j.get('completed_at') else None,
"reviews_count": j.get('reviews_count'),
"scrape_time": j.get('scrape_time'),
"error_message": j.get('error_message')
}
for j in jobs
]
yield f"event: init\ndata: {json.dumps({'jobs': jobs_data})}\n\n"
# Keep connection alive and send updates
while True:
try:
# Wait for update with timeout (for keepalive)
try:
message = await asyncio.wait_for(queue.get(), timeout=5.0)
yield message
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
except Exception as e:
log.error(f"Error in all-jobs SSE stream: {e}")
break
finally:
# Unregister subscriber
try:
job_update_queues["all"].remove(queue)
except:
pass
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
async def get_job_reviews(job_id: UUID):
"""
@@ -298,19 +592,34 @@ async def list_jobs(
jobs = await db.list_jobs(status=job_status, limit=limit, offset=offset)
return [
JobResponse(
result = []
for job in jobs:
# Extract business info from metadata if available
metadata = job.get('metadata')
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except:
metadata = None
business_name = metadata.get('business_name') if metadata else None
business_address = metadata.get('business_address') if metadata else None
result.append(JobResponse(
job_id=str(job['job_id']),
status=job['status'],
url=job['url'],
created_at=job['created_at'].isoformat(),
completed_at=job['completed_at'].isoformat() if job.get('completed_at') else None,
reviews_count=job.get('reviews_count'),
total_reviews=job.get('total_reviews'),
scrape_time=job.get('scrape_time'),
error_message=job.get('error_message')
)
for job in jobs
]
error_message=job.get('error_message'),
business_name=business_name,
business_address=business_address
))
return result
@app.delete("/jobs/{job_id}", summary="Delete Job")
@@ -370,11 +679,11 @@ async def check_reviews(request: ScrapeRequest):
# SIMPLIFIED VALIDATION: If we found a business (name + rating), assume it has reviews
# Let the actual scraper determine if reviews exist
has_business = result.get('name') and result.get('rating')
has_business = bool(result.get('name') and result.get('rating'))
return {
"has_reviews": has_business, # Assume true if business exists
"total_reviews": result['total_reviews'] or 0, # Show 0 if unknown
"has_reviews": has_business, # Boolean: true if business exists
"total_reviews": result.get('total_reviews') or 0, # Show 0 if unknown
"name": result.get('name'),
"address": result.get('address'),
"rating": result.get('rating'),
@@ -488,6 +797,8 @@ async def run_scraping_job(job_id: UUID):
Args:
job_id: Job UUID
"""
job_id_str = str(job_id)
async with job_semaphore: # Limit concurrent Chrome instances
try:
# Update status to running
@@ -498,44 +809,79 @@ async def run_scraping_job(job_id: UUID):
job = await db.get_job(job_id)
url = job['url']
# Broadcast job started via SSE
await broadcast_job_update(job_id_str, "job_started", {
"job_id": job_id_str,
"status": "running",
"url": url
})
# Get the event loop for progress updates from worker thread
loop = asyncio.get_running_loop()
# Progress callback to update job status with current/total counts
# Create log capture instance that we can access for real-time logs
log_capture = LogCapture()
# Progress callback to update job status with current/total counts AND logs
def progress_callback(current_count: int, total_count: int):
"""Update job progress from worker thread"""
"""Update job progress and logs from worker thread"""
async def update():
# Get current logs from the shared log_capture
current_logs = log_capture.get_logs()
await db.update_job_status(
job_id,
JobStatus.RUNNING,
reviews_count=current_count,
total_reviews=total_count
total_reviews=total_count,
scrape_logs=current_logs
)
# Broadcast progress via SSE
await broadcast_job_update(job_id_str, "job_progress", {
"job_id": job_id_str,
"status": "running",
"reviews_count": current_count,
"total_reviews": total_count,
"logs": current_logs
})
# Schedule the coroutine on the event loop
asyncio.run_coroutine_threadsafe(update(), loop)
# Run scraping with progress callback
# Run scraping with progress callback and shared log capture
# headless=False because Docker uses Xvfb virtual display
result = await asyncio.to_thread(
fast_scrape_reviews,
url=url,
headless=True,
progress_callback=progress_callback
headless=False,
progress_callback=progress_callback,
log_capture=log_capture
)
if result['success']:
# Save results to database
# Save results to database (including scraper logs)
await db.save_job_result(
job_id=job_id,
reviews=result['reviews'],
scrape_time=result['time'],
total_reviews=result.get('total_reviews')
total_reviews=result.get('total_reviews'),
scrape_logs=result.get('logs')
)
log.info(
f"Completed job {job_id}: {result['count']} reviews in {result['time']:.1f}s"
)
# Broadcast job completed via SSE
await broadcast_job_update(job_id_str, "job_completed", {
"job_id": job_id_str,
"status": "completed",
"reviews_count": result['count'],
"total_reviews": result.get('total_reviews'),
"scrape_time": result['time'],
"logs": result.get('logs', [])
})
# Send webhook if configured
if job.get('webhook_url'):
webhook_manager = WebhookManager()
@@ -553,15 +899,24 @@ async def run_scraping_job(job_id: UUID):
)
else:
# Job failed
# Job failed - save logs for debugging
await db.update_job_status(
job_id,
JobStatus.FAILED,
error_message=result.get('error', 'Unknown error')
error_message=result.get('error', 'Unknown error'),
scrape_logs=result.get('logs')
)
log.error(f"Failed job {job_id}: {result.get('error')}")
# Broadcast job failed via SSE
await broadcast_job_update(job_id_str, "job_failed", {
"job_id": job_id_str,
"status": "failed",
"error_message": result.get('error'),
"logs": result.get('logs', [])
})
# Send failure webhook if configured
if job.get('webhook_url'):
webhook_manager = WebhookManager()
@@ -585,6 +940,14 @@ async def run_scraping_job(job_id: UUID):
error_message=str(e)
)
# Broadcast job failed via SSE
await broadcast_job_update(job_id_str, "job_failed", {
"job_id": job_id_str,
"status": "failed",
"error_message": str(e),
"logs": []
})
# Send failure webhook
job = await db.get_job(job_id)
if job and job.get('webhook_url'):