- Add fast_scrape_reviews() wrapper to scraper_clean.py for API compatibility - Set window size (1200x900) in wrapper to ensure proper Google Maps rendering - Update job_manager.py to import from scraper_clean instead of fast_scraper - Production now uses clean scraper with: - Hard refresh recovery when stuck after 8+ soft recovery attempts - API interception + DOM parsing for complete data collection - Automatic deduplication across refreshes Tested: 589/589 reviews collected in 55s Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
407 lines
14 KiB
Python
407 lines
14 KiB
Python
"""
|
|
Background job manager for Google Reviews Scraper.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, Any, Optional, List
|
|
from dataclasses import dataclass, asdict
|
|
|
|
from modules.config import load_config
|
|
from modules.scraper import GoogleReviewsScraper
|
|
from modules.scraper_clean import fast_scrape_reviews # Updated to use clean scraper with hard refresh recovery
|
|
from modules.chrome_pool import get_scraping_worker, release_scraping_worker
|
|
|
|
log = logging.getLogger("scraper")
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
"""Job status enumeration"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
@dataclass
|
|
class ScrapingJob:
|
|
"""Scraping job data class"""
|
|
job_id: str
|
|
status: JobStatus
|
|
url: str
|
|
config: Dict[str, Any]
|
|
created_at: datetime
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None # Last update time (for progress tracking)
|
|
error_message: Optional[str] = None
|
|
reviews_count: Optional[int] = None
|
|
total_reviews: Optional[int] = None # Total reviews available (from page counter)
|
|
images_count: Optional[int] = None
|
|
progress: Dict[str, Any] = None
|
|
reviews_data: Optional[List[Dict[str, Any]]] = None # Store actual review data
|
|
scrape_time: Optional[float] = None # Time taken to scrape
|
|
|
|
def to_dict(self, include_reviews: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Convert job to dictionary for JSON serialization
|
|
|
|
Args:
|
|
include_reviews: Whether to include the full reviews data (default: False)
|
|
"""
|
|
data = asdict(self)
|
|
# Convert datetime objects to ISO strings
|
|
for field in ['created_at', 'started_at', 'completed_at']:
|
|
if data[field]:
|
|
data[field] = data[field].isoformat()
|
|
|
|
# Exclude reviews_data by default (can be large)
|
|
if not include_reviews:
|
|
data.pop('reviews_data', None)
|
|
|
|
return data
|
|
|
|
|
|
class JobManager:
|
|
"""Manager for background scraping jobs"""
|
|
|
|
def __init__(self, max_concurrent_jobs: int = 3):
|
|
"""Initialize job manager"""
|
|
self.max_concurrent_jobs = max_concurrent_jobs
|
|
self.jobs: Dict[str, ScrapingJob] = {}
|
|
self.executor = ThreadPoolExecutor(max_workers=max_concurrent_jobs)
|
|
self.lock = threading.Lock()
|
|
|
|
def create_job(self, url: str, config_overrides: Dict[str, Any] = None) -> str:
|
|
"""
|
|
Create a new scraping job.
|
|
|
|
Args:
|
|
url: Google Maps URL to scrape
|
|
config_overrides: Optional config overrides
|
|
|
|
Returns:
|
|
Job ID
|
|
"""
|
|
job_id = str(uuid.uuid4())
|
|
|
|
# Load base config
|
|
config = load_config()
|
|
|
|
# Apply URL
|
|
config["url"] = url
|
|
|
|
# Apply any overrides
|
|
if config_overrides:
|
|
config.update(config_overrides)
|
|
|
|
job = ScrapingJob(
|
|
job_id=job_id,
|
|
status=JobStatus.PENDING,
|
|
url=url,
|
|
config=config,
|
|
created_at=datetime.now(),
|
|
progress={"stage": "created", "message": "Job created and queued"}
|
|
)
|
|
|
|
with self.lock:
|
|
self.jobs[job_id] = job
|
|
|
|
log.info(f"Created scraping job {job_id} for URL: {url}")
|
|
return job_id
|
|
|
|
def start_job(self, job_id: str) -> bool:
|
|
"""
|
|
Start a pending job.
|
|
|
|
Args:
|
|
job_id: Job ID to start
|
|
|
|
Returns:
|
|
True if job was started, False otherwise
|
|
"""
|
|
with self.lock:
|
|
if job_id not in self.jobs:
|
|
return False
|
|
|
|
job = self.jobs[job_id]
|
|
if job.status != JobStatus.PENDING:
|
|
return False
|
|
|
|
# Check if we can start more jobs
|
|
running_count = sum(1 for j in self.jobs.values() if j.status == JobStatus.RUNNING)
|
|
if running_count >= self.max_concurrent_jobs:
|
|
return False
|
|
|
|
job.status = JobStatus.RUNNING
|
|
job.started_at = datetime.now()
|
|
job.updated_at = datetime.now()
|
|
job.progress = {"stage": "starting", "message": "Initializing scraper"}
|
|
|
|
# Submit job to thread pool
|
|
future = self.executor.submit(self._run_scraping_job, job_id)
|
|
|
|
log.info(f"Started scraping job {job_id}")
|
|
return True
|
|
|
|
def _run_scraping_job(self, job_id: str):
|
|
"""
|
|
Run the actual scraping job in background thread.
|
|
|
|
Args:
|
|
job_id: Job ID to run
|
|
"""
|
|
def progress_callback(current_count: int, total_count: int):
|
|
"""Update job progress during scraping"""
|
|
with self.lock:
|
|
job = self.jobs.get(job_id)
|
|
if job:
|
|
job.reviews_count = current_count
|
|
job.total_reviews = total_count
|
|
job.updated_at = datetime.now() # Update last update time
|
|
# Calculate percentage for better UX
|
|
percentage = int((current_count / total_count * 100)) if total_count > 0 else 0
|
|
job.progress = {
|
|
"stage": "scraping",
|
|
"message": f"Collecting reviews: {current_count} / {total_count} ({percentage}%)",
|
|
"percentage": percentage
|
|
}
|
|
|
|
worker = None
|
|
try:
|
|
with self.lock:
|
|
job = self.jobs[job_id]
|
|
job.progress = {"stage": "initializing", "message": "Acquiring Chrome worker from pool"}
|
|
|
|
# Get a worker from the scraping pool
|
|
worker = get_scraping_worker(timeout=30)
|
|
|
|
if not worker:
|
|
raise Exception("No Chrome workers available. Pool may be at capacity.")
|
|
|
|
log.info(f"Job {job_id}: Acquired worker {worker.worker_id} from pool")
|
|
|
|
# Get config
|
|
url = job.config.get('url')
|
|
headless = job.config.get('headless', True) # Default to headless
|
|
max_scrolls = job.config.get('max_scrolls', 999999) # Effectively unlimited - relies on idle detection
|
|
|
|
with self.lock:
|
|
job.progress = {"stage": "scraping", "message": f"Scraping reviews with {worker.worker_id} (fast mode)"}
|
|
|
|
# Run the FAST scraping with progress callback using pooled worker
|
|
result = fast_scrape_reviews(
|
|
url=url,
|
|
headless=headless,
|
|
max_scrolls=max_scrolls,
|
|
progress_callback=progress_callback,
|
|
driver=worker.driver, # Use worker's driver
|
|
return_driver=True # Don't close the driver
|
|
)
|
|
|
|
# Pop the driver from result before storing
|
|
result.pop('driver', None)
|
|
|
|
# Mark job as completed or failed
|
|
with self.lock:
|
|
if result['success']:
|
|
job.status = JobStatus.COMPLETED
|
|
job.completed_at = datetime.now()
|
|
job.updated_at = datetime.now()
|
|
job.reviews_count = result['count']
|
|
job.total_reviews = result.get('total_reviews') # Store total review count from page
|
|
job.reviews_data = result['reviews'] # Store the actual reviews
|
|
job.scrape_time = result['time']
|
|
job.progress = {
|
|
"stage": "completed",
|
|
"message": f"Scraping completed successfully in {result['time']:.1f}s",
|
|
"scroll_time": result.get('scroll_time'),
|
|
"extract_time": result.get('extract_time')
|
|
}
|
|
log.info(f"Completed scraping job {job_id}: {result['count']} reviews in {result['time']:.1f}s")
|
|
else:
|
|
job.status = JobStatus.FAILED
|
|
job.completed_at = datetime.now()
|
|
job.updated_at = datetime.now()
|
|
job.error_message = result.get('error', 'Unknown error')
|
|
job.progress = {"stage": "failed", "message": f"Job failed: {result.get('error')}"}
|
|
log.error(f"Failed scraping job {job_id}: {result.get('error')}")
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in scraping job {job_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
with self.lock:
|
|
job = self.jobs[job_id]
|
|
job.status = JobStatus.FAILED
|
|
job.completed_at = datetime.now()
|
|
job.updated_at = datetime.now()
|
|
job.error_message = str(e)
|
|
job.progress = {"stage": "failed", "message": f"Job failed: {str(e)}"}
|
|
|
|
# Recycle worker on error
|
|
if worker:
|
|
log.info(f"Job {job_id}: Recycling worker {worker.worker_id} due to error")
|
|
release_scraping_worker(worker, recycle=True)
|
|
worker = None # Mark as released
|
|
|
|
finally:
|
|
# Release worker back to pool if not already released
|
|
if worker:
|
|
log.info(f"Job {job_id}: Releasing worker {worker.worker_id} back to pool")
|
|
release_scraping_worker(worker, recycle=False)
|
|
|
|
def get_job(self, job_id: str) -> Optional[ScrapingJob]:
|
|
"""
|
|
Get job by ID.
|
|
|
|
Args:
|
|
job_id: Job ID
|
|
|
|
Returns:
|
|
Job object or None if not found
|
|
"""
|
|
with self.lock:
|
|
return self.jobs.get(job_id)
|
|
|
|
def get_job_reviews(self, job_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""
|
|
Get reviews data for a specific job.
|
|
|
|
Args:
|
|
job_id: Job ID
|
|
|
|
Returns:
|
|
List of reviews or None if not found/not completed
|
|
"""
|
|
with self.lock:
|
|
job = self.jobs.get(job_id)
|
|
if job and job.status == JobStatus.COMPLETED:
|
|
return job.reviews_data
|
|
return None
|
|
|
|
def list_jobs(self, status: Optional[JobStatus] = None, limit: int = 100) -> List[ScrapingJob]:
|
|
"""
|
|
List jobs, optionally filtered by status.
|
|
|
|
Args:
|
|
status: Optional status filter
|
|
limit: Maximum number of jobs to return
|
|
|
|
Returns:
|
|
List of jobs
|
|
"""
|
|
with self.lock:
|
|
jobs = list(self.jobs.values())
|
|
|
|
if status:
|
|
jobs = [job for job in jobs if job.status == status]
|
|
|
|
# Sort by creation time (newest first)
|
|
jobs.sort(key=lambda x: x.created_at, reverse=True)
|
|
|
|
return jobs[:limit]
|
|
|
|
def cancel_job(self, job_id: str) -> bool:
|
|
"""
|
|
Cancel a pending or running job.
|
|
|
|
Args:
|
|
job_id: Job ID to cancel
|
|
|
|
Returns:
|
|
True if job was cancelled, False otherwise
|
|
"""
|
|
with self.lock:
|
|
if job_id not in self.jobs:
|
|
return False
|
|
|
|
job = self.jobs[job_id]
|
|
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
|
|
return False
|
|
|
|
job.status = JobStatus.CANCELLED
|
|
job.completed_at = datetime.now()
|
|
job.updated_at = datetime.now()
|
|
job.progress = {"stage": "cancelled", "message": "Job was cancelled"}
|
|
|
|
log.info(f"Cancelled scraping job {job_id}")
|
|
return True
|
|
|
|
def delete_job(self, job_id: str) -> bool:
|
|
"""
|
|
Delete a job from the manager.
|
|
|
|
Args:
|
|
job_id: Job ID to delete
|
|
|
|
Returns:
|
|
True if job was deleted, False otherwise
|
|
"""
|
|
with self.lock:
|
|
if job_id not in self.jobs:
|
|
return False
|
|
del self.jobs[job_id]
|
|
|
|
log.info(f"Deleted scraping job {job_id}")
|
|
return True
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get job manager statistics.
|
|
|
|
Returns:
|
|
Statistics dictionary
|
|
"""
|
|
with self.lock:
|
|
jobs = list(self.jobs.values())
|
|
|
|
stats = {
|
|
"total_jobs": len(jobs),
|
|
"by_status": {},
|
|
"running_jobs": 0,
|
|
"max_concurrent_jobs": self.max_concurrent_jobs
|
|
}
|
|
|
|
for status in JobStatus:
|
|
count = sum(1 for job in jobs if job.status == status)
|
|
stats["by_status"][status.value] = count
|
|
|
|
stats["running_jobs"] = stats["by_status"].get(JobStatus.RUNNING.value, 0)
|
|
|
|
return stats
|
|
|
|
def cleanup_old_jobs(self, max_age_hours: int = 24):
|
|
"""
|
|
Clean up old completed/failed jobs.
|
|
|
|
Args:
|
|
max_age_hours: Maximum age in hours before cleanup
|
|
"""
|
|
cutoff_time = datetime.now().timestamp() - (max_age_hours * 3600)
|
|
|
|
with self.lock:
|
|
to_delete = []
|
|
for job_id, job in self.jobs.items():
|
|
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
|
|
if job.completed_at and job.completed_at.timestamp() < cutoff_time:
|
|
to_delete.append(job_id)
|
|
|
|
for job_id in to_delete:
|
|
del self.jobs[job_id]
|
|
|
|
if to_delete:
|
|
log.info(f"Cleaned up {len(to_delete)} old jobs")
|
|
|
|
def shutdown(self):
|
|
"""Shutdown the job manager"""
|
|
log.info("Shutting down job manager")
|
|
self.executor.shutdown(wait=True) |