#!/usr/bin/env python3 """ Job Callback Service for webhook delivery on job and batch completion. This service handles sending webhooks when jobs complete or fail, as well as batch-level completion callbacks. """ import asyncio import json import logging from datetime import datetime from typing import Dict, Any, Optional, List from uuid import UUID import httpx from services.webhook_service import WebhookManager log = logging.getLogger(__name__) # Scraper version (should match the deployed scraper) SCRAPER_VERSION = "1.0.0" class CallbackStatus: """Callback status constants""" PENDING = "pending" SENT = "sent" FAILED = "failed" class JobCallbackService: """ Handles webhook callbacks for job and batch completion. This service is responsible for: - Sending callbacks when individual jobs complete or fail - Sending callbacks when entire batches complete - Retrying failed callbacks with exponential backoff - Tracking callback status and attempts in the database """ def __init__( self, db, max_retries: int = 3, timeout: float = 10.0, initial_retry_delay: float = 2.0 ): """ Initialize the job callback service. Args: db: DatabaseManager instance max_retries: Maximum number of delivery attempts per callback timeout: HTTP request timeout in seconds initial_retry_delay: Initial delay between retries (exponential backoff) """ self.db = db self.webhook_manager = WebhookManager( max_retries=max_retries, timeout=timeout, initial_retry_delay=initial_retry_delay ) self.max_retries = max_retries self.timeout = timeout self.initial_retry_delay = initial_retry_delay async def send_job_callback(self, job_id: UUID) -> bool: """ Send callback for completed/failed job. This method: - Fetches the job from the database - Builds the appropriate payload based on job status - POSTs to the callback_url - Updates callback_status in the database Args: job_id: UUID of the job Returns: True if callback was sent successfully, False otherwise """ # Fetch job from database job = await self.db.get_job(job_id) if not job: log.error(f"Job {job_id} not found for callback") return False callback_url = job.get('callback_url') if not callback_url: log.debug(f"Job {job_id} has no callback_url configured") return True # No callback needed, consider success status = job.get('status') if status not in ('completed', 'failed', 'partial'): log.warning(f"Job {job_id} has status '{status}', not sending callback") return False # Build payload based on status payload = self._build_job_payload(job) # Get webhook secret if available (reuse webhook_secret for callbacks) secret = job.get('webhook_secret') # Send the callback log.info(f"Sending job callback to {callback_url} for job {job_id} (status: {status})") success = await self._send_callback( url=callback_url, payload=payload, secret=secret, job_id=job_id ) # Update callback status in database await self._update_callback_status(job_id, success) if success: log.info(f"Job callback sent successfully for job {job_id}") else: log.error(f"Job callback failed for job {job_id}") return success async def send_batch_callback(self, batch_id: UUID) -> bool: """ Send callback when batch completes. This method: - Fetches all jobs in the batch from the database - Checks if all jobs have completed (success or failure) - Builds a summary payload - POSTs to the batch callback_url - Updates callback_status Args: batch_id: UUID of the batch Returns: True if callback was sent successfully, False otherwise """ # Get batch info (from first job with this batch_id) batch_info = await self._get_batch_info(batch_id) if not batch_info: log.error(f"Batch {batch_id} not found or has no jobs") return False callback_url = batch_info.get('callback_url') if not callback_url: log.debug(f"Batch {batch_id} has no callback_url configured") return True # No callback needed # Check if batch is complete if not batch_info.get('is_complete'): log.debug(f"Batch {batch_id} is not yet complete") return False # Build batch payload payload = self._build_batch_payload(batch_info) # Get webhook secret (from first job's webhook_secret) secret = batch_info.get('webhook_secret') # Send the callback log.info(f"Sending batch callback to {callback_url} for batch {batch_id}") success = await self._send_callback( url=callback_url, payload=payload, secret=secret, job_id=None # Batch callback, no single job_id ) # Update batch callback status (on all jobs in the batch) await self._update_batch_callback_status(batch_id, success) if success: log.info(f"Batch callback sent successfully for batch {batch_id}") else: log.error(f"Batch callback failed for batch {batch_id}") return success async def retry_failed_callbacks(self, max_attempts: int = 5) -> Dict[str, int]: """ Find jobs with callback_status='failed' and attempts < max. Retry sending callbacks with exponential backoff. Args: max_attempts: Maximum number of total attempts before giving up Returns: Dict with counts: {'retried': n, 'succeeded': n, 'failed': n} """ # Get jobs with failed callbacks that haven't exceeded max attempts jobs = await self._get_failed_callbacks(max_attempts) results = { 'retried': 0, 'succeeded': 0, 'failed': 0 } if not jobs: log.debug("No failed callbacks to retry") return results log.info(f"Retrying {len(jobs)} failed callbacks") for job in jobs: job_id = job['job_id'] attempts = job.get('callback_attempts', 0) # Calculate delay based on attempt number (exponential backoff) delay = self.initial_retry_delay * (2 ** attempts) log.info(f"Retrying callback for job {job_id} (attempt {attempts + 1}), delay: {delay:.1f}s") # Wait with backoff await asyncio.sleep(delay) # Retry the callback success = await self.send_job_callback(job_id) results['retried'] += 1 if success: results['succeeded'] += 1 else: results['failed'] += 1 log.info(f"Callback retry complete: {results}") return results async def check_and_send_batch_callbacks(self) -> Dict[str, int]: """ Check for completed batches and send their callbacks. This should be called periodically to detect when batches complete. Returns: Dict with counts: {'checked': n, 'sent': n, 'failed': n} """ # Get distinct batch_ids that might be complete batch_ids = await self._get_potentially_complete_batches() results = { 'checked': 0, 'sent': 0, 'failed': 0 } for batch_id in batch_ids: results['checked'] += 1 success = await self.send_batch_callback(batch_id) if success: results['sent'] += 1 else: results['failed'] += 1 return results def _build_job_payload(self, job: Dict[str, Any]) -> Dict[str, Any]: """ Build webhook payload for job completion/failure. Args: job: Job dictionary from database Returns: Webhook payload dictionary """ status = job.get('status') job_id = str(job.get('job_id')) job_type = job.get('job_type', 'google_reviews') # Base payload payload = { "job_id": job_id, "job_type": job_type, "status": status, "url": job.get('url'), "scraper_version": job.get('scraper_version') or SCRAPER_VERSION, } if status == 'completed': # Completed job payload payload["event"] = "job.completed" # Calculate result summary reviews_count = job.get('reviews_count') or 0 # Try to extract primary metric (average rating) from reviews_data primary_metric = None reviews_data = job.get('reviews_data') if reviews_data: if isinstance(reviews_data, str): try: reviews_data = json.loads(reviews_data) except json.JSONDecodeError: reviews_data = [] if reviews_data: ratings = [r.get('rating', 0) for r in reviews_data if r.get('rating')] if ratings: primary_metric = round(sum(ratings) / len(ratings), 2) payload["result_summary"] = { "item_count": reviews_count, "primary_metric": primary_metric } # Duration started_at = job.get('started_at') completed_at = job.get('completed_at') if started_at and completed_at: if isinstance(started_at, str): started_at = datetime.fromisoformat(started_at.replace('Z', '+00:00')) if isinstance(completed_at, str): completed_at = datetime.fromisoformat(completed_at.replace('Z', '+00:00')) duration = (completed_at - started_at).total_seconds() payload["duration_seconds"] = round(duration, 2) elif job.get('scrape_time'): payload["duration_seconds"] = round(job.get('scrape_time'), 2) # Completed timestamp if completed_at: if isinstance(completed_at, datetime): payload["completed_at"] = completed_at.isoformat() + 'Z' else: payload["completed_at"] = completed_at elif status in ('failed', 'partial'): # Failed job payload payload["event"] = "job.failed" error_message = job.get('error_message', 'Unknown error') # Determine error type from message error_type = self._classify_error(error_message) payload["error"] = { "type": error_type, "message": error_message } # Include partial results info if applicable if status == 'partial': payload["status"] = "partial" payload["result_summary"] = { "item_count": job.get('reviews_count') or 0, "primary_metric": None } # Failed timestamp completed_at = job.get('completed_at') if completed_at: if isinstance(completed_at, datetime): payload["completed_at"] = completed_at.isoformat() + 'Z' else: payload["completed_at"] = completed_at return payload def _build_batch_payload(self, batch_info: Dict[str, Any]) -> Dict[str, Any]: """ Build webhook payload for batch completion. Args: batch_info: Batch info dictionary with job summaries Returns: Webhook payload dictionary """ batch_id = str(batch_info.get('batch_id')) jobs = batch_info.get('jobs', []) # Count successes and failures succeeded = sum(1 for j in jobs if j.get('status') == 'completed') failed = sum(1 for j in jobs if j.get('status') in ('failed', 'partial')) failed_job_ids = [str(j.get('job_id')) for j in jobs if j.get('status') in ('failed', 'partial')] # Find latest completed_at completed_times = [ j.get('completed_at') for j in jobs if j.get('completed_at') ] latest_completed = max(completed_times) if completed_times else datetime.utcnow() if isinstance(latest_completed, datetime): latest_completed = latest_completed.isoformat() + 'Z' payload = { "event": "batch.completed", "batch_id": batch_id, "name": batch_info.get('name', f'Batch {batch_id[:8]}'), "total_jobs": len(jobs), "succeeded": succeeded, "failed": failed, "completed_at": latest_completed, "failed_job_ids": failed_job_ids } return payload def _classify_error(self, error_message: str) -> str: """ Classify error message into a type category. Args: error_message: Error message string Returns: Error type string """ if not error_message: return "unknown" error_lower = error_message.lower() if 'rate' in error_lower and 'limit' in error_lower: return "rate_limited" elif 'timeout' in error_lower: return "timeout" elif 'captcha' in error_lower or 'robot' in error_lower: return "captcha_detected" elif 'blocked' in error_lower or 'denied' in error_lower: return "blocked" elif 'network' in error_lower or 'connection' in error_lower: return "network_error" elif 'not found' in error_lower or '404' in error_lower: return "not_found" elif 'invalid' in error_lower: return "invalid_input" elif 'element' in error_lower or 'selector' in error_lower: return "scrape_error" else: return "unknown" async def _send_callback( self, url: str, payload: Dict[str, Any], secret: Optional[str] = None, job_id: Optional[UUID] = None ) -> bool: """ Send a callback to the specified URL. Uses the WebhookManager for retry logic and HMAC signing. Args: url: Callback URL payload: Payload dictionary secret: Optional secret for HMAC signature job_id: Optional job ID for logging Returns: True if sent successfully """ return await self.webhook_manager.send_webhook( webhook_url=url, payload=payload, secret=secret, job_id=job_id, db=self.db ) async def _update_callback_status(self, job_id: UUID, success: bool): """ Update the callback_status and callback_attempts for a job. Args: job_id: Job UUID success: Whether the callback was sent successfully """ async with self.db.pool.acquire() as conn: if success: await conn.execute(""" UPDATE jobs SET callback_status = 'sent', callback_attempts = COALESCE(callback_attempts, 0) + 1 WHERE job_id = $1 """, job_id) else: await conn.execute(""" UPDATE jobs SET callback_status = 'failed', callback_attempts = COALESCE(callback_attempts, 0) + 1 WHERE job_id = $1 """, job_id) async def _update_batch_callback_status(self, batch_id: UUID, success: bool): """ Update callback status for all jobs in a batch. Args: batch_id: Batch UUID success: Whether the callback was sent successfully """ status = 'sent' if success else 'failed' async with self.db.pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET callback_status = $2, callback_attempts = COALESCE(callback_attempts, 0) + 1 WHERE batch_id = $1 """, batch_id, status) async def _get_failed_callbacks(self, max_attempts: int) -> List[Dict[str, Any]]: """ Get jobs with failed callbacks that can be retried. Args: max_attempts: Maximum attempts before giving up Returns: List of job dictionaries """ async with self.db.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id, status, callback_url, callback_status, callback_attempts, webhook_secret FROM jobs WHERE callback_url IS NOT NULL AND callback_status = 'failed' AND COALESCE(callback_attempts, 0) < $1 AND status IN ('completed', 'failed', 'partial') ORDER BY completed_at ASC LIMIT 100 """, max_attempts) return [dict(row) for row in rows] async def _get_batch_info(self, batch_id: UUID) -> Optional[Dict[str, Any]]: """ Get batch information including all jobs. Args: batch_id: Batch UUID Returns: Batch info dictionary with jobs list, or None if not found """ async with self.db.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id, status, batch_index, callback_url, callback_status, webhook_secret, completed_at, reviews_count, error_message, metadata FROM jobs WHERE batch_id = $1 ORDER BY batch_index ASC """, batch_id) if not rows: return None jobs = [dict(row) for row in rows] # Determine if batch is complete (all jobs finished) pending_statuses = ('pending', 'running') is_complete = all( j.get('status') not in pending_statuses for j in jobs ) # Get batch name from first job's metadata if available batch_name = None first_metadata = jobs[0].get('metadata') if first_metadata: if isinstance(first_metadata, str): try: first_metadata = json.loads(first_metadata) except json.JSONDecodeError: first_metadata = {} batch_name = first_metadata.get('batch_name') return { 'batch_id': batch_id, 'name': batch_name, 'jobs': jobs, 'is_complete': is_complete, 'callback_url': jobs[0].get('callback_url'), 'webhook_secret': jobs[0].get('webhook_secret') } async def _get_potentially_complete_batches(self) -> List[UUID]: """ Get batch IDs that might have recently completed. Returns: List of batch UUIDs to check """ async with self.db.pool.acquire() as conn: # Find batches where: # 1. At least one job has callback_url set # 2. callback_status is null or pending (not yet sent) # 3. No jobs are still running rows = await conn.fetch(""" SELECT DISTINCT batch_id FROM jobs WHERE batch_id IS NOT NULL AND callback_url IS NOT NULL AND COALESCE(callback_status, 'pending') = 'pending' AND batch_id NOT IN ( SELECT DISTINCT batch_id FROM jobs WHERE batch_id IS NOT NULL AND status IN ('pending', 'running') ) LIMIT 100 """) return [row['batch_id'] for row in rows] class JobCallbackDispatcher: """ Background dispatcher that monitors for jobs needing callbacks. Runs in background and processes callbacks for completed jobs. """ def __init__( self, db, interval_seconds: int = 30, retry_interval_seconds: int = 300 ): """ Initialize the callback dispatcher. Args: db: DatabaseManager instance interval_seconds: How often to check for pending callbacks retry_interval_seconds: How often to retry failed callbacks """ self.db = db self.interval = interval_seconds self.retry_interval = retry_interval_seconds self.callback_service = JobCallbackService(db) self.running = False self._last_retry = datetime.utcnow() async def start(self): """Start the background callback dispatcher""" self.running = True log.info("Job callback dispatcher started") while self.running: try: # Process pending job callbacks await self._process_pending_callbacks() # Check for completed batches await self.callback_service.check_and_send_batch_callbacks() # Periodically retry failed callbacks now = datetime.utcnow() if (now - self._last_retry).total_seconds() >= self.retry_interval: await self.callback_service.retry_failed_callbacks(max_attempts=5) self._last_retry = now except Exception as e: log.error(f"Error in callback dispatcher: {e}") await asyncio.sleep(self.interval) def stop(self): """Stop the background callback dispatcher""" self.running = False log.info("Job callback dispatcher stopped") async def _process_pending_callbacks(self): """ Process all pending callbacks. Fetches jobs with callback_url set and callback_status null/pending. """ async with self.db.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id FROM jobs WHERE callback_url IS NOT NULL AND COALESCE(callback_status, 'pending') = 'pending' AND status IN ('completed', 'failed', 'partial') ORDER BY completed_at ASC LIMIT 100 """) if not rows: return log.info(f"Processing {len(rows)} pending job callbacks") for row in rows: job_id = row['job_id'] try: await self.callback_service.send_job_callback(job_id) except Exception as e: log.error(f"Error sending callback for job {job_id}: {e}") log.info(f"Processed {len(rows)} callbacks")