Files
Alejandro Gutiérrez 788ef84756 Phases 2-4: Requester support, batches, webhooks, scraper registry
Phase 2 - Requester & Batch Support:
- core/database.py: Added create_job params (requester_*, batch_*, priority, callback_*)
- core/database.py: Added batch methods (create_batch, get_batch, update_batch_progress, get_batches)
- core/database.py: Added update_job_callback for tracking webhook delivery
- api/routes/batches.py: New endpoints:
  - POST /api/scrape/google-reviews/batch (submit batch)
  - GET /api/batches (list batches)
  - GET /api/batches/{id} (batch detail)
  - DELETE /api/batches/{id} (cancel batch)
- api_server_production.py: Updated /api/scrape with requester, priority, callback fields
- api_server_production.py: New primary endpoint POST /api/scrape/google-reviews

Phase 3 - Webhooks:
- services/job_callback_service.py: New service with:
  - JobCallbackService: send_job_callback, send_batch_callback, retry_failed_callbacks
  - JobCallbackDispatcher: Background worker for callback monitoring
  - Payload formats per spec (job.completed, job.failed, batch.completed)
  - Exponential backoff for retries
  - Error classification for failure payloads

Phase 4 - Scraper Registry:
- scrapers/registry.py: Database-backed version routing:
  - get_scraper(): Version/variant/A/B routing
  - _get_weighted_scraper(): Traffic-weighted random selection
  - 60-second TTL cache for performance
  - register_scraper, deprecate_scraper, update_traffic_allocation
  - LegacyScraperRegistry preserved for backwards compatibility

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:35:58 +00:00

