Performance improvements: - Validation speed: 59.71s → 10.96s (5.5x improvement) - Removed 50+ console.log statements from JavaScript extraction - Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting - Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls) Scraping improvements: - Increased idle detection from 6 to 12 consecutive idle scrolls for completeness - Added real-time progress updates every 5 scrolls with percentage calculation - Added crash recovery to extract partial reviews if Chrome crashes - Removed artificial 200-review limit to scrape ALL reviews Timestamp tracking: - Added updated_at field separate from started_at for progress tracking - Frontend now shows both "Started" (fixed) and "Last Update" (dynamic) Robustness improvements: - Added 5 fallback CSS selectors to handle different Google Maps page structures - Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc. - Automatic selector detection logs which selector works for debugging Test results: - Successfully scraped 550 reviews in 150.53s without crashes - Memory management prevents Chrome tab crashes during heavy scraping Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
374 lines
12 KiB
Python
374 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Webhook delivery system with retry logic and security.
|
|
"""
|
|
import asyncio
|
|
import hmac
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
from datetime import datetime
|
|
import httpx
|
|
from uuid import UUID
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class WebhookDeliveryError(Exception):
|
|
"""Raised when webhook delivery fails after all retries"""
|
|
pass
|
|
|
|
|
|
class WebhookManager:
|
|
"""
|
|
Manages webhook delivery with retry logic and security.
|
|
|
|
Features:
|
|
- Exponential backoff retry (3 attempts)
|
|
- HMAC signature for security
|
|
- Timeout handling
|
|
- Async delivery
|
|
- Logging of all attempts
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
max_retries: int = 3,
|
|
timeout: float = 10.0,
|
|
initial_retry_delay: float = 2.0
|
|
):
|
|
"""
|
|
Initialize webhook manager.
|
|
|
|
Args:
|
|
max_retries: Maximum number of delivery attempts
|
|
timeout: Request timeout in seconds
|
|
initial_retry_delay: Initial delay between retries (exponential backoff)
|
|
"""
|
|
self.max_retries = max_retries
|
|
self.timeout = timeout
|
|
self.initial_retry_delay = initial_retry_delay
|
|
|
|
def generate_signature(self, payload: str, secret: str) -> str:
|
|
"""
|
|
Generate HMAC-SHA256 signature for webhook payload.
|
|
|
|
Args:
|
|
payload: JSON string payload
|
|
secret: Webhook secret
|
|
|
|
Returns:
|
|
Hex-encoded signature
|
|
"""
|
|
return hmac.new(
|
|
secret.encode('utf-8'),
|
|
payload.encode('utf-8'),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
async def send_webhook(
|
|
self,
|
|
webhook_url: str,
|
|
payload: Dict[str, Any],
|
|
secret: Optional[str] = None,
|
|
job_id: Optional[UUID] = None,
|
|
db=None
|
|
) -> bool:
|
|
"""
|
|
Send webhook with retry logic.
|
|
|
|
Args:
|
|
webhook_url: URL to send webhook to
|
|
payload: Webhook payload dictionary
|
|
secret: Optional webhook secret for HMAC signature
|
|
job_id: Optional job ID for logging attempts
|
|
db: Optional database manager for logging
|
|
|
|
Returns:
|
|
True if delivery succeeded, False otherwise
|
|
"""
|
|
payload_json = json.dumps(payload, default=str)
|
|
|
|
for attempt in range(1, self.max_retries + 1):
|
|
try:
|
|
start_time = datetime.now()
|
|
|
|
# Prepare headers
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "GoogleReviewsScraper-Webhook/1.0"
|
|
}
|
|
|
|
# Add signature if secret provided
|
|
if secret:
|
|
signature = self.generate_signature(payload_json, secret)
|
|
headers["X-Webhook-Signature"] = f"sha256={signature}"
|
|
headers["X-Webhook-Timestamp"] = str(int(datetime.now().timestamp()))
|
|
|
|
# Send webhook
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
webhook_url,
|
|
content=payload_json,
|
|
headers=headers,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
response_time_ms = (datetime.now() - start_time).total_seconds() * 1000
|
|
|
|
# Check response
|
|
if response.status_code in [200, 201, 202, 204]:
|
|
# Success
|
|
log.info(
|
|
f"Webhook delivered successfully to {webhook_url} "
|
|
f"(attempt {attempt}, {response_time_ms:.0f}ms, status {response.status_code})"
|
|
)
|
|
|
|
# Log successful attempt
|
|
if db and job_id:
|
|
await db.log_webhook_attempt(
|
|
job_id=job_id,
|
|
attempt_number=attempt,
|
|
success=True,
|
|
status_code=response.status_code,
|
|
response_time_ms=response_time_ms
|
|
)
|
|
|
|
return True
|
|
else:
|
|
# Non-2xx response
|
|
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
|
|
log.warning(
|
|
f"Webhook delivery failed to {webhook_url} "
|
|
f"(attempt {attempt}/{self.max_retries}): {error_msg}"
|
|
)
|
|
|
|
# Log failed attempt
|
|
if db and job_id:
|
|
await db.log_webhook_attempt(
|
|
job_id=job_id,
|
|
attempt_number=attempt,
|
|
success=False,
|
|
status_code=response.status_code,
|
|
error_message=error_msg,
|
|
response_time_ms=response_time_ms
|
|
)
|
|
|
|
except httpx.TimeoutException as e:
|
|
error_msg = f"Timeout after {self.timeout}s"
|
|
log.warning(
|
|
f"Webhook delivery timeout to {webhook_url} "
|
|
f"(attempt {attempt}/{self.max_retries}): {error_msg}"
|
|
)
|
|
|
|
# Log timeout attempt
|
|
if db and job_id:
|
|
await db.log_webhook_attempt(
|
|
job_id=job_id,
|
|
attempt_number=attempt,
|
|
success=False,
|
|
error_message=error_msg
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"{type(e).__name__}: {str(e)}"
|
|
log.error(
|
|
f"Webhook delivery error to {webhook_url} "
|
|
f"(attempt {attempt}/{self.max_retries}): {error_msg}"
|
|
)
|
|
|
|
# Log error attempt
|
|
if db and job_id:
|
|
await db.log_webhook_attempt(
|
|
job_id=job_id,
|
|
attempt_number=attempt,
|
|
success=False,
|
|
error_message=error_msg
|
|
)
|
|
|
|
# Retry with exponential backoff
|
|
if attempt < self.max_retries:
|
|
retry_delay = self.initial_retry_delay * (2 ** (attempt - 1))
|
|
log.info(f"Retrying in {retry_delay:.1f}s...")
|
|
await asyncio.sleep(retry_delay)
|
|
|
|
# All retries failed
|
|
log.error(
|
|
f"Webhook delivery failed to {webhook_url} after {self.max_retries} attempts"
|
|
)
|
|
return False
|
|
|
|
async def send_job_completed_webhook(
|
|
self,
|
|
webhook_url: str,
|
|
job_id: UUID,
|
|
status: str,
|
|
reviews_count: Optional[int] = None,
|
|
scrape_time: Optional[float] = None,
|
|
error_message: Optional[str] = None,
|
|
reviews_url: Optional[str] = None,
|
|
secret: Optional[str] = None,
|
|
db=None
|
|
) -> bool:
|
|
"""
|
|
Send job completion webhook.
|
|
|
|
Args:
|
|
webhook_url: URL to send webhook to
|
|
job_id: Job UUID
|
|
status: Job status ('completed' or 'failed')
|
|
reviews_count: Number of reviews scraped
|
|
scrape_time: Time taken in seconds
|
|
error_message: Error message if failed
|
|
reviews_url: URL to retrieve reviews
|
|
secret: Webhook secret
|
|
db: Database manager for logging
|
|
|
|
Returns:
|
|
True if delivery succeeded
|
|
"""
|
|
payload = {
|
|
"event": f"job.{status}",
|
|
"job_id": str(job_id),
|
|
"status": status,
|
|
"timestamp": datetime.utcnow().isoformat() + "Z"
|
|
}
|
|
|
|
if status == "completed":
|
|
payload.update({
|
|
"reviews_count": reviews_count,
|
|
"scrape_time": scrape_time,
|
|
"reviews_url": reviews_url
|
|
})
|
|
elif status == "failed":
|
|
payload["error_message"] = error_message
|
|
|
|
return await self.send_webhook(
|
|
webhook_url=webhook_url,
|
|
payload=payload,
|
|
secret=secret,
|
|
job_id=job_id,
|
|
db=db
|
|
)
|
|
|
|
|
|
class WebhookDispatcher:
|
|
"""
|
|
Background webhook dispatcher that processes pending webhooks.
|
|
|
|
Runs in background and delivers webhooks for completed jobs.
|
|
"""
|
|
|
|
def __init__(self, db, interval_seconds: int = 30):
|
|
"""
|
|
Initialize webhook dispatcher.
|
|
|
|
Args:
|
|
db: Database manager instance
|
|
interval_seconds: How often to check for pending webhooks
|
|
"""
|
|
self.db = db
|
|
self.interval = interval_seconds
|
|
self.webhook_manager = WebhookManager()
|
|
self.running = False
|
|
|
|
async def start(self):
|
|
"""Start the background webhook dispatcher"""
|
|
self.running = True
|
|
log.info("Webhook dispatcher started")
|
|
|
|
while self.running:
|
|
try:
|
|
await self.process_pending_webhooks()
|
|
except Exception as e:
|
|
log.error(f"Error in webhook dispatcher: {e}")
|
|
|
|
await asyncio.sleep(self.interval)
|
|
|
|
def stop(self):
|
|
"""Stop the background webhook dispatcher"""
|
|
self.running = False
|
|
log.info("Webhook dispatcher stopped")
|
|
|
|
async def process_pending_webhooks(self):
|
|
"""
|
|
Process all pending webhooks.
|
|
|
|
Fetches jobs with pending webhooks and delivers them.
|
|
"""
|
|
# Get jobs with pending webhooks
|
|
jobs = await self.db.get_pending_jobs_with_webhooks(limit=100)
|
|
|
|
if not jobs:
|
|
return
|
|
|
|
log.info(f"Processing {len(jobs)} pending webhooks...")
|
|
|
|
for job in jobs:
|
|
try:
|
|
job_id = job['job_id']
|
|
webhook_url = job['webhook_url']
|
|
webhook_secret = job.get('webhook_secret')
|
|
status = job['status']
|
|
|
|
# Build reviews URL (assuming API base URL from environment)
|
|
import os
|
|
api_base_url = os.getenv('API_BASE_URL', 'http://localhost:8000')
|
|
reviews_url = f"{api_base_url}/jobs/{job_id}/reviews"
|
|
|
|
# Send webhook
|
|
await self.webhook_manager.send_job_completed_webhook(
|
|
webhook_url=webhook_url,
|
|
job_id=job_id,
|
|
status=status,
|
|
reviews_count=job.get('reviews_count'),
|
|
scrape_time=job.get('scrape_time'),
|
|
error_message=job.get('error_message'),
|
|
reviews_url=reviews_url if status == 'completed' else None,
|
|
secret=webhook_secret,
|
|
db=self.db
|
|
)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error processing webhook for job {job['job_id']}: {e}")
|
|
|
|
log.info(f"Processed {len(jobs)} webhooks")
|
|
|
|
|
|
# Webhook verification helper for client implementations
|
|
def verify_webhook_signature(payload: str, signature: str, secret: str) -> bool:
|
|
"""
|
|
Verify webhook signature (for client-side verification).
|
|
|
|
Args:
|
|
payload: Raw JSON payload string
|
|
signature: Signature from X-Webhook-Signature header (format: "sha256=...")
|
|
secret: Webhook secret
|
|
|
|
Returns:
|
|
True if signature is valid
|
|
|
|
Example:
|
|
@app.post("/webhook")
|
|
async def handle_webhook(request: Request):
|
|
payload = await request.body()
|
|
signature = request.headers.get("X-Webhook-Signature")
|
|
|
|
if not verify_webhook_signature(payload.decode(), signature, WEBHOOK_SECRET):
|
|
raise HTTPException(status_code=401, detail="Invalid signature")
|
|
|
|
# Process webhook...
|
|
"""
|
|
if not signature or not signature.startswith("sha256="):
|
|
return False
|
|
|
|
expected_signature = signature.split("sha256=", 1)[1]
|
|
computed_signature = hmac.new(
|
|
secret.encode('utf-8'),
|
|
payload.encode('utf-8'),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
return hmac.compare_digest(expected_signature, computed_signature)
|