Files
Alejandro Gutiérrez 39c80fc8be Phases 5-7: Dashboard UI, Admin API, and Auth middleware
Phase 5 - Main Dashboard:
- Dashboard overview page with system health stats
- Jobs by status breakdown, success rates, top clients
- Dashboard API (/api/dashboard/overview, by-client, problems, by-version)

Phase 6 - Admin/Scraper Management:
- Scrapers management page with traffic allocation UI
- Admin API for scraper CRUD operations
- Traffic percentage updates for A/B testing
- Promote/deprecate scraper versions

Phase 7 - Authentication:
- API key authentication middleware
- SHA-256 key hashing (keys never stored in plain text)
- Scope-based authorization (jobs:read, jobs:write, admin)
- Rate limiting per API key

Also:
- Updated api_server_production.py to include new routers
- Extended core/database.py with dashboard query methods
- Added dashboard link to sidebar navigation
- Updated CONTEXT-KEEPER.md to mark all phases complete

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:43:00 +00:00

327 lines
10 KiB
Python

#!/usr/bin/env python3
"""
API Key Authentication Middleware for ReviewIQ Phase 7.
Security Model:
- API keys are never stored in plain text
- Only SHA-256 hashes are stored in the database
- First 8 characters (prefix) are stored for identification in logs/UI
- Keys follow format: "riq_" + 32 random alphanumeric characters
Authentication Flow:
1. Client sends API key in X-API-Key header
2. Server hashes the received key with SHA-256
3. Server looks up the hash in api_keys table
4. If found, active, and not expired, request is authenticated
5. Scopes are checked for protected endpoints
"""
import hashlib
import secrets
import string
import logging
from datetime import datetime
from functools import wraps
from typing import Optional, List, Callable
from uuid import UUID
from fastapi import Request, HTTPException, Depends
from fastapi.security import APIKeyHeader
log = logging.getLogger(__name__)
# Security header for API key
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Key format constants
API_KEY_PREFIX = "riq_"
API_KEY_RANDOM_LENGTH = 32
API_KEY_PREFIX_STORE_LENGTH = 8 # First 8 chars stored for identification
def generate_api_key() -> str:
"""
Generate a secure random API key with prefix.
Format: "riq_" + 32 random alphanumeric characters
Example: "riq_a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6"
Returns:
Secure random API key string
"""
# Use cryptographically secure random generator
alphabet = string.ascii_lowercase + string.digits
random_part = ''.join(secrets.choice(alphabet) for _ in range(API_KEY_RANDOM_LENGTH))
return f"{API_KEY_PREFIX}{random_part}"
class APIKeyAuth:
"""
API Key authentication middleware.
Usage:
from api.middleware.auth import APIKeyAuth
# Initialize with database
auth = APIKeyAuth(db)
# Use as dependency
@app.get("/protected")
async def protected_endpoint(client: dict = Depends(auth.verify_api_key)):
return {"client_id": client["client_id"]}
# Require specific scope
@app.post("/admin-only")
async def admin_endpoint(client: dict = Depends(auth.require_scope("admin"))):
return {"message": "Admin access granted"}
"""
def __init__(self, db):
"""
Initialize API key authentication.
Args:
db: DatabaseManager instance with api key methods
"""
self.db = db
async def verify_api_key(
self,
request: Request,
api_key: Optional[str] = Depends(api_key_header)
) -> dict:
"""
Verify API key and return client info.
This is a FastAPI dependency that validates the X-API-Key header
and returns information about the authenticated client.
Args:
request: FastAPI request object
api_key: API key from X-API-Key header
Returns:
dict: Client information:
{
"client_id": "veritas_123",
"key_id": "uuid-of-key",
"key_prefix": "riq_a1b2",
"name": "Production Key",
"scopes": ["jobs:read", "jobs:write"],
"rate_limit_rpm": 60
}
Raises:
HTTPException 401: If API key is missing or invalid
HTTPException 403: If API key is inactive or expired
"""
if not api_key:
log.warning(f"Missing API key for request: {request.method} {request.url.path}")
raise HTTPException(
status_code=401,
detail="Missing API key. Include X-API-Key header.",
headers={"WWW-Authenticate": "ApiKey"}
)
# Validate key format
if not api_key.startswith(API_KEY_PREFIX):
log.warning(f"Invalid API key format (wrong prefix): {api_key[:8]}...")
raise HTTPException(
status_code=401,
detail="Invalid API key format.",
headers={"WWW-Authenticate": "ApiKey"}
)
# Hash the key for lookup
key_hash = self.hash_api_key(api_key)
# Look up the key in database
key_data = await self.db.get_api_key_by_hash(key_hash)
if not key_data:
# Log only the prefix for security
log.warning(f"Unknown API key attempted: {api_key[:12]}...")
raise HTTPException(
status_code=401,
detail="Invalid API key.",
headers={"WWW-Authenticate": "ApiKey"}
)
# Check if key is active
if not key_data.get('is_active', False):
log.warning(f"Inactive API key used: {key_data['key_prefix']} (client: {key_data['client_id']})")
raise HTTPException(
status_code=403,
detail="API key has been revoked."
)
# Check expiration
expires_at = key_data.get('expires_at')
if expires_at and expires_at < datetime.utcnow():
log.warning(f"Expired API key used: {key_data['key_prefix']} (client: {key_data['client_id']})")
raise HTTPException(
status_code=403,
detail="API key has expired."
)
# Update last_used_at timestamp (fire and forget, don't block request)
try:
await self.db.update_api_key_last_used(key_data['id'])
except Exception as e:
# Don't fail the request if timestamp update fails
log.error(f"Failed to update last_used_at for key {key_data['key_prefix']}: {e}")
# Log successful authentication (at debug level to avoid log spam)
log.debug(f"Authenticated: client={key_data['client_id']} key={key_data['key_prefix']}")
# Return client info
return {
"client_id": key_data['client_id'],
"key_id": str(key_data['id']),
"key_prefix": key_data['key_prefix'],
"name": key_data['name'],
"scopes": key_data.get('scopes', []),
"rate_limit_rpm": key_data.get('rate_limit_rpm', 60)
}
def require_scope(self, scope: str) -> Callable:
"""
Create a dependency that requires a specific scope.
Usage:
@app.post("/jobs")
async def create_job(client: dict = Depends(auth.require_scope("jobs:write"))):
# Only accessible with jobs:write scope
pass
Args:
scope: Required scope string (e.g., "jobs:read", "jobs:write", "admin")
Returns:
FastAPI dependency function that verifies the API key and checks scope
"""
async def scope_dependency(
request: Request,
api_key: Optional[str] = Depends(api_key_header)
) -> dict:
# First verify the API key
client = await self.verify_api_key(request, api_key)
# Check if client has the required scope
client_scopes = client.get('scopes', [])
# Admin scope grants all permissions
if 'admin' in client_scopes:
return client
if scope not in client_scopes:
log.warning(
f"Scope denied: client={client['client_id']} "
f"required={scope} has={client_scopes}"
)
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Required scope: {scope}"
)
return client
return scope_dependency
def require_any_scope(self, scopes: List[str]) -> Callable:
"""
Create a dependency that requires any one of the specified scopes.
Usage:
@app.get("/jobs/{job_id}")
async def get_job(client: dict = Depends(auth.require_any_scope(["jobs:read", "jobs:write"]))):
pass
Args:
scopes: List of acceptable scopes (client needs at least one)
Returns:
FastAPI dependency function
"""
async def scope_dependency(
request: Request,
api_key: Optional[str] = Depends(api_key_header)
) -> dict:
client = await self.verify_api_key(request, api_key)
client_scopes = client.get('scopes', [])
# Admin scope grants all permissions
if 'admin' in client_scopes:
return client
# Check if client has any of the required scopes
if not any(s in client_scopes for s in scopes):
log.warning(
f"Scope denied: client={client['client_id']} "
f"required_any={scopes} has={client_scopes}"
)
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Required one of: {', '.join(scopes)}"
)
return client
return scope_dependency
@staticmethod
def hash_api_key(api_key: str) -> str:
"""
Hash API key for storage/lookup using SHA-256.
This is a one-way hash - the original key cannot be recovered.
We use SHA-256 for consistency and security.
Args:
api_key: Plain text API key
Returns:
64-character hexadecimal hash string
"""
return hashlib.sha256(api_key.encode('utf-8')).hexdigest()
@staticmethod
def get_key_prefix(api_key: str) -> str:
"""
Extract the identifying prefix from an API key.
This prefix is safe to store and display as it cannot
be used to reconstruct the full key.
Args:
api_key: Plain text API key
Returns:
First 8 characters of the key (e.g., "riq_a1b2")
"""
return api_key[:API_KEY_PREFIX_STORE_LENGTH]
# Convenience function for creating auth instance
def create_auth(db) -> APIKeyAuth:
"""
Factory function to create APIKeyAuth instance.
Args:
db: DatabaseManager instance
Returns:
Configured APIKeyAuth instance
"""
return APIKeyAuth(db)
# Available scopes documentation
AVAILABLE_SCOPES = {
"jobs:read": "Read job status and results",
"jobs:write": "Create and cancel jobs",
"batches:read": "Read batch status and results",
"batches:write": "Create and manage batches",
"webhooks:manage": "Configure webhook endpoints",
"admin": "Full administrative access (includes all other scopes)"
}