From dddf388422c2db53f5cc84bb33c04a0fda5dac00 Mon Sep 17 00:00:00 2001 From: George Khananaev <106206490+georgekhananaev@users.noreply.github.com> Date: Tue, 3 Jun 2025 00:23:22 +0700 Subject: [PATCH] Added api support, now the scrapper can be triggered from 3rd party services --- README.md | 44 +++++- api_server.py | 269 +++++++++++++++++++++++++++++++++++ modules/job_manager.py | 311 +++++++++++++++++++++++++++++++++++++++++ requirements.txt | 6 +- 4 files changed, 628 insertions(+), 2 deletions(-) create mode 100644 api_server.py create mode 100644 modules/job_manager.py diff --git a/README.md b/README.md index 42ad1d8..4fa4d80 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ - Hoards local paths or swaps URLs to your domain like a boss - Multi-threaded downloading that would make NASA jealous - **S3 Cloud Storage**: Auto-upload images to AWS S3 with custom folder structure +- **REST API Server**: Trigger scraping jobs via HTTP endpoints with background processing - **Time-Bending Magic**: Transforms Google's vague "2 weeks ago" garbage into precise ISO timestamps - **Sort Any Damn Way**: Newest, highest, lowest, relevance - we've got you covered - **Metadata on Steroids**: Inject custom parameters into every review record @@ -126,13 +127,54 @@ custom_params: ## 🖥️ Unleashing Hell -### No-Frills, Get-It-Done Usage +### Command Line Usage ```bash python start.py --url "https://maps.app.goo.gl/YOUR_URL" # Boom. That's it. Now go grab a coffee while the magic happens. ``` +### 🚀 API Server Mode (NEW!) + +Want to trigger scraping jobs via REST API? We've got you covered: + +```bash +# Start the API server +python api_server.py +# Server runs on http://localhost:8000 +``` + +#### API Endpoints: + +**Start a scraping job:** +```bash +curl -X POST "http://localhost:8000/scrape" \ + -H "Content-Type: application/json" \ + -d '{ + "url": "https://maps.app.goo.gl/YOUR_URL", + "headless": true, + "sort_by": "newest", + "download_images": true + }' +``` + +**Check job status:** +```bash +curl "http://localhost:8000/jobs/{job_id}" +``` + +**List all jobs:** +```bash +curl "http://localhost:8000/jobs" +``` + +**Get job statistics:** +```bash +curl "http://localhost:8000/stats" +``` + +**Interactive API docs available at:** `http://localhost:8000/docs` + ### Battle-Tested Recipes 1. Stealth Mode + Fresh Stuff First: diff --git a/api_server.py b/api_server.py new file mode 100644 index 0000000..2eb257c --- /dev/null +++ b/api_server.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +""" +FastAPI server for Google Reviews Scraper. +Provides REST API endpoints to trigger and manage scraping jobs. +""" + +import logging +import asyncio +from contextlib import asynccontextmanager +from typing import Dict, Any, List, Optional + +from fastapi import FastAPI, HTTPException, BackgroundTasks, Query +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, HttpUrl, Field + +from modules.job_manager import JobManager, JobStatus, ScrapingJob + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +log = logging.getLogger("api_server") + +# Global job manager instance +job_manager: Optional[JobManager] = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Lifespan context manager for startup and shutdown""" + global job_manager + + # Startup + log.info("Starting Google Reviews Scraper API Server") + job_manager = JobManager(max_concurrent_jobs=3) + + # Start auto-cleanup task + asyncio.create_task(cleanup_jobs_periodically()) + + yield + + # Shutdown + log.info("Shutting down Google Reviews Scraper API Server") + if job_manager: + job_manager.shutdown() + + +# Initialize FastAPI app +app = FastAPI( + title="Google Reviews Scraper API", + description="REST API for triggering and managing Google Maps review scraping jobs", + version="1.0.0", + lifespan=lifespan +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Configure appropriately for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# Pydantic models for API +class ScrapeRequest(BaseModel): + """Request model for starting a scrape job""" + url: HttpUrl = Field(..., description="Google Maps URL to scrape") + headless: Optional[bool] = Field(None, description="Run Chrome in headless mode") + sort_by: Optional[str] = Field(None, description="Sort order: newest, highest, lowest, relevance") + stop_on_match: Optional[bool] = Field(None, description="Stop when first already-seen review is encountered") + overwrite_existing: Optional[bool] = Field(None, description="Overwrite existing reviews instead of appending") + download_images: Optional[bool] = Field(None, description="Download images from reviews") + use_s3: Optional[bool] = Field(None, description="Upload images to S3") + custom_params: Optional[Dict[str, Any]] = Field(None, description="Custom parameters to add to each document") + + +class JobResponse(BaseModel): + """Response model for job information""" + job_id: str + status: JobStatus + url: str + created_at: str + started_at: Optional[str] = None + completed_at: Optional[str] = None + error_message: Optional[str] = None + reviews_count: Optional[int] = None + images_count: Optional[int] = None + progress: Optional[Dict[str, Any]] = None + + +class JobStatsResponse(BaseModel): + """Response model for job statistics""" + total_jobs: int + by_status: Dict[str, int] + running_jobs: int + max_concurrent_jobs: int + + +# Background task for periodic cleanup +async def cleanup_jobs_periodically(): + """Periodically clean up old jobs""" + while True: + await asyncio.sleep(3600) # Run every hour + if job_manager: + job_manager.cleanup_old_jobs(max_age_hours=24) + + +# API Endpoints + +@app.get("/", summary="API Health Check") +async def root(): + """Health check endpoint""" + return { + "message": "Google Reviews Scraper API is running", + "status": "healthy", + "version": "1.0.0" + } + + +@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job") +async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks): + """ + Start a new scraping job in the background. + + Returns the job ID that can be used to check status. + """ + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + # Prepare config overrides + config_overrides = {} + + # Only include non-None values + for field, value in request.dict().items(): + if value is not None and field != "url": + config_overrides[field] = value + + # Convert URL to string + url = str(request.url) + + try: + # Create job + job_id = job_manager.create_job(url, config_overrides) + + # Start job immediately if possible + started = job_manager.start_job(job_id) + + log.info(f"Created scraping job {job_id} for URL: {url}") + + return { + "job_id": job_id, + "status": "started" if started else "queued", + "message": f"Scraping job {'started' if started else 'queued'} successfully" + } + + except Exception as e: + log.error(f"Error creating scraping job: {e}") + raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}") + + +@app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status") +async def get_job(job_id: str): + """Get detailed information about a specific job""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + job = job_manager.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + return JobResponse(**job.to_dict()) + + +@app.get("/jobs", response_model=List[JobResponse], summary="List Jobs") +async def list_jobs( + status: Optional[JobStatus] = Query(None, description="Filter by job status"), + limit: int = Query(100, description="Maximum number of jobs to return", ge=1, le=1000) +): + """List all jobs, optionally filtered by status""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + jobs = job_manager.list_jobs(status=status, limit=limit) + return [JobResponse(**job.to_dict()) for job in jobs] + + +@app.post("/jobs/{job_id}/start", summary="Start Pending Job") +async def start_job(job_id: str): + """Start a pending job manually""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + started = job_manager.start_job(job_id) + if not started: + job = job_manager.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + if job.status != JobStatus.PENDING: + raise HTTPException(status_code=400, detail=f"Job is not pending (current status: {job.status})") + + raise HTTPException(status_code=429, detail="Maximum concurrent jobs reached") + + return {"message": "Job started successfully"} + + +@app.post("/jobs/{job_id}/cancel", summary="Cancel Job") +async def cancel_job(job_id: str): + """Cancel a pending or running job""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + cancelled = job_manager.cancel_job(job_id) + if not cancelled: + job = job_manager.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + raise HTTPException(status_code=400, detail="Job cannot be cancelled (already completed, failed, or cancelled)") + + return {"message": "Job cancelled successfully"} + + +@app.delete("/jobs/{job_id}", summary="Delete Job") +async def delete_job(job_id: str): + """Delete a job from the system""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + deleted = job_manager.delete_job(job_id) + if not deleted: + raise HTTPException(status_code=404, detail="Job not found") + + return {"message": "Job deleted successfully"} + + +@app.get("/stats", response_model=JobStatsResponse, summary="Get Job Statistics") +async def get_stats(): + """Get job manager statistics""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + stats = job_manager.get_stats() + return JobStatsResponse(**stats) + + +@app.post("/cleanup", summary="Manual Job Cleanup") +async def cleanup_jobs(max_age_hours: int = Query(24, description="Maximum age in hours", ge=1)): + """Manually trigger cleanup of old completed/failed jobs""" + if not job_manager: + raise HTTPException(status_code=500, detail="Job manager not initialized") + + job_manager.cleanup_old_jobs(max_age_hours=max_age_hours) + return {"message": f"Cleaned up jobs older than {max_age_hours} hours"} + + +if __name__ == "__main__": + import uvicorn + + log.info("Starting FastAPI server...") + uvicorn.run( + "api_server:app", + host="0.0.0.0", + port=8000, + reload=True, + log_level="info" + ) \ No newline at end of file diff --git a/modules/job_manager.py b/modules/job_manager.py new file mode 100644 index 0000000..f082a7b --- /dev/null +++ b/modules/job_manager.py @@ -0,0 +1,311 @@ +""" +Background job manager for Google Reviews Scraper. +""" + +import asyncio +import logging +import threading +import time +import uuid +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from enum import Enum +from typing import Dict, Any, Optional, List +from dataclasses import dataclass, asdict + +from modules.config import load_config +from modules.scraper import GoogleReviewsScraper + +log = logging.getLogger("scraper") + + +class JobStatus(str, Enum): + """Job status enumeration""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class ScrapingJob: + """Scraping job data class""" + job_id: str + status: JobStatus + url: str + config: Dict[str, Any] + created_at: datetime + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + error_message: Optional[str] = None + reviews_count: Optional[int] = None + images_count: Optional[int] = None + progress: Dict[str, Any] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert job to dictionary for JSON serialization""" + data = asdict(self) + # Convert datetime objects to ISO strings + for field in ['created_at', 'started_at', 'completed_at']: + if data[field]: + data[field] = data[field].isoformat() + return data + + +class JobManager: + """Manager for background scraping jobs""" + + def __init__(self, max_concurrent_jobs: int = 3): + """Initialize job manager""" + self.max_concurrent_jobs = max_concurrent_jobs + self.jobs: Dict[str, ScrapingJob] = {} + self.executor = ThreadPoolExecutor(max_workers=max_concurrent_jobs) + self.lock = threading.Lock() + + def create_job(self, url: str, config_overrides: Dict[str, Any] = None) -> str: + """ + Create a new scraping job. + + Args: + url: Google Maps URL to scrape + config_overrides: Optional config overrides + + Returns: + Job ID + """ + job_id = str(uuid.uuid4()) + + # Load base config + config = load_config() + + # Apply URL + config["url"] = url + + # Apply any overrides + if config_overrides: + config.update(config_overrides) + + job = ScrapingJob( + job_id=job_id, + status=JobStatus.PENDING, + url=url, + config=config, + created_at=datetime.now(), + progress={"stage": "created", "message": "Job created and queued"} + ) + + with self.lock: + self.jobs[job_id] = job + + log.info(f"Created scraping job {job_id} for URL: {url}") + return job_id + + def start_job(self, job_id: str) -> bool: + """ + Start a pending job. + + Args: + job_id: Job ID to start + + Returns: + True if job was started, False otherwise + """ + with self.lock: + if job_id not in self.jobs: + return False + + job = self.jobs[job_id] + if job.status != JobStatus.PENDING: + return False + + # Check if we can start more jobs + running_count = sum(1 for j in self.jobs.values() if j.status == JobStatus.RUNNING) + if running_count >= self.max_concurrent_jobs: + return False + + job.status = JobStatus.RUNNING + job.started_at = datetime.now() + job.progress = {"stage": "starting", "message": "Initializing scraper"} + + # Submit job to thread pool + future = self.executor.submit(self._run_scraping_job, job_id) + + log.info(f"Started scraping job {job_id}") + return True + + def _run_scraping_job(self, job_id: str): + """ + Run the actual scraping job in background thread. + + Args: + job_id: Job ID to run + """ + try: + with self.lock: + job = self.jobs[job_id] + job.progress = {"stage": "initializing", "message": "Setting up scraper"} + + # Create scraper with job config + scraper = GoogleReviewsScraper(job.config) + + # Hook into scraper progress (if available) + # This would require modifying the scraper to report progress + + with self.lock: + job.progress = {"stage": "scraping", "message": "Scraping reviews in progress"} + + # Run the scraping + scraper.scrape() + + # Mark job as completed + with self.lock: + job.status = JobStatus.COMPLETED + job.completed_at = datetime.now() + job.progress = {"stage": "completed", "message": "Scraping completed successfully"} + + # Try to get results count if available + # This would require scraper to return results + job.reviews_count = getattr(scraper, 'total_reviews', None) + job.images_count = getattr(scraper, 'total_images', None) + + log.info(f"Completed scraping job {job_id}") + + except Exception as e: + log.error(f"Error in scraping job {job_id}: {e}") + with self.lock: + job = self.jobs[job_id] + job.status = JobStatus.FAILED + job.completed_at = datetime.now() + job.error_message = str(e) + job.progress = {"stage": "failed", "message": f"Job failed: {str(e)}"} + + def get_job(self, job_id: str) -> Optional[ScrapingJob]: + """ + Get job by ID. + + Args: + job_id: Job ID + + Returns: + Job object or None if not found + """ + with self.lock: + return self.jobs.get(job_id) + + def list_jobs(self, status: Optional[JobStatus] = None, limit: int = 100) -> List[ScrapingJob]: + """ + List jobs, optionally filtered by status. + + Args: + status: Optional status filter + limit: Maximum number of jobs to return + + Returns: + List of jobs + """ + with self.lock: + jobs = list(self.jobs.values()) + + if status: + jobs = [job for job in jobs if job.status == status] + + # Sort by creation time (newest first) + jobs.sort(key=lambda x: x.created_at, reverse=True) + + return jobs[:limit] + + def cancel_job(self, job_id: str) -> bool: + """ + Cancel a pending or running job. + + Args: + job_id: Job ID to cancel + + Returns: + True if job was cancelled, False otherwise + """ + with self.lock: + if job_id not in self.jobs: + return False + + job = self.jobs[job_id] + if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + return False + + job.status = JobStatus.CANCELLED + job.completed_at = datetime.now() + job.progress = {"stage": "cancelled", "message": "Job was cancelled"} + + log.info(f"Cancelled scraping job {job_id}") + return True + + def delete_job(self, job_id: str) -> bool: + """ + Delete a job from the manager. + + Args: + job_id: Job ID to delete + + Returns: + True if job was deleted, False otherwise + """ + with self.lock: + if job_id not in self.jobs: + return False + del self.jobs[job_id] + + log.info(f"Deleted scraping job {job_id}") + return True + + def get_stats(self) -> Dict[str, Any]: + """ + Get job manager statistics. + + Returns: + Statistics dictionary + """ + with self.lock: + jobs = list(self.jobs.values()) + + stats = { + "total_jobs": len(jobs), + "by_status": {}, + "running_jobs": 0, + "max_concurrent_jobs": self.max_concurrent_jobs + } + + for status in JobStatus: + count = sum(1 for job in jobs if job.status == status) + stats["by_status"][status.value] = count + + stats["running_jobs"] = stats["by_status"].get(JobStatus.RUNNING.value, 0) + + return stats + + def cleanup_old_jobs(self, max_age_hours: int = 24): + """ + Clean up old completed/failed jobs. + + Args: + max_age_hours: Maximum age in hours before cleanup + """ + cutoff_time = datetime.now().timestamp() - (max_age_hours * 3600) + + with self.lock: + to_delete = [] + for job_id, job in self.jobs.items(): + if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + if job.completed_at and job.completed_at.timestamp() < cutoff_time: + to_delete.append(job_id) + + for job_id in to_delete: + del self.jobs[job_id] + + if to_delete: + log.info(f"Cleaned up {len(to_delete)} old jobs") + + def shutdown(self): + """Shutdown the job manager""" + log.info("Shutting down job manager") + self.executor.shutdown(wait=True) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 728ee14..2f56ca0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,8 @@ certifi==2024.7.4 webdriver-manager==4.0.2 setuptools==79.0.1 boto3==1.35.1 -pytest==7.4.3 \ No newline at end of file +pytest==7.4.3 +fastapi==0.104.1 +uvicorn==0.24.0 +botocore~=1.35.99 +pydantic~=2.11.5 \ No newline at end of file