Phase 2 - Requester & Batch Support:
- core/database.py: Added create_job params (requester_*, batch_*, priority, callback_*)
- core/database.py: Added batch methods (create_batch, get_batch, update_batch_progress, get_batches)
- core/database.py: Added update_job_callback for tracking webhook delivery
- api/routes/batches.py: New endpoints:
- POST /api/scrape/google-reviews/batch (submit batch)
- GET /api/batches (list batches)
- GET /api/batches/{id} (batch detail)
- DELETE /api/batches/{id} (cancel batch)
- api_server_production.py: Updated /api/scrape with requester, priority, callback fields
- api_server_production.py: New primary endpoint POST /api/scrape/google-reviews
Phase 3 - Webhooks:
- services/job_callback_service.py: New service with:
- JobCallbackService: send_job_callback, send_batch_callback, retry_failed_callbacks
- JobCallbackDispatcher: Background worker for callback monitoring
- Payload formats per spec (job.completed, job.failed, batch.completed)
- Exponential backoff for retries
- Error classification for failure payloads
Phase 4 - Scraper Registry:
- scrapers/registry.py: Database-backed version routing:
- get_scraper(): Version/variant/A/B routing
- _get_weighted_scraper(): Traffic-weighted random selection
- 60-second TTL cache for performance
- register_scraper, deprecate_scraper, update_traffic_allocation
- LegacyScraperRegistry preserved for backwards compatibility
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
673 lines
22 KiB
Python
673 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Batch submission API for ReviewIQ Phase 2.
|
|
|
|
Enables submitting multiple URLs as a batch with:
|
|
- Batch-level tracking and callback
|
|
- Individual job creation for each URL
|
|
- Batch status aggregation
|
|
- Batch cancellation
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from uuid import UUID, uuid4
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Depends
|
|
from pydantic import BaseModel, HttpUrl, Field, validator
|
|
|
|
from core.database import DatabaseManager
|
|
from core.enums import JobStatus
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Create router
|
|
router = APIRouter(prefix="/api", tags=["batches"])
|
|
|
|
|
|
# ==================== Pydantic Models ====================
|
|
|
|
class RequesterModel(BaseModel):
|
|
"""Requester information for batch tracking"""
|
|
client_id: str = Field(..., description="Client identifier (e.g., 'veritas_123')")
|
|
source: str = Field(..., description="Source of the request (e.g., 'veritasreview.com')")
|
|
purpose: Optional[str] = Field(None, description="Purpose of the batch (e.g., 'prospect_screening')")
|
|
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional requester metadata")
|
|
|
|
|
|
class BatchSubmitRequest(BaseModel):
|
|
"""Request model for submitting a batch of URLs"""
|
|
name: str = Field(..., description="Batch name for identification", min_length=1, max_length=255)
|
|
urls: List[HttpUrl] = Field(..., description="List of Google Maps URLs to scrape", min_items=1, max_items=1000)
|
|
requester: RequesterModel = Field(..., description="Requester information")
|
|
priority: int = Field(default=0, description="Priority level (higher = more urgent)", ge=-10, le=10)
|
|
callback_url: Optional[HttpUrl] = Field(None, description="Webhook URL called when ALL jobs complete")
|
|
|
|
@validator('urls')
|
|
def validate_unique_urls(cls, v):
|
|
"""Ensure all URLs are unique"""
|
|
unique_urls = list(set(str(url) for url in v))
|
|
if len(unique_urls) != len(v):
|
|
raise ValueError('Duplicate URLs are not allowed in a batch')
|
|
return v
|
|
|
|
|
|
class BatchSubmitResponse(BaseModel):
|
|
"""Response model for batch submission"""
|
|
batch_id: str = Field(..., description="Unique batch identifier")
|
|
job_ids: List[str] = Field(..., description="List of created job IDs")
|
|
total_jobs: int = Field(..., description="Total number of jobs in the batch")
|
|
|
|
|
|
class BatchJobSummary(BaseModel):
|
|
"""Summary of a job within a batch"""
|
|
job_id: str
|
|
url: str
|
|
status: str
|
|
reviews_count: Optional[int] = None
|
|
error_message: Optional[str] = None
|
|
created_at: str
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
class BatchSummary(BaseModel):
|
|
"""Summary model for batch listing"""
|
|
batch_id: str
|
|
name: str
|
|
client_id: str
|
|
status: str # pending, running, completed, partial, failed
|
|
total_jobs: int
|
|
pending_jobs: int
|
|
running_jobs: int
|
|
completed_jobs: int
|
|
failed_jobs: int
|
|
total_reviews: int
|
|
created_at: str
|
|
updated_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
class BatchDetailResponse(BaseModel):
|
|
"""Detailed batch information with job list"""
|
|
batch_id: str
|
|
name: str
|
|
requester: RequesterModel
|
|
priority: int
|
|
callback_url: Optional[str] = None
|
|
status: str
|
|
total_jobs: int
|
|
pending_jobs: int
|
|
running_jobs: int
|
|
completed_jobs: int
|
|
failed_jobs: int
|
|
total_reviews: int
|
|
created_at: str
|
|
updated_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
jobs: List[BatchJobSummary]
|
|
|
|
|
|
class BatchCancelResponse(BaseModel):
|
|
"""Response model for batch cancellation"""
|
|
batch_id: str
|
|
cancelled_jobs: int
|
|
already_completed: int
|
|
message: str
|
|
|
|
|
|
# ==================== Database Helper Functions ====================
|
|
# These will be added to core.database in a future refactor,
|
|
# but for now we implement them here for the batch functionality.
|
|
|
|
async def create_batch(
|
|
db: DatabaseManager,
|
|
name: str,
|
|
requester: RequesterModel,
|
|
priority: int = 0,
|
|
callback_url: Optional[str] = None
|
|
) -> UUID:
|
|
"""
|
|
Create a new batch record in the database.
|
|
|
|
Returns:
|
|
UUID of created batch
|
|
"""
|
|
async with db.pool.acquire() as conn:
|
|
# First ensure batch table exists
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS batches (
|
|
batch_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
name VARCHAR(255) NOT NULL,
|
|
client_id VARCHAR(255) NOT NULL,
|
|
requester JSONB NOT NULL,
|
|
priority INTEGER NOT NULL DEFAULT 0,
|
|
callback_url TEXT,
|
|
callback_delivered BOOLEAN DEFAULT FALSE,
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMP,
|
|
completed_at TIMESTAMP
|
|
);
|
|
""")
|
|
|
|
# Create index on client_id if not exists
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_batches_client_id ON batches(client_id);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_batches_created_at ON batches(created_at DESC);
|
|
""")
|
|
|
|
# Ensure batch_id column exists in jobs table
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS batch_id UUID REFERENCES batches(batch_id) ON DELETE SET NULL;
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_batch_id ON jobs(batch_id) WHERE batch_id IS NOT NULL;
|
|
""")
|
|
|
|
batch_id = await conn.fetchval("""
|
|
INSERT INTO batches (name, client_id, requester, priority, callback_url)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING batch_id
|
|
""", name, requester.client_id, json.dumps(requester.dict()), priority, callback_url)
|
|
|
|
log.info(f"Created batch {batch_id}: '{name}' for client {requester.client_id}")
|
|
return batch_id
|
|
|
|
|
|
async def create_job_in_batch(
|
|
db: DatabaseManager,
|
|
batch_id: UUID,
|
|
url: str,
|
|
priority: int = 0,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> UUID:
|
|
"""
|
|
Create a job that belongs to a batch.
|
|
|
|
Returns:
|
|
UUID of created job
|
|
"""
|
|
job_metadata = metadata or {}
|
|
job_metadata['batch_id'] = str(batch_id)
|
|
job_metadata['priority'] = priority
|
|
|
|
async with db.pool.acquire() as conn:
|
|
job_id = await conn.fetchval("""
|
|
INSERT INTO jobs (url, batch_id, metadata)
|
|
VALUES ($1, $2, $3)
|
|
RETURNING job_id
|
|
""", url, batch_id, json.dumps(job_metadata))
|
|
|
|
return job_id
|
|
|
|
|
|
async def get_batch(db: DatabaseManager, batch_id: UUID) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get batch by ID with aggregated job stats.
|
|
|
|
Returns:
|
|
Batch dictionary with job stats or None if not found
|
|
"""
|
|
async with db.pool.acquire() as conn:
|
|
# Get batch basic info
|
|
batch_row = await conn.fetchrow("""
|
|
SELECT
|
|
batch_id,
|
|
name,
|
|
client_id,
|
|
requester,
|
|
priority,
|
|
callback_url,
|
|
callback_delivered,
|
|
created_at,
|
|
updated_at,
|
|
completed_at
|
|
FROM batches
|
|
WHERE batch_id = $1
|
|
""", batch_id)
|
|
|
|
if not batch_row:
|
|
return None
|
|
|
|
# Get job stats for this batch
|
|
stats = await conn.fetchrow("""
|
|
SELECT
|
|
COUNT(*) as total_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'running') as running_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
|
|
COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed_jobs,
|
|
COALESCE(SUM(reviews_count), 0) as total_reviews
|
|
FROM jobs
|
|
WHERE batch_id = $1
|
|
""", batch_id)
|
|
|
|
result = dict(batch_row)
|
|
result.update(dict(stats))
|
|
|
|
# Calculate batch status
|
|
total = stats['total_jobs']
|
|
completed = stats['completed_jobs']
|
|
failed = stats['failed_jobs']
|
|
running = stats['running_jobs']
|
|
|
|
if total == 0:
|
|
result['status'] = 'pending'
|
|
elif completed + failed == total:
|
|
if failed == total:
|
|
result['status'] = 'failed'
|
|
elif failed > 0:
|
|
result['status'] = 'partial'
|
|
else:
|
|
result['status'] = 'completed'
|
|
elif running > 0 or completed > 0:
|
|
result['status'] = 'running'
|
|
else:
|
|
result['status'] = 'pending'
|
|
|
|
return result
|
|
|
|
|
|
async def get_batch_jobs(db: DatabaseManager, batch_id: UUID) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all jobs in a batch.
|
|
|
|
Returns:
|
|
List of job dictionaries
|
|
"""
|
|
async with db.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
url,
|
|
status,
|
|
reviews_count,
|
|
error_message,
|
|
created_at,
|
|
completed_at
|
|
FROM jobs
|
|
WHERE batch_id = $1
|
|
ORDER BY created_at ASC
|
|
""", batch_id)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def list_batches(
|
|
db: DatabaseManager,
|
|
client_id: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List batches with optional filtering.
|
|
|
|
Returns:
|
|
List of batch summary dictionaries
|
|
"""
|
|
async with db.pool.acquire() as conn:
|
|
# Build query dynamically based on filters
|
|
where_clauses = []
|
|
params = []
|
|
param_idx = 1
|
|
|
|
if client_id:
|
|
where_clauses.append(f"b.client_id = ${param_idx}")
|
|
params.append(client_id)
|
|
param_idx += 1
|
|
|
|
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
|
|
|
query = f"""
|
|
SELECT
|
|
b.batch_id,
|
|
b.name,
|
|
b.client_id,
|
|
b.created_at,
|
|
b.updated_at,
|
|
b.completed_at,
|
|
COUNT(j.job_id) as total_jobs,
|
|
COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs,
|
|
COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs,
|
|
COUNT(j.job_id) FILTER (WHERE j.status = 'completed') as completed_jobs,
|
|
COUNT(j.job_id) FILTER (WHERE j.status IN ('failed', 'partial')) as failed_jobs,
|
|
COALESCE(SUM(j.reviews_count), 0) as total_reviews
|
|
FROM batches b
|
|
LEFT JOIN jobs j ON j.batch_id = b.batch_id
|
|
{where_sql}
|
|
GROUP BY b.batch_id
|
|
ORDER BY b.created_at DESC
|
|
LIMIT ${param_idx} OFFSET ${param_idx + 1}
|
|
"""
|
|
params.extend([limit, offset])
|
|
|
|
rows = await conn.fetch(query, *params)
|
|
|
|
results = []
|
|
for row in rows:
|
|
batch = dict(row)
|
|
|
|
# Calculate status
|
|
total = batch['total_jobs']
|
|
completed = batch['completed_jobs']
|
|
failed = batch['failed_jobs']
|
|
running = batch['running_jobs']
|
|
|
|
if total == 0:
|
|
batch['status'] = 'pending'
|
|
elif completed + failed == total:
|
|
if failed == total:
|
|
batch['status'] = 'failed'
|
|
elif failed > 0:
|
|
batch['status'] = 'partial'
|
|
else:
|
|
batch['status'] = 'completed'
|
|
elif running > 0 or completed > 0:
|
|
batch['status'] = 'running'
|
|
else:
|
|
batch['status'] = 'pending'
|
|
|
|
# Filter by status if specified
|
|
if status and batch['status'] != status:
|
|
continue
|
|
|
|
results.append(batch)
|
|
|
|
return results
|
|
|
|
|
|
async def cancel_batch_jobs(db: DatabaseManager, batch_id: UUID) -> tuple:
|
|
"""
|
|
Cancel all pending jobs in a batch.
|
|
|
|
Returns:
|
|
Tuple of (cancelled_count, already_completed_count)
|
|
"""
|
|
async with db.pool.acquire() as conn:
|
|
# Count already completed/failed jobs
|
|
already_done = await conn.fetchval("""
|
|
SELECT COUNT(*)
|
|
FROM jobs
|
|
WHERE batch_id = $1 AND status IN ('completed', 'failed', 'partial')
|
|
""", batch_id)
|
|
|
|
# Cancel pending and running jobs
|
|
result = await conn.execute("""
|
|
UPDATE jobs
|
|
SET status = 'cancelled', completed_at = NOW()
|
|
WHERE batch_id = $1 AND status IN ('pending', 'running')
|
|
""", batch_id)
|
|
|
|
cancelled_count = int(result.split()[-1])
|
|
|
|
# Update batch completed_at if all jobs are now done
|
|
await conn.execute("""
|
|
UPDATE batches
|
|
SET updated_at = NOW(),
|
|
completed_at = CASE
|
|
WHEN NOT EXISTS (
|
|
SELECT 1 FROM jobs
|
|
WHERE batch_id = $1 AND status IN ('pending', 'running')
|
|
) THEN NOW()
|
|
ELSE completed_at
|
|
END
|
|
WHERE batch_id = $1
|
|
""", batch_id)
|
|
|
|
log.info(f"Cancelled {cancelled_count} jobs in batch {batch_id}")
|
|
return cancelled_count, already_done or 0
|
|
|
|
|
|
# ==================== Dependency Injection ====================
|
|
|
|
# Database instance will be injected from the main app
|
|
# This is a placeholder that should be overridden when including the router
|
|
_db: Optional[DatabaseManager] = None
|
|
|
|
|
|
def set_database(db: DatabaseManager):
|
|
"""Set the database instance for the router"""
|
|
global _db
|
|
_db = db
|
|
|
|
|
|
def get_db() -> DatabaseManager:
|
|
"""Dependency to get database instance"""
|
|
if _db is None:
|
|
raise HTTPException(status_code=500, detail="Database not initialized")
|
|
return _db
|
|
|
|
|
|
# ==================== API Endpoints ====================
|
|
|
|
@router.post(
|
|
"/scrape/google-reviews/batch",
|
|
response_model=BatchSubmitResponse,
|
|
summary="Submit Batch of URLs",
|
|
description="Submit multiple Google Maps URLs as a batch for scraping"
|
|
)
|
|
async def submit_batch(
|
|
request: BatchSubmitRequest,
|
|
db: DatabaseManager = Depends(get_db)
|
|
):
|
|
"""
|
|
Submit multiple URLs as a batch for scraping.
|
|
|
|
Each URL becomes an individual job that can be tracked separately,
|
|
while the batch provides aggregate status and an optional callback
|
|
when all jobs complete.
|
|
|
|
- **name**: Descriptive name for the batch (e.g., "Q1 Prospects")
|
|
- **urls**: List of Google Maps URLs (1-1000 URLs)
|
|
- **requester**: Client identification for tracking
|
|
- **priority**: Higher priority batches may be processed first (default: 0)
|
|
- **callback_url**: Webhook called when ALL jobs in the batch complete
|
|
|
|
Returns batch_id and list of individual job_ids for tracking.
|
|
"""
|
|
try:
|
|
# Create the batch record
|
|
batch_id = await create_batch(
|
|
db=db,
|
|
name=request.name,
|
|
requester=request.requester,
|
|
priority=request.priority,
|
|
callback_url=str(request.callback_url) if request.callback_url else None
|
|
)
|
|
|
|
# Create individual jobs for each URL
|
|
job_ids = []
|
|
for url in request.urls:
|
|
job_id = await create_job_in_batch(
|
|
db=db,
|
|
batch_id=batch_id,
|
|
url=str(url),
|
|
priority=request.priority,
|
|
metadata={
|
|
'requester': request.requester.dict(),
|
|
'batch_name': request.name
|
|
}
|
|
)
|
|
job_ids.append(str(job_id))
|
|
|
|
log.info(f"Created batch {batch_id} with {len(job_ids)} jobs for client {request.requester.client_id}")
|
|
|
|
return BatchSubmitResponse(
|
|
batch_id=str(batch_id),
|
|
job_ids=job_ids,
|
|
total_jobs=len(job_ids)
|
|
)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error creating batch: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to create batch: {str(e)}")
|
|
|
|
|
|
@router.get(
|
|
"/batches",
|
|
response_model=List[BatchSummary],
|
|
summary="List Batches",
|
|
description="List batches with optional filtering by client_id and status"
|
|
)
|
|
async def list_batches_endpoint(
|
|
client_id: Optional[str] = Query(None, description="Filter by client ID"),
|
|
status: Optional[str] = Query(None, description="Filter by batch status (pending, running, completed, partial, failed)"),
|
|
limit: int = Query(100, description="Maximum number of batches to return", ge=1, le=1000),
|
|
offset: int = Query(0, description="Number of batches to skip", ge=0),
|
|
db: DatabaseManager = Depends(get_db)
|
|
):
|
|
"""
|
|
List batches with optional filters.
|
|
|
|
- **client_id**: Filter to only show batches for a specific client
|
|
- **status**: Filter by batch status:
|
|
- pending: No jobs have started yet
|
|
- running: At least one job is running or completed
|
|
- completed: All jobs completed successfully
|
|
- partial: All jobs done, but some failed
|
|
- failed: All jobs failed
|
|
- **limit**: Maximum results (default: 100)
|
|
- **offset**: Skip first N results for pagination
|
|
"""
|
|
try:
|
|
batches = await list_batches(
|
|
db=db,
|
|
client_id=client_id,
|
|
status=status,
|
|
limit=limit,
|
|
offset=offset
|
|
)
|
|
|
|
return [
|
|
BatchSummary(
|
|
batch_id=str(b['batch_id']),
|
|
name=b['name'],
|
|
client_id=b['client_id'],
|
|
status=b['status'],
|
|
total_jobs=b['total_jobs'],
|
|
pending_jobs=b['pending_jobs'],
|
|
running_jobs=b['running_jobs'],
|
|
completed_jobs=b['completed_jobs'],
|
|
failed_jobs=b['failed_jobs'],
|
|
total_reviews=b['total_reviews'],
|
|
created_at=b['created_at'].isoformat(),
|
|
updated_at=b['updated_at'].isoformat() if b.get('updated_at') else None,
|
|
completed_at=b['completed_at'].isoformat() if b.get('completed_at') else None
|
|
)
|
|
for b in batches
|
|
]
|
|
|
|
except Exception as e:
|
|
log.error(f"Error listing batches: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to list batches: {str(e)}")
|
|
|
|
|
|
@router.get(
|
|
"/batches/{batch_id}",
|
|
response_model=BatchDetailResponse,
|
|
summary="Get Batch Details",
|
|
description="Get detailed batch information including all jobs"
|
|
)
|
|
async def get_batch_details(
|
|
batch_id: UUID,
|
|
db: DatabaseManager = Depends(get_db)
|
|
):
|
|
"""
|
|
Get detailed information about a specific batch.
|
|
|
|
Returns batch metadata, aggregate statistics, and a list of all jobs
|
|
with their individual statuses.
|
|
"""
|
|
try:
|
|
batch = await get_batch(db, batch_id)
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="Batch not found")
|
|
|
|
jobs = await get_batch_jobs(db, batch_id)
|
|
|
|
# Parse requester from JSONB
|
|
requester_data = batch['requester']
|
|
if isinstance(requester_data, str):
|
|
requester_data = json.loads(requester_data)
|
|
|
|
return BatchDetailResponse(
|
|
batch_id=str(batch['batch_id']),
|
|
name=batch['name'],
|
|
requester=RequesterModel(**requester_data),
|
|
priority=batch['priority'],
|
|
callback_url=batch.get('callback_url'),
|
|
status=batch['status'],
|
|
total_jobs=batch['total_jobs'],
|
|
pending_jobs=batch['pending_jobs'],
|
|
running_jobs=batch['running_jobs'],
|
|
completed_jobs=batch['completed_jobs'],
|
|
failed_jobs=batch['failed_jobs'],
|
|
total_reviews=batch['total_reviews'],
|
|
created_at=batch['created_at'].isoformat(),
|
|
updated_at=batch['updated_at'].isoformat() if batch.get('updated_at') else None,
|
|
completed_at=batch['completed_at'].isoformat() if batch.get('completed_at') else None,
|
|
jobs=[
|
|
BatchJobSummary(
|
|
job_id=str(j['job_id']),
|
|
url=j['url'],
|
|
status=j['status'],
|
|
reviews_count=j.get('reviews_count'),
|
|
error_message=j.get('error_message'),
|
|
created_at=j['created_at'].isoformat(),
|
|
completed_at=j['completed_at'].isoformat() if j.get('completed_at') else None
|
|
)
|
|
for j in jobs
|
|
]
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
log.error(f"Error getting batch {batch_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get batch: {str(e)}")
|
|
|
|
|
|
@router.delete(
|
|
"/batches/{batch_id}",
|
|
response_model=BatchCancelResponse,
|
|
summary="Cancel Batch",
|
|
description="Cancel all pending jobs in a batch"
|
|
)
|
|
async def cancel_batch(
|
|
batch_id: UUID,
|
|
db: DatabaseManager = Depends(get_db)
|
|
):
|
|
"""
|
|
Cancel all pending and running jobs in a batch.
|
|
|
|
Already completed or failed jobs cannot be cancelled and will remain
|
|
with their current status.
|
|
|
|
Returns the count of cancelled jobs and already-completed jobs.
|
|
"""
|
|
try:
|
|
# Verify batch exists
|
|
batch = await get_batch(db, batch_id)
|
|
if not batch:
|
|
raise HTTPException(status_code=404, detail="Batch not found")
|
|
|
|
# Cancel the jobs
|
|
cancelled, already_done = await cancel_batch_jobs(db, batch_id)
|
|
|
|
return BatchCancelResponse(
|
|
batch_id=str(batch_id),
|
|
cancelled_jobs=cancelled,
|
|
already_completed=already_done,
|
|
message=f"Cancelled {cancelled} jobs. {already_done} jobs were already completed/failed."
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
log.error(f"Error cancelling batch {batch_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to cancel batch: {str(e)}")
|