#!/usr/bin/env python3 """ Webhook delivery system with retry logic and security. """ import asyncio import hmac import hashlib import json import logging from typing import Dict, Any, Optional from datetime import datetime import httpx from uuid import UUID log = logging.getLogger(__name__) class WebhookDeliveryError(Exception): """Raised when webhook delivery fails after all retries""" pass class WebhookManager: """ Manages webhook delivery with retry logic and security. Features: - Exponential backoff retry (3 attempts) - HMAC signature for security - Timeout handling - Async delivery - Logging of all attempts """ def __init__( self, max_retries: int = 3, timeout: float = 10.0, initial_retry_delay: float = 2.0 ): """ Initialize webhook manager. Args: max_retries: Maximum number of delivery attempts timeout: Request timeout in seconds initial_retry_delay: Initial delay between retries (exponential backoff) """ self.max_retries = max_retries self.timeout = timeout self.initial_retry_delay = initial_retry_delay def generate_signature(self, payload: str, secret: str) -> str: """ Generate HMAC-SHA256 signature for webhook payload. Args: payload: JSON string payload secret: Webhook secret Returns: Hex-encoded signature """ return hmac.new( secret.encode('utf-8'), payload.encode('utf-8'), hashlib.sha256 ).hexdigest() async def send_webhook( self, webhook_url: str, payload: Dict[str, Any], secret: Optional[str] = None, job_id: Optional[UUID] = None, db=None ) -> bool: """ Send webhook with retry logic. Args: webhook_url: URL to send webhook to payload: Webhook payload dictionary secret: Optional webhook secret for HMAC signature job_id: Optional job ID for logging attempts db: Optional database manager for logging Returns: True if delivery succeeded, False otherwise """ payload_json = json.dumps(payload, default=str) for attempt in range(1, self.max_retries + 1): try: start_time = datetime.now() # Prepare headers headers = { "Content-Type": "application/json", "User-Agent": "GoogleReviewsScraper-Webhook/1.0" } # Add signature if secret provided if secret: signature = self.generate_signature(payload_json, secret) headers["X-Webhook-Signature"] = f"sha256={signature}" headers["X-Webhook-Timestamp"] = str(int(datetime.now().timestamp())) # Send webhook async with httpx.AsyncClient() as client: response = await client.post( webhook_url, content=payload_json, headers=headers, timeout=self.timeout ) response_time_ms = (datetime.now() - start_time).total_seconds() * 1000 # Check response if response.status_code in [200, 201, 202, 204]: # Success log.info( f"Webhook delivered successfully to {webhook_url} " f"(attempt {attempt}, {response_time_ms:.0f}ms, status {response.status_code})" ) # Log successful attempt if db and job_id: await db.log_webhook_attempt( job_id=job_id, attempt_number=attempt, success=True, status_code=response.status_code, response_time_ms=response_time_ms ) return True else: # Non-2xx response error_msg = f"HTTP {response.status_code}: {response.text[:200]}" log.warning( f"Webhook delivery failed to {webhook_url} " f"(attempt {attempt}/{self.max_retries}): {error_msg}" ) # Log failed attempt if db and job_id: await db.log_webhook_attempt( job_id=job_id, attempt_number=attempt, success=False, status_code=response.status_code, error_message=error_msg, response_time_ms=response_time_ms ) except httpx.TimeoutException as e: error_msg = f"Timeout after {self.timeout}s" log.warning( f"Webhook delivery timeout to {webhook_url} " f"(attempt {attempt}/{self.max_retries}): {error_msg}" ) # Log timeout attempt if db and job_id: await db.log_webhook_attempt( job_id=job_id, attempt_number=attempt, success=False, error_message=error_msg ) except Exception as e: error_msg = f"{type(e).__name__}: {str(e)}" log.error( f"Webhook delivery error to {webhook_url} " f"(attempt {attempt}/{self.max_retries}): {error_msg}" ) # Log error attempt if db and job_id: await db.log_webhook_attempt( job_id=job_id, attempt_number=attempt, success=False, error_message=error_msg ) # Retry with exponential backoff if attempt < self.max_retries: retry_delay = self.initial_retry_delay * (2 ** (attempt - 1)) log.info(f"Retrying in {retry_delay:.1f}s...") await asyncio.sleep(retry_delay) # All retries failed log.error( f"Webhook delivery failed to {webhook_url} after {self.max_retries} attempts" ) return False async def send_job_completed_webhook( self, webhook_url: str, job_id: UUID, status: str, reviews_count: Optional[int] = None, scrape_time: Optional[float] = None, error_message: Optional[str] = None, reviews_url: Optional[str] = None, secret: Optional[str] = None, db=None ) -> bool: """ Send job completion webhook. Args: webhook_url: URL to send webhook to job_id: Job UUID status: Job status ('completed' or 'failed') reviews_count: Number of reviews scraped scrape_time: Time taken in seconds error_message: Error message if failed reviews_url: URL to retrieve reviews secret: Webhook secret db: Database manager for logging Returns: True if delivery succeeded """ payload = { "event": f"job.{status}", "job_id": str(job_id), "status": status, "timestamp": datetime.utcnow().isoformat() + "Z" } if status == "completed": payload.update({ "reviews_count": reviews_count, "scrape_time": scrape_time, "reviews_url": reviews_url }) elif status == "failed": payload["error_message"] = error_message return await self.send_webhook( webhook_url=webhook_url, payload=payload, secret=secret, job_id=job_id, db=db ) class WebhookDispatcher: """ Background webhook dispatcher that processes pending webhooks. Runs in background and delivers webhooks for completed jobs. """ def __init__(self, db, interval_seconds: int = 30): """ Initialize webhook dispatcher. Args: db: Database manager instance interval_seconds: How often to check for pending webhooks """ self.db = db self.interval = interval_seconds self.webhook_manager = WebhookManager() self.running = False async def start(self): """Start the background webhook dispatcher""" self.running = True log.info("Webhook dispatcher started") while self.running: try: await self.process_pending_webhooks() except Exception as e: log.error(f"Error in webhook dispatcher: {e}") await asyncio.sleep(self.interval) def stop(self): """Stop the background webhook dispatcher""" self.running = False log.info("Webhook dispatcher stopped") async def process_pending_webhooks(self): """ Process all pending webhooks. Fetches jobs with pending webhooks and delivers them. """ # Get jobs with pending webhooks jobs = await self.db.get_pending_jobs_with_webhooks(limit=100) if not jobs: return log.info(f"Processing {len(jobs)} pending webhooks...") for job in jobs: try: job_id = job['job_id'] webhook_url = job['webhook_url'] webhook_secret = job.get('webhook_secret') status = job['status'] # Build reviews URL (assuming API base URL from environment) import os api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000') reviews_url = f"{api_base_url}/jobs/{job_id}/reviews" # Send webhook await self.webhook_manager.send_job_completed_webhook( webhook_url=webhook_url, job_id=job_id, status=status, reviews_count=job.get('reviews_count'), scrape_time=job.get('scrape_time'), error_message=job.get('error_message'), reviews_url=reviews_url if status == 'completed' else None, secret=webhook_secret, db=self.db ) except Exception as e: log.error(f"Error processing webhook for job {job['job_id']}: {e}") log.info(f"Processed {len(jobs)} webhooks") # Webhook verification helper for client implementations def verify_webhook_signature(payload: str, signature: str, secret: str) -> bool: """ Verify webhook signature (for client-side verification). Args: payload: Raw JSON payload string signature: Signature from X-Webhook-Signature header (format: "sha256=...") secret: Webhook secret Returns: True if signature is valid Example: @app.post("/webhook") async def handle_webhook(request: Request): payload = await request.body() signature = request.headers.get("X-Webhook-Signature") if not verify_webhook_signature(payload.decode(), signature, WEBHOOK_SECRET): raise HTTPException(status_code=401, detail="Invalid signature") # Process webhook... """ if not signature or not signature.startswith("sha256="): return False expected_signature = signature.split("sha256=", 1)[1] computed_signature = hmac.new( secret.encode('utf-8'), payload.encode('utf-8'), hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected_signature, computed_signature)