#!/usr/bin/env python3 """ Admin API routes for scraper management. Phase 6 - ReviewIQ Platform Provides endpoints for: - Listing registered scrapers with stats - Registering new scraper versions - Updating traffic allocation for A/B testing - Deprecating scrapers (soft delete) - Promoting scrapers to stable/default """ import json import logging from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from uuid import UUID from fastapi import APIRouter, HTTPException, Query, Depends from pydantic import BaseModel, Field, validator from core.database import DatabaseManager from scrapers.registry import ScraperRegistry log = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/api/admin", tags=["admin"]) # ==================== Pydantic Models ==================== class ScraperStatsModel(BaseModel): """Statistics for a scraper over the last 24 hours.""" total_jobs: int = Field(default=0, description="Total jobs processed") success_rate: float = Field(default=0.0, description="Success rate percentage") avg_duration: float = Field(default=0.0, description="Average scrape duration in seconds") class ScraperInfoResponse(BaseModel): """Response model for scraper information.""" id: str = Field(..., description="Unique scraper registry ID") job_type: str = Field(..., description="Type of job this scraper handles") version: str = Field(..., description="Semantic version string") variant: str = Field(..., description="Release variant (stable, beta, canary)") is_default: bool = Field(..., description="Whether this is the default scraper") traffic_pct: int = Field(..., description="Traffic percentage for A/B testing (0-100)") module_path: str = Field(..., description="Python module path") function_name: Optional[str] = Field(None, description="Entry function name") deprecated_at: Optional[str] = Field(None, description="Deprecation timestamp (ISO format)") stats: ScraperStatsModel = Field(default_factory=ScraperStatsModel, description="Last 24h stats") class RegisterScraperRequest(BaseModel): """Request model for registering a new scraper.""" job_type: str = Field(..., description="Type of job (e.g., 'google_reviews')") version: str = Field(..., description="Semantic version string (e.g., '1.1.0')") variant: str = Field(..., description="Release variant: stable, beta, or canary") module_path: str = Field(..., description="Python module path") function_name: str = Field(default="scrape", description="Entry function name") traffic_pct: int = Field(default=0, description="Initial traffic percentage (0-100)", ge=0, le=100) min_priority: int = Field(default=0, description="Minimum job priority required") config: Optional[Dict[str, Any]] = Field(default=None, description="Optional configuration") @validator('variant') def validate_variant(cls, v): if v not in ('stable', 'beta', 'canary'): raise ValueError("variant must be 'stable', 'beta', or 'canary'") return v @validator('version') def validate_version(cls, v): # Basic semver validation parts = v.split('.') if len(parts) < 2: raise ValueError("version must be semantic version format (e.g., '1.0.0')") return v class RegisterScraperResponse(BaseModel): """Response model for scraper registration.""" id: str = Field(..., description="Created scraper registry ID") job_type: str = Field(..., description="Job type") version: str = Field(..., description="Version string") variant: str = Field(..., description="Release variant") message: str = Field(..., description="Status message") class UpdateTrafficRequest(BaseModel): """Request model for updating traffic percentage.""" traffic_pct: int = Field(..., description="New traffic percentage (0-100)", ge=0, le=100) class UpdateTrafficResponse(BaseModel): """Response model for traffic update.""" id: str = Field(..., description="Scraper registry ID") traffic_pct: int = Field(..., description="Updated traffic percentage") message: str = Field(..., description="Status message") class DeprecateResponse(BaseModel): """Response model for deprecation.""" id: str = Field(..., description="Scraper registry ID") deprecated_at: str = Field(..., description="Deprecation timestamp") message: str = Field(..., description="Status message") class PromoteResponse(BaseModel): """Response model for promotion.""" id: str = Field(..., description="Scraper registry ID") variant: str = Field(..., description="New variant (stable)") is_default: bool = Field(..., description="Whether now default") traffic_pct: int = Field(..., description="New traffic percentage") message: str = Field(..., description="Status message") # ==================== Database Helper Functions ==================== async def get_scraper_stats( db: DatabaseManager, scraper_id: str, hours: int = 24 ) -> ScraperStatsModel: """ Get statistics for a specific scraper over the given time period. Args: db: Database manager instance scraper_id: UUID of the scraper registry entry hours: Number of hours to look back (default: 24) Returns: ScraperStatsModel with job counts, success rate, and avg duration """ try: async with db.pool.acquire() as conn: # Query jobs that used this scraper version in the time period stats = await conn.fetchrow(""" SELECT COUNT(*) as total_jobs, COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs, COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed_jobs, AVG(scrape_time) FILTER (WHERE status = 'completed' AND scrape_time IS NOT NULL) as avg_duration FROM jobs WHERE created_at >= NOW() - INTERVAL '%s hours' AND ( metadata->>'scraper_id' = $1 OR (scraper_version IS NOT NULL AND EXISTS ( SELECT 1 FROM scraper_registry sr WHERE sr.id = $2::uuid AND sr.version = jobs.scraper_version AND sr.variant = COALESCE(jobs.scraper_variant, sr.variant) )) ) """, hours, scraper_id, scraper_id) if not stats or stats['total_jobs'] == 0: return ScraperStatsModel() total = stats['total_jobs'] completed = stats['completed_jobs'] or 0 success_rate = (completed / total * 100) if total > 0 else 0.0 avg_duration = float(stats['avg_duration']) if stats['avg_duration'] else 0.0 return ScraperStatsModel( total_jobs=total, success_rate=round(success_rate, 2), avg_duration=round(avg_duration, 2) ) except Exception as e: log.warning(f"Error getting scraper stats for {scraper_id}: {e}") return ScraperStatsModel() async def get_scraper_by_id_from_db( db: DatabaseManager, scraper_id: str ) -> Optional[Dict[str, Any]]: """ Get scraper by ID directly from database. Args: db: Database manager instance scraper_id: UUID of the scraper registry entry Returns: Scraper dictionary or None if not found """ async with db.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT id, job_type, version, variant, module_path, function_name, is_default, traffic_pct, min_priority, config, deprecated_at FROM scraper_registry WHERE id = $1 """, UUID(scraper_id)) if not row: return None return dict(row) async def update_scraper_traffic( db: DatabaseManager, scraper_id: str, traffic_pct: int ) -> bool: """ Update traffic percentage for a scraper. Args: db: Database manager instance scraper_id: UUID of the scraper registry entry traffic_pct: New traffic percentage (0-100) Returns: True if updated, False if not found """ async with db.pool.acquire() as conn: result = await conn.execute(""" UPDATE scraper_registry SET traffic_pct = $2 WHERE id = $1 AND deprecated_at IS NULL """, UUID(scraper_id), traffic_pct) return result.split()[-1] == "1" async def deprecate_scraper_by_id( db: DatabaseManager, scraper_id: str ) -> Optional[str]: """ Deprecate a scraper by ID (soft delete). Args: db: Database manager instance scraper_id: UUID of the scraper registry entry Returns: Deprecation timestamp as ISO string, or None if not found/already deprecated """ async with db.pool.acquire() as conn: result = await conn.fetchval(""" UPDATE scraper_registry SET deprecated_at = NOW(), traffic_pct = 0 WHERE id = $1 AND deprecated_at IS NULL RETURNING deprecated_at """, UUID(scraper_id)) if result: return result.isoformat() return None async def promote_scraper_by_id( db: DatabaseManager, scraper_id: str, default_traffic_pct: int = 80 ) -> Optional[Dict[str, Any]]: """ Promote a scraper to stable variant, set as default, and give it majority traffic. This will: 1. Set the scraper's variant to 'stable' 2. Set is_default to True 3. Set traffic_pct to default_traffic_pct (default: 80%) 4. Unset is_default on other scrapers of the same job_type 5. Reduce traffic_pct on other scrapers proportionally Args: db: Database manager instance scraper_id: UUID of the scraper to promote default_traffic_pct: Traffic percentage to assign (default: 80) Returns: Updated scraper dict or None if not found """ async with db.pool.acquire() as conn: async with conn.transaction(): # Get the scraper to promote scraper = await conn.fetchrow(""" SELECT id, job_type, version, variant FROM scraper_registry WHERE id = $1 AND deprecated_at IS NULL """, UUID(scraper_id)) if not scraper: return None job_type = scraper['job_type'] # Unset is_default on other scrapers of same job_type await conn.execute(""" UPDATE scraper_registry SET is_default = FALSE WHERE job_type = $1 AND id != $2 """, job_type, UUID(scraper_id)) # Reduce traffic on other active scrapers proportionally # Calculate remaining traffic to distribute remaining_traffic = 100 - default_traffic_pct # Get other active scrapers other_scrapers = await conn.fetch(""" SELECT id, traffic_pct FROM scraper_registry WHERE job_type = $1 AND id != $2 AND deprecated_at IS NULL AND traffic_pct > 0 """, job_type, UUID(scraper_id)) if other_scrapers: total_other_traffic = sum(s['traffic_pct'] for s in other_scrapers) if total_other_traffic > 0: for s in other_scrapers: new_pct = int((s['traffic_pct'] / total_other_traffic) * remaining_traffic) await conn.execute(""" UPDATE scraper_registry SET traffic_pct = $2 WHERE id = $1 """, s['id'], new_pct) # Promote the target scraper updated = await conn.fetchrow(""" UPDATE scraper_registry SET variant = 'stable', is_default = TRUE, traffic_pct = $2 WHERE id = $1 RETURNING id, job_type, version, variant, is_default, traffic_pct """, UUID(scraper_id), default_traffic_pct) if updated: return dict(updated) return None # ==================== Dependency Injection ==================== _db: Optional[DatabaseManager] = None _registry: Optional[ScraperRegistry] = None def set_database(db: DatabaseManager): """Set the database instance for the router.""" global _db, _registry _db = db _registry = ScraperRegistry(db) def get_db() -> DatabaseManager: """Dependency to get database instance.""" if _db is None: raise HTTPException(status_code=500, detail="Database not initialized") return _db def get_registry() -> ScraperRegistry: """Dependency to get scraper registry instance.""" if _registry is None: raise HTTPException(status_code=500, detail="Scraper registry not initialized") return _registry # ==================== API Endpoints ==================== @router.get( "/scrapers", response_model=List[ScraperInfoResponse], summary="List All Scrapers", description="Get a list of all registered scrapers with their stats" ) async def list_scrapers( job_type: Optional[str] = Query(None, description="Filter by job type"), include_deprecated: bool = Query(False, description="Include deprecated scrapers"), db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ List all registered scrapers with their configuration and stats. Returns scraper information including: - Version and variant information - Traffic allocation percentage - Whether it's the default scraper - Last 24h performance stats (total jobs, success rate, avg duration) Use `job_type` filter to get scrapers for a specific job type. Set `include_deprecated=true` to include deprecated scrapers. """ try: # Refresh cache to get latest data await registry.refresh_cache() # Get all scrapers scrapers = await registry.list_scrapers( job_type=job_type, include_deprecated=include_deprecated ) # Enrich with stats result = [] for scraper in scrapers: stats = await get_scraper_stats(db, scraper['id']) # Get full scraper info from DB to include job_type full_info = await get_scraper_by_id_from_db(db, scraper['id']) result.append(ScraperInfoResponse( id=scraper['id'], job_type=full_info['job_type'] if full_info else 'unknown', version=scraper['version'], variant=scraper['variant'], is_default=scraper['is_default'], traffic_pct=scraper['traffic_pct'], module_path=scraper['module_path'], function_name=scraper.get('function_name'), deprecated_at=str(full_info['deprecated_at']) if full_info and full_info.get('deprecated_at') else None, stats=stats )) # Sort by job_type, then by version descending result.sort(key=lambda x: (x.job_type, x.version), reverse=True) return result except Exception as e: log.error(f"Error listing scrapers: {e}") raise HTTPException(status_code=500, detail=f"Failed to list scrapers: {str(e)}") @router.post( "/scrapers", response_model=RegisterScraperResponse, summary="Register New Scraper", description="Register a new scraper version" ) async def register_scraper( request: RegisterScraperRequest, db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ Register a new scraper version in the registry. This allows adding new scraper implementations that can be used for: - A/B testing (set traffic_pct to allocate traffic) - Canary releases (set variant to 'canary' with low traffic_pct) - Beta testing (set variant to 'beta') The scraper won't receive any traffic until traffic_pct > 0. **Parameters:** - `job_type`: Type of scraping job (e.g., 'google_reviews') - `version`: Semantic version (e.g., '1.1.0') - `variant`: Release channel ('stable', 'beta', 'canary') - `module_path`: Python module path (e.g., 'scrapers.google_reviews.v1_1_0') - `function_name`: Entry function name (default: 'scrape') - `traffic_pct`: Initial traffic allocation (0-100, default: 0) - `config`: Optional configuration dict passed to the scraper """ try: # Check if version already exists for this job_type existing = await registry.list_scrapers(job_type=request.job_type, include_deprecated=True) for scraper in existing: if scraper['version'] == request.version: raise HTTPException( status_code=409, detail=f"Scraper version {request.version} already exists for job_type {request.job_type}" ) # Register the new scraper scraper_id = await registry.register_scraper( job_type=request.job_type, version=request.version, variant=request.variant, module_path=request.module_path, function_name=request.function_name, is_default=False, # Never auto-set as default traffic_pct=request.traffic_pct, min_priority=request.min_priority, config=request.config ) log.info(f"Registered new scraper: {request.job_type} v{request.version} ({request.variant})") return RegisterScraperResponse( id=scraper_id, job_type=request.job_type, version=request.version, variant=request.variant, message=f"Successfully registered scraper {request.job_type} v{request.version} ({request.variant})" ) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: log.error(f"Error registering scraper: {e}") raise HTTPException(status_code=500, detail=f"Failed to register scraper: {str(e)}") @router.put( "/scrapers/{scraper_id}/traffic", response_model=UpdateTrafficResponse, summary="Update Traffic Percentage", description="Update the traffic allocation for a scraper" ) async def update_traffic( scraper_id: str, request: UpdateTrafficRequest, db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ Update the traffic percentage for a specific scraper. Traffic percentage determines what portion of requests are routed to this scraper version. Used for: - Gradual rollouts (start at 10%, increase to 50%, then 100%) - A/B testing (set two versions to 50% each) - Canary releases (set new version to 5-10%) **Note:** Total traffic across all active scrapers of the same job_type should not exceed 100%. The system uses weighted random selection, so percentages are relative weights, not exact guarantees. """ try: # Validate UUID format try: UUID(scraper_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid scraper ID format") # Check scraper exists scraper = await get_scraper_by_id_from_db(db, scraper_id) if not scraper: raise HTTPException(status_code=404, detail="Scraper not found") if scraper.get('deprecated_at'): raise HTTPException(status_code=400, detail="Cannot update traffic for deprecated scraper") # Update traffic success = await update_scraper_traffic(db, scraper_id, request.traffic_pct) if not success: raise HTTPException(status_code=500, detail="Failed to update traffic allocation") # Invalidate registry cache await registry.refresh_cache() log.info(f"Updated traffic for scraper {scraper_id} to {request.traffic_pct}%") return UpdateTrafficResponse( id=scraper_id, traffic_pct=request.traffic_pct, message=f"Traffic updated to {request.traffic_pct}%" ) except HTTPException: raise except Exception as e: log.error(f"Error updating traffic for scraper {scraper_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to update traffic: {str(e)}") @router.post( "/scrapers/{scraper_id}/deprecate", response_model=DeprecateResponse, summary="Deprecate Scraper", description="Mark a scraper as deprecated (soft delete)" ) async def deprecate_scraper( scraper_id: str, db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ Deprecate a scraper version (soft delete). This will: - Set deprecated_at timestamp - Set traffic_pct to 0 (no new requests) - Keep the scraper in the registry for historical reference Deprecated scrapers are excluded from normal routing but can still be explicitly requested by version for debugging. To permanently remove a scraper, use database admin tools. """ try: # Validate UUID format try: UUID(scraper_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid scraper ID format") # Check scraper exists scraper = await get_scraper_by_id_from_db(db, scraper_id) if not scraper: raise HTTPException(status_code=404, detail="Scraper not found") if scraper.get('deprecated_at'): raise HTTPException(status_code=400, detail="Scraper is already deprecated") # Deprecate deprecated_at = await deprecate_scraper_by_id(db, scraper_id) if not deprecated_at: raise HTTPException(status_code=500, detail="Failed to deprecate scraper") # Invalidate registry cache await registry.refresh_cache() log.info(f"Deprecated scraper {scraper_id}") return DeprecateResponse( id=scraper_id, deprecated_at=deprecated_at, message=f"Scraper deprecated. Traffic allocation set to 0%." ) except HTTPException: raise except Exception as e: log.error(f"Error deprecating scraper {scraper_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to deprecate scraper: {str(e)}") @router.post( "/scrapers/{scraper_id}/promote", response_model=PromoteResponse, summary="Promote Scraper", description="Promote scraper to stable variant and set as default" ) async def promote_scraper( scraper_id: str, traffic_pct: int = Query(80, description="Traffic percentage to assign (0-100)", ge=0, le=100), db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ Promote a scraper to stable variant, set as default, and give it majority traffic. This operation will: 1. Set the scraper's variant to 'stable' 2. Set is_default to True 3. Set traffic_pct to the specified value (default: 80%) 4. Unset is_default on other scrapers of the same job_type 5. Redistribute remaining traffic among other active scrapers **Use cases:** - Graduating a beta version to production - Making a canary release the new stable version - Switching to a new scraper implementation **Parameters:** - `traffic_pct`: Traffic percentage to assign (default: 80%) """ try: # Validate UUID format try: UUID(scraper_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid scraper ID format") # Check scraper exists scraper = await get_scraper_by_id_from_db(db, scraper_id) if not scraper: raise HTTPException(status_code=404, detail="Scraper not found") if scraper.get('deprecated_at'): raise HTTPException(status_code=400, detail="Cannot promote a deprecated scraper") # Promote result = await promote_scraper_by_id(db, scraper_id, traffic_pct) if not result: raise HTTPException(status_code=500, detail="Failed to promote scraper") # Invalidate registry cache await registry.refresh_cache() log.info(f"Promoted scraper {scraper_id} to stable with {traffic_pct}% traffic") return PromoteResponse( id=scraper_id, variant='stable', is_default=True, traffic_pct=traffic_pct, message=f"Scraper promoted to stable. Now default with {traffic_pct}% traffic." ) except HTTPException: raise except Exception as e: log.error(f"Error promoting scraper {scraper_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to promote scraper: {str(e)}") @router.get( "/scrapers/{scraper_id}", response_model=ScraperInfoResponse, summary="Get Scraper Details", description="Get detailed information about a specific scraper" ) async def get_scraper_details( scraper_id: str, db: DatabaseManager = Depends(get_db), registry: ScraperRegistry = Depends(get_registry) ): """ Get detailed information about a specific scraper including stats. """ try: # Validate UUID format try: UUID(scraper_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid scraper ID format") # Get scraper from DB scraper = await get_scraper_by_id_from_db(db, scraper_id) if not scraper: raise HTTPException(status_code=404, detail="Scraper not found") # Get stats stats = await get_scraper_stats(db, scraper_id) return ScraperInfoResponse( id=str(scraper['id']), job_type=scraper['job_type'], version=scraper['version'], variant=scraper['variant'], is_default=scraper['is_default'], traffic_pct=scraper['traffic_pct'], module_path=scraper['module_path'], function_name=scraper.get('function_name'), deprecated_at=str(scraper['deprecated_at']) if scraper.get('deprecated_at') else None, stats=stats ) except HTTPException: raise except Exception as e: log.error(f"Error getting scraper {scraper_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to get scraper: {str(e)}")