From 788ef84756a6f14723da379b6ad9a699a7a459bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 15:35:58 +0000 Subject: [PATCH] Phases 2-4: Requester support, batches, webhooks, scraper registry Phase 2 - Requester & Batch Support: - core/database.py: Added create_job params (requester_*, batch_*, priority, callback_*) - core/database.py: Added batch methods (create_batch, get_batch, update_batch_progress, get_batches) - core/database.py: Added update_job_callback for tracking webhook delivery - api/routes/batches.py: New endpoints: - POST /api/scrape/google-reviews/batch (submit batch) - GET /api/batches (list batches) - GET /api/batches/{id} (batch detail) - DELETE /api/batches/{id} (cancel batch) - api_server_production.py: Updated /api/scrape with requester, priority, callback fields - api_server_production.py: New primary endpoint POST /api/scrape/google-reviews Phase 3 - Webhooks: - services/job_callback_service.py: New service with: - JobCallbackService: send_job_callback, send_batch_callback, retry_failed_callbacks - JobCallbackDispatcher: Background worker for callback monitoring - Payload formats per spec (job.completed, job.failed, batch.completed) - Exponential backoff for retries - Error classification for failure payloads Phase 4 - Scraper Registry: - scrapers/registry.py: Database-backed version routing: - get_scraper(): Version/variant/A/B routing - _get_weighted_scraper(): Traffic-weighted random selection - 60-second TTL cache for performance - register_scraper, deprecate_scraper, update_traffic_allocation - LegacyScraperRegistry preserved for backwards compatibility Co-Authored-By: Claude Opus 4.5 --- .artifacts/CONTEXT-KEEPER.md | 2 +- api/routes/__init__.py | 11 + api/routes/batches.py | 672 +++++++++++++++++++++++++++++ api_server_production.py | 267 ++++++++++-- core/database.py | 391 +++++++++++++++-- scrapers/registry.py | 512 +++++++++++++++++++++- services/__init__.py | 29 ++ services/job_callback_service.py | 717 +++++++++++++++++++++++++++++++ 8 files changed, 2503 insertions(+), 98 deletions(-) create mode 100644 api/routes/batches.py create mode 100644 services/job_callback_service.py diff --git a/.artifacts/CONTEXT-KEEPER.md b/.artifacts/CONTEXT-KEEPER.md index 4caad8f..be97e14 100644 --- a/.artifacts/CONTEXT-KEEPER.md +++ b/.artifacts/CONTEXT-KEEPER.md @@ -104,7 +104,7 @@ reviewiq/ # Will rename from google-reviews-scraper-pro | Phase | Description | Status | |-------|-------------|--------| | 0 | Project restructure (move files to new locations) | ✅ COMPLETE | -| 1 | Database migrations (new fields + tables) | Not started | +| 1 | Database migrations (new fields + tables) | ✅ COMPLETE | | 2 | Requester & batch support | Not started | | 3 | Webhooks | Not started | | 4 | Scraper versioning & registry | Not started | diff --git a/api/routes/__init__.py b/api/routes/__init__.py index e69de29..ec6842a 100644 --- a/api/routes/__init__.py +++ b/api/routes/__init__.py @@ -0,0 +1,11 @@ +""" +API Routes for ReviewIQ. + +This module exports all route modules for easy import into the main server. +""" +from api.routes.batches import router as batches_router, set_database as set_batches_db + +__all__ = [ + 'batches_router', + 'set_batches_db', +] diff --git a/api/routes/batches.py b/api/routes/batches.py new file mode 100644 index 0000000..a07ffab --- /dev/null +++ b/api/routes/batches.py @@ -0,0 +1,672 @@ +#!/usr/bin/env python3 +""" +Batch submission API for ReviewIQ Phase 2. + +Enables submitting multiple URLs as a batch with: +- Batch-level tracking and callback +- Individual job creation for each URL +- Batch status aggregation +- Batch cancellation +""" +import asyncio +import json +import logging +from datetime import datetime +from typing import Optional, List, Dict, Any +from uuid import UUID, uuid4 + +from fastapi import APIRouter, HTTPException, Query, Depends +from pydantic import BaseModel, HttpUrl, Field, validator + +from core.database import DatabaseManager +from core.enums import JobStatus + +log = logging.getLogger(__name__) + +# Create router +router = APIRouter(prefix="/api", tags=["batches"]) + + +# ==================== Pydantic Models ==================== + +class RequesterModel(BaseModel): + """Requester information for batch tracking""" + client_id: str = Field(..., description="Client identifier (e.g., 'veritas_123')") + source: str = Field(..., description="Source of the request (e.g., 'veritasreview.com')") + purpose: Optional[str] = Field(None, description="Purpose of the batch (e.g., 'prospect_screening')") + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional requester metadata") + + +class BatchSubmitRequest(BaseModel): + """Request model for submitting a batch of URLs""" + name: str = Field(..., description="Batch name for identification", min_length=1, max_length=255) + urls: List[HttpUrl] = Field(..., description="List of Google Maps URLs to scrape", min_items=1, max_items=1000) + requester: RequesterModel = Field(..., description="Requester information") + priority: int = Field(default=0, description="Priority level (higher = more urgent)", ge=-10, le=10) + callback_url: Optional[HttpUrl] = Field(None, description="Webhook URL called when ALL jobs complete") + + @validator('urls') + def validate_unique_urls(cls, v): + """Ensure all URLs are unique""" + unique_urls = list(set(str(url) for url in v)) + if len(unique_urls) != len(v): + raise ValueError('Duplicate URLs are not allowed in a batch') + return v + + +class BatchSubmitResponse(BaseModel): + """Response model for batch submission""" + batch_id: str = Field(..., description="Unique batch identifier") + job_ids: List[str] = Field(..., description="List of created job IDs") + total_jobs: int = Field(..., description="Total number of jobs in the batch") + + +class BatchJobSummary(BaseModel): + """Summary of a job within a batch""" + job_id: str + url: str + status: str + reviews_count: Optional[int] = None + error_message: Optional[str] = None + created_at: str + completed_at: Optional[str] = None + + +class BatchSummary(BaseModel): + """Summary model for batch listing""" + batch_id: str + name: str + client_id: str + status: str # pending, running, completed, partial, failed + total_jobs: int + pending_jobs: int + running_jobs: int + completed_jobs: int + failed_jobs: int + total_reviews: int + created_at: str + updated_at: Optional[str] = None + completed_at: Optional[str] = None + + +class BatchDetailResponse(BaseModel): + """Detailed batch information with job list""" + batch_id: str + name: str + requester: RequesterModel + priority: int + callback_url: Optional[str] = None + status: str + total_jobs: int + pending_jobs: int + running_jobs: int + completed_jobs: int + failed_jobs: int + total_reviews: int + created_at: str + updated_at: Optional[str] = None + completed_at: Optional[str] = None + jobs: List[BatchJobSummary] + + +class BatchCancelResponse(BaseModel): + """Response model for batch cancellation""" + batch_id: str + cancelled_jobs: int + already_completed: int + message: str + + +# ==================== Database Helper Functions ==================== +# These will be added to core.database in a future refactor, +# but for now we implement them here for the batch functionality. + +async def create_batch( + db: DatabaseManager, + name: str, + requester: RequesterModel, + priority: int = 0, + callback_url: Optional[str] = None +) -> UUID: + """ + Create a new batch record in the database. + + Returns: + UUID of created batch + """ + async with db.pool.acquire() as conn: + # First ensure batch table exists + await conn.execute(""" + CREATE TABLE IF NOT EXISTS batches ( + batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + client_id VARCHAR(255) NOT NULL, + requester JSONB NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + callback_url TEXT, + callback_delivered BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP, + completed_at TIMESTAMP + ); + """) + + # Create index on client_id if not exists + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_batches_client_id ON batches(client_id); + """) + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_batches_created_at ON batches(created_at DESC); + """) + + # Ensure batch_id column exists in jobs table + await conn.execute(""" + ALTER TABLE jobs ADD COLUMN IF NOT EXISTS batch_id UUID REFERENCES batches(batch_id) ON DELETE SET NULL; + """) + await conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_jobs_batch_id ON jobs(batch_id) WHERE batch_id IS NOT NULL; + """) + + batch_id = await conn.fetchval(""" + INSERT INTO batches (name, client_id, requester, priority, callback_url) + VALUES ($1, $2, $3, $4, $5) + RETURNING batch_id + """, name, requester.client_id, json.dumps(requester.dict()), priority, callback_url) + + log.info(f"Created batch {batch_id}: '{name}' for client {requester.client_id}") + return batch_id + + +async def create_job_in_batch( + db: DatabaseManager, + batch_id: UUID, + url: str, + priority: int = 0, + metadata: Optional[Dict[str, Any]] = None +) -> UUID: + """ + Create a job that belongs to a batch. + + Returns: + UUID of created job + """ + job_metadata = metadata or {} + job_metadata['batch_id'] = str(batch_id) + job_metadata['priority'] = priority + + async with db.pool.acquire() as conn: + job_id = await conn.fetchval(""" + INSERT INTO jobs (url, batch_id, metadata) + VALUES ($1, $2, $3) + RETURNING job_id + """, url, batch_id, json.dumps(job_metadata)) + + return job_id + + +async def get_batch(db: DatabaseManager, batch_id: UUID) -> Optional[Dict[str, Any]]: + """ + Get batch by ID with aggregated job stats. + + Returns: + Batch dictionary with job stats or None if not found + """ + async with db.pool.acquire() as conn: + # Get batch basic info + batch_row = await conn.fetchrow(""" + SELECT + batch_id, + name, + client_id, + requester, + priority, + callback_url, + callback_delivered, + created_at, + updated_at, + completed_at + FROM batches + WHERE batch_id = $1 + """, batch_id) + + if not batch_row: + return None + + # Get job stats for this batch + stats = await conn.fetchrow(""" + SELECT + COUNT(*) as total_jobs, + COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs, + COUNT(*) FILTER (WHERE status = 'running') as running_jobs, + COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs, + COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed_jobs, + COALESCE(SUM(reviews_count), 0) as total_reviews + FROM jobs + WHERE batch_id = $1 + """, batch_id) + + result = dict(batch_row) + result.update(dict(stats)) + + # Calculate batch status + total = stats['total_jobs'] + completed = stats['completed_jobs'] + failed = stats['failed_jobs'] + running = stats['running_jobs'] + + if total == 0: + result['status'] = 'pending' + elif completed + failed == total: + if failed == total: + result['status'] = 'failed' + elif failed > 0: + result['status'] = 'partial' + else: + result['status'] = 'completed' + elif running > 0 or completed > 0: + result['status'] = 'running' + else: + result['status'] = 'pending' + + return result + + +async def get_batch_jobs(db: DatabaseManager, batch_id: UUID) -> List[Dict[str, Any]]: + """ + Get all jobs in a batch. + + Returns: + List of job dictionaries + """ + async with db.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT + job_id, + url, + status, + reviews_count, + error_message, + created_at, + completed_at + FROM jobs + WHERE batch_id = $1 + ORDER BY created_at ASC + """, batch_id) + + return [dict(row) for row in rows] + + +async def list_batches( + db: DatabaseManager, + client_id: Optional[str] = None, + status: Optional[str] = None, + limit: int = 100, + offset: int = 0 +) -> List[Dict[str, Any]]: + """ + List batches with optional filtering. + + Returns: + List of batch summary dictionaries + """ + async with db.pool.acquire() as conn: + # Build query dynamically based on filters + where_clauses = [] + params = [] + param_idx = 1 + + if client_id: + where_clauses.append(f"b.client_id = ${param_idx}") + params.append(client_id) + param_idx += 1 + + where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" + + query = f""" + SELECT + b.batch_id, + b.name, + b.client_id, + b.created_at, + b.updated_at, + b.completed_at, + COUNT(j.job_id) as total_jobs, + COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs, + COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs, + COUNT(j.job_id) FILTER (WHERE j.status = 'completed') as completed_jobs, + COUNT(j.job_id) FILTER (WHERE j.status IN ('failed', 'partial')) as failed_jobs, + COALESCE(SUM(j.reviews_count), 0) as total_reviews + FROM batches b + LEFT JOIN jobs j ON j.batch_id = b.batch_id + {where_sql} + GROUP BY b.batch_id + ORDER BY b.created_at DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """ + params.extend([limit, offset]) + + rows = await conn.fetch(query, *params) + + results = [] + for row in rows: + batch = dict(row) + + # Calculate status + total = batch['total_jobs'] + completed = batch['completed_jobs'] + failed = batch['failed_jobs'] + running = batch['running_jobs'] + + if total == 0: + batch['status'] = 'pending' + elif completed + failed == total: + if failed == total: + batch['status'] = 'failed' + elif failed > 0: + batch['status'] = 'partial' + else: + batch['status'] = 'completed' + elif running > 0 or completed > 0: + batch['status'] = 'running' + else: + batch['status'] = 'pending' + + # Filter by status if specified + if status and batch['status'] != status: + continue + + results.append(batch) + + return results + + +async def cancel_batch_jobs(db: DatabaseManager, batch_id: UUID) -> tuple: + """ + Cancel all pending jobs in a batch. + + Returns: + Tuple of (cancelled_count, already_completed_count) + """ + async with db.pool.acquire() as conn: + # Count already completed/failed jobs + already_done = await conn.fetchval(""" + SELECT COUNT(*) + FROM jobs + WHERE batch_id = $1 AND status IN ('completed', 'failed', 'partial') + """, batch_id) + + # Cancel pending and running jobs + result = await conn.execute(""" + UPDATE jobs + SET status = 'cancelled', completed_at = NOW() + WHERE batch_id = $1 AND status IN ('pending', 'running') + """, batch_id) + + cancelled_count = int(result.split()[-1]) + + # Update batch completed_at if all jobs are now done + await conn.execute(""" + UPDATE batches + SET updated_at = NOW(), + completed_at = CASE + WHEN NOT EXISTS ( + SELECT 1 FROM jobs + WHERE batch_id = $1 AND status IN ('pending', 'running') + ) THEN NOW() + ELSE completed_at + END + WHERE batch_id = $1 + """, batch_id) + + log.info(f"Cancelled {cancelled_count} jobs in batch {batch_id}") + return cancelled_count, already_done or 0 + + +# ==================== Dependency Injection ==================== + +# Database instance will be injected from the main app +# This is a placeholder that should be overridden when including the router +_db: Optional[DatabaseManager] = None + + +def set_database(db: DatabaseManager): + """Set the database instance for the router""" + global _db + _db = db + + +def get_db() -> DatabaseManager: + """Dependency to get database instance""" + if _db is None: + raise HTTPException(status_code=500, detail="Database not initialized") + return _db + + +# ==================== API Endpoints ==================== + +@router.post( + "/scrape/google-reviews/batch", + response_model=BatchSubmitResponse, + summary="Submit Batch of URLs", + description="Submit multiple Google Maps URLs as a batch for scraping" +) +async def submit_batch( + request: BatchSubmitRequest, + db: DatabaseManager = Depends(get_db) +): + """ + Submit multiple URLs as a batch for scraping. + + Each URL becomes an individual job that can be tracked separately, + while the batch provides aggregate status and an optional callback + when all jobs complete. + + - **name**: Descriptive name for the batch (e.g., "Q1 Prospects") + - **urls**: List of Google Maps URLs (1-1000 URLs) + - **requester**: Client identification for tracking + - **priority**: Higher priority batches may be processed first (default: 0) + - **callback_url**: Webhook called when ALL jobs in the batch complete + + Returns batch_id and list of individual job_ids for tracking. + """ + try: + # Create the batch record + batch_id = await create_batch( + db=db, + name=request.name, + requester=request.requester, + priority=request.priority, + callback_url=str(request.callback_url) if request.callback_url else None + ) + + # Create individual jobs for each URL + job_ids = [] + for url in request.urls: + job_id = await create_job_in_batch( + db=db, + batch_id=batch_id, + url=str(url), + priority=request.priority, + metadata={ + 'requester': request.requester.dict(), + 'batch_name': request.name + } + ) + job_ids.append(str(job_id)) + + log.info(f"Created batch {batch_id} with {len(job_ids)} jobs for client {request.requester.client_id}") + + return BatchSubmitResponse( + batch_id=str(batch_id), + job_ids=job_ids, + total_jobs=len(job_ids) + ) + + except Exception as e: + log.error(f"Error creating batch: {e}") + raise HTTPException(status_code=500, detail=f"Failed to create batch: {str(e)}") + + +@router.get( + "/batches", + response_model=List[BatchSummary], + summary="List Batches", + description="List batches with optional filtering by client_id and status" +) +async def list_batches_endpoint( + client_id: Optional[str] = Query(None, description="Filter by client ID"), + status: Optional[str] = Query(None, description="Filter by batch status (pending, running, completed, partial, failed)"), + limit: int = Query(100, description="Maximum number of batches to return", ge=1, le=1000), + offset: int = Query(0, description="Number of batches to skip", ge=0), + db: DatabaseManager = Depends(get_db) +): + """ + List batches with optional filters. + + - **client_id**: Filter to only show batches for a specific client + - **status**: Filter by batch status: + - pending: No jobs have started yet + - running: At least one job is running or completed + - completed: All jobs completed successfully + - partial: All jobs done, but some failed + - failed: All jobs failed + - **limit**: Maximum results (default: 100) + - **offset**: Skip first N results for pagination + """ + try: + batches = await list_batches( + db=db, + client_id=client_id, + status=status, + limit=limit, + offset=offset + ) + + return [ + BatchSummary( + batch_id=str(b['batch_id']), + name=b['name'], + client_id=b['client_id'], + status=b['status'], + total_jobs=b['total_jobs'], + pending_jobs=b['pending_jobs'], + running_jobs=b['running_jobs'], + completed_jobs=b['completed_jobs'], + failed_jobs=b['failed_jobs'], + total_reviews=b['total_reviews'], + created_at=b['created_at'].isoformat(), + updated_at=b['updated_at'].isoformat() if b.get('updated_at') else None, + completed_at=b['completed_at'].isoformat() if b.get('completed_at') else None + ) + for b in batches + ] + + except Exception as e: + log.error(f"Error listing batches: {e}") + raise HTTPException(status_code=500, detail=f"Failed to list batches: {str(e)}") + + +@router.get( + "/batches/{batch_id}", + response_model=BatchDetailResponse, + summary="Get Batch Details", + description="Get detailed batch information including all jobs" +) +async def get_batch_details( + batch_id: UUID, + db: DatabaseManager = Depends(get_db) +): + """ + Get detailed information about a specific batch. + + Returns batch metadata, aggregate statistics, and a list of all jobs + with their individual statuses. + """ + try: + batch = await get_batch(db, batch_id) + if not batch: + raise HTTPException(status_code=404, detail="Batch not found") + + jobs = await get_batch_jobs(db, batch_id) + + # Parse requester from JSONB + requester_data = batch['requester'] + if isinstance(requester_data, str): + requester_data = json.loads(requester_data) + + return BatchDetailResponse( + batch_id=str(batch['batch_id']), + name=batch['name'], + requester=RequesterModel(**requester_data), + priority=batch['priority'], + callback_url=batch.get('callback_url'), + status=batch['status'], + total_jobs=batch['total_jobs'], + pending_jobs=batch['pending_jobs'], + running_jobs=batch['running_jobs'], + completed_jobs=batch['completed_jobs'], + failed_jobs=batch['failed_jobs'], + total_reviews=batch['total_reviews'], + created_at=batch['created_at'].isoformat(), + updated_at=batch['updated_at'].isoformat() if batch.get('updated_at') else None, + completed_at=batch['completed_at'].isoformat() if batch.get('completed_at') else None, + jobs=[ + BatchJobSummary( + job_id=str(j['job_id']), + url=j['url'], + status=j['status'], + reviews_count=j.get('reviews_count'), + error_message=j.get('error_message'), + created_at=j['created_at'].isoformat(), + completed_at=j['completed_at'].isoformat() if j.get('completed_at') else None + ) + for j in jobs + ] + ) + + except HTTPException: + raise + except Exception as e: + log.error(f"Error getting batch {batch_id}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to get batch: {str(e)}") + + +@router.delete( + "/batches/{batch_id}", + response_model=BatchCancelResponse, + summary="Cancel Batch", + description="Cancel all pending jobs in a batch" +) +async def cancel_batch( + batch_id: UUID, + db: DatabaseManager = Depends(get_db) +): + """ + Cancel all pending and running jobs in a batch. + + Already completed or failed jobs cannot be cancelled and will remain + with their current status. + + Returns the count of cancelled jobs and already-completed jobs. + """ + try: + # Verify batch exists + batch = await get_batch(db, batch_id) + if not batch: + raise HTTPException(status_code=404, detail="Batch not found") + + # Cancel the jobs + cancelled, already_done = await cancel_batch_jobs(db, batch_id) + + return BatchCancelResponse( + batch_id=str(batch_id), + cancelled_jobs=cancelled, + already_completed=already_done, + message=f"Cancelled {cancelled} jobs. {already_done} jobs were already completed/failed." + ) + + except HTTPException: + raise + except Exception as e: + log.error(f"Error cancelling batch {batch_id}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to cancel batch: {str(e)}") diff --git a/api_server_production.py b/api_server_production.py index 4e3ec92..995247f 100644 --- a/api_server_production.py +++ b/api_server_production.py @@ -1,9 +1,14 @@ #!/usr/bin/env python3 """ -Production Google Reviews Scraper API Server with Phase 1 features: +Production Google Reviews Scraper API Server with Phase 1 & 2 features: - PostgreSQL storage with JSONB - Webhook delivery with retries - Smart health checks with canary testing +- Phase 2: Requester tracking (client_id, source, purpose) +- Phase 2: Job priority for queue ordering +- Phase 2: Callback URL alternative to webhooks +- Phase 2: Scraper version/variant selection for A/B testing +- Phase 2: Explicit job type endpoint (/api/scrape/google-reviews) """ import asyncio import json @@ -35,6 +40,7 @@ from workers.chrome_pool import ( release_scraping_worker, get_pool_stats ) +from api.routes import batches_router, set_batches_db # Configure logging logging.basicConfig( @@ -78,6 +84,9 @@ async def lifespan(app: FastAPI): await db.initialize_schema() log.info("Database initialized") + # Inject database into route modules + set_batches_db(db) + # Initialize health check system with canary monitoring # DISABLED: Canary tests consume Google Maps requests and trigger rate limiting # health_system = HealthCheckSystem(db) @@ -134,6 +143,9 @@ app.add_middleware( allow_headers=["*"], ) +# Include routers from api/routes/ +app.include_router(batches_router) + # ==================== Request/Response Models ==================== @@ -159,14 +171,44 @@ class BrowserFingerprintModel(BaseModel): platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)") +class RequesterModel(BaseModel): + """Information about the requester of a scrape job""" + client_id: Optional[str] = Field(None, description="Client identifier") + source: Optional[str] = Field(None, description="Source of the request (e.g., 'web', 'api', 'internal')") + purpose: Optional[str] = Field(None, description="Purpose of the scrape (e.g., 'competitor_analysis', 'review_monitoring')") + metadata: Optional[Dict[str, Any]] = Field(None, description="Additional requester metadata") + + class ScrapeRequest(BaseModel): - """Request model for starting a scrape job""" + """Request model for starting a scrape job (legacy endpoint, routes to google-reviews)""" url: HttpUrl = Field(..., description="Google Maps URL to scrape") webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications") webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature") metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata") geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome") browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint") + # Phase 2: New optional fields for enhanced job tracking + requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job") + priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100) + callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)") + scraper_version: Optional[str] = Field(None, description="Specific scraper version to use") + scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')") + + +class GoogleReviewsScrapeRequest(BaseModel): + """Request model for Google Reviews scraping - explicit job type endpoint""" + url: HttpUrl = Field(..., description="Google Maps URL to scrape") + webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications") + webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature") + metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata") + geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome") + browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint") + # Phase 2: New optional fields for enhanced job tracking + requester: Optional[RequesterModel] = Field(None, description="Information about who requested this job") + priority: Optional[int] = Field(0, description="Job priority (higher = more important)", ge=0, le=100) + callback_url: Optional[HttpUrl] = Field(None, description="URL to call when job completes (alternative to webhook)") + scraper_version: Optional[str] = Field(None, description="Specific scraper version to use") + scraper_variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'fast', 'thorough', 'stealth')") class JobResponse(BaseModel): @@ -267,10 +309,146 @@ async def root(): } -@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job") +async def _create_google_reviews_job( + url: str, + webhook_url: Optional[str] = None, + webhook_secret: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + browser_fingerprint: Optional[BrowserFingerprintModel] = None, + geolocation: Optional[GeolocationModel] = None, + requester: Optional[RequesterModel] = None, + priority: int = 0, + callback_url: Optional[str] = None, + scraper_version: Optional[str] = None, + scraper_variant: Optional[str] = None, + job_type: str = "google-reviews" +) -> Dict[str, str]: + """ + Core logic for creating a Google Reviews scraping job. + + This is the shared implementation used by both /scrape and /api/scrape/google-reviews endpoints. + + Returns: + Dict with job_id, status, and message + """ + if not db: + raise HTTPException(status_code=500, detail="Database not initialized") + + try: + # Build metadata with all Phase 2 fields + job_metadata = metadata.copy() if metadata else {} + + # Add browser fingerprint if provided + if browser_fingerprint: + fp = browser_fingerprint + job_metadata['browser_fingerprint'] = { + "userAgent": fp.userAgent, + "timezone": fp.timezone, + "language": fp.language, + "platform": fp.platform, + } + if fp.viewport: + job_metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height} + if fp.geolocation: + job_metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng} + elif geolocation: + job_metadata['geolocation'] = { + 'lat': geolocation.lat, + 'lng': geolocation.lng + } + + # Phase 2: Add requester info if provided + if requester: + job_metadata['requester'] = { + 'client_id': requester.client_id, + 'source': requester.source, + 'purpose': requester.purpose, + 'metadata': requester.metadata + } + + # Phase 2: Add job type for multi-scraper support + job_metadata['job_type'] = job_type + + # Phase 2: Add priority for job queue ordering + job_metadata['priority'] = priority + + # Phase 2: Add callback_url (alternative to webhook) + if callback_url: + job_metadata['callback_url'] = callback_url + + # Phase 2: Add scraper version/variant for A/B testing and version control + if scraper_version: + job_metadata['scraper_version'] = scraper_version + if scraper_variant: + job_metadata['scraper_variant'] = scraper_variant + + # Create job in database + job_id = await db.create_job( + url=url, + webhook_url=webhook_url, + webhook_secret=webhook_secret, + metadata=job_metadata + ) + + # Start scraping job in background + asyncio.create_task(run_scraping_job(job_id)) + + log.info(f"Created and started job {job_id} (type={job_type}, priority={priority})") + + return { + "job_id": str(job_id), + "status": "started", + "message": "Scraping job started successfully", + "job_type": job_type + } + + except Exception as e: + log.error(f"Error creating scraping job: {e}") + raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}") + + +@app.post("/api/scrape/google-reviews", response_model=Dict[str, str], summary="Start Google Reviews Scraping Job") +async def scrape_google_reviews(request: GoogleReviewsScrapeRequest): + """ + Start a new Google Reviews scraping job. + + This is the primary endpoint for Phase 2 onwards. It explicitly creates a job + of type 'google-reviews' with full support for all Phase 2 features: + - Requester tracking (client_id, source, purpose) + - Job priority for queue ordering + - Callback URL (alternative to webhook) + - Scraper version/variant selection for A/B testing + + The job runs asynchronously in the background. You can: + - Poll GET /jobs/{job_id} for status + - Provide webhook_url for automatic notification when complete + - Subscribe to SSE at /jobs/{job_id}/stream for real-time updates + + Returns the job ID for tracking. + """ + return await _create_google_reviews_job( + url=str(request.url), + webhook_url=str(request.webhook_url) if request.webhook_url else None, + webhook_secret=request.webhook_secret, + metadata=request.metadata, + browser_fingerprint=request.browser_fingerprint, + geolocation=request.geolocation, + requester=request.requester, + priority=request.priority or 0, + callback_url=str(request.callback_url) if request.callback_url else None, + scraper_version=request.scraper_version, + scraper_variant=request.scraper_variant, + job_type="google-reviews" + ) + + +@app.post("/scrape", response_model=Dict[str, str], summary="Start Scraping Job (Legacy)") async def start_scrape(request: ScrapeRequest): """ - Start a new scraping job. + Start a new scraping job (legacy endpoint, routes to google-reviews). + + **NOTE**: This endpoint is maintained for backwards compatibility. + For new integrations, use POST /api/scrape/google-reviews instead. The job runs asynchronously in the background. You can: - Poll GET /jobs/{job_id} for status @@ -278,52 +456,51 @@ async def start_scrape(request: ScrapeRequest): Returns the job ID for tracking. """ - if not db: - raise HTTPException(status_code=500, detail="Database not initialized") + return await _create_google_reviews_job( + url=str(request.url), + webhook_url=str(request.webhook_url) if request.webhook_url else None, + webhook_secret=request.webhook_secret, + metadata=request.metadata, + browser_fingerprint=request.browser_fingerprint, + geolocation=request.geolocation, + requester=request.requester, + priority=request.priority or 0, + callback_url=str(request.callback_url) if request.callback_url else None, + scraper_version=request.scraper_version, + scraper_variant=request.scraper_variant, + job_type="google-reviews" + ) - try: - # Merge browser fingerprint into metadata if provided - metadata = request.metadata or {} - if request.browser_fingerprint: - fp = request.browser_fingerprint - metadata['browser_fingerprint'] = { - "userAgent": fp.userAgent, - "timezone": fp.timezone, - "language": fp.language, - "platform": fp.platform, - } - if fp.viewport: - metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height} - if fp.geolocation: - metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng} - elif request.geolocation: - metadata['geolocation'] = { - 'lat': request.geolocation.lat, - 'lng': request.geolocation.lng - } - # Create job in database - job_id = await db.create_job( - url=str(request.url), - webhook_url=str(request.webhook_url) if request.webhook_url else None, - webhook_secret=request.webhook_secret, - metadata=metadata - ) +@app.post("/api/scrape", response_model=Dict[str, str], summary="Start Scraping Job") +async def api_start_scrape(request: ScrapeRequest): + """ + Start a new scraping job via the /api/scrape endpoint. - # Start scraping job in background - asyncio.create_task(run_scraping_job(job_id)) + This endpoint accepts the same request body as /scrape and routes to google-reviews. + For explicit job type control, use POST /api/scrape/google-reviews instead. - log.info(f"Created and started job {job_id}") + The job runs asynchronously in the background. You can: + - Poll GET /jobs/{job_id} for status + - Provide webhook_url for automatic notification when complete + - Subscribe to SSE at /jobs/{job_id}/stream for real-time updates - return { - "job_id": str(job_id), - "status": "started", - "message": "Scraping job started successfully" - } - - except Exception as e: - log.error(f"Error creating scraping job: {e}") - raise HTTPException(status_code=500, detail=f"Failed to create scraping job: {str(e)}") + Returns the job ID for tracking. + """ + return await _create_google_reviews_job( + url=str(request.url), + webhook_url=str(request.webhook_url) if request.webhook_url else None, + webhook_secret=request.webhook_secret, + metadata=request.metadata, + browser_fingerprint=request.browser_fingerprint, + geolocation=request.geolocation, + requester=request.requester, + priority=request.priority or 0, + callback_url=str(request.callback_url) if request.callback_url else None, + scraper_version=request.scraper_version, + scraper_variant=request.scraper_variant, + job_type="google-reviews" + ) @app.get("/jobs/{job_id}", response_model=JobResponse, summary="Get Job Status") diff --git a/core/database.py b/core/database.py index aaeb842..ce3cae5 100644 --- a/core/database.py +++ b/core/database.py @@ -189,7 +189,22 @@ class DatabaseManager: url: str, webhook_url: Optional[str] = None, webhook_secret: Optional[str] = None, - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None, + # Phase 2: New fields for requester tracking + requester_client_id: Optional[str] = None, + requester_source: Optional[str] = None, + scrape_purpose: Optional[str] = None, + requester_metadata: Optional[Dict[str, Any]] = None, + # Phase 2: Batch support + batch_id: Optional[UUID] = None, + batch_index: Optional[int] = None, + # Phase 2: Job configuration + job_type: str = 'google_reviews', + priority: int = 0, + callback_url: Optional[str] = None, + # Phase 2: Scraper versioning + scraper_version: Optional[str] = None, + scraper_variant: Optional[str] = None ) -> UUID: """ Create a new scraping job. @@ -199,16 +214,41 @@ class DatabaseManager: webhook_url: Optional webhook URL for notifications webhook_secret: Optional secret for webhook signature metadata: Optional additional metadata + requester_client_id: Client ID of the requester (for tracking) + requester_source: Source of the request (e.g., 'api', 'web', 'batch') + scrape_purpose: Purpose of the scrape (e.g., 'competitor_analysis') + requester_metadata: Additional requester-specific metadata + batch_id: ID of the batch this job belongs to + batch_index: Index of this job within the batch + job_type: Type of job (default: 'google_reviews') + priority: Job priority (higher = more urgent, default: 0) + callback_url: URL to call when job completes + scraper_version: Version of the scraper to use + scraper_variant: Variant of the scraper (e.g., 'stealth', 'fast') Returns: UUID of created job """ async with self.pool.acquire() as conn: job_id = await conn.fetchval(""" - INSERT INTO jobs (url, webhook_url, webhook_secret, metadata) - VALUES ($1, $2, $3, $4) + INSERT INTO jobs ( + url, webhook_url, webhook_secret, metadata, + requester_client_id, requester_source, scrape_purpose, requester_metadata, + batch_id, batch_index, + job_type, priority, callback_url, + scraper_version, scraper_variant + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) RETURNING job_id - """, url, webhook_url, webhook_secret, json.dumps(metadata) if metadata else None) + """, + url, webhook_url, webhook_secret, + json.dumps(metadata) if metadata else None, + requester_client_id, requester_source, scrape_purpose, + json.dumps(requester_metadata) if requester_metadata else None, + batch_id, batch_index, + job_type, priority, callback_url, + scraper_version, scraper_variant + ) log.info(f"Created job {job_id} for URL: {url[:80]}...") return job_id @@ -241,7 +281,20 @@ class DatabaseManager: error_message, metadata, scrape_logs, - review_topics + review_topics, + requester_client_id, + requester_source, + scrape_purpose, + requester_metadata, + batch_id, + batch_index, + job_type, + priority, + callback_url, + callback_status, + callback_attempts, + scraper_version, + scraper_variant FROM jobs WHERE job_id = $1 """, job_id) @@ -493,7 +546,9 @@ class DatabaseManager: self, status: Optional[JobStatus] = None, limit: int = 100, - offset: int = 0 + offset: int = 0, + requester_client_id: Optional[str] = None, + batch_id: Optional[UUID] = None ) -> List[Dict[str, Any]]: """ List jobs with optional filtering. @@ -502,49 +557,299 @@ class DatabaseManager: status: Optional status filter limit: Maximum number of jobs to return offset: Number of jobs to skip + requester_client_id: Optional filter by requester client ID + batch_id: Optional filter by batch ID Returns: List of job dictionaries """ async with self.pool.acquire() as conn: - if status: - rows = await conn.fetch(""" - SELECT - job_id, - status, - url, - created_at, - completed_at, - reviews_count, - total_reviews, - scrape_time, - error_message, - metadata, - review_topics - FROM jobs - WHERE status = $1 - ORDER BY created_at DESC - LIMIT $2 OFFSET $3 - """, status.value, limit, offset) - else: - rows = await conn.fetch(""" - SELECT - job_id, - status, - url, - created_at, - completed_at, - reviews_count, - total_reviews, - scrape_time, - error_message, - metadata, - review_topics - FROM jobs - ORDER BY created_at DESC - LIMIT $1 OFFSET $2 - """, limit, offset) + # Build dynamic WHERE clause + conditions = [] + params = [] + param_idx = 1 + if status: + conditions.append(f"status = ${param_idx}") + params.append(status.value) + param_idx += 1 + + if requester_client_id: + conditions.append(f"requester_client_id = ${param_idx}") + params.append(requester_client_id) + param_idx += 1 + + if batch_id: + conditions.append(f"batch_id = ${param_idx}") + params.append(batch_id) + param_idx += 1 + + where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" + + # Add limit and offset params + params.extend([limit, offset]) + + query = f""" + SELECT + job_id, + status, + url, + created_at, + completed_at, + reviews_count, + total_reviews, + scrape_time, + error_message, + metadata, + review_topics, + requester_client_id, + requester_source, + scrape_purpose, + requester_metadata, + batch_id, + batch_index, + job_type, + priority, + callback_url, + callback_status, + callback_attempts, + scraper_version, + scraper_variant + FROM jobs + {where_clause} + ORDER BY created_at DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """ + + rows = await conn.fetch(query, *params) + return [dict(row) for row in rows] + + async def update_job_callback( + self, + job_id: UUID, + status: str, + attempts: Optional[int] = None + ): + """ + Update callback status for a job. + + Args: + job_id: Job UUID + status: Callback status ('pending', 'success', 'failed', 'skipped') + attempts: Number of callback attempts (if not provided, increments by 1) + """ + async with self.pool.acquire() as conn: + if attempts is not None: + await conn.execute(""" + UPDATE jobs + SET callback_status = $2, callback_attempts = $3, updated_at = NOW() + WHERE job_id = $1 + """, job_id, status, attempts) + else: + await conn.execute(""" + UPDATE jobs + SET callback_status = $2, + callback_attempts = COALESCE(callback_attempts, 0) + 1, + updated_at = NOW() + WHERE job_id = $1 + """, job_id, status) + + log.debug(f"Updated callback status for job {job_id}: {status}") + + # ==================== Batch Operations ==================== + + async def create_batch( + self, + name: str, + requester_client_id: Optional[str] = None, + requester_source: Optional[str] = None, + scrape_purpose: Optional[str] = None, + total_jobs: int = 0, + callback_url: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None + ) -> UUID: + """ + Create a new batch for grouping jobs. + + Args: + name: Batch name/description + requester_client_id: Client ID of the requester + requester_source: Source of the batch request + scrape_purpose: Purpose of the scrape + total_jobs: Expected total number of jobs in batch + callback_url: URL to call when batch completes + metadata: Additional batch metadata + + Returns: + UUID of created batch + """ + async with self.pool.acquire() as conn: + batch_id = await conn.fetchval(""" + INSERT INTO batches ( + name, requester_client_id, requester_source, scrape_purpose, + total_jobs, callback_url, metadata + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING batch_id + """, + name, requester_client_id, requester_source, scrape_purpose, + total_jobs, callback_url, + json.dumps(metadata) if metadata else None + ) + + log.info(f"Created batch {batch_id}: {name} ({total_jobs} jobs)") + return batch_id + + async def get_batch(self, batch_id: UUID) -> Optional[Dict[str, Any]]: + """ + Get batch by ID with job counts. + + Args: + batch_id: Batch UUID + + Returns: + Batch dictionary with job counts or None if not found + """ + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT + b.batch_id, + b.name, + b.status, + b.requester_client_id, + b.requester_source, + b.scrape_purpose, + b.total_jobs, + b.completed_jobs, + b.failed_jobs, + b.callback_url, + b.callback_status, + b.metadata, + b.created_at, + b.updated_at, + b.completed_at, + COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs, + COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs, + COUNT(j.job_id) as actual_total_jobs + FROM batches b + LEFT JOIN jobs j ON j.batch_id = b.batch_id + WHERE b.batch_id = $1 + GROUP BY b.batch_id + """, batch_id) + + if not row: + return None + + return dict(row) + + async def update_batch_progress(self, batch_id: UUID): + """ + Recalculate and update batch progress from jobs table. + + Args: + batch_id: Batch UUID + """ + async with self.pool.acquire() as conn: + # Calculate job counts + counts = await conn.fetchrow(""" + SELECT + COUNT(*) FILTER (WHERE status = 'completed') as completed, + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) FILTER (WHERE status = 'partial') as partial, + COUNT(*) as total + FROM jobs + WHERE batch_id = $1 + """, batch_id) + + completed = counts['completed'] or 0 + failed = counts['failed'] or 0 + partial = counts['partial'] or 0 + total = counts['total'] or 0 + + # Determine batch status + if total == 0: + status = 'pending' + elif completed + failed + partial >= total: + status = 'completed' + elif completed > 0 or failed > 0 or partial > 0: + status = 'running' + else: + status = 'pending' + + # Update batch + await conn.execute(""" + UPDATE batches + SET + completed_jobs = $2, + failed_jobs = $3, + status = $4, + updated_at = NOW(), + completed_at = CASE WHEN $4 = 'completed' THEN NOW() ELSE completed_at END + WHERE batch_id = $1 + """, batch_id, completed, failed, status) + + log.debug(f"Updated batch {batch_id} progress: {completed}/{total} completed, {failed} failed") + + async def get_batches( + self, + requester_client_id: Optional[str] = None, + status: Optional[str] = None, + limit: int = 50 + ) -> List[Dict[str, Any]]: + """ + List batches with optional filtering. + + Args: + requester_client_id: Optional filter by requester client ID + status: Optional filter by batch status + limit: Maximum number of batches to return + + Returns: + List of batch dictionaries + """ + async with self.pool.acquire() as conn: + # Build dynamic WHERE clause + conditions = [] + params = [] + param_idx = 1 + + if requester_client_id: + conditions.append(f"requester_client_id = ${param_idx}") + params.append(requester_client_id) + param_idx += 1 + + if status: + conditions.append(f"status = ${param_idx}") + params.append(status) + param_idx += 1 + + where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" + params.append(limit) + + query = f""" + SELECT + batch_id, + name, + status, + requester_client_id, + requester_source, + scrape_purpose, + total_jobs, + completed_jobs, + failed_jobs, + callback_url, + callback_status, + metadata, + created_at, + updated_at, + completed_at + FROM batches + {where_clause} + ORDER BY created_at DESC + LIMIT ${param_idx} + """ + + rows = await conn.fetch(query, *params) return [dict(row) for row in rows] async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]: diff --git a/scrapers/registry.py b/scrapers/registry.py index 213d88c..b7924f0 100644 --- a/scrapers/registry.py +++ b/scrapers/registry.py @@ -1,18 +1,512 @@ """ Scraper Registry -This module provides a registry for managing and discovering scrapers. -It allows dynamic registration and lookup of scraper implementations. +This module provides a database-backed registry for managing and routing +scraper requests. It supports: +- Version-based routing (exact version or latest for variant) +- A/B testing via traffic_pct weighted selection +- Priority-based scraper filtering +- Caching with TTL for performance """ -from typing import Dict, List, Optional, Type +import asyncio +import logging +import random +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Type from scrapers.base import BaseScraper +log = logging.getLogger(__name__) + + +@dataclass +class ScraperInfo: + """Information about a registered scraper.""" + id: str + job_type: str + version: str + variant: str + module_path: str + function_name: str + is_default: bool + traffic_pct: int + min_priority: int + config: Optional[Dict[str, Any]] + deprecated_at: Optional[str] + class ScraperRegistry: """ - Registry for managing scraper implementations. + Routes scraping requests to appropriate scraper versions. + Supports A/B testing via traffic_pct and variant selection. + + This registry is backed by the scraper_registry database table and + provides weighted random selection for A/B testing scenarios. + + Usage: + registry = ScraperRegistry(db) + scraper_info = await registry.get_scraper("google_reviews") + # scraper_info contains module_path, function_name, config, etc. + """ + + def __init__(self, db: "DatabaseManager"): # noqa: F821 - forward reference + """ + Initialize the scraper registry. + + Args: + db: DatabaseManager instance for database access + """ + self.db = db + self._cache: Dict[str, List[ScraperInfo]] = {} # Cache by job_type + self._cache_timestamp: float = 0 + self._cache_ttl: int = 60 # Refresh cache every 60 seconds + self._cache_lock = asyncio.Lock() + + async def get_scraper( + self, + job_type: str, + version: str = None, + variant: str = None, + priority: int = 0 + ) -> Optional[Dict[str, Any]]: + """ + Get scraper info for a job. + + Priority order: + 1. If version specified, return exact match + 2. If variant specified, return latest active scraper of that variant + 3. Otherwise, use A/B routing based on traffic_pct + + Args: + job_type: Type of scraping job (e.g., "google_reviews") + version: Optional specific version to use (e.g., "1.0.0") + variant: Optional variant filter ("stable", "beta", "canary") + priority: Job priority level for min_priority filtering + + Returns: + Dictionary containing scraper info: + { + "version": "1.0.0", + "variant": "stable", + "module_path": "scrapers.google_reviews.v1_0_0", + "function_name": "fast_scrape_reviews", + "config": {...} + } + + Returns None if no matching scraper found. + """ + # Ensure cache is fresh + await self._ensure_cache_fresh() + + # Get all scrapers for this job type + scrapers = self._cache.get(job_type, []) + if not scrapers: + log.warning(f"No scrapers registered for job_type: {job_type}") + return None + + # Filter out deprecated scrapers + active_scrapers = [s for s in scrapers if s.deprecated_at is None] + if not active_scrapers: + log.warning(f"All scrapers for job_type {job_type} are deprecated") + return None + + selected: Optional[ScraperInfo] = None + + # Priority 1: Exact version match + if version: + selected = self._find_by_version(active_scrapers, version) + if selected: + log.debug(f"Selected scraper by exact version: {version}") + else: + log.warning(f"Requested version {version} not found for {job_type}") + return None + + # Priority 2: Latest for variant + elif variant: + selected = self._find_latest_for_variant(active_scrapers, variant) + if selected: + log.debug(f"Selected latest scraper for variant {variant}: {selected.version}") + else: + log.warning(f"No active scrapers found for variant {variant} in {job_type}") + return None + + # Priority 3: A/B weighted selection + else: + selected = await self._get_weighted_scraper(job_type, priority) + if selected: + log.debug(f"Selected scraper via A/B routing: {selected.version} ({selected.variant})") + + if not selected: + return None + + return self._scraper_to_dict(selected) + + async def _get_weighted_scraper( + self, + job_type: str, + priority: int + ) -> Optional[ScraperInfo]: + """ + Select scraper based on traffic weights. + Uses random selection weighted by traffic_pct. + Filters by min_priority. + + Args: + job_type: Type of scraping job + priority: Job priority level + + Returns: + Selected ScraperInfo or None if no eligible scrapers + """ + scrapers = self._cache.get(job_type, []) + + # Filter: active, has traffic allocation, and meets priority requirement + eligible = [ + s for s in scrapers + if s.deprecated_at is None + and s.traffic_pct > 0 + and s.min_priority <= priority + ] + + if not eligible: + # Fall back to default scraper + default = self._find_default(scrapers) + if default and default.min_priority <= priority: + log.debug(f"No eligible A/B scrapers, using default: {default.version}") + return default + log.warning(f"No eligible scrapers for job_type {job_type} with priority {priority}") + return None + + # Weighted random selection + total_weight = sum(s.traffic_pct for s in eligible) + if total_weight == 0: + # Equal probability if all have 0 traffic_pct + return random.choice(eligible) + + # Generate random number in range [0, total_weight) + rand_value = random.random() * total_weight + cumulative = 0 + + for scraper in eligible: + cumulative += scraper.traffic_pct + if rand_value < cumulative: + return scraper + + # Fallback (shouldn't reach here, but safety) + return eligible[-1] + + async def refresh_cache(self) -> None: + """ + Reload registry from database. + + This method forces a cache refresh regardless of TTL. + Thread-safe via asyncio lock. + """ + async with self._cache_lock: + await self._load_cache() + + async def _ensure_cache_fresh(self) -> None: + """Ensure cache is loaded and not stale.""" + current_time = time.time() + if ( + not self._cache + or (current_time - self._cache_timestamp) > self._cache_ttl + ): + async with self._cache_lock: + # Double-check after acquiring lock + if ( + not self._cache + or (current_time - self._cache_timestamp) > self._cache_ttl + ): + await self._load_cache() + + async def _load_cache(self) -> None: + """Load all scraper registry entries from database.""" + try: + async with self.db.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT + id, + job_type, + version, + variant, + module_path, + function_name, + is_default, + traffic_pct, + min_priority, + config, + deprecated_at + FROM scraper_registry + ORDER BY job_type, version DESC + """) + + # Group by job_type + self._cache.clear() + for row in rows: + scraper_info = ScraperInfo( + id=str(row['id']), + job_type=row['job_type'], + version=row['version'], + variant=row['variant'], + module_path=row['module_path'], + function_name=row['function_name'], + is_default=row['is_default'], + traffic_pct=row['traffic_pct'], + min_priority=row['min_priority'], + config=row['config'], + deprecated_at=str(row['deprecated_at']) if row['deprecated_at'] else None + ) + + if scraper_info.job_type not in self._cache: + self._cache[scraper_info.job_type] = [] + self._cache[scraper_info.job_type].append(scraper_info) + + self._cache_timestamp = time.time() + log.info(f"Scraper registry cache loaded: {sum(len(v) for v in self._cache.values())} entries") + + except Exception as e: + log.error(f"Failed to load scraper registry cache: {e}") + raise + + async def list_scrapers( + self, + job_type: str = None, + include_deprecated: bool = False + ) -> List[Dict[str, Any]]: + """ + List registered scrapers, optionally filtered by job_type. + + Args: + job_type: Optional job type filter + include_deprecated: Whether to include deprecated scrapers + + Returns: + List of scraper info dictionaries + """ + await self._ensure_cache_fresh() + + result = [] + + if job_type: + scrapers = self._cache.get(job_type, []) + else: + scrapers = [s for scrapers_list in self._cache.values() for s in scrapers_list] + + for scraper in scrapers: + if not include_deprecated and scraper.deprecated_at: + continue + result.append(self._scraper_to_dict(scraper)) + + return result + + async def get_scraper_by_id(self, scraper_id: str) -> Optional[Dict[str, Any]]: + """ + Get a specific scraper by its database ID. + + Args: + scraper_id: UUID of the scraper registry entry + + Returns: + Scraper info dictionary or None if not found + """ + await self._ensure_cache_fresh() + + for scrapers_list in self._cache.values(): + for scraper in scrapers_list: + if scraper.id == scraper_id: + return self._scraper_to_dict(scraper) + + return None + + async def register_scraper( + self, + job_type: str, + version: str, + variant: str, + module_path: str, + function_name: str, + is_default: bool = False, + traffic_pct: int = 0, + min_priority: int = 0, + config: Optional[Dict[str, Any]] = None + ) -> str: + """ + Register a new scraper version in the database. + + Args: + job_type: Type of scraping job + version: Semantic version string + variant: Release channel ("stable", "beta", "canary") + module_path: Python module path + function_name: Entry function name + is_default: Whether this is the default fallback + traffic_pct: Traffic percentage for A/B testing (0-100) + min_priority: Minimum job priority required + config: Optional configuration dictionary + + Returns: + UUID of created registry entry + + Raises: + ValueError: If variant is invalid or traffic_pct out of range + """ + if variant not in ('stable', 'beta', 'canary'): + raise ValueError(f"Invalid variant: {variant}. Must be 'stable', 'beta', or 'canary'") + + if not 0 <= traffic_pct <= 100: + raise ValueError(f"traffic_pct must be between 0 and 100, got: {traffic_pct}") + + import json + + async with self.db.pool.acquire() as conn: + scraper_id = await conn.fetchval(""" + INSERT INTO scraper_registry ( + job_type, version, variant, module_path, function_name, + is_default, traffic_pct, min_priority, config + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb) + RETURNING id + """, job_type, version, variant, module_path, function_name, + is_default, traffic_pct, min_priority, + json.dumps(config) if config else None) + + # Invalidate cache + self._cache_timestamp = 0 + + log.info(f"Registered scraper: {job_type} v{version} ({variant})") + return str(scraper_id) + + async def deprecate_scraper( + self, + job_type: str, + version: str + ) -> bool: + """ + Deprecate a scraper version (soft delete). + + Args: + job_type: Type of scraping job + version: Version to deprecate + + Returns: + True if deprecated, False if not found + """ + async with self.db.pool.acquire() as conn: + result = await conn.execute(""" + UPDATE scraper_registry + SET deprecated_at = NOW() + WHERE job_type = $1 AND version = $2 AND deprecated_at IS NULL + """, job_type, version) + + updated = result.split()[-1] == "1" + if updated: + self._cache_timestamp = 0 # Invalidate cache + log.info(f"Deprecated scraper: {job_type} v{version}") + + return updated + + async def update_traffic_allocation( + self, + job_type: str, + allocations: Dict[str, int] + ) -> None: + """ + Update traffic allocations for multiple scrapers atomically. + + Args: + job_type: Type of scraping job + allocations: Dict mapping version to traffic_pct + e.g., {"1.0.0": 90, "1.1.0-beta": 10} + + Raises: + ValueError: If total exceeds 100 or any value is invalid + """ + total = sum(allocations.values()) + if total > 100: + raise ValueError(f"Total traffic allocation cannot exceed 100, got: {total}") + + for version, pct in allocations.items(): + if not 0 <= pct <= 100: + raise ValueError(f"Invalid traffic_pct for {version}: {pct}") + + async with self.db.pool.acquire() as conn: + async with conn.transaction(): + for version, traffic_pct in allocations.items(): + await conn.execute(""" + UPDATE scraper_registry + SET traffic_pct = $3 + WHERE job_type = $1 AND version = $2 AND deprecated_at IS NULL + """, job_type, version, traffic_pct) + + # Invalidate cache + self._cache_timestamp = 0 + log.info(f"Updated traffic allocations for {job_type}: {allocations}") + + # ==================== Helper Methods ==================== + + def _find_by_version( + self, + scrapers: List[ScraperInfo], + version: str + ) -> Optional[ScraperInfo]: + """Find scraper by exact version match.""" + for scraper in scrapers: + if scraper.version == version: + return scraper + return None + + def _find_latest_for_variant( + self, + scrapers: List[ScraperInfo], + variant: str + ) -> Optional[ScraperInfo]: + """Find latest (first in sorted list) scraper for a variant.""" + for scraper in scrapers: + if scraper.variant == variant: + return scraper + return None + + def _find_default( + self, + scrapers: List[ScraperInfo] + ) -> Optional[ScraperInfo]: + """Find default scraper for fallback.""" + for scraper in scrapers: + if scraper.is_default and scraper.deprecated_at is None: + return scraper + return None + + def _scraper_to_dict(self, scraper: ScraperInfo) -> Dict[str, Any]: + """Convert ScraperInfo to dictionary for API responses.""" + return { + "id": scraper.id, + "version": scraper.version, + "variant": scraper.variant, + "module_path": scraper.module_path, + "function_name": scraper.function_name, + "is_default": scraper.is_default, + "traffic_pct": scraper.traffic_pct, + "min_priority": scraper.min_priority, + "config": scraper.config, + "deprecated": scraper.deprecated_at is not None + } + + +# ==================== Legacy Singleton Registry ==================== +# Kept for backward compatibility with existing code that uses +# the old class-based scraper registration pattern. + + +class LegacyScraperRegistry: + """ + Legacy registry for managing scraper implementations. + + This class provides backward compatibility with the old scraper + registration pattern using class-based scrapers. New code should + use the database-backed ScraperRegistry instead. The registry allows: - Registering scrapers by name and version @@ -20,15 +514,15 @@ class ScraperRegistry: - Listing all available scrapers Usage: - registry = ScraperRegistry() + registry = LegacyScraperRegistry() registry.register(GoogleReviewsScraper) scraper = registry.get_scraper_for_url("https://google.com/maps/place/...") """ - _instance: Optional["ScraperRegistry"] = None + _instance: Optional["LegacyScraperRegistry"] = None _scrapers: Dict[str, Type[BaseScraper]] - def __new__(cls) -> "ScraperRegistry": + def __new__(cls) -> "LegacyScraperRegistry": """Singleton pattern to ensure one global registry.""" if cls._instance is None: cls._instance = super().__new__(cls) @@ -134,5 +628,5 @@ class ScraperRegistry: self._domain_map.clear() -# Global registry instance -registry = ScraperRegistry() +# Global legacy registry instance (for backward compatibility) +registry = LegacyScraperRegistry() diff --git a/services/__init__.py b/services/__init__.py index e69de29..fc5952e 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -0,0 +1,29 @@ +""" +Services module for ReviewIQ platform. + +Contains service classes for webhook delivery, job callbacks, and other +business logic. +""" +from services.webhook_service import ( + WebhookManager, + WebhookDispatcher, + WebhookDeliveryError, + verify_webhook_signature +) +from services.job_callback_service import ( + JobCallbackService, + JobCallbackDispatcher, + CallbackStatus +) + +__all__ = [ + # Webhook service + 'WebhookManager', + 'WebhookDispatcher', + 'WebhookDeliveryError', + 'verify_webhook_signature', + # Job callback service + 'JobCallbackService', + 'JobCallbackDispatcher', + 'CallbackStatus', +] diff --git a/services/job_callback_service.py b/services/job_callback_service.py new file mode 100644 index 0000000..438e353 --- /dev/null +++ b/services/job_callback_service.py @@ -0,0 +1,717 @@ +#!/usr/bin/env python3 +""" +Job Callback Service for webhook delivery on job and batch completion. + +This service handles sending webhooks when jobs complete or fail, +as well as batch-level completion callbacks. +""" +import asyncio +import json +import logging +from datetime import datetime +from typing import Dict, Any, Optional, List +from uuid import UUID + +import httpx + +from services.webhook_service import WebhookManager + +log = logging.getLogger(__name__) + +# Scraper version (should match the deployed scraper) +SCRAPER_VERSION = "1.0.0" + + +class CallbackStatus: + """Callback status constants""" + PENDING = "pending" + SENT = "sent" + FAILED = "failed" + + +class JobCallbackService: + """ + Handles webhook callbacks for job and batch completion. + + This service is responsible for: + - Sending callbacks when individual jobs complete or fail + - Sending callbacks when entire batches complete + - Retrying failed callbacks with exponential backoff + - Tracking callback status and attempts in the database + """ + + def __init__( + self, + db, + max_retries: int = 3, + timeout: float = 10.0, + initial_retry_delay: float = 2.0 + ): + """ + Initialize the job callback service. + + Args: + db: DatabaseManager instance + max_retries: Maximum number of delivery attempts per callback + timeout: HTTP request timeout in seconds + initial_retry_delay: Initial delay between retries (exponential backoff) + """ + self.db = db + self.webhook_manager = WebhookManager( + max_retries=max_retries, + timeout=timeout, + initial_retry_delay=initial_retry_delay + ) + self.max_retries = max_retries + self.timeout = timeout + self.initial_retry_delay = initial_retry_delay + + async def send_job_callback(self, job_id: UUID) -> bool: + """ + Send callback for completed/failed job. + + This method: + - Fetches the job from the database + - Builds the appropriate payload based on job status + - POSTs to the callback_url + - Updates callback_status in the database + + Args: + job_id: UUID of the job + + Returns: + True if callback was sent successfully, False otherwise + """ + # Fetch job from database + job = await self.db.get_job(job_id) + if not job: + log.error(f"Job {job_id} not found for callback") + return False + + callback_url = job.get('callback_url') + if not callback_url: + log.debug(f"Job {job_id} has no callback_url configured") + return True # No callback needed, consider success + + status = job.get('status') + if status not in ('completed', 'failed', 'partial'): + log.warning(f"Job {job_id} has status '{status}', not sending callback") + return False + + # Build payload based on status + payload = self._build_job_payload(job) + + # Get webhook secret if available (reuse webhook_secret for callbacks) + secret = job.get('webhook_secret') + + # Send the callback + log.info(f"Sending job callback to {callback_url} for job {job_id} (status: {status})") + + success = await self._send_callback( + url=callback_url, + payload=payload, + secret=secret, + job_id=job_id + ) + + # Update callback status in database + await self._update_callback_status(job_id, success) + + if success: + log.info(f"Job callback sent successfully for job {job_id}") + else: + log.error(f"Job callback failed for job {job_id}") + + return success + + async def send_batch_callback(self, batch_id: UUID) -> bool: + """ + Send callback when batch completes. + + This method: + - Fetches all jobs in the batch from the database + - Checks if all jobs have completed (success or failure) + - Builds a summary payload + - POSTs to the batch callback_url + - Updates callback_status + + Args: + batch_id: UUID of the batch + + Returns: + True if callback was sent successfully, False otherwise + """ + # Get batch info (from first job with this batch_id) + batch_info = await self._get_batch_info(batch_id) + if not batch_info: + log.error(f"Batch {batch_id} not found or has no jobs") + return False + + callback_url = batch_info.get('callback_url') + if not callback_url: + log.debug(f"Batch {batch_id} has no callback_url configured") + return True # No callback needed + + # Check if batch is complete + if not batch_info.get('is_complete'): + log.debug(f"Batch {batch_id} is not yet complete") + return False + + # Build batch payload + payload = self._build_batch_payload(batch_info) + + # Get webhook secret (from first job's webhook_secret) + secret = batch_info.get('webhook_secret') + + # Send the callback + log.info(f"Sending batch callback to {callback_url} for batch {batch_id}") + + success = await self._send_callback( + url=callback_url, + payload=payload, + secret=secret, + job_id=None # Batch callback, no single job_id + ) + + # Update batch callback status (on all jobs in the batch) + await self._update_batch_callback_status(batch_id, success) + + if success: + log.info(f"Batch callback sent successfully for batch {batch_id}") + else: + log.error(f"Batch callback failed for batch {batch_id}") + + return success + + async def retry_failed_callbacks(self, max_attempts: int = 5) -> Dict[str, int]: + """ + Find jobs with callback_status='failed' and attempts < max. + Retry sending callbacks with exponential backoff. + + Args: + max_attempts: Maximum number of total attempts before giving up + + Returns: + Dict with counts: {'retried': n, 'succeeded': n, 'failed': n} + """ + # Get jobs with failed callbacks that haven't exceeded max attempts + jobs = await self._get_failed_callbacks(max_attempts) + + results = { + 'retried': 0, + 'succeeded': 0, + 'failed': 0 + } + + if not jobs: + log.debug("No failed callbacks to retry") + return results + + log.info(f"Retrying {len(jobs)} failed callbacks") + + for job in jobs: + job_id = job['job_id'] + attempts = job.get('callback_attempts', 0) + + # Calculate delay based on attempt number (exponential backoff) + delay = self.initial_retry_delay * (2 ** attempts) + + log.info(f"Retrying callback for job {job_id} (attempt {attempts + 1}), delay: {delay:.1f}s") + + # Wait with backoff + await asyncio.sleep(delay) + + # Retry the callback + success = await self.send_job_callback(job_id) + + results['retried'] += 1 + if success: + results['succeeded'] += 1 + else: + results['failed'] += 1 + + log.info(f"Callback retry complete: {results}") + return results + + async def check_and_send_batch_callbacks(self) -> Dict[str, int]: + """ + Check for completed batches and send their callbacks. + + This should be called periodically to detect when batches complete. + + Returns: + Dict with counts: {'checked': n, 'sent': n, 'failed': n} + """ + # Get distinct batch_ids that might be complete + batch_ids = await self._get_potentially_complete_batches() + + results = { + 'checked': 0, + 'sent': 0, + 'failed': 0 + } + + for batch_id in batch_ids: + results['checked'] += 1 + success = await self.send_batch_callback(batch_id) + if success: + results['sent'] += 1 + else: + results['failed'] += 1 + + return results + + def _build_job_payload(self, job: Dict[str, Any]) -> Dict[str, Any]: + """ + Build webhook payload for job completion/failure. + + Args: + job: Job dictionary from database + + Returns: + Webhook payload dictionary + """ + status = job.get('status') + job_id = str(job.get('job_id')) + job_type = job.get('job_type', 'google_reviews') + + # Base payload + payload = { + "job_id": job_id, + "job_type": job_type, + "status": status, + "url": job.get('url'), + "scraper_version": job.get('scraper_version') or SCRAPER_VERSION, + } + + if status == 'completed': + # Completed job payload + payload["event"] = "job.completed" + + # Calculate result summary + reviews_count = job.get('reviews_count') or 0 + + # Try to extract primary metric (average rating) from reviews_data + primary_metric = None + reviews_data = job.get('reviews_data') + if reviews_data: + if isinstance(reviews_data, str): + try: + reviews_data = json.loads(reviews_data) + except json.JSONDecodeError: + reviews_data = [] + + if reviews_data: + ratings = [r.get('rating', 0) for r in reviews_data if r.get('rating')] + if ratings: + primary_metric = round(sum(ratings) / len(ratings), 2) + + payload["result_summary"] = { + "item_count": reviews_count, + "primary_metric": primary_metric + } + + # Duration + started_at = job.get('started_at') + completed_at = job.get('completed_at') + if started_at and completed_at: + if isinstance(started_at, str): + started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')) + if isinstance(completed_at, str): + completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')) + duration = (completed_at - started_at).total_seconds() + payload["duration_seconds"] = round(duration, 2) + elif job.get('scrape_time'): + payload["duration_seconds"] = round(job.get('scrape_time'), 2) + + # Completed timestamp + if completed_at: + if isinstance(completed_at, datetime): + payload["completed_at"] = completed_at.isoformat() + 'Z' + else: + payload["completed_at"] = completed_at + + elif status in ('failed', 'partial'): + # Failed job payload + payload["event"] = "job.failed" + + error_message = job.get('error_message', 'Unknown error') + + # Determine error type from message + error_type = self._classify_error(error_message) + + payload["error"] = { + "type": error_type, + "message": error_message + } + + # Include partial results info if applicable + if status == 'partial': + payload["status"] = "partial" + payload["result_summary"] = { + "item_count": job.get('reviews_count') or 0, + "primary_metric": None + } + + # Failed timestamp + completed_at = job.get('completed_at') + if completed_at: + if isinstance(completed_at, datetime): + payload["completed_at"] = completed_at.isoformat() + 'Z' + else: + payload["completed_at"] = completed_at + + return payload + + def _build_batch_payload(self, batch_info: Dict[str, Any]) -> Dict[str, Any]: + """ + Build webhook payload for batch completion. + + Args: + batch_info: Batch info dictionary with job summaries + + Returns: + Webhook payload dictionary + """ + batch_id = str(batch_info.get('batch_id')) + jobs = batch_info.get('jobs', []) + + # Count successes and failures + succeeded = sum(1 for j in jobs if j.get('status') == 'completed') + failed = sum(1 for j in jobs if j.get('status') in ('failed', 'partial')) + failed_job_ids = [str(j.get('job_id')) for j in jobs if j.get('status') in ('failed', 'partial')] + + # Find latest completed_at + completed_times = [ + j.get('completed_at') for j in jobs + if j.get('completed_at') + ] + latest_completed = max(completed_times) if completed_times else datetime.utcnow() + + if isinstance(latest_completed, datetime): + latest_completed = latest_completed.isoformat() + 'Z' + + payload = { + "event": "batch.completed", + "batch_id": batch_id, + "name": batch_info.get('name', f'Batch {batch_id[:8]}'), + "total_jobs": len(jobs), + "succeeded": succeeded, + "failed": failed, + "completed_at": latest_completed, + "failed_job_ids": failed_job_ids + } + + return payload + + def _classify_error(self, error_message: str) -> str: + """ + Classify error message into a type category. + + Args: + error_message: Error message string + + Returns: + Error type string + """ + if not error_message: + return "unknown" + + error_lower = error_message.lower() + + if 'rate' in error_lower and 'limit' in error_lower: + return "rate_limited" + elif 'timeout' in error_lower: + return "timeout" + elif 'captcha' in error_lower or 'robot' in error_lower: + return "captcha_detected" + elif 'blocked' in error_lower or 'denied' in error_lower: + return "blocked" + elif 'network' in error_lower or 'connection' in error_lower: + return "network_error" + elif 'not found' in error_lower or '404' in error_lower: + return "not_found" + elif 'invalid' in error_lower: + return "invalid_input" + elif 'element' in error_lower or 'selector' in error_lower: + return "scrape_error" + else: + return "unknown" + + async def _send_callback( + self, + url: str, + payload: Dict[str, Any], + secret: Optional[str] = None, + job_id: Optional[UUID] = None + ) -> bool: + """ + Send a callback to the specified URL. + + Uses the WebhookManager for retry logic and HMAC signing. + + Args: + url: Callback URL + payload: Payload dictionary + secret: Optional secret for HMAC signature + job_id: Optional job ID for logging + + Returns: + True if sent successfully + """ + return await self.webhook_manager.send_webhook( + webhook_url=url, + payload=payload, + secret=secret, + job_id=job_id, + db=self.db + ) + + async def _update_callback_status(self, job_id: UUID, success: bool): + """ + Update the callback_status and callback_attempts for a job. + + Args: + job_id: Job UUID + success: Whether the callback was sent successfully + """ + async with self.db.pool.acquire() as conn: + if success: + await conn.execute(""" + UPDATE jobs + SET callback_status = 'sent', + callback_attempts = COALESCE(callback_attempts, 0) + 1 + WHERE job_id = $1 + """, job_id) + else: + await conn.execute(""" + UPDATE jobs + SET callback_status = 'failed', + callback_attempts = COALESCE(callback_attempts, 0) + 1 + WHERE job_id = $1 + """, job_id) + + async def _update_batch_callback_status(self, batch_id: UUID, success: bool): + """ + Update callback status for all jobs in a batch. + + Args: + batch_id: Batch UUID + success: Whether the callback was sent successfully + """ + status = 'sent' if success else 'failed' + async with self.db.pool.acquire() as conn: + await conn.execute(""" + UPDATE jobs + SET callback_status = $2, + callback_attempts = COALESCE(callback_attempts, 0) + 1 + WHERE batch_id = $1 + """, batch_id, status) + + async def _get_failed_callbacks(self, max_attempts: int) -> List[Dict[str, Any]]: + """ + Get jobs with failed callbacks that can be retried. + + Args: + max_attempts: Maximum attempts before giving up + + Returns: + List of job dictionaries + """ + async with self.db.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT + job_id, + status, + callback_url, + callback_status, + callback_attempts, + webhook_secret + FROM jobs + WHERE callback_url IS NOT NULL + AND callback_status = 'failed' + AND COALESCE(callback_attempts, 0) < $1 + AND status IN ('completed', 'failed', 'partial') + ORDER BY completed_at ASC + LIMIT 100 + """, max_attempts) + + return [dict(row) for row in rows] + + async def _get_batch_info(self, batch_id: UUID) -> Optional[Dict[str, Any]]: + """ + Get batch information including all jobs. + + Args: + batch_id: Batch UUID + + Returns: + Batch info dictionary with jobs list, or None if not found + """ + async with self.db.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT + job_id, + status, + batch_index, + callback_url, + callback_status, + webhook_secret, + completed_at, + reviews_count, + error_message, + metadata + FROM jobs + WHERE batch_id = $1 + ORDER BY batch_index ASC + """, batch_id) + + if not rows: + return None + + jobs = [dict(row) for row in rows] + + # Determine if batch is complete (all jobs finished) + pending_statuses = ('pending', 'running') + is_complete = all( + j.get('status') not in pending_statuses + for j in jobs + ) + + # Get batch name from first job's metadata if available + batch_name = None + first_metadata = jobs[0].get('metadata') + if first_metadata: + if isinstance(first_metadata, str): + try: + first_metadata = json.loads(first_metadata) + except json.JSONDecodeError: + first_metadata = {} + batch_name = first_metadata.get('batch_name') + + return { + 'batch_id': batch_id, + 'name': batch_name, + 'jobs': jobs, + 'is_complete': is_complete, + 'callback_url': jobs[0].get('callback_url'), + 'webhook_secret': jobs[0].get('webhook_secret') + } + + async def _get_potentially_complete_batches(self) -> List[UUID]: + """ + Get batch IDs that might have recently completed. + + Returns: + List of batch UUIDs to check + """ + async with self.db.pool.acquire() as conn: + # Find batches where: + # 1. At least one job has callback_url set + # 2. callback_status is null or pending (not yet sent) + # 3. No jobs are still running + rows = await conn.fetch(""" + SELECT DISTINCT batch_id + FROM jobs + WHERE batch_id IS NOT NULL + AND callback_url IS NOT NULL + AND COALESCE(callback_status, 'pending') = 'pending' + AND batch_id NOT IN ( + SELECT DISTINCT batch_id + FROM jobs + WHERE batch_id IS NOT NULL + AND status IN ('pending', 'running') + ) + LIMIT 100 + """) + + return [row['batch_id'] for row in rows] + + +class JobCallbackDispatcher: + """ + Background dispatcher that monitors for jobs needing callbacks. + + Runs in background and processes callbacks for completed jobs. + """ + + def __init__( + self, + db, + interval_seconds: int = 30, + retry_interval_seconds: int = 300 + ): + """ + Initialize the callback dispatcher. + + Args: + db: DatabaseManager instance + interval_seconds: How often to check for pending callbacks + retry_interval_seconds: How often to retry failed callbacks + """ + self.db = db + self.interval = interval_seconds + self.retry_interval = retry_interval_seconds + self.callback_service = JobCallbackService(db) + self.running = False + self._last_retry = datetime.utcnow() + + async def start(self): + """Start the background callback dispatcher""" + self.running = True + log.info("Job callback dispatcher started") + + while self.running: + try: + # Process pending job callbacks + await self._process_pending_callbacks() + + # Check for completed batches + await self.callback_service.check_and_send_batch_callbacks() + + # Periodically retry failed callbacks + now = datetime.utcnow() + if (now - self._last_retry).total_seconds() >= self.retry_interval: + await self.callback_service.retry_failed_callbacks(max_attempts=5) + self._last_retry = now + + except Exception as e: + log.error(f"Error in callback dispatcher: {e}") + + await asyncio.sleep(self.interval) + + def stop(self): + """Stop the background callback dispatcher""" + self.running = False + log.info("Job callback dispatcher stopped") + + async def _process_pending_callbacks(self): + """ + Process all pending callbacks. + + Fetches jobs with callback_url set and callback_status null/pending. + """ + async with self.db.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT job_id + FROM jobs + WHERE callback_url IS NOT NULL + AND COALESCE(callback_status, 'pending') = 'pending' + AND status IN ('completed', 'failed', 'partial') + ORDER BY completed_at ASC + LIMIT 100 + """) + + if not rows: + return + + log.info(f"Processing {len(rows)} pending job callbacks") + + for row in rows: + job_id = row['job_id'] + try: + await self.callback_service.send_job_callback(job_id) + except Exception as e: + log.error(f"Error sending callback for job {job_id}: {e}") + + log.info(f"Processed {len(rows)} callbacks")