Files
whyrating-engine-legacy/scrapers/registry.py
Alejandro Gutiérrez 788ef84756 Phases 2-4: Requester support, batches, webhooks, scraper registry
Phase 2 - Requester & Batch Support:
- core/database.py: Added create_job params (requester_*, batch_*, priority, callback_*)
- core/database.py: Added batch methods (create_batch, get_batch, update_batch_progress, get_batches)
- core/database.py: Added update_job_callback for tracking webhook delivery
- api/routes/batches.py: New endpoints:
  - POST /api/scrape/google-reviews/batch (submit batch)
  - GET /api/batches (list batches)
  - GET /api/batches/{id} (batch detail)
  - DELETE /api/batches/{id} (cancel batch)
- api_server_production.py: Updated /api/scrape with requester, priority, callback fields
- api_server_production.py: New primary endpoint POST /api/scrape/google-reviews

Phase 3 - Webhooks:
- services/job_callback_service.py: New service with:
  - JobCallbackService: send_job_callback, send_batch_callback, retry_failed_callbacks
  - JobCallbackDispatcher: Background worker for callback monitoring
  - Payload formats per spec (job.completed, job.failed, batch.completed)
  - Exponential backoff for retries
  - Error classification for failure payloads

Phase 4 - Scraper Registry:
- scrapers/registry.py: Database-backed version routing:
  - get_scraper(): Version/variant/A/B routing
  - _get_weighted_scraper(): Traffic-weighted random selection
  - 60-second TTL cache for performance
  - register_scraper, deprecate_scraper, update_traffic_allocation
  - LegacyScraperRegistry preserved for backwards compatibility

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:35:58 +00:00

633 lines
21 KiB
Python