673 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Batch submission API for ReviewIQ Phase 2.
Enables submitting multiple URLs as a batch with:
- Batch-level tracking and callback
- Individual job creation for each URL
- Batch status aggregation
- Batch cancellation
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
from uuid import UUID, uuid4
from fastapi import APIRouter, HTTPException, Query, Depends
from pydantic import BaseModel, HttpUrl, Field, validator
from core.database import DatabaseManager
from core.enums import JobStatus
log = logging.getLogger(__name__)
# Create router
router = APIRouter(prefix="/api", tags=["batches"])
# ==================== Pydantic Models ====================
class RequesterModel(BaseModel):
"""Requester information for batch tracking"""
client_id: str = Field(..., description="Client identifier (e.g., 'veritas_123')")
source: str = Field(..., description="Source of the request (e.g., 'veritasreview.com')")
purpose: Optional[str] = Field(None, description="Purpose of the batch (e.g., 'prospect_screening')")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional requester metadata")
class BatchSubmitRequest(BaseModel):
"""Request model for submitting a batch of URLs"""
name: str = Field(..., description="Batch name for identification", min_length=1, max_length=255)
urls: List[HttpUrl] = Field(..., description="List of Google Maps URLs to scrape", min_items=1, max_items=1000)
requester: RequesterModel = Field(..., description="Requester information")
priority: int = Field(default=0, description="Priority level (higher = more urgent)", ge=-10, le=10)
callback_url: Optional[HttpUrl] = Field(None, description="Webhook URL called when ALL jobs complete")
@validator('urls')
def validate_unique_urls(cls, v):
"""Ensure all URLs are unique"""
unique_urls = list(set(str(url) for url in v))
if len(unique_urls) != len(v):
raise ValueError('Duplicate URLs are not allowed in a batch')
return v
class BatchSubmitResponse(BaseModel):
"""Response model for batch submission"""
batch_id: str = Field(..., description="Unique batch identifier")
job_ids: List[str] = Field(..., description="List of created job IDs")
total_jobs: int = Field(..., description="Total number of jobs in the batch")
class BatchJobSummary(BaseModel):
"""Summary of a job within a batch"""
job_id: str
url: str
status: str
reviews_count: Optional[int] = None
error_message: Optional[str] = None
created_at: str
completed_at: Optional[str] = None
class BatchSummary(BaseModel):
"""Summary model for batch listing"""
batch_id: str
name: str
client_id: str
status: str # pending, running, completed, partial, failed
total_jobs: int
pending_jobs: int
running_jobs: int
completed_jobs: int
failed_jobs: int
total_reviews: int
created_at: str
updated_at: Optional[str] = None
completed_at: Optional[str] = None
class BatchDetailResponse(BaseModel):
"""Detailed batch information with job list"""
batch_id: str
name: str
requester: RequesterModel
priority: int
callback_url: Optional[str] = None
status: str
total_jobs: int
pending_jobs: int
running_jobs: int
completed_jobs: int
failed_jobs: int
total_reviews: int
created_at: str
updated_at: Optional[str] = None
completed_at: Optional[str] = None
jobs: List[BatchJobSummary]
class BatchCancelResponse(BaseModel):
"""Response model for batch cancellation"""
batch_id: str
cancelled_jobs: int
already_completed: int
message: str
# ==================== Database Helper Functions ====================
# These will be added to core.database in a future refactor,
# but for now we implement them here for the batch functionality.
async def create_batch(
db: DatabaseManager,
name: str,
requester: RequesterModel,
priority: int = 0,
callback_url: Optional[str] = None
) -> UUID:
"""
Create a new batch record in the database.
Returns:
UUID of created batch
"""
async with db.pool.acquire() as conn:
# First ensure batch table exists
await conn.execute("""
CREATE TABLE IF NOT EXISTS batches (
batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
client_id VARCHAR(255) NOT NULL,
requester JSONB NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
callback_url TEXT,
callback_delivered BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP,
completed_at TIMESTAMP
);
""")
# Create index on client_id if not exists
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_batches_client_id ON batches(client_id);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_batches_created_at ON batches(created_at DESC);
""")
# Ensure batch_id column exists in jobs table
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS batch_id UUID REFERENCES batches(batch_id) ON DELETE SET NULL;
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_jobs_batch_id ON jobs(batch_id) WHERE batch_id IS NOT NULL;
""")
batch_id = await conn.fetchval("""
INSERT INTO batches (name, client_id, requester, priority, callback_url)
VALUES ($1, $2, $3, $4, $5)
RETURNING batch_id
""", name, requester.client_id, json.dumps(requester.dict()), priority, callback_url)
log.info(f"Created batch {batch_id}: '{name}' for client {requester.client_id}")
return batch_id
async def create_job_in_batch(
db: DatabaseManager,
batch_id: UUID,
url: str,
priority: int = 0,
metadata: Optional[Dict[str, Any]] = None
) -> UUID:
"""
Create a job that belongs to a batch.
Returns:
UUID of created job
"""
job_metadata = metadata or {}
job_metadata['batch_id'] = str(batch_id)
job_metadata['priority'] = priority
async with db.pool.acquire() as conn:
job_id = await conn.fetchval("""
INSERT INTO jobs (url, batch_id, metadata)
VALUES ($1, $2, $3)
RETURNING job_id
""", url, batch_id, json.dumps(job_metadata))
return job_id
async def get_batch(db: DatabaseManager, batch_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get batch by ID with aggregated job stats.
Returns:
Batch dictionary with job stats or None if not found
"""
async with db.pool.acquire() as conn:
# Get batch basic info
batch_row = await conn.fetchrow("""
SELECT
batch_id,
name,
client_id,
requester,
priority,
callback_url,
callback_delivered,
created_at,
updated_at,
completed_at
FROM batches
WHERE batch_id = $1
""", batch_id)
if not batch_row:
return None
# Get job stats for this batch
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs,
COUNT(*) FILTER (WHERE status = 'running') as running_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed_jobs,
COALESCE(SUM(reviews_count), 0) as total_reviews
FROM jobs
WHERE batch_id = $1
""", batch_id)
result = dict(batch_row)
result.update(dict(stats))
# Calculate batch status
total = stats['total_jobs']
completed = stats['completed_jobs']
failed = stats['failed_jobs']
running = stats['running_jobs']
if total == 0:
result['status'] = 'pending'
elif completed + failed == total:
if failed == total:
result['status'] = 'failed'
elif failed > 0:
result['status'] = 'partial'
else:
result['status'] = 'completed'
elif running > 0 or completed > 0:
result['status'] = 'running'
else:
result['status'] = 'pending'
return result
async def get_batch_jobs(db: DatabaseManager, batch_id: UUID) -> List[Dict[str, Any]]:
"""
Get all jobs in a batch.
Returns:
List of job dictionaries
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
job_id,
url,
status,
reviews_count,
error_message,
created_at,
completed_at
FROM jobs
WHERE batch_id = $1
ORDER BY created_at ASC
""", batch_id)
return [dict(row) for row in rows]
async def list_batches(
db: DatabaseManager,
client_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
List batches with optional filtering.
Returns:
List of batch summary dictionaries
"""
async with db.pool.acquire() as conn:
# Build query dynamically based on filters
where_clauses = []
params = []
param_idx = 1
if client_id:
where_clauses.append(f"b.client_id = ${param_idx}")
params.append(client_id)
param_idx += 1
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
SELECT
b.batch_id,
b.name,
b.client_id,
b.created_at,
b.updated_at,
b.completed_at,
COUNT(j.job_id) as total_jobs,
COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs,
COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs,
COUNT(j.job_id) FILTER (WHERE j.status = 'completed') as completed_jobs,
COUNT(j.job_id) FILTER (WHERE j.status IN ('failed', 'partial')) as failed_jobs,
COALESCE(SUM(j.reviews_count), 0) as total_reviews
FROM batches b
LEFT JOIN jobs j ON j.batch_id = b.batch_id
{where_sql}
GROUP BY b.batch_id
ORDER BY b.created_at DESC
LIMIT ${param_idx} OFFSET ${param_idx + 1}
"""
params.extend([limit, offset])
rows = await conn.fetch(query, *params)
results = []
for row in rows:
batch = dict(row)
# Calculate status
total = batch['total_jobs']
completed = batch['completed_jobs']
failed = batch['failed_jobs']
running = batch['running_jobs']
if total == 0:
batch['status'] = 'pending'
elif completed + failed == total:
if failed == total:
batch['status'] = 'failed'
elif failed > 0:
batch['status'] = 'partial'
else:
batch['status'] = 'completed'
elif running > 0 or completed > 0:
batch['status'] = 'running'
else:
batch['status'] = 'pending'
# Filter by status if specified
if status and batch['status'] != status:
continue
results.append(batch)
return results
async def cancel_batch_jobs(db: DatabaseManager, batch_id: UUID) -> tuple:
"""
Cancel all pending jobs in a batch.
Returns:
Tuple of (cancelled_count, already_completed_count)
"""
async with db.pool.acquire() as conn:
# Count already completed/failed jobs
already_done = await conn.fetchval("""
SELECT COUNT(*)
FROM jobs
WHERE batch_id = $1 AND status IN ('completed', 'failed', 'partial')
""", batch_id)
# Cancel pending and running jobs
result = await conn.execute("""
UPDATE jobs
SET status = 'cancelled', completed_at = NOW()
WHERE batch_id = $1 AND status IN ('pending', 'running')
""", batch_id)
cancelled_count = int(result.split()[-1])
# Update batch completed_at if all jobs are now done
await conn.execute("""
UPDATE batches
SET updated_at = NOW(),
completed_at = CASE
WHEN NOT EXISTS (
SELECT 1 FROM jobs
WHERE batch_id = $1 AND status IN ('pending', 'running')
) THEN NOW()
ELSE completed_at
END
WHERE batch_id = $1
""", batch_id)
log.info(f"Cancelled {cancelled_count} jobs in batch {batch_id}")
return cancelled_count, already_done or 0
# ==================== Dependency Injection ====================
# Database instance will be injected from the main app
# This is a placeholder that should be overridden when including the router
_db: Optional[DatabaseManager] = None
def set_database(db: DatabaseManager):
"""Set the database instance for the router"""
global _db
_db = db
def get_db() -> DatabaseManager:
"""Dependency to get database instance"""
if _db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
return _db
# ==================== API Endpoints ====================
@router.post(
"/scrape/google-reviews/batch",
response_model=BatchSubmitResponse,
summary="Submit Batch of URLs",
description="Submit multiple Google Maps URLs as a batch for scraping"
)
async def submit_batch(
request: BatchSubmitRequest,
db: DatabaseManager = Depends(get_db)
):
"""
Submit multiple URLs as a batch for scraping.
Each URL becomes an individual job that can be tracked separately,
while the batch provides aggregate status and an optional callback
when all jobs complete.
- **name**: Descriptive name for the batch (e.g., "Q1 Prospects")
- **urls**: List of Google Maps URLs (1-1000 URLs)
- **requester**: Client identification for tracking
- **priority**: Higher priority batches may be processed first (default: 0)
- **callback_url**: Webhook called when ALL jobs in the batch complete
Returns batch_id and list of individual job_ids for tracking.
"""
try:
# Create the batch record
batch_id = await create_batch(
db=db,
name=request.name,
requester=request.requester,
priority=request.priority,
callback_url=str(request.callback_url) if request.callback_url else None
)
# Create individual jobs for each URL
job_ids = []
for url in request.urls:
job_id = await create_job_in_batch(
db=db,
batch_id=batch_id,
url=str(url),
priority=request.priority,
metadata={
'requester': request.requester.dict(),
'batch_name': request.name
}
)
job_ids.append(str(job_id))
log.info(f"Created batch {batch_id} with {len(job_ids)} jobs for client {request.requester.client_id}")
return BatchSubmitResponse(
batch_id=str(batch_id),
job_ids=job_ids,
total_jobs=len(job_ids)
)
except Exception as e:
log.error(f"Error creating batch: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create batch: {str(e)}")
@router.get(
"/batches",
response_model=List[BatchSummary],
summary="List Batches",
description="List batches with optional filtering by client_id and status"
)
async def list_batches_endpoint(
client_id: Optional[str] = Query(None, description="Filter by client ID"),
status: Optional[str] = Query(None, description="Filter by batch status (pending, running, completed, partial, failed)"),
limit: int = Query(100, description="Maximum number of batches to return", ge=1, le=1000),
offset: int = Query(0, description="Number of batches to skip", ge=0),
db: DatabaseManager = Depends(get_db)
):
"""
List batches with optional filters.
- **client_id**: Filter to only show batches for a specific client
- **status**: Filter by batch status:
- pending: No jobs have started yet
- running: At least one job is running or completed
- completed: All jobs completed successfully
- partial: All jobs done, but some failed
- failed: All jobs failed
- **limit**: Maximum results (default: 100)
- **offset**: Skip first N results for pagination
"""
try:
batches = await list_batches(
db=db,
client_id=client_id,
status=status,
limit=limit,
offset=offset
)
return [
BatchSummary(
batch_id=str(b['batch_id']),
name=b['name'],
client_id=b['client_id'],
status=b['status'],
total_jobs=b['total_jobs'],
pending_jobs=b['pending_jobs'],
running_jobs=b['running_jobs'],
completed_jobs=b['completed_jobs'],
failed_jobs=b['failed_jobs'],
total_reviews=b['total_reviews'],
created_at=b['created_at'].isoformat(),
updated_at=b['updated_at'].isoformat() if b.get('updated_at') else None,
completed_at=b['completed_at'].isoformat() if b.get('completed_at') else None
)
for b in batches
]
except Exception as e:
log.error(f"Error listing batches: {e}")
raise HTTPException(status_code=500, detail=f"Failed to list batches: {str(e)}")
@router.get(
"/batches/{batch_id}",
response_model=BatchDetailResponse,
summary="Get Batch Details",
description="Get detailed batch information including all jobs"
)
async def get_batch_details(
batch_id: UUID,
db: DatabaseManager = Depends(get_db)
):
"""
Get detailed information about a specific batch.
Returns batch metadata, aggregate statistics, and a list of all jobs
with their individual statuses.
"""
try:
batch = await get_batch(db, batch_id)
if not batch:
raise HTTPException(status_code=404, detail="Batch not found")
jobs = await get_batch_jobs(db, batch_id)
# Parse requester from JSONB
requester_data = batch['requester']
if isinstance(requester_data, str):
requester_data = json.loads(requester_data)
return BatchDetailResponse(
batch_id=str(batch['batch_id']),
name=batch['name'],
requester=RequesterModel(**requester_data),
priority=batch['priority'],
callback_url=batch.get('callback_url'),
status=batch['status'],
total_jobs=batch['total_jobs'],
pending_jobs=batch['pending_jobs'],
running_jobs=batch['running_jobs'],
completed_jobs=batch['completed_jobs'],
failed_jobs=batch['failed_jobs'],
total_reviews=batch['total_reviews'],
created_at=batch['created_at'].isoformat(),
updated_at=batch['updated_at'].isoformat() if batch.get('updated_at') else None,
completed_at=batch['completed_at'].isoformat() if batch.get('completed_at') else None,
jobs=[
BatchJobSummary(
job_id=str(j['job_id']),
url=j['url'],
status=j['status'],
reviews_count=j.get('reviews_count'),
error_message=j.get('error_message'),
created_at=j['created_at'].isoformat(),
completed_at=j['completed_at'].isoformat() if j.get('completed_at') else None
)
for j in jobs
]
)
except HTTPException:
raise
except Exception as e:
log.error(f"Error getting batch {batch_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get batch: {str(e)}")
@router.delete(
"/batches/{batch_id}",
response_model=BatchCancelResponse,
summary="Cancel Batch",
description="Cancel all pending jobs in a batch"
)
async def cancel_batch(
batch_id: UUID,
db: DatabaseManager = Depends(get_db)
):
"""
Cancel all pending and running jobs in a batch.
Already completed or failed jobs cannot be cancelled and will remain
with their current status.
Returns the count of cancelled jobs and already-completed jobs.
"""
try:
# Verify batch exists
batch = await get_batch(db, batch_id)
if not batch:
raise HTTPException(status_code=404, detail="Batch not found")
# Cancel the jobs
cancelled, already_done = await cancel_batch_jobs(db, batch_id)
return BatchCancelResponse(
batch_id=str(batch_id),
cancelled_jobs=cancelled,
already_completed=already_done,
message=f"Cancelled {cancelled} jobs. {already_done} jobs were already completed/failed."
)
except HTTPException:
raise
except Exception as e:
log.error(f"Error cancelling batch {batch_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to cancel batch: {str(e)}")