Performance improvements: - Validation speed: 59.71s → 10.96s (5.5x improvement) - Removed 50+ console.log statements from JavaScript extraction - Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting - Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls) Scraping improvements: - Increased idle detection from 6 to 12 consecutive idle scrolls for completeness - Added real-time progress updates every 5 scrolls with percentage calculation - Added crash recovery to extract partial reviews if Chrome crashes - Removed artificial 200-review limit to scrape ALL reviews Timestamp tracking: - Added updated_at field separate from started_at for progress tracking - Frontend now shows both "Started" (fixed) and "Last Update" (dynamic) Robustness improvements: - Added 5 fallback CSS selectors to handle different Google Maps page structures - Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc. - Automatic selector detection logs which selector works for debugging Test results: - Successfully scraped 550 reviews in 150.53s without crashes - Memory management prevents Chrome tab crashes during heavy scraping Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
522 lines
17 KiB
Python
522 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PostgreSQL database module for production microservice.
|
|
Stores job metadata and reviews as JSONB.
|
|
"""
|
|
import asyncpg
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from uuid import UUID, uuid4
|
|
from enum import Enum
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
"""Job status enumeration"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
class DatabaseManager:
|
|
"""PostgreSQL database manager with connection pooling"""
|
|
|
|
def __init__(self, database_url: str):
|
|
"""
|
|
Initialize database manager.
|
|
|
|
Args:
|
|
database_url: PostgreSQL connection URL
|
|
Format: postgresql://user:password@host:port/database
|
|
"""
|
|
self.database_url = database_url
|
|
self.pool: Optional[asyncpg.Pool] = None
|
|
|
|
async def connect(self):
|
|
"""Create connection pool"""
|
|
log.info("Connecting to PostgreSQL database...")
|
|
self.pool = await asyncpg.create_pool(
|
|
self.database_url,
|
|
min_size=5,
|
|
max_size=20,
|
|
command_timeout=60
|
|
)
|
|
log.info("Database connection pool created")
|
|
|
|
async def disconnect(self):
|
|
"""Close connection pool"""
|
|
if self.pool:
|
|
await self.pool.close()
|
|
log.info("Database connection pool closed")
|
|
|
|
async def initialize_schema(self):
|
|
"""Create database schema if it doesn't exist"""
|
|
async with self.pool.acquire() as conn:
|
|
# Create jobs table
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
|
url TEXT NOT NULL,
|
|
webhook_url TEXT,
|
|
webhook_secret TEXT,
|
|
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
|
|
reviews_count INTEGER,
|
|
total_reviews INTEGER,
|
|
reviews_data JSONB,
|
|
scrape_time REAL,
|
|
|
|
error_message TEXT,
|
|
metadata JSONB,
|
|
|
|
CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled'))
|
|
);
|
|
""")
|
|
|
|
# Create indexes
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_webhook ON jobs(webhook_url) WHERE webhook_url IS NOT NULL;
|
|
""")
|
|
|
|
# Create canary results table
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS canary_results (
|
|
id SERIAL PRIMARY KEY,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
success BOOLEAN NOT NULL,
|
|
reviews_count INTEGER,
|
|
scrape_time REAL,
|
|
error_message TEXT,
|
|
metadata JSONB
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_canary_timestamp ON canary_results(timestamp DESC);
|
|
""")
|
|
|
|
# Create webhook attempts table (for retry tracking)
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS webhook_attempts (
|
|
id SERIAL PRIMARY KEY,
|
|
job_id UUID NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE,
|
|
attempt_number INTEGER NOT NULL,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
success BOOLEAN NOT NULL,
|
|
status_code INTEGER,
|
|
error_message TEXT,
|
|
response_time_ms REAL
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id);
|
|
""")
|
|
|
|
log.info("Database schema initialized")
|
|
|
|
# ==================== Job Operations ====================
|
|
|
|
async def create_job(
|
|
self,
|
|
url: str,
|
|
webhook_url: Optional[str] = None,
|
|
webhook_secret: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> UUID:
|
|
"""
|
|
Create a new scraping job.
|
|
|
|
Args:
|
|
url: Google Maps URL to scrape
|
|
webhook_url: Optional webhook URL for notifications
|
|
webhook_secret: Optional secret for webhook signature
|
|
metadata: Optional additional metadata
|
|
|
|
Returns:
|
|
UUID of created job
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
job_id = await conn.fetchval("""
|
|
INSERT INTO jobs (url, webhook_url, webhook_secret, metadata)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING job_id
|
|
""", url, webhook_url, webhook_secret, json.dumps(metadata) if metadata else None)
|
|
|
|
log.info(f"Created job {job_id} for URL: {url[:80]}...")
|
|
return job_id
|
|
|
|
async def get_job(self, job_id: UUID) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get job by ID.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
|
|
Returns:
|
|
Job dictionary or None if not found
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
webhook_url,
|
|
created_at,
|
|
started_at,
|
|
completed_at,
|
|
reviews_count,
|
|
reviews_data,
|
|
scrape_time,
|
|
error_message,
|
|
metadata
|
|
FROM jobs
|
|
WHERE job_id = $1
|
|
""", job_id)
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return dict(row)
|
|
|
|
async def get_job_reviews(self, job_id: UUID) -> Optional[List[Dict[str, Any]]]:
|
|
"""
|
|
Get reviews for a specific job.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
|
|
Returns:
|
|
List of reviews or None if not found/not completed
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
reviews_data = await conn.fetchval("""
|
|
SELECT reviews_data
|
|
FROM jobs
|
|
WHERE job_id = $1 AND status = 'completed'
|
|
""", job_id)
|
|
|
|
if not reviews_data:
|
|
return None
|
|
|
|
# asyncpg returns JSONB as string, need to parse it
|
|
if isinstance(reviews_data, str):
|
|
return json.loads(reviews_data)
|
|
|
|
return reviews_data
|
|
|
|
async def update_job_status(
|
|
self,
|
|
job_id: UUID,
|
|
status: JobStatus,
|
|
**kwargs
|
|
):
|
|
"""
|
|
Update job status and optional fields.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
status: New status
|
|
**kwargs: Additional fields to update (started_at, completed_at, error_message, etc.)
|
|
"""
|
|
# Build dynamic UPDATE query
|
|
set_clauses = ["status = $2"]
|
|
params = [job_id, status.value]
|
|
param_idx = 3
|
|
|
|
if status == JobStatus.RUNNING and 'started_at' not in kwargs:
|
|
kwargs['started_at'] = datetime.now()
|
|
elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and 'completed_at' not in kwargs:
|
|
kwargs['completed_at'] = datetime.now()
|
|
|
|
for key, value in kwargs.items():
|
|
set_clauses.append(f"{key} = ${param_idx}")
|
|
params.append(value)
|
|
param_idx += 1
|
|
|
|
query = f"""
|
|
UPDATE jobs
|
|
SET {', '.join(set_clauses)}
|
|
WHERE job_id = $1
|
|
"""
|
|
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(query, *params)
|
|
|
|
async def save_job_result(
|
|
self,
|
|
job_id: UUID,
|
|
reviews: List[Dict[str, Any]],
|
|
scrape_time: float,
|
|
total_reviews: Optional[int] = None
|
|
):
|
|
"""
|
|
Save scraping results to database.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
reviews: List of review dictionaries
|
|
scrape_time: Time taken to scrape in seconds
|
|
total_reviews: Total reviews available (from page counter)
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
status = 'completed',
|
|
completed_at = NOW(),
|
|
reviews_count = $2,
|
|
total_reviews = $3,
|
|
reviews_data = $4::jsonb,
|
|
scrape_time = $5
|
|
WHERE job_id = $1
|
|
""", job_id, len(reviews), total_reviews, json.dumps(reviews), scrape_time)
|
|
|
|
log.info(f"Saved {len(reviews)} reviews for job {job_id}")
|
|
|
|
async def list_jobs(
|
|
self,
|
|
status: Optional[JobStatus] = None,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List jobs with optional filtering.
|
|
|
|
Args:
|
|
status: Optional status filter
|
|
limit: Maximum number of jobs to return
|
|
offset: Number of jobs to skip
|
|
|
|
Returns:
|
|
List of job dictionaries
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
if status:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
created_at,
|
|
completed_at,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message
|
|
FROM jobs
|
|
WHERE status = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT $2 OFFSET $3
|
|
""", status.value, limit, offset)
|
|
else:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
created_at,
|
|
completed_at,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message
|
|
FROM jobs
|
|
ORDER BY created_at DESC
|
|
LIMIT $1 OFFSET $2
|
|
""", limit, offset)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get completed jobs that have webhooks pending delivery.
|
|
|
|
Args:
|
|
limit: Maximum number of jobs to return
|
|
|
|
Returns:
|
|
List of job dictionaries with webhook info
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
webhook_url,
|
|
webhook_secret,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message,
|
|
completed_at
|
|
FROM jobs
|
|
WHERE webhook_url IS NOT NULL
|
|
AND status IN ('completed', 'failed')
|
|
AND job_id NOT IN (
|
|
SELECT job_id
|
|
FROM webhook_attempts
|
|
WHERE success = true
|
|
)
|
|
ORDER BY completed_at ASC
|
|
LIMIT $1
|
|
""", limit)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def delete_job(self, job_id: UUID) -> bool:
|
|
"""
|
|
Delete a job from the database.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute("""
|
|
DELETE FROM jobs WHERE job_id = $1
|
|
""", job_id)
|
|
|
|
deleted = result.split()[-1] == "1"
|
|
if deleted:
|
|
log.info(f"Deleted job {job_id}")
|
|
return deleted
|
|
|
|
async def cleanup_old_jobs(self, max_age_days: int = 30):
|
|
"""
|
|
Delete old completed/failed jobs.
|
|
|
|
Args:
|
|
max_age_days: Maximum age in days before deletion
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute("""
|
|
DELETE FROM jobs
|
|
WHERE status IN ('completed', 'failed', 'cancelled')
|
|
AND completed_at < NOW() - INTERVAL '%s days'
|
|
""", max_age_days)
|
|
|
|
deleted_count = int(result.split()[-1])
|
|
if deleted_count > 0:
|
|
log.info(f"Cleaned up {deleted_count} old jobs")
|
|
|
|
# ==================== Statistics ====================
|
|
|
|
async def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get job statistics.
|
|
|
|
Returns:
|
|
Statistics dictionary
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
stats = await conn.fetchrow("""
|
|
SELECT
|
|
COUNT(*) as total_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'pending') as pending,
|
|
COUNT(*) FILTER (WHERE status = 'running') as running,
|
|
COUNT(*) FILTER (WHERE status = 'completed') as completed,
|
|
COUNT(*) FILTER (WHERE status = 'failed') as failed,
|
|
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
|
|
AVG(scrape_time) FILTER (WHERE status = 'completed') as avg_scrape_time,
|
|
SUM(reviews_count) FILTER (WHERE status = 'completed') as total_reviews
|
|
FROM jobs
|
|
""")
|
|
|
|
return dict(stats)
|
|
|
|
# ==================== Canary Operations ====================
|
|
|
|
async def save_canary_result(
|
|
self,
|
|
success: bool,
|
|
reviews_count: Optional[int] = None,
|
|
scrape_time: Optional[float] = None,
|
|
error_message: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""
|
|
Save canary test result.
|
|
|
|
Args:
|
|
success: Whether canary test succeeded
|
|
reviews_count: Number of reviews scraped
|
|
scrape_time: Time taken in seconds
|
|
error_message: Error message if failed
|
|
metadata: Additional metadata
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO canary_results (success, reviews_count, scrape_time, error_message, metadata)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
""", success, reviews_count, scrape_time, error_message, json.dumps(metadata) if metadata else None)
|
|
|
|
async def get_canary_history(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get canary test history.
|
|
|
|
Args:
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of canary result dictionaries
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
timestamp,
|
|
success,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message
|
|
FROM canary_results
|
|
ORDER BY timestamp DESC
|
|
LIMIT $1
|
|
""", limit)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
# ==================== Webhook Attempts ====================
|
|
|
|
async def log_webhook_attempt(
|
|
self,
|
|
job_id: UUID,
|
|
attempt_number: int,
|
|
success: bool,
|
|
status_code: Optional[int] = None,
|
|
error_message: Optional[str] = None,
|
|
response_time_ms: Optional[float] = None
|
|
):
|
|
"""
|
|
Log a webhook delivery attempt.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
attempt_number: Attempt number (1, 2, 3...)
|
|
success: Whether delivery succeeded
|
|
status_code: HTTP status code
|
|
error_message: Error message if failed
|
|
response_time_ms: Response time in milliseconds
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""", job_id, attempt_number, success, status_code, error_message, response_time_ms)
|