Files
Alejandro Gutiérrez 39c80fc8be Phases 5-7: Dashboard UI, Admin API, and Auth middleware
Phase 5 - Main Dashboard:
- Dashboard overview page with system health stats
- Jobs by status breakdown, success rates, top clients
- Dashboard API (/api/dashboard/overview, by-client, problems, by-version)

Phase 6 - Admin/Scraper Management:
- Scrapers management page with traffic allocation UI
- Admin API for scraper CRUD operations
- Traffic percentage updates for A/B testing
- Promote/deprecate scraper versions

Phase 7 - Authentication:
- API key authentication middleware
- SHA-256 key hashing (keys never stored in plain text)
- Scope-based authorization (jobs:read, jobs:write, admin)
- Rate limiting per API key

Also:
- Updated api_server_production.py to include new routers
- Extended core/database.py with dashboard query methods
- Added dashboard link to sidebar navigation
- Updated CONTEXT-KEEPER.md to mark all phases complete

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:43:00 +00:00

624 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Dashboard API for ReviewIQ Phase 5.
Provides system-wide analytics and monitoring endpoints:
- Overview statistics (jobs by status, success rates, durations)
- Client-level aggregations
- Problem detection (failures, slow jobs, callback issues)
- Scraper version performance analysis
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from enum import Enum
from fastapi import APIRouter, HTTPException, Query, Depends
from pydantic import BaseModel, Field
from core.database import DatabaseManager
log = logging.getLogger(__name__)
# Create router
router = APIRouter(prefix="/api/dashboard", tags=["dashboard"])
# ==================== Enums ====================
class TimePeriod(str, Enum):
"""Time period for filtering dashboard data"""
HOUR_1 = "1h"
HOUR_6 = "6h"
HOUR_24 = "24h"
DAY_7 = "7d"
DAY_30 = "30d"
# ==================== Pydantic Response Models ====================
class JobsByStatus(BaseModel):
"""Job counts grouped by status"""
pending: int = 0
running: int = 0
completed: int = 0
failed: int = 0
cancelled: int = 0
partial: int = 0
class OverviewResponse(BaseModel):
"""System-wide dashboard overview statistics"""
period: str = Field(..., description="Time period for the statistics (e.g., '24h')")
total_jobs: int = Field(..., description="Total number of jobs in the period")
completed_jobs: int = Field(..., description="Number of successfully completed jobs")
failed_jobs: int = Field(..., description="Number of failed jobs")
running_jobs: int = Field(..., description="Number of currently running jobs")
success_rate: float = Field(..., description="Percentage of successful jobs (0-100)")
avg_duration_seconds: Optional[float] = Field(None, description="Average job duration in seconds")
jobs_by_status: JobsByStatus = Field(..., description="Job counts grouped by status")
total_reviews_scraped: int = Field(0, description="Total reviews scraped in the period")
class ClientStats(BaseModel):
"""Job statistics for a single client"""
client_id: str = Field(..., description="Client identifier")
source: Optional[str] = Field(None, description="Source of the requests (e.g., 'veritasreview.com')")
total_jobs: int = Field(..., description="Total jobs submitted by this client")
completed: int = Field(..., description="Number of completed jobs")
failed: int = Field(..., description="Number of failed jobs")
success_rate: float = Field(..., description="Success rate percentage (0-100)")
total_reviews: int = Field(0, description="Total reviews scraped for this client")
class FailedJob(BaseModel):
"""Details of a failed job"""
job_id: str = Field(..., description="Job UUID")
url: str = Field(..., description="URL that was being scraped")
error_type: Optional[str] = Field(None, description="Categorized error type")
error_message: Optional[str] = Field(None, description="Error message")
failed_at: str = Field(..., description="ISO timestamp when the job failed")
client_id: Optional[str] = Field(None, description="Client who submitted the job")
class SlowJob(BaseModel):
"""Details of a slow job (taking > 2x average duration)"""
job_id: str = Field(..., description="Job UUID")
url: str = Field(..., description="URL that was being scraped")
duration_seconds: float = Field(..., description="Actual job duration in seconds")
avg_duration_seconds: float = Field(..., description="Average duration for comparison")
ratio: float = Field(..., description="How many times slower than average")
completed_at: str = Field(..., description="ISO timestamp when the job completed")
class CallbackFailure(BaseModel):
"""Details of a failed webhook callback"""
job_id: str = Field(..., description="Job UUID")
callback_url: str = Field(..., description="Webhook URL that failed")
status: str = Field(..., description="Callback status")
attempts: int = Field(..., description="Number of delivery attempts")
last_error: Optional[str] = Field(None, description="Last error message")
class ProblemsResponse(BaseModel):
"""Recent failures and issues"""
failed_jobs: List[FailedJob] = Field(default_factory=list, description="Recent job failures")
slow_jobs: List[SlowJob] = Field(default_factory=list, description="Jobs taking > 2x average duration")
callback_failures: List[CallbackFailure] = Field(default_factory=list, description="Failed webhook deliveries")
total_problems: int = Field(..., description="Total number of problems detected")
class VersionStats(BaseModel):
"""Performance statistics for a scraper version"""
version: str = Field(..., description="Scraper version string (e.g., '1.0.0')")
variant: Optional[str] = Field(None, description="Scraper variant (e.g., 'stable', 'stealth')")
total_jobs: int = Field(..., description="Total jobs run with this version")
success_rate: float = Field(..., description="Success rate percentage (0-100)")
avg_duration: Optional[float] = Field(None, description="Average job duration in seconds")
total_reviews: int = Field(0, description="Total reviews scraped with this version")
# ==================== Helper Functions ====================
def get_period_delta(period: TimePeriod) -> timedelta:
"""Convert period enum to timedelta"""
mapping = {
TimePeriod.HOUR_1: timedelta(hours=1),
TimePeriod.HOUR_6: timedelta(hours=6),
TimePeriod.HOUR_24: timedelta(hours=24),
TimePeriod.DAY_7: timedelta(days=7),
TimePeriod.DAY_30: timedelta(days=30),
}
return mapping.get(period, timedelta(hours=24))
def categorize_error(error_message: Optional[str]) -> str:
"""Categorize error message into a type"""
if not error_message:
return "unknown"
error_lower = error_message.lower()
if "rate" in error_lower and "limit" in error_lower:
return "rate_limited"
elif "timeout" in error_lower:
return "timeout"
elif "captcha" in error_lower or "recaptcha" in error_lower:
return "captcha_blocked"
elif "bot" in error_lower or "detected" in error_lower:
return "bot_detected"
elif "network" in error_lower or "connection" in error_lower:
return "network_error"
elif "element" in error_lower or "selector" in error_lower or "not found" in error_lower:
return "selector_failed"
elif "navigation" in error_lower or "page" in error_lower:
return "navigation_error"
elif "browser" in error_lower or "playwright" in error_lower:
return "browser_error"
else:
return "other"
# ==================== Database Query Functions ====================
async def get_overview_stats(
db: DatabaseManager,
period: TimePeriod
) -> Dict[str, Any]:
"""
Get system-wide job statistics for the specified period.
"""
delta = get_period_delta(period)
cutoff = datetime.now() - delta
async with db.pool.acquire() as conn:
# Get job counts by status
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
COUNT(*) FILTER (WHERE status = 'partial') as partial,
AVG(scrape_time) FILTER (WHERE status = 'completed' AND scrape_time IS NOT NULL) as avg_duration,
COALESCE(SUM(reviews_count) FILTER (WHERE status = 'completed'), 0) as total_reviews
FROM jobs
WHERE created_at >= $1
""", cutoff)
total = stats['total_jobs'] or 0
completed = stats['completed'] or 0
failed = stats['failed'] or 0
# Calculate success rate (only for finished jobs)
finished = completed + failed + (stats['partial'] or 0)
success_rate = (completed / finished * 100) if finished > 0 else 0.0
return {
'period': period.value,
'total_jobs': total,
'completed_jobs': completed,
'failed_jobs': failed,
'running_jobs': stats['running'] or 0,
'success_rate': round(success_rate, 1),
'avg_duration_seconds': round(stats['avg_duration'], 1) if stats['avg_duration'] else None,
'total_reviews_scraped': stats['total_reviews'] or 0,
'jobs_by_status': {
'pending': stats['pending'] or 0,
'running': stats['running'] or 0,
'completed': completed,
'failed': failed,
'cancelled': stats['cancelled'] or 0,
'partial': stats['partial'] or 0,
}
}
async def get_stats_by_client(
db: DatabaseManager,
period: TimePeriod,
limit: int = 50
) -> List[Dict[str, Any]]:
"""
Get job statistics grouped by client.
"""
delta = get_period_delta(period)
cutoff = datetime.now() - delta
async with db.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
COALESCE(requester_client_id, 'unknown') as client_id,
requester_source as source,
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed,
COALESCE(SUM(reviews_count) FILTER (WHERE status = 'completed'), 0) as total_reviews
FROM jobs
WHERE created_at >= $1
GROUP BY requester_client_id, requester_source
ORDER BY total_jobs DESC
LIMIT $2
""", cutoff, limit)
results = []
for row in rows:
total = row['total_jobs']
completed = row['completed'] or 0
failed = row['failed'] or 0
finished = completed + failed
success_rate = (completed / finished * 100) if finished > 0 else 0.0
results.append({
'client_id': row['client_id'],
'source': row['source'],
'total_jobs': total,
'completed': completed,
'failed': failed,
'success_rate': round(success_rate, 1),
'total_reviews': row['total_reviews'] or 0,
})
return results
async def get_problems(
db: DatabaseManager,
period: TimePeriod,
limit: int = 20
) -> Dict[str, Any]:
"""
Get recent failures and issues.
"""
delta = get_period_delta(period)
cutoff = datetime.now() - delta
async with db.pool.acquire() as conn:
# Get failed jobs
failed_rows = await conn.fetch("""
SELECT
job_id,
url,
error_message,
completed_at,
requester_client_id
FROM jobs
WHERE status IN ('failed', 'partial')
AND created_at >= $1
ORDER BY completed_at DESC
LIMIT $2
""", cutoff, limit)
failed_jobs = [
{
'job_id': str(row['job_id']),
'url': row['url'],
'error_type': categorize_error(row['error_message']),
'error_message': row['error_message'],
'failed_at': row['completed_at'].isoformat() if row['completed_at'] else datetime.now().isoformat(),
'client_id': row['requester_client_id'],
}
for row in failed_rows
]
# Get average duration for slow job detection
avg_duration = await conn.fetchval("""
SELECT AVG(scrape_time)
FROM jobs
WHERE status = 'completed'
AND scrape_time IS NOT NULL
AND created_at >= $1
""", cutoff)
slow_jobs = []
if avg_duration and avg_duration > 0:
# Find jobs taking > 2x average duration
slow_rows = await conn.fetch("""
SELECT
job_id,
url,
scrape_time,
completed_at
FROM jobs
WHERE status = 'completed'
AND scrape_time IS NOT NULL
AND scrape_time > $1 * 2
AND created_at >= $2
ORDER BY scrape_time DESC
LIMIT $3
""", avg_duration, cutoff, limit)
slow_jobs = [
{
'job_id': str(row['job_id']),
'url': row['url'],
'duration_seconds': round(row['scrape_time'], 1),
'avg_duration_seconds': round(avg_duration, 1),
'ratio': round(row['scrape_time'] / avg_duration, 1),
'completed_at': row['completed_at'].isoformat() if row['completed_at'] else datetime.now().isoformat(),
}
for row in slow_rows
]
# Get callback failures
callback_rows = await conn.fetch("""
SELECT
job_id,
callback_url,
callback_status,
callback_attempts
FROM jobs
WHERE callback_url IS NOT NULL
AND callback_status = 'failed'
AND created_at >= $1
ORDER BY completed_at DESC
LIMIT $2
""", cutoff, limit)
callback_failures = [
{
'job_id': str(row['job_id']),
'callback_url': row['callback_url'],
'status': row['callback_status'] or 'failed',
'attempts': row['callback_attempts'] or 0,
'last_error': None, # Would need to query webhook_attempts table
}
for row in callback_rows
]
total_problems = len(failed_jobs) + len(slow_jobs) + len(callback_failures)
return {
'failed_jobs': failed_jobs,
'slow_jobs': slow_jobs,
'callback_failures': callback_failures,
'total_problems': total_problems,
}
async def get_stats_by_version(
db: DatabaseManager,
period: TimePeriod,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Get performance statistics grouped by scraper version.
"""
delta = get_period_delta(period)
cutoff = datetime.now() - delta
async with db.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
COALESCE(scraper_version, 'unknown') as version,
scraper_variant as variant,
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status IN ('failed', 'partial')) as failed,
AVG(scrape_time) FILTER (WHERE status = 'completed' AND scrape_time IS NOT NULL) as avg_duration,
COALESCE(SUM(reviews_count) FILTER (WHERE status = 'completed'), 0) as total_reviews
FROM jobs
WHERE created_at >= $1
GROUP BY scraper_version, scraper_variant
ORDER BY total_jobs DESC
LIMIT $2
""", cutoff, limit)
results = []
for row in rows:
completed = row['completed'] or 0
failed = row['failed'] or 0
finished = completed + failed
success_rate = (completed / finished * 100) if finished > 0 else 0.0
results.append({
'version': row['version'],
'variant': row['variant'],
'total_jobs': row['total_jobs'],
'success_rate': round(success_rate, 1),
'avg_duration': round(row['avg_duration'], 1) if row['avg_duration'] else None,
'total_reviews': row['total_reviews'] or 0,
})
return results
# ==================== Dependency Injection ====================
_db: Optional[DatabaseManager] = None
def set_database(db: DatabaseManager):
"""Set the database instance for the router"""
global _db
_db = db
def get_db() -> DatabaseManager:
"""Dependency to get database instance"""
if _db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
return _db
# ==================== API Endpoints ====================
@router.get(
"/overview",
response_model=OverviewResponse,
summary="Get Dashboard Overview",
description="Get system-wide job statistics and success rates"
)
async def get_overview(
period: TimePeriod = Query(
TimePeriod.HOUR_24,
description="Time period for statistics (1h, 6h, 24h, 7d, 30d)"
),
db: DatabaseManager = Depends(get_db)
) -> OverviewResponse:
"""
Get system-wide dashboard statistics.
Returns aggregate job counts, success rates, and average durations
for the specified time period.
- **period**: Time window to analyze (default: 24h)
- 1h: Last hour
- 6h: Last 6 hours
- 24h: Last 24 hours
- 7d: Last 7 days
- 30d: Last 30 days
"""
try:
stats = await get_overview_stats(db, period)
return OverviewResponse(
period=stats['period'],
total_jobs=stats['total_jobs'],
completed_jobs=stats['completed_jobs'],
failed_jobs=stats['failed_jobs'],
running_jobs=stats['running_jobs'],
success_rate=stats['success_rate'],
avg_duration_seconds=stats['avg_duration_seconds'],
jobs_by_status=JobsByStatus(**stats['jobs_by_status']),
total_reviews_scraped=stats['total_reviews_scraped'],
)
except Exception as e:
log.error(f"Error getting dashboard overview: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get overview: {str(e)}")
@router.get(
"/by-client",
response_model=List[ClientStats],
summary="Get Stats by Client",
description="Get job statistics grouped by client"
)
async def get_by_client(
period: TimePeriod = Query(
TimePeriod.HOUR_24,
description="Time period for statistics (1h, 6h, 24h, 7d, 30d)"
),
limit: int = Query(50, description="Maximum number of clients to return", ge=1, le=200),
db: DatabaseManager = Depends(get_db)
) -> List[ClientStats]:
"""
Get job statistics grouped by client.
Returns aggregated statistics for each client including job counts,
success rates, and total reviews scraped. Results are ordered by
total job count descending.
- **period**: Time window to analyze (default: 24h)
- **limit**: Maximum number of clients to return (default: 50)
"""
try:
stats = await get_stats_by_client(db, period, limit)
return [
ClientStats(
client_id=s['client_id'],
source=s['source'],
total_jobs=s['total_jobs'],
completed=s['completed'],
failed=s['failed'],
success_rate=s['success_rate'],
total_reviews=s['total_reviews'],
)
for s in stats
]
except Exception as e:
log.error(f"Error getting client stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get client stats: {str(e)}")
@router.get(
"/problems",
response_model=ProblemsResponse,
summary="Get Recent Problems",
description="Get recent failures, slow jobs, and callback issues"
)
async def get_problems_endpoint(
period: TimePeriod = Query(
TimePeriod.HOUR_24,
description="Time period for problems (1h, 6h, 24h, 7d, 30d)"
),
limit: int = Query(20, description="Maximum number of items per category", ge=1, le=100),
db: DatabaseManager = Depends(get_db)
) -> ProblemsResponse:
"""
Get recent failures and issues.
Returns three categories of problems:
- **failed_jobs**: Jobs that failed with errors
- **slow_jobs**: Jobs that took more than 2x the average duration
- **callback_failures**: Webhook deliveries that failed
Each category includes relevant details for debugging and resolution.
- **period**: Time window to analyze (default: 24h)
- **limit**: Maximum items per category (default: 20)
"""
try:
problems = await get_problems(db, period, limit)
return ProblemsResponse(
failed_jobs=[FailedJob(**fj) for fj in problems['failed_jobs']],
slow_jobs=[SlowJob(**sj) for sj in problems['slow_jobs']],
callback_failures=[CallbackFailure(**cf) for cf in problems['callback_failures']],
total_problems=problems['total_problems'],
)
except Exception as e:
log.error(f"Error getting problems: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get problems: {str(e)}")
@router.get(
"/by-version",
response_model=List[VersionStats],
summary="Get Stats by Scraper Version",
description="Get performance statistics grouped by scraper version"
)
async def get_by_version(
period: TimePeriod = Query(
TimePeriod.HOUR_24,
description="Time period for statistics (1h, 6h, 24h, 7d, 30d)"
),
limit: int = Query(20, description="Maximum number of versions to return", ge=1, le=100),
db: DatabaseManager = Depends(get_db)
) -> List[VersionStats]:
"""
Get performance statistics grouped by scraper version.
Useful for comparing the performance of different scraper versions
and variants (e.g., 'stable' vs 'stealth'). Results are ordered by
total job count descending.
- **period**: Time window to analyze (default: 24h)
- **limit**: Maximum number of versions to return (default: 20)
"""
try:
stats = await get_stats_by_version(db, period, limit)
return [
VersionStats(
version=s['version'],
variant=s['variant'],
total_jobs=s['total_jobs'],
success_rate=s['success_rate'],
avg_duration=s['avg_duration'],
total_reviews=s['total_reviews'],
)
for s in stats
]
except Exception as e:
log.error(f"Error getting version stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get version stats: {str(e)}")