"""
Scraper Registry
This module provides a database-backed registry for managing and routing
scraper requests. It supports:
- Version-based routing (exact version or latest for variant)
- A/B testing via traffic_pct weighted selection
- Priority-based scraper filtering
- Caching with TTL for performance
"""
import asyncio
import logging
import random
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type
from scrapers.base import BaseScraper
log = logging.getLogger(__name__)
@dataclass
class ScraperInfo:
"""Information about a registered scraper."""
id: str
job_type: str
version: str
variant: str
module_path: str
function_name: str
is_default: bool
traffic_pct: int
min_priority: int
config: Optional[Dict[str, Any]]
deprecated_at: Optional[str]
class ScraperRegistry:
"""
Routes scraping requests to appropriate scraper versions.
Supports A/B testing via traffic_pct and variant selection.
This registry is backed by the scraper_registry database table and
provides weighted random selection for A/B testing scenarios.
Usage:
registry = ScraperRegistry(db)
scraper_info = await registry.get_scraper("google_reviews")
# scraper_info contains module_path, function_name, config, etc.
"""
def __init__(self, db: "DatabaseManager"): # noqa: F821 - forward reference
"""
Initialize the scraper registry.
Args:
db: DatabaseManager instance for database access
"""
self.db = db
self._cache: Dict[str, List[ScraperInfo]] = {} # Cache by job_type
self._cache_timestamp: float = 0
self._cache_ttl: int = 60 # Refresh cache every 60 seconds
self._cache_lock = asyncio.Lock()
async def get_scraper(
self,
job_type: str,
version: str = None,
variant: str = None,
priority: int = 0
) -> Optional[Dict[str, Any]]:
"""
Get scraper info for a job.
Priority order:
1. If version specified, return exact match
2. If variant specified, return latest active scraper of that variant
3. Otherwise, use A/B routing based on traffic_pct
Args:
job_type: Type of scraping job (e.g., "google_reviews")
version: Optional specific version to use (e.g., "1.0.0")
variant: Optional variant filter ("stable", "beta", "canary")
priority: Job priority level for min_priority filtering
Returns:
Dictionary containing scraper info:
{
"version": "1.0.0",
"variant": "stable",
"module_path": "scrapers.google_reviews.v1_0_0",
"function_name": "fast_scrape_reviews",
"config": {...}
}
Returns None if no matching scraper found.
"""
# Ensure cache is fresh
await self._ensure_cache_fresh()
# Get all scrapers for this job type
scrapers = self._cache.get(job_type, [])
if not scrapers:
log.warning(f"No scrapers registered for job_type: {job_type}")
return None
# Filter out deprecated scrapers
active_scrapers = [s for s in scrapers if s.deprecated_at is None]
if not active_scrapers:
log.warning(f"All scrapers for job_type {job_type} are deprecated")
return None
selected: Optional[ScraperInfo] = None
# Priority 1: Exact version match
if version:
selected = self._find_by_version(active_scrapers, version)
if selected:
log.debug(f"Selected scraper by exact version: {version}")
else:
log.warning(f"Requested version {version} not found for {job_type}")
return None
# Priority 2: Latest for variant
elif variant:
selected = self._find_latest_for_variant(active_scrapers, variant)
if selected:
log.debug(f"Selected latest scraper for variant {variant}: {selected.version}")
else:
log.warning(f"No active scrapers found for variant {variant} in {job_type}")
return None
# Priority 3: A/B weighted selection
else:
selected = await self._get_weighted_scraper(job_type, priority)
if selected:
log.debug(f"Selected scraper via A/B routing: {selected.version} ({selected.variant})")
if not selected:
return None
return self._scraper_to_dict(selected)
async def _get_weighted_scraper(
self,
job_type: str,
priority: int
) -> Optional[ScraperInfo]:
"""
Select scraper based on traffic weights.
Uses random selection weighted by traffic_pct.
Filters by min_priority.
Args:
job_type: Type of scraping job
priority: Job priority level
Returns:
Selected ScraperInfo or None if no eligible scrapers
"""
scrapers = self._cache.get(job_type, [])
# Filter: active, has traffic allocation, and meets priority requirement
eligible = [
s for s in scrapers
if s.deprecated_at is None
and s.traffic_pct > 0
and s.min_priority <= priority
]
if not eligible:
# Fall back to default scraper
default = self._find_default(scrapers)
if default and default.min_priority <= priority:
log.debug(f"No eligible A/B scrapers, using default: {default.version}")
return default
log.warning(f"No eligible scrapers for job_type {job_type} with priority {priority}")
return None
# Weighted random selection
total_weight = sum(s.traffic_pct for s in eligible)
if total_weight == 0:
# Equal probability if all have 0 traffic_pct
return random.choice(eligible)
# Generate random number in range [0, total_weight)
rand_value = random.random() * total_weight
cumulative = 0
for scraper in eligible:
cumulative += scraper.traffic_pct
if rand_value < cumulative:
return scraper
# Fallback (shouldn't reach here, but safety)
return eligible[-1]
async def refresh_cache(self) -> None:
"""
Reload registry from database.
This method forces a cache refresh regardless of TTL.
Thread-safe via asyncio lock.
"""
async with self._cache_lock:
await self._load_cache()
async def _ensure_cache_fresh(self) -> None:
"""Ensure cache is loaded and not stale."""
current_time = time.time()
if (
not self._cache
or (current_time - self._cache_timestamp) > self._cache_ttl
):
async with self._cache_lock:
# Double-check after acquiring lock
if (
not self._cache
or (current_time - self._cache_timestamp) > self._cache_ttl
):
await self._load_cache()
async def _load_cache(self) -> None:
"""Load all scraper registry entries from database."""
try:
async with self.db.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
id,
job_type,
version,
variant,
module_path,
function_name,
is_default,
traffic_pct,
min_priority,
config,
deprecated_at
FROM scraper_registry
ORDER BY job_type, version DESC
""")
# Group by job_type
self._cache.clear()
for row in rows:
scraper_info = ScraperInfo(
id=str(row['id']),
job_type=row['job_type'],
version=row['version'],
variant=row['variant'],
module_path=row['module_path'],
function_name=row['function_name'],
is_default=row['is_default'],
traffic_pct=row['traffic_pct'],
min_priority=row['min_priority'],
config=row['config'],
deprecated_at=str(row['deprecated_at']) if row['deprecated_at'] else None
)
if scraper_info.job_type not in self._cache:
self._cache[scraper_info.job_type] = []
self._cache[scraper_info.job_type].append(scraper_info)
self._cache_timestamp = time.time()
log.info(f"Scraper registry cache loaded: {sum(len(v) for v in self._cache.values())} entries")
except Exception as e:
log.error(f"Failed to load scraper registry cache: {e}")
raise
async def list_scrapers(
self,
job_type: str = None,
include_deprecated: bool = False
) -> List[Dict[str, Any]]:
"""
List registered scrapers, optionally filtered by job_type.
Args:
job_type: Optional job type filter
include_deprecated: Whether to include deprecated scrapers
Returns:
List of scraper info dictionaries
"""
await self._ensure_cache_fresh()
result = []
if job_type:
scrapers = self._cache.get(job_type, [])
else:
scrapers = [s for scrapers_list in self._cache.values() for s in scrapers_list]
for scraper in scrapers:
if not include_deprecated and scraper.deprecated_at:
continue
result.append(self._scraper_to_dict(scraper))
return result
async def get_scraper_by_id(self, scraper_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific scraper by its database ID.
Args:
scraper_id: UUID of the scraper registry entry
Returns:
Scraper info dictionary or None if not found
"""
await self._ensure_cache_fresh()
for scrapers_list in self._cache.values():
for scraper in scrapers_list:
if scraper.id == scraper_id:
return self._scraper_to_dict(scraper)
return None
async def register_scraper(
self,
job_type: str,
version: str,
variant: str,
module_path: str,
function_name: str,
is_default: bool = False,
traffic_pct: int = 0,
min_priority: int = 0,
config: Optional[Dict[str, Any]] = None
) -> str:
"""
Register a new scraper version in the database.
Args:
job_type: Type of scraping job
version: Semantic version string
variant: Release channel ("stable", "beta", "canary")
module_path: Python module path
function_name: Entry function name
is_default: Whether this is the default fallback
traffic_pct: Traffic percentage for A/B testing (0-100)
min_priority: Minimum job priority required
config: Optional configuration dictionary
Returns:
UUID of created registry entry
Raises:
ValueError: If variant is invalid or traffic_pct out of range
"""
if variant not in ('stable', 'beta', 'canary'):
raise ValueError(f"Invalid variant: {variant}. Must be 'stable', 'beta', or 'canary'")
if not 0 <= traffic_pct <= 100:
raise ValueError(f"traffic_pct must be between 0 and 100, got: {traffic_pct}")
import json
async with self.db.pool.acquire() as conn:
scraper_id = await conn.fetchval("""
INSERT INTO scraper_registry (
job_type, version, variant, module_path, function_name,
is_default, traffic_pct, min_priority, config
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::jsonb)
RETURNING id
""", job_type, version, variant, module_path, function_name,
is_default, traffic_pct, min_priority,
json.dumps(config) if config else None)
# Invalidate cache
self._cache_timestamp = 0
log.info(f"Registered scraper: {job_type} v{version} ({variant})")
return str(scraper_id)
async def deprecate_scraper(
self,
job_type: str,
version: str
) -> bool:
"""
Deprecate a scraper version (soft delete).
Args:
job_type: Type of scraping job
version: Version to deprecate
Returns:
True if deprecated, False if not found
"""
async with self.db.pool.acquire() as conn:
result = await conn.execute("""
UPDATE scraper_registry
SET deprecated_at = NOW()
WHERE job_type = $1 AND version = $2 AND deprecated_at IS NULL
""", job_type, version)
updated = result.split()[-1] == "1"
if updated:
self._cache_timestamp = 0 # Invalidate cache
log.info(f"Deprecated scraper: {job_type} v{version}")
return updated
async def update_traffic_allocation(
self,
job_type: str,
allocations: Dict[str, int]
) -> None:
"""
Update traffic allocations for multiple scrapers atomically.
Args:
job_type: Type of scraping job
allocations: Dict mapping version to traffic_pct
e.g., {"1.0.0": 90, "1.1.0-beta": 10}
Raises:
ValueError: If total exceeds 100 or any value is invalid
"""
total = sum(allocations.values())
if total > 100:
raise ValueError(f"Total traffic allocation cannot exceed 100, got: {total}")
for version, pct in allocations.items():
if not 0 <= pct <= 100:
raise ValueError(f"Invalid traffic_pct for {version}: {pct}")
async with self.db.pool.acquire() as conn:
async with conn.transaction():
for version, traffic_pct in allocations.items():
await conn.execute("""
UPDATE scraper_registry
SET traffic_pct = $3
WHERE job_type = $1 AND version = $2 AND deprecated_at IS NULL
""", job_type, version, traffic_pct)
# Invalidate cache
self._cache_timestamp = 0
log.info(f"Updated traffic allocations for {job_type}: {allocations}")
# ==================== Helper Methods ====================
def _find_by_version(
self,
scrapers: List[ScraperInfo],
version: str
) -> Optional[ScraperInfo]:
"""Find scraper by exact version match."""
for scraper in scrapers:
if scraper.version == version:
return scraper
return None
def _find_latest_for_variant(
self,
scrapers: List[ScraperInfo],
variant: str
) -> Optional[ScraperInfo]:
"""Find latest (first in sorted list) scraper for a variant."""
for scraper in scrapers:
if scraper.variant == variant:
return scraper
return None
def _find_default(
self,
scrapers: List[ScraperInfo]
) -> Optional[ScraperInfo]:
"""Find default scraper for fallback."""
for scraper in scrapers:
if scraper.is_default and scraper.deprecated_at is None:
return scraper
return None
def _scraper_to_dict(self, scraper: ScraperInfo) -> Dict[str, Any]:
"""Convert ScraperInfo to dictionary for API responses."""
return {
"id": scraper.id,
"version": scraper.version,
"variant": scraper.variant,
"module_path": scraper.module_path,
"function_name": scraper.function_name,
"is_default": scraper.is_default,
"traffic_pct": scraper.traffic_pct,
"min_priority": scraper.min_priority,
"config": scraper.config,
"deprecated": scraper.deprecated_at is not None
}
# ==================== Legacy Singleton Registry ====================
# Kept for backward compatibility with existing code that uses
# the old class-based scraper registration pattern.
class LegacyScraperRegistry:
"""
Legacy registry for managing scraper implementations.
This class provides backward compatibility with the old scraper
registration pattern using class-based scrapers. New code should
use the database-backed ScraperRegistry instead.
The registry allows:
- Registering scrapers by name and version
- Looking up scrapers by domain or name
- Listing all available scrapers
Usage:
registry = LegacyScraperRegistry()
registry.register(GoogleReviewsScraper)
scraper = registry.get_scraper_for_url("https://google.com/maps/place/...")
"""
_instance: Optional["LegacyScraperRegistry"] = None
_scrapers: Dict[str, Type[BaseScraper]]
def __new__(cls) -> "LegacyScraperRegistry":
"""Singleton pattern to ensure one global registry."""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._scrapers = {}
cls._instance._domain_map = {}
return cls._instance
def register(self, scraper_class: Type[BaseScraper], name: Optional[str] = None) -> None:
"""
Register a scraper class with the registry.
Args:
scraper_class: The scraper class to register (must inherit from BaseScraper)
name: Optional name override, defaults to scraper_class.name property
"""
# Create a temporary instance to get properties
# Note: In production, we might want scraper_class to have class-level properties
instance = scraper_class.__new__(scraper_class)
scraper_name = name or instance.name
scraper_version = instance.version
key = f"{scraper_name}:{scraper_version}"
self._scrapers[key] = scraper_class
# Map domains to this scraper
for domain in instance.supported_domains:
if domain not in self._domain_map:
self._domain_map[domain] = []
self._domain_map[domain].append(key)
def get_scraper(self, name: str, version: Optional[str] = None) -> Optional[Type[BaseScraper]]:
"""
Get a scraper class by name and optional version.
Args:
name: The scraper name
version: Optional version string. If not provided, returns the latest.
Returns:
The scraper class, or None if not found
"""
if version:
key = f"{name}:{version}"
return self._scrapers.get(key)
# Find latest version for this name
matching = [k for k in self._scrapers.keys() if k.startswith(f"{name}:")]
if not matching:
return None
# Sort by version and return latest
matching.sort(reverse=True)
return self._scrapers.get(matching[0])
def get_scraper_for_url(self, url: str) -> Optional[Type[BaseScraper]]:
"""
Find a suitable scraper for the given URL.
Args:
url: The URL to find a scraper for
Returns:
The scraper class that can handle this URL, or None if no match
"""
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix for matching
if domain.startswith("www."):
domain = domain[4:]
scraper_keys = self._domain_map.get(domain, [])
if not scraper_keys:
return None
# Return the latest version
scraper_keys.sort(reverse=True)
return self._scrapers.get(scraper_keys[0])
def list_scrapers(self) -> List[Dict[str, str]]:
"""
List all registered scrapers.
Returns:
List of dictionaries with scraper info (name, version, domains)
"""
result = []
for key, scraper_class in self._scrapers.items():
instance = scraper_class.__new__(scraper_class)
result.append({
"name": instance.name,
"version": instance.version,
"domains": instance.supported_domains
})
return result
def clear(self) -> None:
"""Clear all registered scrapers. Useful for testing."""
self._scrapers.clear()
self._domain_map.clear()
# Global legacy registry instance (for backward compatibility)
registry = LegacyScraperRegistry()