#!/usr/bin/env python3 """ Batch submission API for ReviewIQ Phase 2. Enables submitting multiple URLs as a batch with: - Batch-level tracking and callback - Individual job creation for each URL - Batch status aggregation - Batch cancellation """ import asyncio import json import logging from datetime import datetime from typing import Optional, List, Dict, Any from uuid import UUID, uuid4 from fastapi import APIRouter, HTTPException, Query, Depends from pydantic import BaseModel, HttpUrl, Field, validator from core.database import DatabaseManager from core.enums import JobStatus log = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/api", tags=["batches"]) # ==================== Pydantic Models ==================== class RequesterModel(BaseModel): """Requester information for batch tracking""" client_id: str = Field(..., description="Client identifier (e.g., 'veritas_123')") source: str = Field(..., description="Source of the request (e.g., 'veritasreview.com')") purpose: Optional[str] = Field(None, description="Purpose of the batch (e.g., 'prospect_screening')") metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional requester metadata") class BatchSubmitRequest(BaseModel): """Request model for submitting a batch of URLs""" name: str = Field(..., description="Batch name for identification", min_length=1, max_length=255) urls: List[HttpUrl] = Field(..., description="List of Google Maps URLs to scrape", min_items=1, max_items=1000) requester: RequesterModel = Field(..., description="Requester information") priority: int = Field(default=0, description="Priority level (higher = more urgent)", ge=-10, le=10) callback_url: Optional[HttpUrl] = Field(None, description="Webhook URL called when ALL jobs complete") @validator('urls') def validate_unique_urls(cls, v): """Ensure all URLs are unique""" unique_urls = list(set(str(url) for url in v)) if len(unique_urls) != len(v): raise ValueError('Duplicate URLs are not allowed in a batch') return v class BatchSubmitResponse(BaseModel): """Response model for batch submission""" batch_id: str = Field(..., description="Unique batch identifier") job_ids: List[str] = Field(..., description="List of created job IDs") total_jobs: int = Field(..., description="Total number of jobs in the batch") class BatchJobSummary(BaseModel): """Summary of a job within a batch""" job_id: str url: str status: str reviews_count: Optional[int] = None error_message: Optional[str] = None created_at: str completed_at: Optional[str] = None class BatchSummary(BaseModel): """Summary model for batch listing""" batch_id: str name: str client_id: str status: str # pending, running, completed, partial, failed total_jobs: int pending_jobs: int running_jobs: int completed_jobs: int failed_jobs: int total_reviews: int created_at: str updated_at: Optional[str] = None completed_at: Optional[str] = None class BatchDetailResponse(BaseModel): """Detailed batch information with job list""" batch_id: str name: str requester: RequesterModel priority: int callback_url: Optional[str] = None status: str total_jobs: int pending_jobs: int running_jobs: int completed_jobs: int failed_jobs: int total_reviews: int created_at: str updated_at: Optional[str] = None completed_at: Optional[str] = None jobs: List[BatchJobSummary] class BatchCancelResponse(BaseModel): """Response model for batch cancellation""" batch_id: str cancelled_jobs: int already_completed: int message: str # ==================== Database Helper Functions ==================== # These will be added to core.database in a future refactor, # but for now we implement them here for the batch functionality. async def create_batch( db: DatabaseManager, name: str, requester: RequesterModel, priority: int = 0, callback_url: Optional[str] = None ) -> UUID: """ Create a new batch record in the database. Returns: UUID of created batch """ async with db.pool.acquire() as conn: # First ensure batch table exists await conn.execute(""" CREATE TABLE IF NOT EXISTS batches ( batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, client_id VARCHAR(255) NOT NULL, requester JSONB NOT NULL, priority INTEGER NOT NULL DEFAULT 0, callback_url TEXT, callback_delivered BOOLEAN DEFAULT FALSE, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP, completed_at TIMESTAMP ); """) # Create index on client_id if not exists await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_batches_client_id ON batches(client_id); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_batches_created_at ON batches(created_at DESC); """) # Ensure batch_id column exists in jobs table await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS batch_id UUID REFERENCES batches(batch_id) ON DELETE SET NULL; """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_batch_id ON jobs(batch_id) WHERE batch_id IS NOT NULL; """) batch_id = await conn.fetchval(""" INSERT INTO batches (name, client_id, requester, priority, callback_url) VALUES ($1, $2, $3, $4, $5) RETURNING batch_id """, name, requester.client_id, json.dumps(requester.dict()), priority, callback_url) log.info(f"Created batch {batch_id}: '{name}' for client {requester.client_id}") return batch_id async def create_job_in_batch( db: DatabaseManager, batch_id: UUID, url: str, priority: int = 0, metadata: Optional[Dict[str, Any]] = None ) -> UUID: """ Create a job that belongs to a batch. Returns: UUID of created job """ job_metadata = metadata or {} job_metadata['batch_id'] = str(batch_id) job_metadata['priority'] = priority async with db.pool.acquire() as conn: job_id = await conn.fetchval(""" INSERT INTO jobs (url, batch_id, metadata) VALUES ($1, $2, $3) RETURNING job_id """, url, batch_id, json.dumps(job_metadata)) return job_id async def get_batch(db: DatabaseManager, batch_id: UUID) -> Optional[Dict[str, Any]]: """ Get batch by ID with aggregated job stats. Returns: Batch dictionary with job stats or None if not found """ async with db.pool.acquire() as conn: # Get batch basic info batch_row = await conn.fetchrow(""" SELECT batch_id, name, client_id, requester, priority, callback_url, callback_delivered, created_at, updated_at, completed_at FROM batches WHERE batch_id = $1 """, batch_id) if not batch_row: return None # Get job stats for this batch stats = await conn.fetchrow(""" SELECT COUNT(*) as total_jobs, COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs, COUNT(*) FILTER (WHERE status = 'running') as running_jobs, COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs, COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed_jobs, COALESCE(SUM(reviews_count), 0) as total_reviews FROM jobs WHERE batch_id = $1 """, batch_id) result = dict(batch_row) result.update(dict(stats)) # Calculate batch status total = stats['total_jobs'] completed = stats['completed_jobs'] failed = stats['failed_jobs'] running = stats['running_jobs'] if total == 0: result['status'] = 'pending' elif completed + failed == total: if failed == total: result['status'] = 'failed' elif failed > 0: result['status'] = 'partial' else: result['status'] = 'completed' elif running > 0 or completed > 0: result['status'] = 'running' else: result['status'] = 'pending' return result async def get_batch_jobs(db: DatabaseManager, batch_id: UUID) -> List[Dict[str, Any]]: """ Get all jobs in a batch. Returns: List of job dictionaries """ async with db.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id, url, status, reviews_count, error_message, created_at, completed_at FROM jobs WHERE batch_id = $1 ORDER BY created_at ASC """, batch_id) return [dict(row) for row in rows] async def list_batches( db: DatabaseManager, client_id: Optional[str] = None, status: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ List batches with optional filtering. Returns: List of batch summary dictionaries """ async with db.pool.acquire() as conn: # Build query dynamically based on filters where_clauses = [] params = [] param_idx = 1 if client_id: where_clauses.append(f"b.client_id = ${param_idx}") params.append(client_id) param_idx += 1 where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" query = f""" SELECT b.batch_id, b.name, b.client_id, b.created_at, b.updated_at, b.completed_at, COUNT(j.job_id) as total_jobs, COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs, COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs, COUNT(j.job_id) FILTER (WHERE j.status = 'completed') as completed_jobs, COUNT(j.job_id) FILTER (WHERE j.status IN ('failed', 'partial')) as failed_jobs, COALESCE(SUM(j.reviews_count), 0) as total_reviews FROM batches b LEFT JOIN jobs j ON j.batch_id = b.batch_id {where_sql} GROUP BY b.batch_id ORDER BY b.created_at DESC LIMIT ${param_idx} OFFSET ${param_idx + 1} """ params.extend([limit, offset]) rows = await conn.fetch(query, *params) results = [] for row in rows: batch = dict(row) # Calculate status total = batch['total_jobs'] completed = batch['completed_jobs'] failed = batch['failed_jobs'] running = batch['running_jobs'] if total == 0: batch['status'] = 'pending' elif completed + failed == total: if failed == total: batch['status'] = 'failed' elif failed > 0: batch['status'] = 'partial' else: batch['status'] = 'completed' elif running > 0 or completed > 0: batch['status'] = 'running' else: batch['status'] = 'pending' # Filter by status if specified if status and batch['status'] != status: continue results.append(batch) return results async def cancel_batch_jobs(db: DatabaseManager, batch_id: UUID) -> tuple: """ Cancel all pending jobs in a batch. Returns: Tuple of (cancelled_count, already_completed_count) """ async with db.pool.acquire() as conn: # Count already completed/failed jobs already_done = await conn.fetchval(""" SELECT COUNT(*) FROM jobs WHERE batch_id = $1 AND status IN ('completed', 'failed', 'partial') """, batch_id) # Cancel pending and running jobs result = await conn.execute(""" UPDATE jobs SET status = 'cancelled', completed_at = NOW() WHERE batch_id = $1 AND status IN ('pending', 'running') """, batch_id) cancelled_count = int(result.split()[-1]) # Update batch completed_at if all jobs are now done await conn.execute(""" UPDATE batches SET updated_at = NOW(), completed_at = CASE WHEN NOT EXISTS ( SELECT 1 FROM jobs WHERE batch_id = $1 AND status IN ('pending', 'running') ) THEN NOW() ELSE completed_at END WHERE batch_id = $1 """, batch_id) log.info(f"Cancelled {cancelled_count} jobs in batch {batch_id}") return cancelled_count, already_done or 0 # ==================== Dependency Injection ==================== # Database instance will be injected from the main app # This is a placeholder that should be overridden when including the router _db: Optional[DatabaseManager] = None def set_database(db: DatabaseManager): """Set the database instance for the router""" global _db _db = db def get_db() -> DatabaseManager: """Dependency to get database instance""" if _db is None: raise HTTPException(status_code=500, detail="Database not initialized") return _db # ==================== API Endpoints ==================== @router.post( "/scrape/google-reviews/batch", response_model=BatchSubmitResponse, summary="Submit Batch of URLs", description="Submit multiple Google Maps URLs as a batch for scraping" ) async def submit_batch( request: BatchSubmitRequest, db: DatabaseManager = Depends(get_db) ): """ Submit multiple URLs as a batch for scraping. Each URL becomes an individual job that can be tracked separately, while the batch provides aggregate status and an optional callback when all jobs complete. - **name**: Descriptive name for the batch (e.g., "Q1 Prospects") - **urls**: List of Google Maps URLs (1-1000 URLs) - **requester**: Client identification for tracking - **priority**: Higher priority batches may be processed first (default: 0) - **callback_url**: Webhook called when ALL jobs in the batch complete Returns batch_id and list of individual job_ids for tracking. """ try: # Create the batch record batch_id = await create_batch( db=db, name=request.name, requester=request.requester, priority=request.priority, callback_url=str(request.callback_url) if request.callback_url else None ) # Create individual jobs for each URL job_ids = [] for url in request.urls: job_id = await create_job_in_batch( db=db, batch_id=batch_id, url=str(url), priority=request.priority, metadata={ 'requester': request.requester.dict(), 'batch_name': request.name } ) job_ids.append(str(job_id)) log.info(f"Created batch {batch_id} with {len(job_ids)} jobs for client {request.requester.client_id}") return BatchSubmitResponse( batch_id=str(batch_id), job_ids=job_ids, total_jobs=len(job_ids) ) except Exception as e: log.error(f"Error creating batch: {e}") raise HTTPException(status_code=500, detail=f"Failed to create batch: {str(e)}") @router.get( "/batches", response_model=List[BatchSummary], summary="List Batches", description="List batches with optional filtering by client_id and status" ) async def list_batches_endpoint( client_id: Optional[str] = Query(None, description="Filter by client ID"), status: Optional[str] = Query(None, description="Filter by batch status (pending, running, completed, partial, failed)"), limit: int = Query(100, description="Maximum number of batches to return", ge=1, le=1000), offset: int = Query(0, description="Number of batches to skip", ge=0), db: DatabaseManager = Depends(get_db) ): """ List batches with optional filters. - **client_id**: Filter to only show batches for a specific client - **status**: Filter by batch status: - pending: No jobs have started yet - running: At least one job is running or completed - completed: All jobs completed successfully - partial: All jobs done, but some failed - failed: All jobs failed - **limit**: Maximum results (default: 100) - **offset**: Skip first N results for pagination """ try: batches = await list_batches( db=db, client_id=client_id, status=status, limit=limit, offset=offset ) return [ BatchSummary( batch_id=str(b['batch_id']), name=b['name'], client_id=b['client_id'], status=b['status'], total_jobs=b['total_jobs'], pending_jobs=b['pending_jobs'], running_jobs=b['running_jobs'], completed_jobs=b['completed_jobs'], failed_jobs=b['failed_jobs'], total_reviews=b['total_reviews'], created_at=b['created_at'].isoformat(), updated_at=b['updated_at'].isoformat() if b.get('updated_at') else None, completed_at=b['completed_at'].isoformat() if b.get('completed_at') else None ) for b in batches ] except Exception as e: log.error(f"Error listing batches: {e}") raise HTTPException(status_code=500, detail=f"Failed to list batches: {str(e)}") @router.get( "/batches/{batch_id}", response_model=BatchDetailResponse, summary="Get Batch Details", description="Get detailed batch information including all jobs" ) async def get_batch_details( batch_id: UUID, db: DatabaseManager = Depends(get_db) ): """ Get detailed information about a specific batch. Returns batch metadata, aggregate statistics, and a list of all jobs with their individual statuses. """ try: batch = await get_batch(db, batch_id) if not batch: raise HTTPException(status_code=404, detail="Batch not found") jobs = await get_batch_jobs(db, batch_id) # Parse requester from JSONB requester_data = batch['requester'] if isinstance(requester_data, str): requester_data = json.loads(requester_data) return BatchDetailResponse( batch_id=str(batch['batch_id']), name=batch['name'], requester=RequesterModel(**requester_data), priority=batch['priority'], callback_url=batch.get('callback_url'), status=batch['status'], total_jobs=batch['total_jobs'], pending_jobs=batch['pending_jobs'], running_jobs=batch['running_jobs'], completed_jobs=batch['completed_jobs'], failed_jobs=batch['failed_jobs'], total_reviews=batch['total_reviews'], created_at=batch['created_at'].isoformat(), updated_at=batch['updated_at'].isoformat() if batch.get('updated_at') else None, completed_at=batch['completed_at'].isoformat() if batch.get('completed_at') else None, jobs=[ BatchJobSummary( job_id=str(j['job_id']), url=j['url'], status=j['status'], reviews_count=j.get('reviews_count'), error_message=j.get('error_message'), created_at=j['created_at'].isoformat(), completed_at=j['completed_at'].isoformat() if j.get('completed_at') else None ) for j in jobs ] ) except HTTPException: raise except Exception as e: log.error(f"Error getting batch {batch_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to get batch: {str(e)}") @router.delete( "/batches/{batch_id}", response_model=BatchCancelResponse, summary="Cancel Batch", description="Cancel all pending jobs in a batch" ) async def cancel_batch( batch_id: UUID, db: DatabaseManager = Depends(get_db) ): """ Cancel all pending and running jobs in a batch. Already completed or failed jobs cannot be cancelled and will remain with their current status. Returns the count of cancelled jobs and already-completed jobs. """ try: # Verify batch exists batch = await get_batch(db, batch_id) if not batch: raise HTTPException(status_code=404, detail="Batch not found") # Cancel the jobs cancelled, already_done = await cancel_batch_jobs(db, batch_id) return BatchCancelResponse( batch_id=str(batch_id), cancelled_jobs=cancelled, already_completed=already_done, message=f"Cancelled {cancelled} jobs. {already_done} jobs were already completed/failed." ) except HTTPException: raise except Exception as e: log.error(f"Error cancelling batch {batch_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to cancel batch: {str(e)}")