- Task #3: Update SSE stream to emit structured log events (type: "log" for entries, type: "metrics" every 5s, ?format=legacy for backward compat) - Task #10: Create crash pattern analyzer module (6 patterns: memory_exhaustion, dom_bloat, rate_limited, consent_loop, scroll_timeout, element_stale) (confidence scoring, auto-fix params, summarize_crash_patterns for recurring issues) - Task #13: Capture session fingerprint in backend (user_agent, platform, timezone, webgl, canvas, bot_detection_tests) (saved on success and failure for debugging) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
883 lines
31 KiB
Python
883 lines
31 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PostgreSQL database module for production microservice.
|
|
Stores job metadata and reviews as JSONB.
|
|
"""
|
|
import asyncpg
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from uuid import UUID, uuid4
|
|
from enum import Enum
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class JobStatus(str, Enum):
|
|
"""Job status enumeration"""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
PARTIAL = "partial" # Job crashed but has partial reviews saved
|
|
|
|
|
|
class DatabaseManager:
|
|
"""PostgreSQL database manager with connection pooling"""
|
|
|
|
def __init__(self, database_url: str):
|
|
"""
|
|
Initialize database manager.
|
|
|
|
Args:
|
|
database_url: PostgreSQL connection URL
|
|
Format: postgresql://user:password@host:port/database
|
|
"""
|
|
self.database_url = database_url
|
|
self.pool: Optional[asyncpg.Pool] = None
|
|
|
|
async def connect(self):
|
|
"""Create connection pool"""
|
|
log.info("Connecting to PostgreSQL database...")
|
|
self.pool = await asyncpg.create_pool(
|
|
self.database_url,
|
|
min_size=5,
|
|
max_size=20,
|
|
command_timeout=60
|
|
)
|
|
log.info("Database connection pool created")
|
|
|
|
async def disconnect(self):
|
|
"""Close connection pool"""
|
|
if self.pool:
|
|
await self.pool.close()
|
|
log.info("Database connection pool closed")
|
|
|
|
async def initialize_schema(self):
|
|
"""Create database schema if it doesn't exist"""
|
|
async with self.pool.acquire() as conn:
|
|
# Create jobs table
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
|
url TEXT NOT NULL,
|
|
webhook_url TEXT,
|
|
webhook_secret TEXT,
|
|
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
started_at TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
updated_at TIMESTAMP,
|
|
|
|
reviews_count INTEGER,
|
|
total_reviews INTEGER,
|
|
reviews_data JSONB,
|
|
scrape_time REAL,
|
|
|
|
error_message TEXT,
|
|
metadata JSONB,
|
|
scrape_logs JSONB,
|
|
|
|
CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial'))
|
|
);
|
|
""")
|
|
|
|
# Add scrape_logs column if it doesn't exist (for existing databases)
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS scrape_logs JSONB;
|
|
""")
|
|
|
|
# Add updated_at column if it doesn't exist (for incremental progress tracking)
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS updated_at TIMESTAMP;
|
|
""")
|
|
|
|
# Add review_topics column if it doesn't exist (extracted topic filters with mention counts)
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS review_topics JSONB;
|
|
""")
|
|
|
|
# Update constraint to include 'partial' status (for existing databases)
|
|
await conn.execute("""
|
|
ALTER TABLE jobs DROP CONSTRAINT IF EXISTS valid_status;
|
|
""")
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled', 'partial'));
|
|
""")
|
|
|
|
# Create indexes
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at DESC);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_webhook ON jobs(webhook_url) WHERE webhook_url IS NOT NULL;
|
|
""")
|
|
|
|
# Create canary results table
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS canary_results (
|
|
id SERIAL PRIMARY KEY,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
success BOOLEAN NOT NULL,
|
|
reviews_count INTEGER,
|
|
scrape_time REAL,
|
|
error_message TEXT,
|
|
metadata JSONB
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_canary_timestamp ON canary_results(timestamp DESC);
|
|
""")
|
|
|
|
# Create webhook attempts table (for retry tracking)
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS webhook_attempts (
|
|
id SERIAL PRIMARY KEY,
|
|
job_id UUID NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE,
|
|
attempt_number INTEGER NOT NULL,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
success BOOLEAN NOT NULL,
|
|
status_code INTEGER,
|
|
error_message TEXT,
|
|
response_time_ms REAL
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_webhook_job_id ON webhook_attempts(job_id);
|
|
""")
|
|
|
|
# Add session_fingerprint and metrics_history columns to jobs table
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS session_fingerprint JSONB;
|
|
""")
|
|
await conn.execute("""
|
|
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS metrics_history JSONB;
|
|
""")
|
|
|
|
# Create crash_reports table
|
|
await conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS crash_reports (
|
|
crash_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
job_id UUID REFERENCES jobs(job_id) ON DELETE CASCADE,
|
|
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
|
crash_type VARCHAR(50) NOT NULL,
|
|
error_message TEXT,
|
|
state JSONB NOT NULL,
|
|
metrics_history JSONB,
|
|
logs_before_crash JSONB,
|
|
analysis JSONB,
|
|
screenshot_url TEXT,
|
|
dom_snapshot_id UUID
|
|
);
|
|
""")
|
|
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_crash_reports_job ON crash_reports(job_id);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_crash_reports_type ON crash_reports(crash_type);
|
|
""")
|
|
await conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_crash_reports_created ON crash_reports(created_at DESC);
|
|
""")
|
|
|
|
log.info("Database schema initialized")
|
|
|
|
# ==================== Job Operations ====================
|
|
|
|
async def create_job(
|
|
self,
|
|
url: str,
|
|
webhook_url: Optional[str] = None,
|
|
webhook_secret: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> UUID:
|
|
"""
|
|
Create a new scraping job.
|
|
|
|
Args:
|
|
url: Google Maps URL to scrape
|
|
webhook_url: Optional webhook URL for notifications
|
|
webhook_secret: Optional secret for webhook signature
|
|
metadata: Optional additional metadata
|
|
|
|
Returns:
|
|
UUID of created job
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
job_id = await conn.fetchval("""
|
|
INSERT INTO jobs (url, webhook_url, webhook_secret, metadata)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING job_id
|
|
""", url, webhook_url, webhook_secret, json.dumps(metadata) if metadata else None)
|
|
|
|
log.info(f"Created job {job_id} for URL: {url[:80]}...")
|
|
return job_id
|
|
|
|
async def get_job(self, job_id: UUID) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get job by ID.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
|
|
Returns:
|
|
Job dictionary or None if not found
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
row = await conn.fetchrow("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
webhook_url,
|
|
created_at,
|
|
started_at,
|
|
completed_at,
|
|
updated_at,
|
|
reviews_count,
|
|
total_reviews,
|
|
reviews_data,
|
|
scrape_time,
|
|
error_message,
|
|
metadata,
|
|
scrape_logs,
|
|
review_topics
|
|
FROM jobs
|
|
WHERE job_id = $1
|
|
""", job_id)
|
|
|
|
if not row:
|
|
return None
|
|
|
|
return dict(row)
|
|
|
|
async def get_job_reviews(self, job_id: UUID, include_partial: bool = True) -> Optional[List[Dict[str, Any]]]:
|
|
"""
|
|
Get reviews for a specific job.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
include_partial: If True, also return reviews for running and partial jobs
|
|
|
|
Returns:
|
|
List of reviews or None if not found/no reviews
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
if include_partial:
|
|
# Return reviews for completed, running, or partial jobs
|
|
reviews_data = await conn.fetchval("""
|
|
SELECT reviews_data
|
|
FROM jobs
|
|
WHERE job_id = $1 AND status IN ('completed', 'running', 'partial')
|
|
""", job_id)
|
|
else:
|
|
# Only return reviews for completed jobs
|
|
reviews_data = await conn.fetchval("""
|
|
SELECT reviews_data
|
|
FROM jobs
|
|
WHERE job_id = $1 AND status = 'completed'
|
|
""", job_id)
|
|
|
|
if not reviews_data:
|
|
return None
|
|
|
|
# asyncpg returns JSONB as string, need to parse it
|
|
if isinstance(reviews_data, str):
|
|
return json.loads(reviews_data)
|
|
|
|
return reviews_data
|
|
|
|
async def update_job_status(
|
|
self,
|
|
job_id: UUID,
|
|
status: JobStatus,
|
|
**kwargs
|
|
):
|
|
"""
|
|
Update job status and optional fields.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
status: New status
|
|
**kwargs: Additional fields to update (started_at, completed_at, error_message, etc.)
|
|
"""
|
|
# Build dynamic UPDATE query
|
|
set_clauses = ["status = $2"]
|
|
params = [job_id, status.value]
|
|
param_idx = 3
|
|
|
|
if status == JobStatus.RUNNING and 'started_at' not in kwargs:
|
|
kwargs['started_at'] = datetime.now()
|
|
elif status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED] and 'completed_at' not in kwargs:
|
|
kwargs['completed_at'] = datetime.now()
|
|
|
|
for key, value in kwargs.items():
|
|
# Handle JSONB fields specially
|
|
if key == 'scrape_logs' and value is not None:
|
|
set_clauses.append(f"{key} = ${param_idx}::jsonb")
|
|
params.append(json.dumps(value) if not isinstance(value, str) else value)
|
|
else:
|
|
set_clauses.append(f"{key} = ${param_idx}")
|
|
params.append(value)
|
|
param_idx += 1
|
|
|
|
query = f"""
|
|
UPDATE jobs
|
|
SET {', '.join(set_clauses)}
|
|
WHERE job_id = $1
|
|
"""
|
|
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute(query, *params)
|
|
|
|
async def save_job_result(
|
|
self,
|
|
job_id: UUID,
|
|
reviews: List[Dict[str, Any]],
|
|
scrape_time: float,
|
|
total_reviews: Optional[int] = None,
|
|
scrape_logs: Optional[List[Dict[str, Any]]] = None,
|
|
review_topics: Optional[List[Dict[str, Any]]] = None
|
|
):
|
|
"""
|
|
Save scraping results to database.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
reviews: List of review dictionaries
|
|
scrape_time: Time taken to scrape in seconds
|
|
total_reviews: Total reviews available (from page counter)
|
|
scrape_logs: List of log entries from the scraper
|
|
review_topics: List of topic filter dictionaries with topic and count
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
# If reviews list is empty, check if job already has reviews from incremental saves
|
|
# This happens when flush_callback was used during scraping
|
|
if not reviews:
|
|
existing = await conn.fetchval(
|
|
"SELECT reviews_count FROM jobs WHERE job_id = $1", job_id
|
|
)
|
|
if existing and existing > 0:
|
|
# Job has reviews from incremental saves, don't overwrite reviews_data
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
status = 'completed',
|
|
completed_at = NOW(),
|
|
total_reviews = COALESCE($2, total_reviews),
|
|
scrape_time = $3,
|
|
scrape_logs = $4::jsonb,
|
|
review_topics = $5::jsonb
|
|
WHERE job_id = $1
|
|
""", job_id, total_reviews, scrape_time,
|
|
json.dumps(scrape_logs) if scrape_logs else None,
|
|
json.dumps(review_topics) if review_topics else None)
|
|
log.info(f"Completed job {job_id} with {existing} reviews (from incremental saves)")
|
|
return
|
|
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
status = 'completed',
|
|
completed_at = NOW(),
|
|
reviews_count = $2,
|
|
total_reviews = $3,
|
|
reviews_data = $4::jsonb,
|
|
scrape_time = $5,
|
|
scrape_logs = $6::jsonb,
|
|
review_topics = $7::jsonb
|
|
WHERE job_id = $1
|
|
""", job_id, len(reviews), total_reviews, json.dumps(reviews), scrape_time,
|
|
json.dumps(scrape_logs) if scrape_logs else None,
|
|
json.dumps(review_topics) if review_topics else None)
|
|
|
|
log.info(f"Saved {len(reviews)} reviews for job {job_id}")
|
|
|
|
async def save_reviews_incremental(
|
|
self,
|
|
job_id: UUID,
|
|
reviews: List[Dict[str, Any]],
|
|
total_reviews: Optional[int] = None
|
|
):
|
|
"""
|
|
Save reviews incrementally during scraping.
|
|
Called on each flush to preserve progress in case of crash.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
reviews: ALL reviews collected so far (not just new ones)
|
|
total_reviews: Total reviews available (from page counter)
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
reviews_count = $2,
|
|
total_reviews = COALESCE($3, total_reviews),
|
|
reviews_data = $4::jsonb,
|
|
updated_at = NOW()
|
|
WHERE job_id = $1 AND status = 'running'
|
|
""", job_id, len(reviews), total_reviews, json.dumps(reviews))
|
|
|
|
log.debug(f"Incremental save: {len(reviews)} reviews for job {job_id}")
|
|
|
|
async def update_session_fingerprint(
|
|
self,
|
|
job_id: UUID,
|
|
session_fingerprint: Dict[str, Any]
|
|
):
|
|
"""
|
|
Update the session fingerprint for a job.
|
|
|
|
This should be called early in the scraping process after the browser
|
|
fingerprint is captured, to record browser characteristics for
|
|
bot detection analysis.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
session_fingerprint: Dictionary containing browser fingerprint data:
|
|
- user_agent: Browser user agent string
|
|
- platform: OS platform
|
|
- language: Primary language
|
|
- languages: List of accepted languages
|
|
- timezone: Timezone string
|
|
- screen: {width, height, colorDepth}
|
|
- viewport: {width, height}
|
|
- webgl_vendor: WebGL vendor string
|
|
- webgl_renderer: WebGL renderer string
|
|
- canvas_fingerprint: Canvas fingerprint hash
|
|
- hardware_concurrency: Number of CPU cores
|
|
- device_memory: Device memory in GB
|
|
- bot_detection_tests: {webdriver_hidden, chrome_runtime, permissions_query}
|
|
- captured_at: ISO timestamp when fingerprint was captured
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
session_fingerprint = $2::jsonb,
|
|
updated_at = NOW()
|
|
WHERE job_id = $1
|
|
""", job_id, json.dumps(session_fingerprint))
|
|
|
|
log.debug(f"Updated session fingerprint for job {job_id}")
|
|
|
|
async def mark_job_partial(
|
|
self,
|
|
job_id: UUID,
|
|
error_message: str,
|
|
scrape_logs: Optional[List[Dict[str, Any]]] = None
|
|
):
|
|
"""
|
|
Mark a job as partial (crashed but has some reviews saved).
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
error_message: Error that caused the crash
|
|
scrape_logs: Log entries from the scraper
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE jobs
|
|
SET
|
|
status = 'partial',
|
|
completed_at = NOW(),
|
|
error_message = $2,
|
|
scrape_logs = $3::jsonb
|
|
WHERE job_id = $1
|
|
""", job_id, error_message, json.dumps(scrape_logs) if scrape_logs else None)
|
|
|
|
log.info(f"Marked job {job_id} as partial due to: {error_message}")
|
|
|
|
async def list_jobs(
|
|
self,
|
|
status: Optional[JobStatus] = None,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List jobs with optional filtering.
|
|
|
|
Args:
|
|
status: Optional status filter
|
|
limit: Maximum number of jobs to return
|
|
offset: Number of jobs to skip
|
|
|
|
Returns:
|
|
List of job dictionaries
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
if status:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
created_at,
|
|
completed_at,
|
|
reviews_count,
|
|
total_reviews,
|
|
scrape_time,
|
|
error_message,
|
|
metadata,
|
|
review_topics
|
|
FROM jobs
|
|
WHERE status = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT $2 OFFSET $3
|
|
""", status.value, limit, offset)
|
|
else:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
created_at,
|
|
completed_at,
|
|
reviews_count,
|
|
total_reviews,
|
|
scrape_time,
|
|
error_message,
|
|
metadata,
|
|
review_topics
|
|
FROM jobs
|
|
ORDER BY created_at DESC
|
|
LIMIT $1 OFFSET $2
|
|
""", limit, offset)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def get_pending_jobs_with_webhooks(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get completed jobs that have webhooks pending delivery.
|
|
|
|
Args:
|
|
limit: Maximum number of jobs to return
|
|
|
|
Returns:
|
|
List of job dictionaries with webhook info
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
job_id,
|
|
status,
|
|
url,
|
|
webhook_url,
|
|
webhook_secret,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message,
|
|
completed_at
|
|
FROM jobs
|
|
WHERE webhook_url IS NOT NULL
|
|
AND status IN ('completed', 'failed')
|
|
AND job_id NOT IN (
|
|
SELECT job_id
|
|
FROM webhook_attempts
|
|
WHERE success = true
|
|
)
|
|
ORDER BY completed_at ASC
|
|
LIMIT $1
|
|
""", limit)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
async def delete_job(self, job_id: UUID) -> bool:
|
|
"""
|
|
Delete a job from the database.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
|
|
Returns:
|
|
True if deleted, False if not found
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute("""
|
|
DELETE FROM jobs WHERE job_id = $1
|
|
""", job_id)
|
|
|
|
deleted = result.split()[-1] == "1"
|
|
if deleted:
|
|
log.info(f"Deleted job {job_id}")
|
|
return deleted
|
|
|
|
async def cleanup_old_jobs(self, max_age_days: int = 30):
|
|
"""
|
|
Delete old completed/failed jobs.
|
|
|
|
Args:
|
|
max_age_days: Maximum age in days before deletion
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
result = await conn.execute("""
|
|
DELETE FROM jobs
|
|
WHERE status IN ('completed', 'failed', 'cancelled')
|
|
AND completed_at < NOW() - INTERVAL '%s days'
|
|
""", max_age_days)
|
|
|
|
deleted_count = int(result.split()[-1])
|
|
if deleted_count > 0:
|
|
log.info(f"Cleaned up {deleted_count} old jobs")
|
|
|
|
# ==================== Statistics ====================
|
|
|
|
async def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get job statistics.
|
|
|
|
Returns:
|
|
Statistics dictionary
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
stats = await conn.fetchrow("""
|
|
SELECT
|
|
COUNT(*) as total_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'pending') as pending,
|
|
COUNT(*) FILTER (WHERE status = 'running') as running,
|
|
COUNT(*) FILTER (WHERE status = 'completed') as completed,
|
|
COUNT(*) FILTER (WHERE status = 'failed') as failed,
|
|
COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
|
|
AVG(scrape_time) FILTER (WHERE status = 'completed') as avg_scrape_time,
|
|
SUM(reviews_count) FILTER (WHERE status = 'completed') as total_reviews
|
|
FROM jobs
|
|
""")
|
|
|
|
return dict(stats)
|
|
|
|
# ==================== Canary Operations ====================
|
|
|
|
async def save_canary_result(
|
|
self,
|
|
success: bool,
|
|
reviews_count: Optional[int] = None,
|
|
scrape_time: Optional[float] = None,
|
|
error_message: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""
|
|
Save canary test result.
|
|
|
|
Args:
|
|
success: Whether canary test succeeded
|
|
reviews_count: Number of reviews scraped
|
|
scrape_time: Time taken in seconds
|
|
error_message: Error message if failed
|
|
metadata: Additional metadata
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO canary_results (success, reviews_count, scrape_time, error_message, metadata)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
""", success, reviews_count, scrape_time, error_message, json.dumps(metadata) if metadata else None)
|
|
|
|
async def get_canary_history(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get canary test history.
|
|
|
|
Args:
|
|
limit: Maximum number of results to return
|
|
|
|
Returns:
|
|
List of canary result dictionaries
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
rows = await conn.fetch("""
|
|
SELECT
|
|
timestamp,
|
|
success,
|
|
reviews_count,
|
|
scrape_time,
|
|
error_message
|
|
FROM canary_results
|
|
ORDER BY timestamp DESC
|
|
LIMIT $1
|
|
""", limit)
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
# ==================== Webhook Attempts ====================
|
|
|
|
async def log_webhook_attempt(
|
|
self,
|
|
job_id: UUID,
|
|
attempt_number: int,
|
|
success: bool,
|
|
status_code: Optional[int] = None,
|
|
error_message: Optional[str] = None,
|
|
response_time_ms: Optional[float] = None
|
|
):
|
|
"""
|
|
Log a webhook delivery attempt.
|
|
|
|
Args:
|
|
job_id: Job UUID
|
|
attempt_number: Attempt number (1, 2, 3...)
|
|
success: Whether delivery succeeded
|
|
status_code: HTTP status code
|
|
error_message: Error message if failed
|
|
response_time_ms: Response time in milliseconds
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO webhook_attempts (job_id, attempt_number, success, status_code, error_message, response_time_ms)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
""", job_id, attempt_number, success, status_code, error_message, response_time_ms)
|
|
|
|
# ==================== Crash Reports ====================
|
|
|
|
async def save_crash_report(self, job_id: str, crash_data: dict) -> str:
|
|
"""
|
|
Save a crash report and return the crash_id.
|
|
|
|
Args:
|
|
job_id: Job UUID as string
|
|
crash_data: Dictionary containing crash report data:
|
|
- crash_type: Type of crash (required)
|
|
- error_message: Error message (optional)
|
|
- state: Current state at crash time (required)
|
|
- metrics_history: Historical metrics (optional)
|
|
- logs_before_crash: Log entries before crash (optional)
|
|
- analysis: Crash analysis data (optional)
|
|
- screenshot_url: URL to screenshot (optional)
|
|
- dom_snapshot_id: UUID of DOM snapshot (optional)
|
|
|
|
Returns:
|
|
UUID of created crash report as string
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
# Convert job_id string to UUID
|
|
job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id
|
|
|
|
crash_id = await conn.fetchval("""
|
|
INSERT INTO crash_reports (
|
|
job_id,
|
|
crash_type,
|
|
error_message,
|
|
state,
|
|
metrics_history,
|
|
logs_before_crash,
|
|
analysis,
|
|
screenshot_url,
|
|
dom_snapshot_id
|
|
)
|
|
VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8, $9)
|
|
RETURNING crash_id
|
|
""",
|
|
job_uuid,
|
|
crash_data.get('crash_type'),
|
|
crash_data.get('error_message'),
|
|
json.dumps(crash_data.get('state', {})),
|
|
json.dumps(crash_data.get('metrics_history')) if crash_data.get('metrics_history') else None,
|
|
json.dumps(crash_data.get('logs_before_crash')) if crash_data.get('logs_before_crash') else None,
|
|
json.dumps(crash_data.get('analysis')) if crash_data.get('analysis') else None,
|
|
crash_data.get('screenshot_url'),
|
|
UUID(crash_data['dom_snapshot_id']) if crash_data.get('dom_snapshot_id') else None
|
|
)
|
|
|
|
log.info(f"Saved crash report {crash_id} for job {job_id}, type: {crash_data.get('crash_type')}")
|
|
return str(crash_id)
|
|
|
|
async def get_crash_report(self, job_id: str) -> Optional[dict]:
|
|
"""
|
|
Get crash report for a job, if any.
|
|
|
|
Args:
|
|
job_id: Job UUID as string
|
|
|
|
Returns:
|
|
Crash report dictionary or None if not found
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
job_uuid = UUID(job_id) if isinstance(job_id, str) else job_id
|
|
|
|
row = await conn.fetchrow("""
|
|
SELECT
|
|
crash_id,
|
|
job_id,
|
|
created_at,
|
|
crash_type,
|
|
error_message,
|
|
state,
|
|
metrics_history,
|
|
logs_before_crash,
|
|
analysis,
|
|
screenshot_url,
|
|
dom_snapshot_id
|
|
FROM crash_reports
|
|
WHERE job_id = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""", job_uuid)
|
|
|
|
if not row:
|
|
return None
|
|
|
|
result = dict(row)
|
|
# Convert UUIDs to strings for JSON serialization
|
|
result['crash_id'] = str(result['crash_id'])
|
|
result['job_id'] = str(result['job_id'])
|
|
if result.get('dom_snapshot_id'):
|
|
result['dom_snapshot_id'] = str(result['dom_snapshot_id'])
|
|
|
|
return result
|
|
|
|
async def get_crash_stats(self, days: int = 7) -> dict:
|
|
"""
|
|
Get crash statistics for the last N days.
|
|
|
|
Args:
|
|
days: Number of days to look back (default: 7)
|
|
|
|
Returns:
|
|
Dictionary with:
|
|
- total: Total number of crashes
|
|
- by_type: Dict mapping crash type to count
|
|
- by_day: List of dicts with date and count
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
# Get total count
|
|
total = await conn.fetchval("""
|
|
SELECT COUNT(*)
|
|
FROM crash_reports
|
|
WHERE created_at >= NOW() - INTERVAL '%s days'
|
|
""", days)
|
|
|
|
# Get counts by type
|
|
type_rows = await conn.fetch("""
|
|
SELECT crash_type, COUNT(*) as count
|
|
FROM crash_reports
|
|
WHERE created_at >= NOW() - INTERVAL '%s days'
|
|
GROUP BY crash_type
|
|
ORDER BY count DESC
|
|
""", days)
|
|
|
|
by_type = {row['crash_type']: row['count'] for row in type_rows}
|
|
|
|
# Get counts by day
|
|
day_rows = await conn.fetch("""
|
|
SELECT DATE(created_at) as date, COUNT(*) as count
|
|
FROM crash_reports
|
|
WHERE created_at >= NOW() - INTERVAL '%s days'
|
|
GROUP BY DATE(created_at)
|
|
ORDER BY date DESC
|
|
""", days)
|
|
|
|
by_day = [{'date': str(row['date']), 'count': row['count']} for row in day_rows]
|
|
|
|
return {
|
|
'total': total or 0,
|
|
'by_type': by_type,
|
|
'by_day': by_day
|
|
}
|