Phase 2 - Requester & Batch Support:
- core/database.py: Added create_job params (requester_*, batch_*, priority, callback_*)
- core/database.py: Added batch methods (create_batch, get_batch, update_batch_progress, get_batches)
- core/database.py: Added update_job_callback for tracking webhook delivery
- api/routes/batches.py: New endpoints:
- POST /api/scrape/google-reviews/batch (submit batch)
- GET /api/batches (list batches)
- GET /api/batches/{id} (batch detail)
- DELETE /api/batches/{id} (cancel batch)
- api_server_production.py: Updated /api/scrape with requester, priority, callback fields
- api_server_production.py: New primary endpoint POST /api/scrape/google-reviews
Phase 3 - Webhooks:
- services/job_callback_service.py: New service with:
- JobCallbackService: send_job_callback, send_batch_callback, retry_failed_callbacks
- JobCallbackDispatcher: Background worker for callback monitoring
- Payload formats per spec (job.completed, job.failed, batch.completed)
- Exponential backoff for retries
- Error classification for failure payloads
Phase 4 - Scraper Registry:
- scrapers/registry.py: Database-backed version routing:
- get_scraper(): Version/variant/A/B routing
- _get_weighted_scraper(): Traffic-weighted random selection
- 60-second TTL cache for performance
- register_scraper, deprecate_scraper, update_traffic_allocation
- LegacyScraperRegistry preserved for backwards compatibility
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
718 lines
23 KiB
Python
718 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Job Callback Service for webhook delivery on job and batch completion.
|
|
|
|
This service handles sending webhooks when jobs complete or fail,
|
|
as well as batch-level completion callbacks.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
from uuid import UUID
|
|
|
|
import httpx
|
|
|
|
from services.webhook_service import WebhookManager
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Scraper version (should match the deployed scraper)
|
|
SCRAPER_VERSION = "1.0.0"
|
|
|
|
|
|
class CallbackStatus:
|
|
"""Callback status constants"""
|
|
PENDING = "pending"
|
|
SENT = "sent"
|
|
FAILED = "failed"
|
|
|
|
|
|
class JobCallbackService:
|
|
"""
|
|
Handles webhook callbacks for job and batch completion.
|
|
|
|
This service is responsible for:
|
|
- Sending callbacks when individual jobs complete or fail
|
|
- Sending callbacks when entire batches complete
|
|
- Retrying failed callbacks with exponential backoff
|
|
- Tracking callback status and attempts in the database
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db,
|
|
max_retries: int = 3,
|
|
timeout: float = 10.0,
|
|
initial_retry_delay: float = 2.0
|
|
):
|
|
"""
|
|
Initialize the job callback service.
|
|
|
|
Args:
|
|
db: DatabaseManager instance
|
|
max_retries: Maximum number of delivery attempts per callback
|
|
timeout: HTTP request timeout in seconds
|
|
initial_retry_delay: Initial delay between retries (exponential backoff)
|
|
"""
|
|
self.db = db
|
|
self.webhook_manager = WebhookManager(
|
|
max_retries=max_retries,
|
|
timeout=timeout,
|
|
initial_retry_delay=initial_retry_delay
|
|
)
|
|
self.max_retries = max_retries
|
|
self.timeout = timeout
|
|
self.initial_retry_delay = initial_retry_delay
|
|
|
|
async def send_job_callback(self, job_id: UUID) -> bool:
|
|
"""
|
|
Send callback for completed/failed job.
|
|
|
|
This method:
|
|
- Fetches the job from the database
|
|
- Builds the appropriate payload based on job status
|
|
- POSTs to the callback_url
|
|
- Updates callback_status in the database
|
|
|
|
Args:
|
|
job_id: UUID of the job
|
|
|
|
Returns:
|
|
True if callback was sent successfully, False otherwise
|
|
"""
|
|
# Fetch job from database
|
|
job = await self.db.get_job(job_id)
|
|
if not job:
|
|
log.error(f"Job {job_id} not found for callback")
|
|
return False
|
|
|
|
callback_url = job.get('callback_url')
|
|
if not callback_url:
|
|
log.debug(f"Job {job_id} has no callback_url configured")
|
|
return True # No callback needed, consider success
|
|
|
|
status = job.get('status')
|
|
if status not in ('completed', 'failed', 'partial'):
|
|
log.warning(f"Job {job_id} has status '{status}', not sending callback")
|
|
return False
|
|
|
|
# Build payload based on status
|
|
payload = self._build_job_payload(job)
|
|
|
|
# Get webhook secret if available (reuse webhook_secret for callbacks)
|
|
secret = job.get('webhook_secret')
|
|
|
|
# Send the callback
|
|
log.info(f"Sending job callback to {callback_url} for job {job_id} (status: {status})")
|
|
|
|
success = await self._send_callback(
|
|
url=callback_url,
|
|
payload=payload,
|
|
secret=secret,
|
|
job_id=job_id
|
|
)
|
|
|
|
# Update callback status in database
|
|
await self._update_callback_status(job_id, success)
|
|
|
|
if success:
|
|
log.info(f"Job callback sent successfully for job {job_id}")
|
|
else:
|
|
log.error(f"Job callback failed for job {job_id}")
|
|
|
|
return success
|
|
|
|
async def send_batch_callback(self, batch_id: UUID) -> bool:
|
|
"""
|
|
Send callback when batch completes.
|
|
|
|
This method:
|
|
- Fetches all jobs in the batch from the database
|
|
- Checks if all jobs have completed (success or failure)
|
|
- Builds a summary payload
|
|
- POSTs to the batch callback_url
|
|
- Updates callback_status
|
|
|
|
Args:
|
|
batch_id: UUID of the batch
|
|
|
|
Returns:
|
|
True if callback was sent successfully, False otherwise
|
|
"""
|
|
# Get batch info (from first job with this batch_id)
|
|
batch_info = await self._get_batch_info(batch_id)
|
|
if not batch_info:
|
|
log.error(f"Batch {batch_id} not found or has no jobs")
|
|
return False
|
|
|
|
callback_url = batch_info.get('callback_url')
|
|
if not callback_url:
|
|
log.debug(f"Batch {batch_id} has no callback_url configured")
|
|
return True # No callback needed
|
|
|
|
# Check if batch is complete
|
|
if not batch_info.get('is_complete'):
|
|
log.debug(f"Batch {batch_id} is not yet complete")
|
|
return False
|
|
|
|
# Build batch payload
|
|
payload = self._build_batch_payload(batch_info)
|
|
|
|
# Get webhook secret (from first job's webhook_secret)
|
|
secret = batch_info.get('webhook_secret')
|
|
|
|
# Send the callback
|
|
log.info(f"Sending batch callback to {callback_url} for batch {batch_id}")
|
|
|
|
success = await self._send_callback(
|
|
url=callback_url,
|
|
payload=payload,
|
|
secret=secret,
|
|
job_id=None # Batch callback, no single job_id
|
|
)
|
|
|
|
# Update batch callback status (on all jobs in the batch)
|
|
await self._update_batch_callback_status(batch_id, success)
|
|
|
|
if success:
|
|
log.info(f"Batch callback sent successfully for batch {batch_id}")
|
|
else:
|
|
log.error(f"Batch callback failed for batch {batch_id}")
|
|
|
|
return success
|
|
|
|
async def retry_failed_callbacks(self, max_attempts: int = 5) -> Dict[str, int]:
|
|
"""
|
|
Find jobs with callback_status='failed' and attempts < max.
|
|
Retry sending callbacks with exponential backoff.
|
|
|
|
Args:
|
|
max_attempts: Maximum number of total attempts before giving up
|
|
|
|
Returns:
|
|
Dict with counts: {'retried': n, 'succeeded': n, 'failed': n}
|
|
"""
|
|
# Get jobs with failed callbacks that haven't exceeded max attempts
|
|
jobs = await self._get_failed_callbacks(max_attempts)
|
|
|
|
results = {
|
|
'retried': 0,
|
|
'succeeded': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
if not jobs:
|
|
log.debug("No failed callbacks to retry")
|
|
return results
|
|
|
|
log.info(f"Retrying {len(jobs)} failed callbacks")
|
|
|
|
for job in jobs:
|
|
job_id = job['job_id']
|
|
attempts = job.get('callback_attempts', 0)
|
|
|
|
# Calculate delay based on attempt number (exponential backoff)
|
|
delay = self.initial_retry_delay * (2 ** attempts)
|
|
|
|
log.info(f"Retrying callback for job {job_id} (attempt {attempts + 1}), delay: {delay:.1f}s")
|
|
|
|
# Wait with backoff
|
|
await asyncio.sleep(delay)
|
|
|
|
# Retry the callback
|
|
success = await self.send_job_callback(job_id)
|
|
|
|
results['retried'] += 1
|
|
if success:
|
|
results['succeeded'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
|
|
log.info(f"Callback retry complete: {results}")
|
|
return results
|
|
|
|
async def check_and_send_batch_callbacks(self) -> Dict[str, int]:
|
|
"""
|
|
Check for completed batches and send their callbacks.
|
|
|
|
This should be called periodically to detect when batches complete.
|
|
|
|
Returns:
|
|
Dict with counts: {'checked': n, 'sent': n, 'failed': n}
|
|
"""
|
|
# Get distinct batch_ids that might be complete
|
|
batch_ids = await self._get_potentially_complete_batches()
|
|
|
|
results = {
|
|
'checked': 0,
|
|
'sent': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
for batch_id in batch_ids:
|
|
results['checked'] += 1
|
|
success = await self.send_batch_callback(batch_id)
|
|
if success:
|
|
results['sent'] += 1
|
|
else:
|
|
results['failed'] += 1
|
|
|
|
return results
|
|
|
|
def _build_job_payload(self, job: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Build webhook payload for job completion/failure.
|
|
|
|
Args:
|
|
job: Job dictionary from database
|
|
|
|
Returns:
|
|
Webhook payload dictionary
|
|
"""
|
|
status = job.get('status')
|
|
job_id = str(job.get('job_id'))
|
|
job_type = job.get('job_type', 'google_reviews')
|
|
|
|
# Base payload
|
|
payload = {
|
|
"job_id": job_id,
|
|
"job_type": job_type,
|
|
"status": status,
|
|
"url": job.get('url'),
|
|
"scraper_version": job.get('scraper_version') or SCRAPER_VERSION,
|
|
}
|
|
|
|
if status == 'completed':
|
|
# Completed job payload
|
|
payload["event"] = "job.completed"
|
|
|
|
# Calculate result summary
|
|
reviews_count = job.get('reviews_count') or 0
|
|
|
|
# Try to extract primary metric (average rating) from reviews_data
|
|
primary_metric = None
|
|
reviews_data = job.get('reviews_data')
|
|
if reviews_data:
|
|
if isinstance(reviews_data, str):
|
|
try:
|
|
reviews_data = json.loads(reviews_data)
|
|
except json.JSONDecodeError:
|
|
reviews_data = []
|
|
|
|
if reviews_data:
|
|
ratings = [r.get('rating', 0) for r in reviews_data if r.get('rating')]
|
|
if ratings:
|
|
primary_metric = round(sum(ratings) / len(ratings), 2)
|
|
|
|
payload["result_summary"] = {
|
|
"item_count": reviews_count,
|
|
"primary_metric": primary_metric
|
|
}
|
|
|
|
# Duration
|
|
started_at = job.get('started_at')
|
|
completed_at = job.get('completed_at')
|
|
if started_at and completed_at:
|
|
if isinstance(started_at, str):
|
|
started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00'))
|
|
if isinstance(completed_at, str):
|
|
completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00'))
|
|
duration = (completed_at - started_at).total_seconds()
|
|
payload["duration_seconds"] = round(duration, 2)
|
|
elif job.get('scrape_time'):
|
|
payload["duration_seconds"] = round(job.get('scrape_time'), 2)
|
|
|
|
# Completed timestamp
|
|
if completed_at:
|
|
if isinstance(completed_at, datetime):
|
|
payload["completed_at"] = completed_at.isoformat() + 'Z'
|
|
else:
|
|
payload["completed_at"] = completed_at
|
|
|
|
elif status in ('failed', 'partial'):
|
|
# Failed job payload
|
|
payload["event"] = "job.failed"
|
|
|
|
error_message = job.get('error_message', 'Unknown error')
|
|
|
|
# Determine error type from message
|
|
error_type = self._classify_error(error_message)
|
|
|
|
payload["error"] = {
|
|
"type": error_type,
|
|
"message": error_message
|
|
}
|
|
|
|
# Include partial results info if applicable
|
|
if status == 'partial':
|
|
payload["status"] = "partial"
|
|
payload["result_summary"] = {
|
|
"item_count": job.get('reviews_count') or 0,
|
|
"primary_metric": None
|
|
}
|
|
|
|
# Failed timestamp
|
|
completed_at = job.get('completed_at')
|
|
if completed_at:
|
|
if isinstance(completed_at, datetime):
|
|
payload["completed_at"] = completed_at.isoformat() + 'Z'
|
|
else:
|
|
payload["completed_at"] = completed_at
|
|
|
|
return payload
|
|
|
|
def _build_batch_payload(self, batch_info: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Build webhook payload for batch completion.
|
|
|
|
Args:
|
|
batch_info: Batch info dictionary with job summaries
|
|
|
|
Returns:
|
|
Webhook payload dictionary
|
|
"""
|
|
batch_id = str(batch_info.get('batch_id'))
|
|
jobs = batch_info.get('jobs', [])
|
|
|
|
# Count successes and failures
|
|
succeeded = sum(1 for j in jobs if j.get('status') == 'completed')
|
|
failed = sum(1 for j in jobs if j.get('status') in ('failed', 'partial'))
|
|
failed_job_ids = [str(j.get('job_id')) for j in jobs if j.get('status') in ('failed', 'partial')]
|
|
|
|
# Find latest completed_at
|
|
completed_times = [
|
|
j.get('completed_at') for j in jobs
|
|
if j.get('completed_at')
|
|
]
|
|
latest_completed = max(completed_times) if completed_times else datetime.utcnow()
|
|
|
|
if isinstance(latest_completed, datetime):
|
|
latest_completed = latest_completed.isoformat() + 'Z'
|
|
|
|
payload = {
|
|
"event": "batch.completed",
|
|
"batch_id": batch_id,
|
|
"name": batch_info.get('name', f'Batch {batch_id[:8]}'),
|
|
"total_jobs": len(jobs),
|
|
"succeeded": succeeded,
|
|
"failed": failed,
|
|
"completed_at": latest_completed,
|
|
"failed_job_ids": failed_job_ids
|
|
}
|
|
|
|
return payload
|
|
|
|
def _classify_error(self, error_message: str) -> str:
|
|
"""
|
|
Classify error message into a type category.
|
|
|
|
Args:
|
|
error_message: Error message string
|
|
|
|
Returns:
|
|
Error type string
|
|
"""
|
|
if not error_message:
|
|
return "unknown"
|
|
|
|
error_lower = error_message.lower()
|
|
|
|
if 'rate' in error_lower and 'limit' in error_lower:
|
|
return "rate_limited"
|
|
elif 'timeout' in error_lower:
|
|
return "timeout"
|
|
elif 'captcha' in error_lower or 'robot' in error_lower:
|
|
return "captcha_detected"
|
|
elif 'blocked' in error_lower or 'denied' in error_lower:
|
|
return "blocked"
|
|
elif 'network' in error_lower or 'connection' in error_lower:
|
|
return "network_error"
|
|
elif 'not found' in error_lower or '404' in error_lower:
|
|
return "not_found"
|
|
elif 'invalid' in error_lower:
|
|
return "invalid_input"
|
|
elif 'element' in error_lower or 'selector' in error_lower:
|
|
return "scrape_error"
|
|
else:
|
|
return "unknown"
|
|
|
|
async def _send_callback(
|
|
self,
|
|
url: str,
|
|
payload: Dict[str, Any],
|
|
secret: Optional[str] = None,
|
|
job_id: Optional[UUID] = None
|
|
) -> bool:
|
|
"""
|
|
Send a callback to the specified URL.
|
|
|
|
Uses the WebhookManager for retry logic and HMAC signing.
|
|
|
|
Args:
|
|
url: Callback URL
|
|
payload: Payload dictionary
|
|
secret: Optional secret for HMAC signature
|
|
job_id: Optional job ID for logging
|
|
|
|
Returns:
|
|
True if sent successfully
|
|
"""
|
|
return await self.webhook_manager.send_webhook(
|
|
webhook_url=url,
|
|
payload=payload,
|
|
secret=secret,
|
|
job_id=job_id,
|
|
db=self.db
|
|
)
|
|
|
|
async def _update_callback_status(self, job_id: UUID, success: bool):
|
|
"""
|
|
Update the callback_status and callback_attempts for a job.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
success: Whether the callback was sent successfully
|
|
"""
|
|
async with self.db.pool.acquire() as conn:
|
|
if success:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET callback_status = 'sent',
|
|
callback_attempts = COALESCE(callback_attempts, 0) + 1
|
|
WHERE job_id = $1
|
|
""", job_id)
|
|
else:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET callback_status = 'failed',
|
|
callback_attempts = COALESCE(callback_attempts, 0) + 1
|
|
WHERE job_id = $1
|
|
""", job_id)
|
|
|
|
async def _update_batch_callback_status(self, batch_id: UUID, success: bool):
|
|
"""
|
|
Update callback status for all jobs in a batch.
|
|
|
|
Args:
|
|
batch_id: Batch UUID
|
|
success: Whether the callback was sent successfully
|
|
"""
|
|
status = 'sent' if success else 'failed'
|
|
async with self.db.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET callback_status = $2,
|
|
callback_attempts = COALESCE(callback_attempts, 0) + 1
|
|
WHERE batch_id = $1
|
|
""", batch_id, status)
|
|
|
|
async def _get_failed_callbacks(self, max_attempts: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get jobs with failed callbacks that can be retried.
|
|
|
|
Args:
|
|
max_attempts: Maximum attempts before giving up
|
|
|
|
Returns:
|
|
List of job dictionaries
|
|
"""
|
|
async with self.db.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
callback_url,
|
|
callback_status,
|
|
callback_attempts,
|
|
webhook_secret
|
|
FROM jobs
|
|
WHERE callback_url IS NOT NULL
|
|
AND callback_status = 'failed'
|
|
AND COALESCE(callback_attempts, 0) < $1
|
|
AND status IN ('completed', 'failed', 'partial')
|
|
ORDER BY completed_at ASC
|
|
LIMIT 100
|
|
""", max_attempts)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def _get_batch_info(self, batch_id: UUID) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get batch information including all jobs.
|
|
|
|
Args:
|
|
batch_id: Batch UUID
|
|
|
|
Returns:
|
|
Batch info dictionary with jobs list, or None if not found
|
|
"""
|
|
async with self.db.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
batch_index,
|
|
callback_url,
|
|
callback_status,
|
|
webhook_secret,
|
|
completed_at,
|
|
reviews_count,
|
|
error_message,
|
|
metadata
|
|
FROM jobs
|
|
WHERE batch_id = $1
|
|
ORDER BY batch_index ASC
|
|
""", batch_id)
|
|
|
|
if not rows:
|
|
return None
|
|
|
|
jobs = [dict(row) for row in rows]
|
|
|
|
# Determine if batch is complete (all jobs finished)
|
|
pending_statuses = ('pending', 'running')
|
|
is_complete = all(
|
|
j.get('status') not in pending_statuses
|
|
for j in jobs
|
|
)
|
|
|
|
# Get batch name from first job's metadata if available
|
|
batch_name = None
|
|
first_metadata = jobs[0].get('metadata')
|
|
if first_metadata:
|
|
if isinstance(first_metadata, str):
|
|
try:
|
|
first_metadata = json.loads(first_metadata)
|
|
except json.JSONDecodeError:
|
|
first_metadata = {}
|
|
batch_name = first_metadata.get('batch_name')
|
|
|
|
return {
|
|
'batch_id': batch_id,
|
|
'name': batch_name,
|
|
'jobs': jobs,
|
|
'is_complete': is_complete,
|
|
'callback_url': jobs[0].get('callback_url'),
|
|
'webhook_secret': jobs[0].get('webhook_secret')
|
|
}
|
|
|
|
async def _get_potentially_complete_batches(self) -> List[UUID]:
|
|
"""
|
|
Get batch IDs that might have recently completed.
|
|
|
|
Returns:
|
|
List of batch UUIDs to check
|
|
"""
|
|
async with self.db.pool.acquire() as conn:
|
|
# Find batches where:
|
|
# 1. At least one job has callback_url set
|
|
# 2. callback_status is null or pending (not yet sent)
|
|
# 3. No jobs are still running
|
|
rows = await conn.fetch("""
|
|
SELECT DISTINCT batch_id
|
|
FROM jobs
|
|
WHERE batch_id IS NOT NULL
|
|
AND callback_url IS NOT NULL
|
|
AND COALESCE(callback_status, 'pending') = 'pending'
|
|
AND batch_id NOT IN (
|
|
SELECT DISTINCT batch_id
|
|
FROM jobs
|
|
WHERE batch_id IS NOT NULL
|
|
AND status IN ('pending', 'running')
|
|
)
|
|
LIMIT 100
|
|
""")
|
|
|
|
return [row['batch_id'] for row in rows]
|
|
|
|
|
|
class JobCallbackDispatcher:
|
|
"""
|
|
Background dispatcher that monitors for jobs needing callbacks.
|
|
|
|
Runs in background and processes callbacks for completed jobs.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
db,
|
|
interval_seconds: int = 30,
|
|
retry_interval_seconds: int = 300
|
|
):
|
|
"""
|
|
Initialize the callback dispatcher.
|
|
|
|
Args:
|
|
db: DatabaseManager instance
|
|
interval_seconds: How often to check for pending callbacks
|
|
retry_interval_seconds: How often to retry failed callbacks
|
|
"""
|
|
self.db = db
|
|
self.interval = interval_seconds
|
|
self.retry_interval = retry_interval_seconds
|
|
self.callback_service = JobCallbackService(db)
|
|
self.running = False
|
|
self._last_retry = datetime.utcnow()
|
|
|
|
async def start(self):
|
|
"""Start the background callback dispatcher"""
|
|
self.running = True
|
|
log.info("Job callback dispatcher started")
|
|
|
|
while self.running:
|
|
try:
|
|
# Process pending job callbacks
|
|
await self._process_pending_callbacks()
|
|
|
|
# Check for completed batches
|
|
await self.callback_service.check_and_send_batch_callbacks()
|
|
|
|
# Periodically retry failed callbacks
|
|
now = datetime.utcnow()
|
|
if (now - self._last_retry).total_seconds() >= self.retry_interval:
|
|
await self.callback_service.retry_failed_callbacks(max_attempts=5)
|
|
self._last_retry = now
|
|
|
|
except Exception as e:
|
|
log.error(f"Error in callback dispatcher: {e}")
|
|
|
|
await asyncio.sleep(self.interval)
|
|
|
|
def stop(self):
|
|
"""Stop the background callback dispatcher"""
|
|
self.running = False
|
|
log.info("Job callback dispatcher stopped")
|
|
|
|
async def _process_pending_callbacks(self):
|
|
"""
|
|
Process all pending callbacks.
|
|
|
|
Fetches jobs with callback_url set and callback_status null/pending.
|
|
"""
|
|
async with self.db.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT job_id
|
|
FROM jobs
|
|
WHERE callback_url IS NOT NULL
|
|
AND COALESCE(callback_status, 'pending') = 'pending'
|
|
AND status IN ('completed', 'failed', 'partial')
|
|
ORDER BY completed_at ASC
|
|
LIMIT 100
|
|
""")
|
|
|
|
if not rows:
|
|
return
|
|
|
|
log.info(f"Processing {len(rows)} pending job callbacks")
|
|
|
|
for row in rows:
|
|
job_id = row['job_id']
|
|
try:
|
|
await self.callback_service.send_job_callback(job_id)
|
|
except Exception as e:
|
|
log.error(f"Error sending callback for job {job_id}: {e}")
|
|
|
|
log.info(f"Processed {len(rows)} callbacks")
|