#!/usr/bin/env python3 """ PostgreSQL database module for production microservice. Stores job metadata and reviews as JSONB. """ import asyncpg import json from datetime import datetime from typing import Optional, List, Dict, Any from uuid import UUID, uuid4 import logging from core.enums import JobStatus log = logging.getLogger(__name__) class DatabaseManager: """PostgreSQL database manager with connection pooling""" def __init__(self, database_url: str): """ Initialize database manager. Args: database_url: PostgreSQL connection URL Format: postgresql://user:password@host:port/database """ self.database_url = database_url self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Create connection pool""" log.info("Connecting to PostgreSQL database...") self.pool = await asyncpg.create_pool( self.database_url, min_size=5, max_size=20, command_timeout=60 ) log.info("Database connection pool created") async def disconnect(self): """Close connection pool""" if self.pool: await self.pool.close() log.info("Database connection pool closed") async def initialize_schema(self): """Create database schema if it doesn't exist""" async with self.pool.acquire() as conn: # Create jobs table await conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), status VARCHAR(20) NOT NULL DEFAULT 'pending', url TEXT NOT NULL, webhook_url TEXT, webhook_secret TEXT, created_at TIMESTAMP NOT NULL DEFAULT NOW(), started_at TIMESTAMP, completed_at TIMESTAMP, updated_at TIMESTAMP, reviews_count INTEGER, total_reviews INTEGER, reviews_data JSONB, scrape_time REAL, error_message TEXT, metadata JSONB, scrape_logs JSONB, CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial')) ); """) # Add scrape_logs column if it doesn't exist (for existing databases) await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS scrape_logs JSONB; """) # Add updated_at column if it doesn't exist (for incremental progress tracking) await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP; """) # Add review_topics column if it doesn't exist (extracted topic filters with mention counts) await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS review_topics JSONB; """) # Update constraint to include 'partial' status (for existing databases) await conn.execute(""" ALTER TABLE jobs DROP CONSTRAINT IF EXISTS valid_status; """) await conn.execute(""" ALTER TABLE jobs ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial')); """) # Create indexes await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_webhook ON jobs(webhook_url) WHERE webhook_url IS NOT NULL; """) # Create canary results table await conn.execute(""" CREATE TABLE IF NOT EXISTS canary_results ( id SERIAL PRIMARY KEY, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), success BOOLEAN NOT NULL, reviews_count INTEGER, scrape_time REAL, error_message TEXT, metadata JSONB ); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_canary_timestamp ON canary_results(timestamp DESC); """) # Create webhook attempts table (for retry tracking) await conn.execute(""" CREATE TABLE IF NOT EXISTS webhook_attempts ( id SERIAL PRIMARY KEY, job_id UUID NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE, attempt_number INTEGER NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), success BOOLEAN NOT NULL, status_code INTEGER, error_message TEXT, response_time_ms REAL ); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id); """) # Add session_fingerprint and metrics_history columns to jobs table await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS session_fingerprint JSONB; """) await conn.execute(""" ALTER TABLE jobs ADD COLUMN IF NOT EXISTS metrics_history JSONB; """) # Create crash_reports table await conn.execute(""" CREATE TABLE IF NOT EXISTS crash_reports ( crash_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), job_id UUID REFERENCES jobs(job_id) ON DELETE CASCADE, created_at TIMESTAMP NOT NULL DEFAULT NOW(), crash_type VARCHAR(50) NOT NULL, error_message TEXT, state JSONB NOT NULL, metrics_history JSONB, logs_before_crash JSONB, analysis JSONB, screenshot_url TEXT, dom_snapshot_id UUID ); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_crash_reports_job ON crash_reports(job_id); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_crash_reports_type ON crash_reports(crash_type); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_crash_reports_created ON crash_reports(created_at DESC); """) log.info("Database schema initialized") # ==================== Job Operations ==================== async def create_job( self, url: str, webhook_url: Optional[str] = None, webhook_secret: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> UUID: """ Create a new scraping job. Args: url: Google Maps URL to scrape webhook_url: Optional webhook URL for notifications webhook_secret: Optional secret for webhook signature metadata: Optional additional metadata Returns: UUID of created job """ async with self.pool.acquire() as conn: job_id = await conn.fetchval(""" INSERT INTO jobs (url, webhook_url, webhook_secret, metadata) VALUES ($1, $2, $3, $4) RETURNING job_id """, url, webhook_url, webhook_secret, json.dumps(metadata) if metadata else None) log.info(f"Created job {job_id} for URL: {url[:80]}...") return job_id async def get_job(self, job_id: UUID) -> Optional[Dict[str, Any]]: """ Get job by ID. Args: job_id: Job UUID Returns: Job dictionary or None if not found """ async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT job_id, status, url, webhook_url, created_at, started_at, completed_at, updated_at, reviews_count, total_reviews, reviews_data, scrape_time, error_message, metadata, scrape_logs, review_topics FROM jobs WHERE job_id = $1 """, job_id) if not row: return None return dict(row) async def get_job_reviews(self, job_id: UUID, include_partial: bool = True) -> Optional[List[Dict[str, Any]]]: """ Get reviews for a specific job. Args: job_id: Job UUID include_partial: If True, also return reviews for running and partial jobs Returns: List of reviews or None if not found/no reviews """ async with self.pool.acquire() as conn: if include_partial: # Return reviews for completed, running, or partial jobs reviews_data = await conn.fetchval(""" SELECT reviews_data FROM jobs WHERE job_id = $1 AND status IN ('completed', 'running', 'partial') """, job_id) else: # Only return reviews for completed jobs reviews_data = await conn.fetchval(""" SELECT reviews_data FROM jobs WHERE job_id = $1 AND status = 'completed' """, job_id) if not reviews_data: return None # asyncpg returns JSONB as string, need to parse it if isinstance(reviews_data, str): return json.loads(reviews_data) return reviews_data async def update_job_status( self, job_id: UUID, status: JobStatus, **kwargs ): """ Update job status and optional fields. Args: job_id: Job UUID status: New status **kwargs: Additional fields to update (started_at, completed_at, error_message, etc.) """ # Build dynamic UPDATE query set_clauses = ["status = $2"] params = [job_id, status.value] param_idx = 3 if status == JobStatus.RUNNING and 'started_at' not in kwargs: kwargs['started_at'] = datetime.now() elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and 'completed_at' not in kwargs: kwargs['completed_at'] = datetime.now() for key, value in kwargs.items(): # Handle JSONB fields specially if key == 'scrape_logs' and value is not None: set_clauses.append(f"{key} = ${param_idx}::jsonb") params.append(json.dumps(value) if not isinstance(value, str) else value) else: set_clauses.append(f"{key} = ${param_idx}") params.append(value) param_idx += 1 query = f""" UPDATE jobs SET {', '.join(set_clauses)} WHERE job_id = $1 """ async with self.pool.acquire() as conn: await conn.execute(query, *params) async def save_job_result( self, job_id: UUID, reviews: List[Dict[str, Any]], scrape_time: float, total_reviews: Optional[int] = None, scrape_logs: Optional[List[Dict[str, Any]]] = None, review_topics: Optional[List[Dict[str, Any]]] = None ): """ Save scraping results to database. Args: job_id: Job UUID reviews: List of review dictionaries scrape_time: Time taken to scrape in seconds total_reviews: Total reviews available (from page counter) scrape_logs: List of log entries from the scraper review_topics: List of topic filter dictionaries with topic and count """ async with self.pool.acquire() as conn: # If reviews list is empty, check if job already has reviews from incremental saves # This happens when flush_callback was used during scraping if not reviews: existing = await conn.fetchval( "SELECT reviews_count FROM jobs WHERE job_id = $1", job_id ) if existing and existing > 0: # Job has reviews from incremental saves, don't overwrite reviews_data await conn.execute(""" UPDATE jobs SET status = 'completed', completed_at = NOW(), total_reviews = COALESCE($2, total_reviews), scrape_time = $3, scrape_logs = $4::jsonb, review_topics = $5::jsonb WHERE job_id = $1 """, job_id, total_reviews, scrape_time, json.dumps(scrape_logs) if scrape_logs else None, json.dumps(review_topics) if review_topics else None) log.info(f"Completed job {job_id} with {existing} reviews (from incremental saves)") return await conn.execute(""" UPDATE jobs SET status = 'completed', completed_at = NOW(), reviews_count = $2, total_reviews = $3, reviews_data = $4::jsonb, scrape_time = $5, scrape_logs = $6::jsonb, review_topics = $7::jsonb WHERE job_id = $1 """, job_id, len(reviews), total_reviews, json.dumps(reviews), scrape_time, json.dumps(scrape_logs) if scrape_logs else None, json.dumps(review_topics) if review_topics else None) log.info(f"Saved {len(reviews)} reviews for job {job_id}") async def save_reviews_incremental( self, job_id: UUID, reviews: List[Dict[str, Any]], total_reviews: Optional[int] = None ): """ Save reviews incrementally during scraping. Called on each flush to preserve progress in case of crash. Args: job_id: Job UUID reviews: ALL reviews collected so far (not just new ones) total_reviews: Total reviews available (from page counter) """ async with self.pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET reviews_count = $2, total_reviews = COALESCE($3, total_reviews), reviews_data = $4::jsonb, updated_at = NOW() WHERE job_id = $1 AND status = 'running' """, job_id, len(reviews), total_reviews, json.dumps(reviews)) log.debug(f"Incremental save: {len(reviews)} reviews for job {job_id}") async def update_session_fingerprint( self, job_id: UUID, session_fingerprint: Dict[str, Any] ): """ Update the session fingerprint for a job. This should be called early in the scraping process after the browser fingerprint is captured, to record browser characteristics for bot detection analysis. Args: job_id: Job UUID session_fingerprint: Dictionary containing browser fingerprint data: - user_agent: Browser user agent string - platform: OS platform - language: Primary language - languages: List of accepted languages - timezone: Timezone string - screen: {width, height, colorDepth} - viewport: {width, height} - webgl_vendor: WebGL vendor string - webgl_renderer: WebGL renderer string - canvas_fingerprint: Canvas fingerprint hash - hardware_concurrency: Number of CPU cores - device_memory: Device memory in GB - bot_detection_tests: {webdriver_hidden, chrome_runtime, permissions_query} - captured_at: ISO timestamp when fingerprint was captured """ async with self.pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET session_fingerprint = $2::jsonb, updated_at = NOW() WHERE job_id = $1 """, job_id, json.dumps(session_fingerprint)) log.debug(f"Updated session fingerprint for job {job_id}") async def mark_job_partial( self, job_id: UUID, error_message: str, scrape_logs: Optional[List[Dict[str, Any]]] = None ): """ Mark a job as partial (crashed but has some reviews saved). Args: job_id: Job UUID error_message: Error that caused the crash scrape_logs: Log entries from the scraper """ async with self.pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET status = 'partial', completed_at = NOW(), error_message = $2, scrape_logs = $3::jsonb WHERE job_id = $1 """, job_id, error_message, json.dumps(scrape_logs) if scrape_logs else None) log.info(f"Marked job {job_id} as partial due to: {error_message}") async def list_jobs( self, status: Optional[JobStatus] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ List jobs with optional filtering. Args: status: Optional status filter limit: Maximum number of jobs to return offset: Number of jobs to skip Returns: List of job dictionaries """ async with self.pool.acquire() as conn: if status: rows = await conn.fetch(""" SELECT job_id, status, url, created_at, completed_at, reviews_count, total_reviews, scrape_time, error_message, metadata, review_topics FROM jobs WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 """, status.value, limit, offset) else: rows = await conn.fetch(""" SELECT job_id, status, url, created_at, completed_at, reviews_count, total_reviews, scrape_time, error_message, metadata, review_topics FROM jobs ORDER BY created_at DESC LIMIT $1 OFFSET $2 """, limit, offset) return [dict(row) for row in rows] async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get completed jobs that have webhooks pending delivery. Args: limit: Maximum number of jobs to return Returns: List of job dictionaries with webhook info """ async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id, status, url, webhook_url, webhook_secret, reviews_count, scrape_time, error_message, completed_at FROM jobs WHERE webhook_url IS NOT NULL AND status IN ('completed', 'failed') AND job_id NOT IN ( SELECT job_id FROM webhook_attempts WHERE success = true ) ORDER BY completed_at ASC LIMIT $1 """, limit) return [dict(row) for row in rows] async def delete_job(self, job_id: UUID) -> bool: """ Delete a job from the database. Args: job_id: Job UUID Returns: True if deleted, False if not found """ async with self.pool.acquire() as conn: result = await conn.execute(""" DELETE FROM jobs WHERE job_id = $1 """, job_id) deleted = result.split()[-1] == "1" if deleted: log.info(f"Deleted job {job_id}") return deleted async def cleanup_old_jobs(self, max_age_days: int = 30): """ Delete old completed/failed jobs. Args: max_age_days: Maximum age in days before deletion """ async with self.pool.acquire() as conn: result = await conn.execute(""" DELETE FROM jobs WHERE status IN ('completed', 'failed', 'cancelled') AND completed_at < NOW() - INTERVAL '%s days' """, max_age_days) deleted_count = int(result.split()[-1]) if deleted_count > 0: log.info(f"Cleaned up {deleted_count} old jobs") # ==================== Statistics ==================== async def get_stats(self) -> Dict[str, Any]: """ Get job statistics. Returns: Statistics dictionary """ async with self.pool.acquire() as conn: stats = await conn.fetchrow(""" SELECT COUNT(*) as total_jobs, COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, COUNT(*) FILTER (WHERE status = 'failed') as failed, COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled, AVG(scrape_time) FILTER (WHERE status = 'completed') as avg_scrape_time, SUM(reviews_count) FILTER (WHERE status = 'completed') as total_reviews FROM jobs """) return dict(stats) # ==================== Canary Operations ==================== async def save_canary_result( self, success: bool, reviews_count: Optional[int] = None, scrape_time: Optional[float] = None, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ): """ Save canary test result. Args: success: Whether canary test succeeded reviews_count: Number of reviews scraped scrape_time: Time taken in seconds error_message: Error message if failed metadata: Additional metadata """ async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO canary_results (success, reviews_count, scrape_time, error_message, metadata) VALUES ($1, $2, $3, $4, $5) """, success, reviews_count, scrape_time, error_message, json.dumps(metadata) if metadata else None) async def get_canary_history(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get canary test history. Args: limit: Maximum number of results to return Returns: List of canary result dictionaries """ async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT timestamp, success, reviews_count, scrape_time, error_message FROM canary_results ORDER BY timestamp DESC LIMIT $1 """, limit) return [dict(row) for row in rows] # ==================== Webhook Attempts ==================== async def log_webhook_attempt( self, job_id: UUID, attempt_number: int, success: bool, status_code: Optional[int] = None, error_message: Optional[str] = None, response_time_ms: Optional[float] = None ): """ Log a webhook delivery attempt. Args: job_id: Job UUID attempt_number: Attempt number (1, 2, 3...) success: Whether delivery succeeded status_code: HTTP status code error_message: Error message if failed response_time_ms: Response time in milliseconds """ async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms) VALUES ($1, $2, $3, $4, $5, $6) """, job_id, attempt_number, success, status_code, error_message, response_time_ms) # ==================== Crash Reports ==================== async def save_crash_report(self, job_id: str, crash_data: dict) -> str: """ Save a crash report and return the crash_id. Args: job_id: Job UUID as string crash_data: Dictionary containing crash report data: - crash_type: Type of crash (required) - error_message: Error message (optional) - state: Current state at crash time (required) - metrics_history: Historical metrics (optional) - logs_before_crash: Log entries before crash (optional) - analysis: Crash analysis data (optional) - screenshot_url: URL to screenshot (optional) - dom_snapshot_id: UUID of DOM snapshot (optional) Returns: UUID of created crash report as string """ async with self.pool.acquire() as conn: # Convert job_id string to UUID job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id crash_id = await conn.fetchval(""" INSERT INTO crash_reports ( job_id, crash_type, error_message, state, metrics_history, logs_before_crash, analysis, screenshot_url, dom_snapshot_id ) VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8, $9) RETURNING crash_id """, job_uuid, crash_data.get('crash_type'), crash_data.get('error_message'), json.dumps(crash_data.get('state', {})), json.dumps(crash_data.get('metrics_history')) if crash_data.get('metrics_history') else None, json.dumps(crash_data.get('logs_before_crash')) if crash_data.get('logs_before_crash') else None, json.dumps(crash_data.get('analysis')) if crash_data.get('analysis') else None, crash_data.get('screenshot_url'), UUID(crash_data['dom_snapshot_id']) if crash_data.get('dom_snapshot_id') else None ) log.info(f"Saved crash report {crash_id} for job {job_id}, type: {crash_data.get('crash_type')}") return str(crash_id) async def get_crash_report(self, job_id: str) -> Optional[dict]: """ Get crash report for a job, if any. Args: job_id: Job UUID as string Returns: Crash report dictionary or None if not found """ async with self.pool.acquire() as conn: job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id row = await conn.fetchrow(""" SELECT crash_id, job_id, created_at, crash_type, error_message, state, metrics_history, logs_before_crash, analysis, screenshot_url, dom_snapshot_id FROM crash_reports WHERE job_id = $1 ORDER BY created_at DESC LIMIT 1 """, job_uuid) if not row: return None result = dict(row) # Convert UUIDs to strings for JSON serialization result['crash_id'] = str(result['crash_id']) result['job_id'] = str(result['job_id']) if result.get('dom_snapshot_id'): result['dom_snapshot_id'] = str(result['dom_snapshot_id']) return result async def get_crash_stats(self, days: int = 7) -> dict: """ Get crash statistics for the last N days. Args: days: Number of days to look back (default: 7) Returns: Dictionary with: - total: Total number of crashes - by_type: Dict mapping crash type to count - by_day: List of dicts with date and count """ async with self.pool.acquire() as conn: # Get total count total = await conn.fetchval(""" SELECT COUNT(*) FROM crash_reports WHERE created_at >= NOW() - INTERVAL '%s days' """, days) # Get counts by type type_rows = await conn.fetch(""" SELECT crash_type, COUNT(*) as count FROM crash_reports WHERE created_at >= NOW() - INTERVAL '%s days' GROUP BY crash_type ORDER BY count DESC """, days) by_type = {row['crash_type']: row['count'] for row in type_rows} # Get counts by day day_rows = await conn.fetch(""" SELECT DATE(created_at) as date, COUNT(*) as count FROM crash_reports WHERE created_at >= NOW() - INTERVAL '%s days' GROUP BY DATE(created_at) ORDER BY date DESC """, days) by_day = [{'date': str(row['date']), 'count': row['count']} for row in day_rows] return { 'total': total or 0, 'by_type': by_type, 'by_day': by_day }