Wave 3: SSE structured logs, crash analyzer, session fingerprint
- Task #3: Update SSE stream to emit structured log events (type: "log" for entries, type: "metrics" every 5s, ?format=legacy for backward compat) - Task #10: Create crash pattern analyzer module (6 patterns: memory_exhaustion, dom_bloat, rate_limited, consent_loop, scroll_timeout, element_stale) (confidence scoring, auto-fix params, summarize_crash_patterns for recurring issues) - Task #13: Capture session fingerprint in backend (user_agent, platform, timezone, webgl, canvas, bot_detection_tests) (saved on success and failure for debugging) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional, List, Dict, Any
|
||||
from uuid import UUID
|
||||
@@ -22,6 +23,7 @@ from modules.database import DatabaseManager, JobStatus
|
||||
from modules.webhooks import WebhookDispatcher, WebhookManager
|
||||
from modules.health_checks import HealthCheckSystem
|
||||
from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper
|
||||
from modules.structured_logger import StructuredLogger, LogEntry
|
||||
from modules.chrome_pool import (
|
||||
start_worker_pools,
|
||||
stop_worker_pools,
|
||||
@@ -361,6 +363,51 @@ async def get_job_logs(job_id: UUID):
|
||||
|
||||
# ==================== SSE Streaming Endpoints ====================
|
||||
|
||||
def format_sse_event(event_type: str, data: dict, use_structured_format: bool = True) -> str:
|
||||
"""
|
||||
Format an SSE event message.
|
||||
|
||||
Args:
|
||||
event_type: The SSE event type (e.g., 'log', 'metrics', 'status')
|
||||
data: The data payload
|
||||
use_structured_format: If True, wrap in {"type": ..., "data": ...} structure
|
||||
If False, use legacy format for backward compatibility
|
||||
|
||||
Returns:
|
||||
Formatted SSE message string
|
||||
"""
|
||||
if use_structured_format:
|
||||
payload = {"type": event_type, "data": data}
|
||||
else:
|
||||
payload = data
|
||||
return f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
|
||||
|
||||
|
||||
def format_structured_log_event(log_entry: dict) -> str:
|
||||
"""
|
||||
Format a structured log entry as an SSE event.
|
||||
|
||||
The log_entry should already be a dict from StructuredLogger.get_logs().
|
||||
|
||||
Returns:
|
||||
Formatted SSE message with type "log"
|
||||
"""
|
||||
return format_sse_event("log", log_entry, use_structured_format=True)
|
||||
|
||||
|
||||
def format_metrics_event(metrics: dict) -> str:
|
||||
"""
|
||||
Format a metrics event for SSE streaming.
|
||||
|
||||
Args:
|
||||
metrics: Dict containing reviews_extracted, scroll_count, memory_mb, extraction_rate
|
||||
|
||||
Returns:
|
||||
Formatted SSE message with type "metrics"
|
||||
"""
|
||||
return format_sse_event("metrics", metrics, use_structured_format=True)
|
||||
|
||||
|
||||
async def broadcast_job_update(job_id: str, event_type: str, data: dict):
|
||||
"""Broadcast an update to all subscribers of a job stream and the all-jobs stream."""
|
||||
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
|
||||
@@ -381,22 +428,59 @@ async def broadcast_job_update(job_id: str, event_type: str, data: dict):
|
||||
pass
|
||||
|
||||
|
||||
async def broadcast_structured_log(job_id: str, log_entry: dict):
|
||||
"""Broadcast a structured log entry to job subscribers."""
|
||||
message = format_structured_log_event(log_entry)
|
||||
|
||||
if job_id in job_update_queues:
|
||||
for queue in job_update_queues[job_id]:
|
||||
try:
|
||||
await queue.put(message)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
async def broadcast_metrics(job_id: str, metrics: dict):
|
||||
"""Broadcast metrics update to job subscribers."""
|
||||
message = format_metrics_event(metrics)
|
||||
|
||||
if job_id in job_update_queues:
|
||||
for queue in job_update_queues[job_id]:
|
||||
try:
|
||||
await queue.put(message)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@app.get("/jobs/{job_id}/stream", summary="Stream Job Updates (SSE)")
|
||||
async def stream_job_updates(job_id: UUID):
|
||||
async def stream_job_updates(
|
||||
job_id: UUID,
|
||||
format: str = Query("structured", description="Event format: 'structured' (new) or 'legacy' (backward compatible)")
|
||||
):
|
||||
"""
|
||||
Server-Sent Events stream for real-time job updates.
|
||||
|
||||
Streams:
|
||||
Event types (structured format):
|
||||
- init: Initial job state with all logs
|
||||
- log: Individual structured log entry {"type": "log", "data": {"timestamp": "...", "level": "INFO", ...}}
|
||||
- metrics: Periodic metrics {"type": "metrics", "data": {"reviews_extracted": 150, "scroll_count": 45, ...}}
|
||||
- status: Job status changes
|
||||
- progress: Review count and progress updates
|
||||
- logs: New log entries
|
||||
- complete: Job finished (completed/failed)
|
||||
|
||||
Query parameters:
|
||||
- format: 'structured' (default) for new format, 'legacy' for backward compatibility
|
||||
|
||||
Connect with EventSource in the browser:
|
||||
```javascript
|
||||
const es = new EventSource('/jobs/{job_id}/stream');
|
||||
es.onmessage = (e) => console.log(JSON.parse(e.data));
|
||||
es.addEventListener('logs', (e) => console.log('Logs:', JSON.parse(e.data)));
|
||||
es.addEventListener('log', (e) => {
|
||||
const event = JSON.parse(e.data);
|
||||
console.log('Log:', event.data); // Structured log entry
|
||||
});
|
||||
es.addEventListener('metrics', (e) => {
|
||||
const event = JSON.parse(e.data);
|
||||
console.log('Metrics:', event.data); // {reviews_extracted, scroll_count, memory_mb, extraction_rate}
|
||||
});
|
||||
```
|
||||
"""
|
||||
if not db:
|
||||
@@ -408,6 +492,7 @@ async def stream_job_updates(job_id: UUID):
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
|
||||
job_id_str = str(job_id)
|
||||
use_structured = format.lower() != "legacy"
|
||||
|
||||
# Create queue for this client
|
||||
queue: asyncio.Queue = asyncio.Queue()
|
||||
@@ -421,6 +506,7 @@ async def stream_job_updates(job_id: UUID):
|
||||
try:
|
||||
# Send initial state
|
||||
job_data = await db.get_job(job_id)
|
||||
scrape_logs = []
|
||||
if job_data:
|
||||
scrape_logs = job_data.get('scrape_logs')
|
||||
if isinstance(scrape_logs, str):
|
||||
@@ -448,6 +534,8 @@ async def stream_job_updates(job_id: UUID):
|
||||
# Keep connection alive and send updates
|
||||
last_log_count = len(scrape_logs) if scrape_logs else 0
|
||||
last_reviews_count = job_data.get('reviews_count') if job_data else 0
|
||||
last_metrics_time = time.time()
|
||||
metrics_interval = 5.0 # Emit metrics every 5 seconds
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -459,31 +547,11 @@ async def stream_job_updates(job_id: UUID):
|
||||
# Send keepalive comment
|
||||
yield ": keepalive\n\n"
|
||||
|
||||
# Also poll database for updates (backup in case broadcast missed)
|
||||
job_data = await db.get_job(job_id)
|
||||
if job_data:
|
||||
# Check for status change
|
||||
if job_data['status'] in ['completed', 'failed', 'cancelled']:
|
||||
scrape_logs = job_data.get('scrape_logs')
|
||||
if isinstance(scrape_logs, str):
|
||||
try:
|
||||
scrape_logs = json.loads(scrape_logs)
|
||||
except:
|
||||
scrape_logs = []
|
||||
|
||||
final = {
|
||||
"job_id": job_id_str,
|
||||
"status": job_data['status'],
|
||||
"reviews_count": job_data.get('reviews_count'),
|
||||
"total_reviews": job_data.get('total_reviews'),
|
||||
"scrape_time": job_data.get('scrape_time'),
|
||||
"error_message": job_data.get('error_message'),
|
||||
"logs": scrape_logs or []
|
||||
}
|
||||
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
|
||||
return
|
||||
|
||||
# Check for new logs or progress
|
||||
# Also poll database for updates (backup in case broadcast missed)
|
||||
job_data = await db.get_job(job_id)
|
||||
if job_data:
|
||||
# Check for status change
|
||||
if job_data['status'] in ['completed', 'failed', 'cancelled']:
|
||||
scrape_logs = job_data.get('scrape_logs')
|
||||
if isinstance(scrape_logs, str):
|
||||
try:
|
||||
@@ -491,10 +559,72 @@ async def stream_job_updates(job_id: UUID):
|
||||
except:
|
||||
scrape_logs = []
|
||||
|
||||
current_log_count = len(scrape_logs) if scrape_logs else 0
|
||||
current_reviews = job_data.get('reviews_count') or 0
|
||||
final = {
|
||||
"job_id": job_id_str,
|
||||
"status": job_data['status'],
|
||||
"reviews_count": job_data.get('reviews_count'),
|
||||
"total_reviews": job_data.get('total_reviews'),
|
||||
"scrape_time": job_data.get('scrape_time'),
|
||||
"error_message": job_data.get('error_message'),
|
||||
"logs": scrape_logs or []
|
||||
}
|
||||
yield f"event: complete\ndata: {json.dumps(final)}\n\n"
|
||||
return
|
||||
|
||||
if current_log_count > last_log_count or current_reviews != last_reviews_count:
|
||||
# Check for new logs or progress
|
||||
scrape_logs = job_data.get('scrape_logs')
|
||||
if isinstance(scrape_logs, str):
|
||||
try:
|
||||
scrape_logs = json.loads(scrape_logs)
|
||||
except:
|
||||
scrape_logs = []
|
||||
|
||||
current_log_count = len(scrape_logs) if scrape_logs else 0
|
||||
current_reviews = job_data.get('reviews_count') or 0
|
||||
current_time = time.time()
|
||||
|
||||
# Emit individual structured log events for new logs
|
||||
if use_structured and current_log_count > last_log_count:
|
||||
new_logs = scrape_logs[last_log_count:] if scrape_logs else []
|
||||
for log_entry in new_logs:
|
||||
yield format_structured_log_event(log_entry)
|
||||
|
||||
# Emit metrics event every 5 seconds during job execution
|
||||
if use_structured and job_data['status'] == 'running':
|
||||
if current_time - last_metrics_time >= metrics_interval:
|
||||
# Calculate extraction rate (reviews per second)
|
||||
elapsed = job_data.get('scrape_time') or 0
|
||||
extraction_rate = (current_reviews / elapsed) if elapsed > 0 else 0
|
||||
|
||||
# Count scroll events from logs
|
||||
scroll_count = 0
|
||||
if scrape_logs:
|
||||
for entry in scrape_logs:
|
||||
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
||||
if 'scroll' in msg.lower():
|
||||
scroll_count += 1
|
||||
|
||||
# Get memory from latest log entry with metrics
|
||||
memory_mb = 0
|
||||
if scrape_logs:
|
||||
for entry in reversed(scrape_logs):
|
||||
if isinstance(entry, dict) and entry.get('metrics'):
|
||||
memory_mb = entry['metrics'].get('memory_mb', 0)
|
||||
break
|
||||
|
||||
metrics_data = {
|
||||
"reviews_extracted": current_reviews,
|
||||
"scroll_count": scroll_count,
|
||||
"memory_mb": memory_mb,
|
||||
"extraction_rate": round(extraction_rate, 2)
|
||||
}
|
||||
yield format_metrics_event(metrics_data)
|
||||
last_metrics_time = current_time
|
||||
|
||||
# Send legacy update event if reviews or logs changed
|
||||
if current_log_count > last_log_count or current_reviews != last_reviews_count:
|
||||
if not use_structured:
|
||||
# Legacy format: send all logs in update event
|
||||
update = {
|
||||
"job_id": job_id_str,
|
||||
"status": job_data['status'],
|
||||
@@ -503,8 +633,8 @@ async def stream_job_updates(job_id: UUID):
|
||||
"logs": scrape_logs or []
|
||||
}
|
||||
yield f"event: update\ndata: {json.dumps(update)}\n\n"
|
||||
last_log_count = current_log_count
|
||||
last_reviews_count = current_reviews
|
||||
last_log_count = current_log_count
|
||||
last_reviews_count = current_reviews
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error in SSE stream for job {job_id}: {e}")
|
||||
@@ -930,6 +1060,11 @@ async def run_scraping_job(job_id: UUID):
|
||||
total_reviews_seen = [None]
|
||||
# Accumulate all reviews for incremental saves (flush_callback receives batches)
|
||||
all_reviews_collected = []
|
||||
# Track last broadcasted log count for streaming new logs
|
||||
last_broadcasted_log_count = [0]
|
||||
# Track last metrics broadcast time
|
||||
last_metrics_broadcast_time = [time.time()]
|
||||
metrics_broadcast_interval = 5.0 # Emit metrics every 5 seconds
|
||||
|
||||
# Progress callback to update job status with current/total counts AND logs
|
||||
def progress_callback(current_count: int, total_count: int):
|
||||
@@ -948,7 +1083,45 @@ async def run_scraping_job(job_id: UUID):
|
||||
scrape_logs=current_logs
|
||||
)
|
||||
|
||||
# Broadcast progress via SSE
|
||||
# Broadcast individual structured log events for new logs
|
||||
current_log_count = len(current_logs)
|
||||
if current_log_count > last_broadcasted_log_count[0]:
|
||||
new_logs = current_logs[last_broadcasted_log_count[0]:]
|
||||
for log_entry in new_logs:
|
||||
await broadcast_structured_log(job_id_str, log_entry)
|
||||
last_broadcasted_log_count[0] = current_log_count
|
||||
|
||||
# Broadcast metrics event every 5 seconds
|
||||
current_time = time.time()
|
||||
if current_time - last_metrics_broadcast_time[0] >= metrics_broadcast_interval:
|
||||
# Calculate extraction rate
|
||||
elapsed = current_time - last_metrics_broadcast_time[0]
|
||||
extraction_rate = (current_count / elapsed) if elapsed > 0 else 0
|
||||
|
||||
# Count scroll events from logs
|
||||
scroll_count = 0
|
||||
for entry in current_logs:
|
||||
msg = entry.get('message', '') if isinstance(entry, dict) else str(entry)
|
||||
if 'scroll' in msg.lower():
|
||||
scroll_count += 1
|
||||
|
||||
# Get memory from latest log entry with metrics
|
||||
memory_mb = 0
|
||||
for entry in reversed(current_logs):
|
||||
if isinstance(entry, dict) and entry.get('metrics'):
|
||||
memory_mb = entry['metrics'].get('memory_mb', 0)
|
||||
break
|
||||
|
||||
metrics_data = {
|
||||
"reviews_extracted": current_count,
|
||||
"scroll_count": scroll_count,
|
||||
"memory_mb": memory_mb,
|
||||
"extraction_rate": round(extraction_rate, 2)
|
||||
}
|
||||
await broadcast_metrics(job_id_str, metrics_data)
|
||||
last_metrics_broadcast_time[0] = current_time
|
||||
|
||||
# Broadcast progress via SSE (legacy format for backward compatibility)
|
||||
await broadcast_job_update(job_id_str, "job_progress", {
|
||||
"job_id": job_id_str,
|
||||
"status": "running",
|
||||
@@ -989,6 +1162,11 @@ async def run_scraping_job(job_id: UUID):
|
||||
)
|
||||
|
||||
if result['success']:
|
||||
# Save session fingerprint if captured
|
||||
if result.get('session_fingerprint'):
|
||||
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
||||
log.info(f"Saved session fingerprint for job {job_id}")
|
||||
|
||||
# Save results to database (including scraper logs and review topics)
|
||||
await db.save_job_result(
|
||||
job_id=job_id,
|
||||
@@ -1030,6 +1208,11 @@ async def run_scraping_job(job_id: UUID):
|
||||
)
|
||||
|
||||
else:
|
||||
# Save session fingerprint even on failure (useful for debugging bot detection)
|
||||
if result.get('session_fingerprint'):
|
||||
await db.update_session_fingerprint(job_id, result['session_fingerprint'])
|
||||
log.info(f"Saved session fingerprint for failed job {job_id}")
|
||||
|
||||
# Job failed - check if we have partial reviews saved
|
||||
current_job = await db.get_job(job_id)
|
||||
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
||||
|
||||
Reference in New Issue
Block a user