Files
whyrating-engine-legacy/core/database.py
Alejandro Gutiérrez 03ed7029e2 feat: Add decoupled pipeline schema with separate PostgreSQL namespace
- Create consolidated migration (005_create_pipeline_schema.sql) with
  'pipeline' schema for all classification tables
- Update pipeline repositories to use schema prefix (pipeline.*)
- Add run_migrations() method to DatabaseManager
- Add CLI tool for running versioned migrations

Tables created in pipeline schema:
- reviews_raw, reviews_enriched (Stage 1)
- review_spans (Stage 2)
- issues, issue_spans, issue_events (Stage 3)
- fact_timeseries (Stage 4)
- urt_domains, urt_categories (taxonomy lookup)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 18:17:20 +00:00

1596 lines
56 KiB
Python

#!/usr/bin/env python3
"""
PostgreSQL database module for production microservice.
Stores job metadata and reviews as JSONB.
"""
import asyncpg
import json
from datetime import datetime
from typing import Optional, List, Dict, Any
from uuid import UUID, uuid4
import logging
from core.enums import JobStatus
log = logging.getLogger(__name__)
class DatabaseManager:
"""PostgreSQL database manager with connection pooling"""
def __init__(self, database_url: str):
"""
Initialize database manager.
Args:
database_url: PostgreSQL connection URL
Format: postgresql://user:password@host:port/database
"""
self.database_url = database_url
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool"""
log.info("Connecting to PostgreSQL database...")
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20,
command_timeout=60
)
log.info("Database connection pool created")
async def disconnect(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
log.info("Database connection pool closed")
async def initialize_schema(self):
"""Create database schema if it doesn't exist"""
async with self.pool.acquire() as conn:
# Create jobs table
await conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
url TEXT NOT NULL,
webhook_url TEXT,
webhook_secret TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
updated_at TIMESTAMP,
reviews_count INTEGER,
total_reviews INTEGER,
reviews_data JSONB,
scrape_time REAL,
error_message TEXT,
metadata JSONB,
scrape_logs JSONB,
CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial'))
);
""")
# Add scrape_logs column if it doesn't exist (for existing databases)
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS scrape_logs JSONB;
""")
# Add updated_at column if it doesn't exist (for incremental progress tracking)
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP;
""")
# Add review_topics column if it doesn't exist (extracted topic filters with mention counts)
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS review_topics JSONB;
""")
# Update constraint to include 'partial' status (for existing databases)
await conn.execute("""
ALTER TABLE jobs DROP CONSTRAINT IF EXISTS valid_status;
""")
await conn.execute("""
ALTER TABLE jobs ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial'));
""")
# Create indexes
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_jobs_webhook ON jobs(webhook_url) WHERE webhook_url IS NOT NULL;
""")
# Create canary results table
await conn.execute("""
CREATE TABLE IF NOT EXISTS canary_results (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
success BOOLEAN NOT NULL,
reviews_count INTEGER,
scrape_time REAL,
error_message TEXT,
metadata JSONB
);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_canary_timestamp ON canary_results(timestamp DESC);
""")
# Create webhook attempts table (for retry tracking)
await conn.execute("""
CREATE TABLE IF NOT EXISTS webhook_attempts (
id SERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE,
attempt_number INTEGER NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
success BOOLEAN NOT NULL,
status_code INTEGER,
error_message TEXT,
response_time_ms REAL
);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id);
""")
# Add session_fingerprint and metrics_history columns to jobs table
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS session_fingerprint JSONB;
""")
await conn.execute("""
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS metrics_history JSONB;
""")
# Create crash_reports table
await conn.execute("""
CREATE TABLE IF NOT EXISTS crash_reports (
crash_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID REFERENCES jobs(job_id) ON DELETE CASCADE,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
crash_type VARCHAR(50) NOT NULL,
error_message TEXT,
state JSONB NOT NULL,
metrics_history JSONB,
logs_before_crash JSONB,
analysis JSONB,
screenshot_url TEXT,
dom_snapshot_id UUID
);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_crash_reports_job ON crash_reports(job_id);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_crash_reports_type ON crash_reports(crash_type);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_crash_reports_created ON crash_reports(created_at DESC);
""")
log.info("Database schema initialized")
async def run_migrations(self, migrations_dir: str = "migrations/versions"):
"""
Run versioned migrations from SQL files.
Args:
migrations_dir: Path to directory containing .sql migration files.
Files are run in sorted order.
Returns:
Number of migrations applied.
"""
from pathlib import Path
migrations_path = Path(migrations_dir)
if not migrations_path.exists():
log.warning(f"Migrations directory not found: {migrations_dir}")
return 0
async with self.pool.acquire() as conn:
# Create migrations tracking table
await conn.execute("""
CREATE TABLE IF NOT EXISTS _migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) UNIQUE NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
# Get already applied migrations
applied = await conn.fetch("SELECT filename FROM _migrations")
applied_set = {r["filename"] for r in applied}
# Find and run pending migrations
migration_files = sorted(migrations_path.glob("*.sql"))
migrations_run = 0
for migration_file in migration_files:
filename = migration_file.name
if filename in applied_set:
continue
log.info(f"Running migration: {filename}")
async with conn.transaction():
try:
sql = migration_file.read_text()
await conn.execute(sql)
await conn.execute(
"INSERT INTO _migrations (filename) VALUES ($1)",
filename,
)
migrations_run += 1
log.info(f"Migration {filename} applied successfully")
except Exception as e:
log.error(f"Migration {filename} failed: {e}")
raise
log.info(f"Ran {migrations_run} migrations")
return migrations_run
# ==================== Job Operations ====================
async def create_job(
self,
url: str,
webhook_url: Optional[str] = None,
webhook_secret: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
# Phase 2: New fields for requester tracking
requester_client_id: Optional[str] = None,
requester_source: Optional[str] = None,
scrape_purpose: Optional[str] = None,
requester_metadata: Optional[Dict[str, Any]] = None,
# Phase 2: Batch support
batch_id: Optional[UUID] = None,
batch_index: Optional[int] = None,
# Phase 2: Job configuration
job_type: str = 'google_reviews',
priority: int = 0,
callback_url: Optional[str] = None,
# Phase 2: Scraper versioning
scraper_version: Optional[str] = None,
scraper_variant: Optional[str] = None
) -> UUID:
"""
Create a new scraping job.
Args:
url: Google Maps URL to scrape
webhook_url: Optional webhook URL for notifications
webhook_secret: Optional secret for webhook signature
metadata: Optional additional metadata
requester_client_id: Client ID of the requester (for tracking)
requester_source: Source of the request (e.g., 'api', 'web', 'batch')
scrape_purpose: Purpose of the scrape (e.g., 'competitor_analysis')
requester_metadata: Additional requester-specific metadata
batch_id: ID of the batch this job belongs to
batch_index: Index of this job within the batch
job_type: Type of job (default: 'google_reviews')
priority: Job priority (higher = more urgent, default: 0)
callback_url: URL to call when job completes
scraper_version: Version of the scraper to use
scraper_variant: Variant of the scraper (e.g., 'stealth', 'fast')
Returns:
UUID of created job
"""
async with self.pool.acquire() as conn:
job_id = await conn.fetchval("""
INSERT INTO jobs (
url, webhook_url, webhook_secret, metadata,
requester_client_id, requester_source, scrape_purpose, requester_metadata,
batch_id, batch_index,
job_type, priority, callback_url,
scraper_version, scraper_variant
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
RETURNING job_id
""",
url, webhook_url, webhook_secret,
json.dumps(metadata) if metadata else None,
requester_client_id, requester_source, scrape_purpose,
json.dumps(requester_metadata) if requester_metadata else None,
batch_id, batch_index,
job_type, priority, callback_url,
scraper_version, scraper_variant
)
log.info(f"Created job {job_id} for URL: {url[:80]}...")
return job_id
async def get_job(self, job_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get job by ID.
Args:
job_id: Job UUID
Returns:
Job dictionary or None if not found
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
job_id,
status,
url,
webhook_url,
created_at,
started_at,
completed_at,
updated_at,
reviews_count,
total_reviews,
reviews_data,
scrape_time,
error_message,
metadata,
scrape_logs,
review_topics,
requester_client_id,
requester_source,
scrape_purpose,
requester_metadata,
batch_id,
batch_index,
job_type,
priority,
callback_url,
callback_status,
callback_attempts,
scraper_version,
scraper_variant
FROM jobs
WHERE job_id = $1
""", job_id)
if not row:
return None
return dict(row)
async def get_job_reviews(self, job_id: UUID, include_partial: bool = True) -> Optional[List[Dict[str, Any]]]:
"""
Get reviews for a specific job.
Args:
job_id: Job UUID
include_partial: If True, also return reviews for running and partial jobs
Returns:
List of reviews or None if not found/no reviews
"""
async with self.pool.acquire() as conn:
if include_partial:
# Return reviews for completed, running, or partial jobs
reviews_data = await conn.fetchval("""
SELECT reviews_data
FROM jobs
WHERE job_id = $1 AND status IN ('completed', 'running', 'partial')
""", job_id)
else:
# Only return reviews for completed jobs
reviews_data = await conn.fetchval("""
SELECT reviews_data
FROM jobs
WHERE job_id = $1 AND status = 'completed'
""", job_id)
if not reviews_data:
return None
# asyncpg returns JSONB as string, need to parse it
if isinstance(reviews_data, str):
return json.loads(reviews_data)
return reviews_data
async def update_job_status(
self,
job_id: UUID,
status: JobStatus,
**kwargs
):
"""
Update job status and optional fields.
Args:
job_id: Job UUID
status: New status
**kwargs: Additional fields to update (started_at, completed_at, error_message, etc.)
"""
# Build dynamic UPDATE query
set_clauses = ["status = $2"]
params = [job_id, status.value]
param_idx = 3
if status == JobStatus.RUNNING and 'started_at' not in kwargs:
kwargs['started_at'] = datetime.now()
elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and 'completed_at' not in kwargs:
kwargs['completed_at'] = datetime.now()
for key, value in kwargs.items():
# Handle JSONB fields specially
if key == 'scrape_logs' and value is not None:
set_clauses.append(f"{key} = ${param_idx}::jsonb")
params.append(json.dumps(value) if not isinstance(value, str) else value)
else:
set_clauses.append(f"{key} = ${param_idx}")
params.append(value)
param_idx += 1
query = f"""
UPDATE jobs
SET {', '.join(set_clauses)}
WHERE job_id = $1
"""
async with self.pool.acquire() as conn:
await conn.execute(query, *params)
async def save_job_result(
self,
job_id: UUID,
reviews: List[Dict[str, Any]],
scrape_time: float,
total_reviews: Optional[int] = None,
scrape_logs: Optional[List[Dict[str, Any]]] = None,
review_topics: Optional[List[Dict[str, Any]]] = None
):
"""
Save scraping results to database.
Args:
job_id: Job UUID
reviews: List of review dictionaries
scrape_time: Time taken to scrape in seconds
total_reviews: Total reviews available (from page counter)
scrape_logs: List of log entries from the scraper
review_topics: List of topic filter dictionaries with topic and count
"""
async with self.pool.acquire() as conn:
# If reviews list is empty, check if job already has reviews from incremental saves
# This happens when flush_callback was used during scraping
if not reviews:
existing = await conn.fetchval(
"SELECT reviews_count FROM jobs WHERE job_id = $1", job_id
)
if existing and existing > 0:
# Job has reviews from incremental saves, don't overwrite reviews_data
await conn.execute("""
UPDATE jobs
SET
status = 'completed',
completed_at = NOW(),
total_reviews = COALESCE($2, total_reviews),
scrape_time = $3,
scrape_logs = $4::jsonb,
review_topics = $5::jsonb
WHERE job_id = $1
""", job_id, total_reviews, scrape_time,
json.dumps(scrape_logs) if scrape_logs else None,
json.dumps(review_topics) if review_topics else None)
log.info(f"Completed job {job_id} with {existing} reviews (from incremental saves)")
return
await conn.execute("""
UPDATE jobs
SET
status = 'completed',
completed_at = NOW(),
reviews_count = $2,
total_reviews = $3,
reviews_data = $4::jsonb,
scrape_time = $5,
scrape_logs = $6::jsonb,
review_topics = $7::jsonb
WHERE job_id = $1
""", job_id, len(reviews), total_reviews, json.dumps(reviews), scrape_time,
json.dumps(scrape_logs) if scrape_logs else None,
json.dumps(review_topics) if review_topics else None)
log.info(f"Saved {len(reviews)} reviews for job {job_id}")
async def save_reviews_incremental(
self,
job_id: UUID,
reviews: List[Dict[str, Any]],
total_reviews: Optional[int] = None
):
"""
Save reviews incrementally during scraping.
Called on each flush to preserve progress in case of crash.
Args:
job_id: Job UUID
reviews: ALL reviews collected so far (not just new ones)
total_reviews: Total reviews available (from page counter)
"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE jobs
SET
reviews_count = $2,
total_reviews = COALESCE($3, total_reviews),
reviews_data = $4::jsonb,
updated_at = NOW()
WHERE job_id = $1 AND status = 'running'
""", job_id, len(reviews), total_reviews, json.dumps(reviews))
log.debug(f"Incremental save: {len(reviews)} reviews for job {job_id}")
async def update_session_fingerprint(
self,
job_id: UUID,
session_fingerprint: Dict[str, Any]
):
"""
Update the session fingerprint for a job.
This should be called early in the scraping process after the browser
fingerprint is captured, to record browser characteristics for
bot detection analysis.
Args:
job_id: Job UUID
session_fingerprint: Dictionary containing browser fingerprint data:
- user_agent: Browser user agent string
- platform: OS platform
- language: Primary language
- languages: List of accepted languages
- timezone: Timezone string
- screen: {width, height, colorDepth}
- viewport: {width, height}
- webgl_vendor: WebGL vendor string
- webgl_renderer: WebGL renderer string
- canvas_fingerprint: Canvas fingerprint hash
- hardware_concurrency: Number of CPU cores
- device_memory: Device memory in GB
- bot_detection_tests: {webdriver_hidden, chrome_runtime, permissions_query}
- captured_at: ISO timestamp when fingerprint was captured
"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE jobs
SET
session_fingerprint = $2::jsonb,
updated_at = NOW()
WHERE job_id = $1
""", job_id, json.dumps(session_fingerprint))
log.debug(f"Updated session fingerprint for job {job_id}")
async def mark_job_partial(
self,
job_id: UUID,
error_message: str,
scrape_logs: Optional[List[Dict[str, Any]]] = None
):
"""
Mark a job as partial (crashed but has some reviews saved).
Args:
job_id: Job UUID
error_message: Error that caused the crash
scrape_logs: Log entries from the scraper
"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE jobs
SET
status = 'partial',
completed_at = NOW(),
error_message = $2,
scrape_logs = $3::jsonb
WHERE job_id = $1
""", job_id, error_message, json.dumps(scrape_logs) if scrape_logs else None)
log.info(f"Marked job {job_id} as partial due to: {error_message}")
async def list_jobs(
self,
status: Optional[JobStatus] = None,
limit: int = 100,
offset: int = 0,
requester_client_id: Optional[str] = None,
batch_id: Optional[UUID] = None
) -> List[Dict[str, Any]]:
"""
List jobs with optional filtering.
Args:
status: Optional status filter
limit: Maximum number of jobs to return
offset: Number of jobs to skip
requester_client_id: Optional filter by requester client ID
batch_id: Optional filter by batch ID
Returns:
List of job dictionaries
"""
async with self.pool.acquire() as conn:
# Build dynamic WHERE clause
conditions = []
params = []
param_idx = 1
if status:
conditions.append(f"status = ${param_idx}")
params.append(status.value)
param_idx += 1
if requester_client_id:
conditions.append(f"requester_client_id = ${param_idx}")
params.append(requester_client_id)
param_idx += 1
if batch_id:
conditions.append(f"batch_id = ${param_idx}")
params.append(batch_id)
param_idx += 1
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
# Add limit and offset params
params.extend([limit, offset])
query = f"""
SELECT
job_id,
status,
url,
created_at,
completed_at,
reviews_count,
total_reviews,
scrape_time,
error_message,
metadata,
review_topics,
requester_client_id,
requester_source,
scrape_purpose,
requester_metadata,
batch_id,
batch_index,
job_type,
priority,
callback_url,
callback_status,
callback_attempts,
scraper_version,
scraper_variant
FROM jobs
{where_clause}
ORDER BY created_at DESC
LIMIT ${param_idx} OFFSET ${param_idx + 1}
"""
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
async def update_job_callback(
self,
job_id: UUID,
status: str,
attempts: Optional[int] = None
):
"""
Update callback status for a job.
Args:
job_id: Job UUID
status: Callback status ('pending', 'success', 'failed', 'skipped')
attempts: Number of callback attempts (if not provided, increments by 1)
"""
async with self.pool.acquire() as conn:
if attempts is not None:
await conn.execute("""
UPDATE jobs
SET callback_status = $2, callback_attempts = $3, updated_at = NOW()
WHERE job_id = $1
""", job_id, status, attempts)
else:
await conn.execute("""
UPDATE jobs
SET callback_status = $2,
callback_attempts = COALESCE(callback_attempts, 0) + 1,
updated_at = NOW()
WHERE job_id = $1
""", job_id, status)
log.debug(f"Updated callback status for job {job_id}: {status}")
# ==================== Batch Operations ====================
async def create_batch(
self,
name: str,
requester_client_id: Optional[str] = None,
requester_source: Optional[str] = None,
scrape_purpose: Optional[str] = None,
total_jobs: int = 0,
callback_url: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> UUID:
"""
Create a new batch for grouping jobs.
Args:
name: Batch name/description
requester_client_id: Client ID of the requester
requester_source: Source of the batch request
scrape_purpose: Purpose of the scrape
total_jobs: Expected total number of jobs in batch
callback_url: URL to call when batch completes
metadata: Additional batch metadata
Returns:
UUID of created batch
"""
async with self.pool.acquire() as conn:
batch_id = await conn.fetchval("""
INSERT INTO batches (
name, requester_client_id, requester_source, scrape_purpose,
total_jobs, callback_url, metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING batch_id
""",
name, requester_client_id, requester_source, scrape_purpose,
total_jobs, callback_url,
json.dumps(metadata) if metadata else None
)
log.info(f"Created batch {batch_id}: {name} ({total_jobs} jobs)")
return batch_id
async def get_batch(self, batch_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get batch by ID with job counts.
Args:
batch_id: Batch UUID
Returns:
Batch dictionary with job counts or None if not found
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
b.batch_id,
b.name,
b.status,
b.requester_client_id,
b.requester_source,
b.scrape_purpose,
b.total_jobs,
b.completed_jobs,
b.failed_jobs,
b.callback_url,
b.callback_status,
b.metadata,
b.created_at,
b.updated_at,
b.completed_at,
COUNT(j.job_id) FILTER (WHERE j.status = 'pending') as pending_jobs,
COUNT(j.job_id) FILTER (WHERE j.status = 'running') as running_jobs,
COUNT(j.job_id) as actual_total_jobs
FROM batches b
LEFT JOIN jobs j ON j.batch_id = b.batch_id
WHERE b.batch_id = $1
GROUP BY b.batch_id
""", batch_id)
if not row:
return None
return dict(row)
async def update_batch_progress(self, batch_id: UUID):
"""
Recalculate and update batch progress from jobs table.
Args:
batch_id: Batch UUID
"""
async with self.pool.acquire() as conn:
# Calculate job counts
counts = await conn.fetchrow("""
SELECT
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'partial') as partial,
COUNT(*) as total
FROM jobs
WHERE batch_id = $1
""", batch_id)
completed = counts['completed'] or 0
failed = counts['failed'] or 0
partial = counts['partial'] or 0
total = counts['total'] or 0
# Determine batch status
if total == 0:
status = 'pending'
elif completed + failed + partial >= total:
status = 'completed'
elif completed > 0 or failed > 0 or partial > 0:
status = 'running'
else:
status = 'pending'
# Update batch
await conn.execute("""
UPDATE batches
SET
completed_jobs = $2,
failed_jobs = $3,
status = $4,
updated_at = NOW(),
completed_at = CASE WHEN $4 = 'completed' THEN NOW() ELSE completed_at END
WHERE batch_id = $1
""", batch_id, completed, failed, status)
log.debug(f"Updated batch {batch_id} progress: {completed}/{total} completed, {failed} failed")
async def get_batches(
self,
requester_client_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""
List batches with optional filtering.
Args:
requester_client_id: Optional filter by requester client ID
status: Optional filter by batch status
limit: Maximum number of batches to return
Returns:
List of batch dictionaries
"""
async with self.pool.acquire() as conn:
# Build dynamic WHERE clause
conditions = []
params = []
param_idx = 1
if requester_client_id:
conditions.append(f"requester_client_id = ${param_idx}")
params.append(requester_client_id)
param_idx += 1
if status:
conditions.append(f"status = ${param_idx}")
params.append(status)
param_idx += 1
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
query = f"""
SELECT
batch_id,
name,
status,
requester_client_id,
requester_source,
scrape_purpose,
total_jobs,
completed_jobs,
failed_jobs,
callback_url,
callback_status,
metadata,
created_at,
updated_at,
completed_at
FROM batches
{where_clause}
ORDER BY created_at DESC
LIMIT ${param_idx}
"""
rows = await conn.fetch(query, *params)
return [dict(row) for row in rows]
async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get completed jobs that have webhooks pending delivery.
Args:
limit: Maximum number of jobs to return
Returns:
List of job dictionaries with webhook info
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
job_id,
status,
url,
webhook_url,
webhook_secret,
reviews_count,
scrape_time,
error_message,
completed_at
FROM jobs
WHERE webhook_url IS NOT NULL
AND status IN ('completed', 'failed')
AND job_id NOT IN (
SELECT job_id
FROM webhook_attempts
WHERE success = true
)
ORDER BY completed_at ASC
LIMIT $1
""", limit)
return [dict(row) for row in rows]
async def delete_job(self, job_id: UUID) -> bool:
"""
Delete a job from the database.
Args:
job_id: Job UUID
Returns:
True if deleted, False if not found
"""
async with self.pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM jobs WHERE job_id = $1
""", job_id)
deleted = result.split()[-1] == "1"
if deleted:
log.info(f"Deleted job {job_id}")
return deleted
async def cleanup_old_jobs(self, max_age_days: int = 30):
"""
Delete old completed/failed jobs.
Args:
max_age_days: Maximum age in days before deletion
"""
async with self.pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM jobs
WHERE status IN ('completed', 'failed', 'cancelled')
AND completed_at < NOW() - INTERVAL '%s days'
""", max_age_days)
deleted_count = int(result.split()[-1])
if deleted_count > 0:
log.info(f"Cleaned up {deleted_count} old jobs")
# ==================== Statistics ====================
async def get_stats(self) -> Dict[str, Any]:
"""
Get job statistics.
Returns:
Statistics dictionary
"""
async with self.pool.acquire() as conn:
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
AVG(scrape_time) FILTER (WHERE status = 'completed') as avg_scrape_time,
SUM(reviews_count) FILTER (WHERE status = 'completed') as total_reviews
FROM jobs
""")
return dict(stats)
# ==================== Canary Operations ====================
async def save_canary_result(
self,
success: bool,
reviews_count: Optional[int] = None,
scrape_time: Optional[float] = None,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""
Save canary test result.
Args:
success: Whether canary test succeeded
reviews_count: Number of reviews scraped
scrape_time: Time taken in seconds
error_message: Error message if failed
metadata: Additional metadata
"""
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO canary_results (success, reviews_count, scrape_time, error_message, metadata)
VALUES ($1, $2, $3, $4, $5)
""", success, reviews_count, scrape_time, error_message, json.dumps(metadata) if metadata else None)
async def get_canary_history(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Get canary test history.
Args:
limit: Maximum number of results to return
Returns:
List of canary result dictionaries
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
timestamp,
success,
reviews_count,
scrape_time,
error_message
FROM canary_results
ORDER BY timestamp DESC
LIMIT $1
""", limit)
return [dict(row) for row in rows]
# ==================== Webhook Attempts ====================
async def log_webhook_attempt(
self,
job_id: UUID,
attempt_number: int,
success: bool,
status_code: Optional[int] = None,
error_message: Optional[str] = None,
response_time_ms: Optional[float] = None
):
"""
Log a webhook delivery attempt.
Args:
job_id: Job UUID
attempt_number: Attempt number (1, 2, 3...)
success: Whether delivery succeeded
status_code: HTTP status code
error_message: Error message if failed
response_time_ms: Response time in milliseconds
"""
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms)
VALUES ($1, $2, $3, $4, $5, $6)
""", job_id, attempt_number, success, status_code, error_message, response_time_ms)
# ==================== Crash Reports ====================
async def save_crash_report(self, job_id: str, crash_data: dict) -> str:
"""
Save a crash report and return the crash_id.
Args:
job_id: Job UUID as string
crash_data: Dictionary containing crash report data:
- crash_type: Type of crash (required)
- error_message: Error message (optional)
- state: Current state at crash time (required)
- metrics_history: Historical metrics (optional)
- logs_before_crash: Log entries before crash (optional)
- analysis: Crash analysis data (optional)
- screenshot_url: URL to screenshot (optional)
- dom_snapshot_id: UUID of DOM snapshot (optional)
Returns:
UUID of created crash report as string
"""
async with self.pool.acquire() as conn:
# Convert job_id string to UUID
job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id
crash_id = await conn.fetchval("""
INSERT INTO crash_reports (
job_id,
crash_type,
error_message,
state,
metrics_history,
logs_before_crash,
analysis,
screenshot_url,
dom_snapshot_id
)
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8, $9)
RETURNING crash_id
""",
job_uuid,
crash_data.get('crash_type'),
crash_data.get('error_message'),
json.dumps(crash_data.get('state', {})),
json.dumps(crash_data.get('metrics_history')) if crash_data.get('metrics_history') else None,
json.dumps(crash_data.get('logs_before_crash')) if crash_data.get('logs_before_crash') else None,
json.dumps(crash_data.get('analysis')) if crash_data.get('analysis') else None,
crash_data.get('screenshot_url'),
UUID(crash_data['dom_snapshot_id']) if crash_data.get('dom_snapshot_id') else None
)
log.info(f"Saved crash report {crash_id} for job {job_id}, type: {crash_data.get('crash_type')}")
return str(crash_id)
async def get_crash_report(self, job_id: str) -> Optional[dict]:
"""
Get crash report for a job, if any.
Args:
job_id: Job UUID as string
Returns:
Crash report dictionary or None if not found
"""
async with self.pool.acquire() as conn:
job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id
row = await conn.fetchrow("""
SELECT
crash_id,
job_id,
created_at,
crash_type,
error_message,
state,
metrics_history,
logs_before_crash,
analysis,
screenshot_url,
dom_snapshot_id
FROM crash_reports
WHERE job_id = $1
ORDER BY created_at DESC
LIMIT 1
""", job_uuid)
if not row:
return None
result = dict(row)
# Convert UUIDs to strings for JSON serialization
result['crash_id'] = str(result['crash_id'])
result['job_id'] = str(result['job_id'])
if result.get('dom_snapshot_id'):
result['dom_snapshot_id'] = str(result['dom_snapshot_id'])
return result
async def get_crash_stats(self, days: int = 7) -> dict:
"""
Get crash statistics for the last N days.
Args:
days: Number of days to look back (default: 7)
Returns:
Dictionary with:
- total: Total number of crashes
- by_type: Dict mapping crash type to count
- by_day: List of dicts with date and count
"""
async with self.pool.acquire() as conn:
# Get total count
total = await conn.fetchval("""
SELECT COUNT(*)
FROM crash_reports
WHERE created_at >= NOW() - INTERVAL '%s days'
""", days)
# Get counts by type
type_rows = await conn.fetch("""
SELECT crash_type, COUNT(*) as count
FROM crash_reports
WHERE created_at >= NOW() - INTERVAL '%s days'
GROUP BY crash_type
ORDER BY count DESC
""", days)
by_type = {row['crash_type']: row['count'] for row in type_rows}
# Get counts by day
day_rows = await conn.fetch("""
SELECT DATE(created_at) as date, COUNT(*) as count
FROM crash_reports
WHERE created_at >= NOW() - INTERVAL '%s days'
GROUP BY DATE(created_at)
ORDER BY date DESC
""", days)
by_day = [{'date': str(row['date']), 'count': row['count']} for row in day_rows]
return {
'total': total or 0,
'by_type': by_type,
'by_day': by_day
}
# ==================== API Key Operations ====================
async def initialize_api_keys_schema(self):
"""Create api_keys table if it doesn't exist."""
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key_hash VARCHAR(64) NOT NULL UNIQUE,
key_prefix VARCHAR(8) NOT NULL,
name VARCHAR(255) NOT NULL,
client_id VARCHAR(255) NOT NULL,
scopes TEXT[] DEFAULT '{}',
rate_limit_rpm INTEGER DEFAULT 60,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
expires_at TIMESTAMP,
metadata JSONB
);
""")
# Create indexes
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_api_keys_key_hash ON api_keys (key_hash);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_api_keys_client_id ON api_keys (client_id);
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_api_keys_active ON api_keys (is_active) WHERE is_active = true;
""")
log.info("API keys schema initialized")
async def get_api_key_by_hash(self, key_hash: str) -> Optional[Dict[str, Any]]:
"""
Look up API key by its SHA-256 hash.
This is the primary authentication lookup method. The hash is computed
from the API key provided in the request header.
Args:
key_hash: SHA-256 hash of the API key (64 hex characters)
Returns:
API key record dictionary or None if not found:
{
"id": UUID,
"key_prefix": "riq_a1b2",
"name": "Production Key",
"client_id": "veritas_123",
"scopes": ["jobs:read", "jobs:write"],
"rate_limit_rpm": 60,
"is_active": True,
"created_at": datetime,
"last_used_at": datetime or None,
"expires_at": datetime or None,
"metadata": dict or None
}
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
id,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
is_active,
created_at,
last_used_at,
expires_at,
metadata
FROM api_keys
WHERE key_hash = $1
""", key_hash)
if not row:
return None
result = dict(row)
# Convert scopes from PostgreSQL array to Python list
result['scopes'] = list(result['scopes']) if result['scopes'] else []
return result
async def create_api_key(
self,
client_id: str,
name: str,
scopes: List[str],
rate_limit_rpm: int = 60,
expires_at: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
) -> tuple:
"""
Create a new API key for a client.
IMPORTANT: This method returns the plain text API key exactly once.
After this, only the hash is stored - the key cannot be recovered.
Make sure to display or securely transmit this key to the user.
Args:
client_id: External client identifier (e.g., "veritas_client_123")
name: Human-readable name for the key (e.g., "Production API Key")
scopes: List of permission scopes (e.g., ["jobs:read", "jobs:write"])
rate_limit_rpm: Maximum requests per minute (default: 60)
expires_at: Optional expiration datetime (None = never expires)
metadata: Optional additional metadata dict
Returns:
Tuple of (plain_api_key, key_id):
- plain_api_key: The full API key to give to the user (str)
- key_id: UUID of the created key record
Security Note:
The plain_api_key is ONLY returned here. After creation, only
the SHA-256 hash is stored. Never log or persist the plain key.
"""
# Import here to avoid circular dependency
from api.middleware.auth import generate_api_key, APIKeyAuth
# Generate secure random key
plain_api_key = generate_api_key()
# Hash for storage
key_hash = APIKeyAuth.hash_api_key(plain_api_key)
# Extract prefix for identification
key_prefix = APIKeyAuth.get_key_prefix(plain_api_key)
async with self.pool.acquire() as conn:
key_id = await conn.fetchval("""
INSERT INTO api_keys (
key_hash,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
expires_at,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
""",
key_hash,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
expires_at,
json.dumps(metadata) if metadata else None
)
# Log creation with only the prefix (never log full key)
log.info(
f"Created API key {key_prefix}... for client {client_id} "
f"with scopes {scopes}"
)
return (plain_api_key, key_id)
async def update_api_key_last_used(self, key_id: UUID):
"""
Update the last_used_at timestamp for an API key.
Called after each successful authentication to track key usage.
This is non-blocking and failures are logged but not raised.
Args:
key_id: UUID of the API key record
"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE api_keys
SET last_used_at = NOW()
WHERE id = $1
""", key_id)
async def revoke_api_key(self, key_id: UUID) -> bool:
"""
Revoke an API key by setting is_active to false.
This is preferred over deletion as it preserves audit history.
The key will immediately become invalid for authentication.
Args:
key_id: UUID of the API key to revoke
Returns:
True if key was found and revoked, False if not found
"""
async with self.pool.acquire() as conn:
result = await conn.execute("""
UPDATE api_keys
SET is_active = false
WHERE id = $1 AND is_active = true
""", key_id)
revoked = result.split()[-1] == "1"
if revoked:
log.info(f"Revoked API key {key_id}")
return revoked
async def delete_api_key(self, key_id: UUID) -> bool:
"""
Permanently delete an API key.
Use revoke_api_key instead if you want to preserve audit history.
Deletion is permanent and cannot be undone.
Args:
key_id: UUID of the API key to delete
Returns:
True if deleted, False if not found
"""
async with self.pool.acquire() as conn:
result = await conn.execute("""
DELETE FROM api_keys WHERE id = $1
""", key_id)
deleted = result.split()[-1] == "1"
if deleted:
log.info(f"Deleted API key {key_id}")
return deleted
async def list_api_keys_for_client(
self,
client_id: str,
include_inactive: bool = False
) -> List[Dict[str, Any]]:
"""
List all API keys for a specific client.
Note: This returns key metadata only, never the actual keys
(since we only store hashes).
Args:
client_id: Client identifier to filter by
include_inactive: Whether to include revoked keys (default: False)
Returns:
List of API key records (without key_hash for security)
"""
async with self.pool.acquire() as conn:
if include_inactive:
rows = await conn.fetch("""
SELECT
id,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
is_active,
created_at,
last_used_at,
expires_at
FROM api_keys
WHERE client_id = $1
ORDER BY created_at DESC
""", client_id)
else:
rows = await conn.fetch("""
SELECT
id,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
is_active,
created_at,
last_used_at,
expires_at
FROM api_keys
WHERE client_id = $1 AND is_active = true
ORDER BY created_at DESC
""", client_id)
results = []
for row in rows:
record = dict(row)
record['scopes'] = list(record['scopes']) if record['scopes'] else []
results.append(record)
return results
async def get_api_key_by_id(self, key_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get API key metadata by its ID.
Note: This returns key metadata only, never the actual key
(since we only store hashes).
Args:
key_id: UUID of the API key
Returns:
API key record dictionary or None if not found
"""
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
id,
key_prefix,
name,
client_id,
scopes,
rate_limit_rpm,
is_active,
created_at,
last_used_at,
expires_at,
metadata
FROM api_keys
WHERE id = $1
""", key_id)
if not row:
return None
result = dict(row)
result['scopes'] = list(result['scopes']) if result['scopes'] else []
return result
async def update_api_key_scopes(
self,
key_id: UUID,
scopes: List[str]
) -> bool:
"""
Update the scopes for an API key.
Args:
key_id: UUID of the API key
scopes: New list of permission scopes
Returns:
True if updated, False if key not found
"""
async with self.pool.acquire() as conn:
result = await conn.execute("""
UPDATE api_keys
SET scopes = $2
WHERE id = $1
""", key_id, scopes)
updated = result.split()[-1] == "1"
if updated:
log.info(f"Updated scopes for API key {key_id}: {scopes}")
return updated