#!/usr/bin/env python3 """ PostgreSQL database module for production microservice. Stores job metadata and reviews as JSONB. """ import asyncpg import json from datetime import datetime from typing import Optional, List, Dict, Any from uuid import UUID, uuid4 from enum import Enum import logging log = logging.getLogger(__name__) class JobStatus(str, Enum): """Job status enumeration""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class DatabaseManager: """PostgreSQL database manager with connection pooling""" def __init__(self, database_url: str): """ Initialize database manager. Args: database_url: PostgreSQL connection URL Format: postgresql://user:password@host:port/database """ self.database_url = database_url self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Create connection pool""" log.info("Connecting to PostgreSQL database...") self.pool = await asyncpg.create_pool( self.database_url, min_size=5, max_size=20, command_timeout=60 ) log.info("Database connection pool created") async def disconnect(self): """Close connection pool""" if self.pool: await self.pool.close() log.info("Database connection pool closed") async def initialize_schema(self): """Create database schema if it doesn't exist""" async with self.pool.acquire() as conn: # Create jobs table await conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), status VARCHAR(20) NOT NULL DEFAULT 'pending', url TEXT NOT NULL, webhook_url TEXT, webhook_secret TEXT, created_at TIMESTAMP NOT NULL DEFAULT NOW(), started_at TIMESTAMP, completed_at TIMESTAMP, reviews_count INTEGER, total_reviews INTEGER, reviews_data JSONB, scrape_time REAL, error_message TEXT, metadata JSONB, CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) ); """) # Create indexes await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_webhook ON jobs(webhook_url) WHERE webhook_url IS NOT NULL; """) # Create canary results table await conn.execute(""" CREATE TABLE IF NOT EXISTS canary_results ( id SERIAL PRIMARY KEY, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), success BOOLEAN NOT NULL, reviews_count INTEGER, scrape_time REAL, error_message TEXT, metadata JSONB ); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_canary_timestamp ON canary_results(timestamp DESC); """) # Create webhook attempts table (for retry tracking) await conn.execute(""" CREATE TABLE IF NOT EXISTS webhook_attempts ( id SERIAL PRIMARY KEY, job_id UUID NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE, attempt_number INTEGER NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), success BOOLEAN NOT NULL, status_code INTEGER, error_message TEXT, response_time_ms REAL ); """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id); """) log.info("Database schema initialized") # ==================== Job Operations ==================== async def create_job( self, url: str, webhook_url: Optional[str] = None, webhook_secret: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> UUID: """ Create a new scraping job. Args: url: Google Maps URL to scrape webhook_url: Optional webhook URL for notifications webhook_secret: Optional secret for webhook signature metadata: Optional additional metadata Returns: UUID of created job """ async with self.pool.acquire() as conn: job_id = await conn.fetchval(""" INSERT INTO jobs (url, webhook_url, webhook_secret, metadata) VALUES ($1, $2, $3, $4) RETURNING job_id """, url, webhook_url, webhook_secret, json.dumps(metadata) if metadata else None) log.info(f"Created job {job_id} for URL: {url[:80]}...") return job_id async def get_job(self, job_id: UUID) -> Optional[Dict[str, Any]]: """ Get job by ID. Args: job_id: Job UUID Returns: Job dictionary or None if not found """ async with self.pool.acquire() as conn: row = await conn.fetchrow(""" SELECT job_id, status, url, webhook_url, created_at, started_at, completed_at, reviews_count, reviews_data, scrape_time, error_message, metadata FROM jobs WHERE job_id = $1 """, job_id) if not row: return None return dict(row) async def get_job_reviews(self, job_id: UUID) -> Optional[List[Dict[str, Any]]]: """ Get reviews for a specific job. Args: job_id: Job UUID Returns: List of reviews or None if not found/not completed """ async with self.pool.acquire() as conn: reviews_data = await conn.fetchval(""" SELECT reviews_data FROM jobs WHERE job_id = $1 AND status = 'completed' """, job_id) if not reviews_data: return None # asyncpg returns JSONB as string, need to parse it if isinstance(reviews_data, str): return json.loads(reviews_data) return reviews_data async def update_job_status( self, job_id: UUID, status: JobStatus, **kwargs ): """ Update job status and optional fields. Args: job_id: Job UUID status: New status **kwargs: Additional fields to update (started_at, completed_at, error_message, etc.) """ # Build dynamic UPDATE query set_clauses = ["status = $2"] params = [job_id, status.value] param_idx = 3 if status == JobStatus.RUNNING and 'started_at' not in kwargs: kwargs['started_at'] = datetime.now() elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and 'completed_at' not in kwargs: kwargs['completed_at'] = datetime.now() for key, value in kwargs.items(): set_clauses.append(f"{key} = ${param_idx}") params.append(value) param_idx += 1 query = f""" UPDATE jobs SET {', '.join(set_clauses)} WHERE job_id = $1 """ async with self.pool.acquire() as conn: await conn.execute(query, *params) async def save_job_result( self, job_id: UUID, reviews: List[Dict[str, Any]], scrape_time: float, total_reviews: Optional[int] = None ): """ Save scraping results to database. Args: job_id: Job UUID reviews: List of review dictionaries scrape_time: Time taken to scrape in seconds total_reviews: Total reviews available (from page counter) """ async with self.pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET status = 'completed', completed_at = NOW(), reviews_count = $2, total_reviews = $3, reviews_data = $4::jsonb, scrape_time = $5 WHERE job_id = $1 """, job_id, len(reviews), total_reviews, json.dumps(reviews), scrape_time) log.info(f"Saved {len(reviews)} reviews for job {job_id}") async def list_jobs( self, status: Optional[JobStatus] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ List jobs with optional filtering. Args: status: Optional status filter limit: Maximum number of jobs to return offset: Number of jobs to skip Returns: List of job dictionaries """ async with self.pool.acquire() as conn: if status: rows = await conn.fetch(""" SELECT job_id, status, url, created_at, completed_at, reviews_count, scrape_time, error_message FROM jobs WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3 """, status.value, limit, offset) else: rows = await conn.fetch(""" SELECT job_id, status, url, created_at, completed_at, reviews_count, scrape_time, error_message FROM jobs ORDER BY created_at DESC LIMIT $1 OFFSET $2 """, limit, offset) return [dict(row) for row in rows] async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get completed jobs that have webhooks pending delivery. Args: limit: Maximum number of jobs to return Returns: List of job dictionaries with webhook info """ async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT job_id, status, url, webhook_url, webhook_secret, reviews_count, scrape_time, error_message, completed_at FROM jobs WHERE webhook_url IS NOT NULL AND status IN ('completed', 'failed') AND job_id NOT IN ( SELECT job_id FROM webhook_attempts WHERE success = true ) ORDER BY completed_at ASC LIMIT $1 """, limit) return [dict(row) for row in rows] async def delete_job(self, job_id: UUID) -> bool: """ Delete a job from the database. Args: job_id: Job UUID Returns: True if deleted, False if not found """ async with self.pool.acquire() as conn: result = await conn.execute(""" DELETE FROM jobs WHERE job_id = $1 """, job_id) deleted = result.split()[-1] == "1" if deleted: log.info(f"Deleted job {job_id}") return deleted async def cleanup_old_jobs(self, max_age_days: int = 30): """ Delete old completed/failed jobs. Args: max_age_days: Maximum age in days before deletion """ async with self.pool.acquire() as conn: result = await conn.execute(""" DELETE FROM jobs WHERE status IN ('completed', 'failed', 'cancelled') AND completed_at < NOW() - INTERVAL '%s days' """, max_age_days) deleted_count = int(result.split()[-1]) if deleted_count > 0: log.info(f"Cleaned up {deleted_count} old jobs") # ==================== Statistics ==================== async def get_stats(self) -> Dict[str, Any]: """ Get job statistics. Returns: Statistics dictionary """ async with self.pool.acquire() as conn: stats = await conn.fetchrow(""" SELECT COUNT(*) as total_jobs, COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, COUNT(*) FILTER (WHERE status = 'failed') as failed, COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled, AVG(scrape_time) FILTER (WHERE status = 'completed') as avg_scrape_time, SUM(reviews_count) FILTER (WHERE status = 'completed') as total_reviews FROM jobs """) return dict(stats) # ==================== Canary Operations ==================== async def save_canary_result( self, success: bool, reviews_count: Optional[int] = None, scrape_time: Optional[float] = None, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ): """ Save canary test result. Args: success: Whether canary test succeeded reviews_count: Number of reviews scraped scrape_time: Time taken in seconds error_message: Error message if failed metadata: Additional metadata """ async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO canary_results (success, reviews_count, scrape_time, error_message, metadata) VALUES ($1, $2, $3, $4, $5) """, success, reviews_count, scrape_time, error_message, json.dumps(metadata) if metadata else None) async def get_canary_history(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get canary test history. Args: limit: Maximum number of results to return Returns: List of canary result dictionaries """ async with self.pool.acquire() as conn: rows = await conn.fetch(""" SELECT timestamp, success, reviews_count, scrape_time, error_message FROM canary_results ORDER BY timestamp DESC LIMIT $1 """, limit) return [dict(row) for row in rows] # ==================== Webhook Attempts ==================== async def log_webhook_attempt( self, job_id: UUID, attempt_number: int, success: bool, status_code: Optional[int] = None, error_message: Optional[str] = None, response_time_ms: Optional[float] = None ): """ Log a webhook delivery attempt. Args: job_id: Job UUID attempt_number: Attempt number (1, 2, 3...) success: Whether delivery succeeded status_code: HTTP status code error_message: Error message if failed response_time_ms: Response time in milliseconds """ async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms) VALUES ($1, $2, $3, $4, $5, $6) """, job_id, attempt_number, success, status_code, error_message, response_time_ms)