diff --git a/api_server_production.py b/api_server_production.py index 03d3ced..f7b0733 100644 --- a/api_server_production.py +++ b/api_server_production.py @@ -11,6 +11,7 @@ import logging import os import time from contextlib import asynccontextmanager +from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from uuid import UUID @@ -23,6 +24,7 @@ from modules.database import DatabaseManager, JobStatus from modules.webhooks import WebhookDispatcher, WebhookManager from modules.health_checks import HealthCheckSystem from modules.scraper_clean import fast_scrape_reviews, LogCapture, get_business_card_info # Clean scraper +from modules.crash_analyzer import analyze_crash, summarize_crash_patterns, apply_auto_fix from modules.structured_logger import StructuredLogger, LogEntry from modules.chrome_pool import ( start_worker_pools, @@ -207,6 +209,51 @@ class StatsResponse(BaseModel): total_reviews: Optional[int] = None +class CrashAnalysisModel(BaseModel): + """Crash analysis details""" + pattern: str = Field(..., description="Identified crash pattern type") + confidence: float = Field(..., description="Confidence score 0.0 to 1.0") + description: str = Field(..., description="Description of the crash cause") + suggested_fix: str = Field(..., description="Recommended fix action") + auto_fix_params: Optional[Dict[str, Any]] = Field(None, description="Parameters for auto-fix") + + +class CrashReportResponse(BaseModel): + """Response model for crash report""" + crash_id: str + job_id: str + crash_type: str + error_message: Optional[str] = None + analysis: Optional[CrashAnalysisModel] = None + metrics_history: Optional[List[Dict[str, Any]]] = None + logs_before_crash: Optional[List[Dict[str, Any]]] = None + screenshot_url: Optional[str] = None + created_at: str + + +class RetryJobResponse(BaseModel): + """Response model for retry job""" + job_id: str + status: str + message: str + applied_fixes: Optional[Dict[str, Any]] = None + + +class CrashPatternStats(BaseModel): + """Statistics for a single crash pattern""" + count: int + percentage: float + avg_confidence: float + + +class CrashStatsResponse(BaseModel): + """Response model for aggregate crash statistics""" + total_crashes: int + patterns: Dict[str, CrashPatternStats] + most_common: Optional[str] = None + recommendations: List[Dict[str, Any]] + + # ==================== API Endpoints ==================== @app.get("/", summary="API Health Check") @@ -946,6 +993,329 @@ async def pool_stats(): return await asyncio.to_thread(get_pool_stats) +# ==================== Crash Report Endpoints ==================== + +@app.get("/jobs/{job_id}/crash-report", response_model=CrashReportResponse, summary="Get Crash Report") +async def get_crash_report(job_id: UUID): + """ + Get the crash report for a failed or partial job. + + Returns detailed crash analysis including: + - Crash pattern identification (memory_exhaustion, rate_limited, etc.) + - Confidence score for the pattern match + - Suggested fixes and auto-fix parameters + - Metrics history and logs before the crash + """ + if not db: + raise HTTPException(status_code=500, detail="Database not initialized") + + # Verify job exists + job = await db.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + # Only failed or partial jobs have crash reports + if job['status'] not in ['failed', 'partial']: + raise HTTPException( + status_code=400, + detail=f"Job status is '{job['status']}' - crash reports only available for failed or partial jobs" + ) + + # Get crash report from database + crash_report = await db.get_crash_report(str(job_id)) + + if not crash_report: + # No stored crash report - generate one from job data + # Build crash report from job data for analysis + scrape_logs = job.get('scrape_logs') + if isinstance(scrape_logs, str): + try: + scrape_logs = json.loads(scrape_logs) + except: + scrape_logs = [] + + # Get metrics_history if available + metrics_history = job.get('metrics_history') + if isinstance(metrics_history, str): + try: + metrics_history = json.loads(metrics_history) + except: + metrics_history = [] + + crash_data = { + 'error_message': job.get('error_message', 'Unknown error'), + 'metrics_history': metrics_history or [], + 'logs_before_crash': scrape_logs or [], + 'state': { + 'reviews_extracted': job.get('reviews_count', 0), + 'total_reviews': job.get('total_reviews') + } + } + + # Analyze the crash + analysis = analyze_crash(crash_data) + + # Build response from job data and analysis + return CrashReportResponse( + crash_id=str(job_id), # Use job_id as crash_id when no stored report + job_id=str(job_id), + crash_type=analysis.pattern, + error_message=job.get('error_message'), + analysis=CrashAnalysisModel( + pattern=analysis.pattern, + confidence=analysis.confidence, + description=analysis.description, + suggested_fix=analysis.suggested_fix, + auto_fix_params=analysis.auto_fix_params + ), + metrics_history=metrics_history, + logs_before_crash=scrape_logs, + screenshot_url=None, + created_at=job['completed_at'].isoformat() if job.get('completed_at') else job['created_at'].isoformat() + ) + + # Parse JSONB fields if needed + metrics_history = crash_report.get('metrics_history') + if isinstance(metrics_history, str): + try: + metrics_history = json.loads(metrics_history) + except: + metrics_history = [] + + logs_before_crash = crash_report.get('logs_before_crash') + if isinstance(logs_before_crash, str): + try: + logs_before_crash = json.loads(logs_before_crash) + except: + logs_before_crash = [] + + stored_analysis = crash_report.get('analysis') + if isinstance(stored_analysis, str): + try: + stored_analysis = json.loads(stored_analysis) + except: + stored_analysis = None + + # If no analysis stored, generate one + if not stored_analysis: + crash_data = { + 'error_message': crash_report.get('error_message', ''), + 'metrics_history': metrics_history or [], + 'logs_before_crash': logs_before_crash or [], + 'crash_type': crash_report.get('crash_type'), + 'state': crash_report.get('state', {}) + } + analysis = analyze_crash(crash_data) + stored_analysis = { + 'pattern': analysis.pattern, + 'confidence': analysis.confidence, + 'description': analysis.description, + 'suggested_fix': analysis.suggested_fix, + 'auto_fix_params': analysis.auto_fix_params + } + + return CrashReportResponse( + crash_id=crash_report['crash_id'], + job_id=crash_report['job_id'], + crash_type=crash_report['crash_type'], + error_message=crash_report.get('error_message'), + analysis=CrashAnalysisModel(**stored_analysis) if stored_analysis else None, + metrics_history=metrics_history, + logs_before_crash=logs_before_crash, + screenshot_url=crash_report.get('screenshot_url'), + created_at=crash_report['created_at'].isoformat() + ) + + +@app.post("/jobs/{job_id}/retry", response_model=RetryJobResponse, summary="Retry Failed Job") +async def retry_job( + job_id: UUID, + apply_fix: bool = Query(False, description="Apply auto-fix parameters based on crash analysis") +): + """ + Retry a failed or partial job, optionally applying auto-fix parameters. + + When apply_fix=true: + - Analyzes the crash pattern from the original job + - Applies recommended parameter adjustments (e.g., reduced batch size for memory issues) + - Creates a new job with the adjusted parameters + + Returns the new job ID for tracking. + """ + if not db: + raise HTTPException(status_code=500, detail="Database not initialized") + + # Get original job + original_job = await db.get_job(job_id) + if not original_job: + raise HTTPException(status_code=404, detail="Job not found") + + # Can only retry failed or partial jobs + if original_job['status'] not in ['failed', 'partial']: + raise HTTPException( + status_code=400, + detail=f"Cannot retry job with status '{original_job['status']}' - only failed or partial jobs can be retried" + ) + + # Parse original metadata + original_metadata = original_job.get('metadata') + if isinstance(original_metadata, str): + try: + original_metadata = json.loads(original_metadata) + except: + original_metadata = {} + original_metadata = original_metadata or {} + + applied_fixes = None + + if apply_fix: + # Get crash analysis to determine fixes + scrape_logs = original_job.get('scrape_logs') + if isinstance(scrape_logs, str): + try: + scrape_logs = json.loads(scrape_logs) + except: + scrape_logs = [] + + metrics_history = original_job.get('metrics_history') + if isinstance(metrics_history, str): + try: + metrics_history = json.loads(metrics_history) + except: + metrics_history = [] + + crash_data = { + 'error_message': original_job.get('error_message', 'Unknown error'), + 'metrics_history': metrics_history or [], + 'logs_before_crash': scrape_logs or [], + 'state': { + 'reviews_extracted': original_job.get('reviews_count', 0), + 'total_reviews': original_job.get('total_reviews') + } + } + + analysis = analyze_crash(crash_data) + + if analysis.auto_fix_params: + # Get current scraper params from metadata or use defaults + current_params = original_metadata.get('scraper_params', {}) + + # Apply the auto-fix parameters + fixed_params = apply_auto_fix(analysis.pattern, current_params) + + # Store applied fixes in metadata + original_metadata['scraper_params'] = fixed_params + original_metadata['retry_info'] = { + 'original_job_id': str(job_id), + 'crash_pattern': analysis.pattern, + 'applied_fixes': analysis.auto_fix_params + } + + applied_fixes = analysis.auto_fix_params + log.info(f"Applying auto-fix for pattern '{analysis.pattern}': {applied_fixes}") + + # Create new job with same URL and (possibly modified) metadata + new_job_id = await db.create_job( + url=original_job['url'], + webhook_url=original_job.get('webhook_url'), + webhook_secret=original_job.get('webhook_secret'), + metadata=original_metadata + ) + + # Start the new scraping job + asyncio.create_task(run_scraping_job(new_job_id)) + + log.info(f"Created retry job {new_job_id} for original job {job_id}") + + return RetryJobResponse( + job_id=str(new_job_id), + status="started", + message=f"Retry job created from original job {job_id}", + applied_fixes=applied_fixes + ) + + +@app.get("/crashes/stats", response_model=CrashStatsResponse, summary="Get Crash Statistics") +async def get_crash_stats( + days: int = Query(7, description="Number of days to look back", ge=1, le=90) +): + """ + Get aggregate crash statistics and pattern analysis. + + Analyzes all crash reports from the specified time period to identify: + - Most common crash patterns + - Confidence scores for pattern detection + - Recommended fixes based on recurring patterns + + Use this to identify systemic issues and optimize scraper configuration. + """ + if not db: + raise HTTPException(status_code=500, detail="Database not initialized") + + # Get basic crash stats from database + basic_stats = await db.get_crash_stats(days=days) + + # Get all failed/partial jobs for deeper analysis + failed_jobs = await db.list_jobs(status=JobStatus.FAILED, limit=500) + partial_jobs = await db.list_jobs(status=JobStatus.PARTIAL, limit=500) + + all_crash_jobs = failed_jobs + partial_jobs + + # Filter by time if needed (list_jobs doesn't have date filter) + cutoff = datetime.now() - timedelta(days=days) + recent_crash_jobs = [ + job for job in all_crash_jobs + if job.get('completed_at') and job['completed_at'] > cutoff + ] + + if not recent_crash_jobs: + return CrashStatsResponse( + total_crashes=0, + patterns={}, + most_common=None, + recommendations=[] + ) + + # Build crash reports for analysis + crash_reports = [] + for job in recent_crash_jobs: + scrape_logs = job.get('scrape_logs') + if isinstance(scrape_logs, str): + try: + scrape_logs = json.loads(scrape_logs) + except: + scrape_logs = [] + + crash_reports.append({ + 'error_message': job.get('error_message', ''), + 'metrics_history': [], # Not stored in job list query + 'logs_before_crash': scrape_logs or [], + 'state': { + 'reviews_extracted': job.get('reviews_count', 0), + 'total_reviews': job.get('total_reviews') + } + }) + + # Use summarize_crash_patterns for deep analysis + summary = summarize_crash_patterns(crash_reports) + + # Convert patterns to response model format + patterns_response = {} + for pattern_name, stats in summary.get('patterns', {}).items(): + patterns_response[pattern_name] = CrashPatternStats( + count=stats['count'], + percentage=stats['percentage'], + avg_confidence=stats['avg_confidence'] + ) + + return CrashStatsResponse( + total_crashes=summary.get('total_crashes', 0), + patterns=patterns_response, + most_common=summary.get('most_common'), + recommendations=summary.get('recommendations', []) + ) + + # ==================== Health Check Endpoints ==================== @app.get("/health/live", summary="Liveness Probe") diff --git a/web/components/JobDevTools/MetricsDashboard.tsx b/web/components/JobDevTools/MetricsDashboard.tsx new file mode 100644 index 0000000..d4b4e81 --- /dev/null +++ b/web/components/JobDevTools/MetricsDashboard.tsx @@ -0,0 +1,386 @@ +'use client'; + +import { useMemo } from 'react'; +import { + LineChart, + Line, + AreaChart, + Area, + XAxis, + YAxis, + CartesianGrid, + Tooltip, + ResponsiveContainer, + ReferenceLine, +} from 'recharts'; +import { Activity, TrendingUp, HardDrive, Scroll } from 'lucide-react'; + +/** + * Represents a single metrics sample collected during job execution + */ +export interface MetricsSample { + timestamp_ms: number; + reviews_extracted: number; + scroll_count: number; + memory_mb: number; + extraction_rate: number; // reviews per second +} + +interface MetricsDashboardProps { + metricsHistory: MetricsSample[]; + currentMetrics?: MetricsSample; + isStreaming: boolean; +} + +/** + * Formats a timestamp (in ms) to a relative time string + * e.g., "0s", "30s", "1m", "1m 30s", etc. + */ +function formatRelativeTime(timestampMs: number, startMs: number): string { + const elapsedMs = timestampMs - startMs; + const totalSeconds = Math.floor(elapsedMs / 1000); + + if (totalSeconds < 60) { + return `${totalSeconds}s`; + } + + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + + if (seconds === 0) { + return `${minutes}m`; + } + + return `${minutes}m ${seconds}s`; +} + +/** + * MetricsDashboard displays real-time metrics during job execution + * with charts for extraction rate, cumulative reviews, and memory usage. + */ +export default function MetricsDashboard({ + metricsHistory, + currentMetrics, + isStreaming, +}: MetricsDashboardProps) { + // Determine the starting timestamp for relative time calculations + const startTimestamp = useMemo(() => { + if (metricsHistory.length > 0) { + return metricsHistory[0].timestamp_ms; + } + return currentMetrics?.timestamp_ms ?? Date.now(); + }, [metricsHistory, currentMetrics]); + + // Transform metrics history for charts with relative time labels + const chartData = useMemo(() => { + return metricsHistory.map((sample) => ({ + ...sample, + time: formatRelativeTime(sample.timestamp_ms, startTimestamp), + timeMs: sample.timestamp_ms - startTimestamp, + })); + }, [metricsHistory, startTimestamp]); + + // Get the latest metrics (either current or last from history) + const latestMetrics = currentMetrics ?? metricsHistory[metricsHistory.length - 1]; + + // Memory warning threshold + const MEMORY_WARNING_MB = 1500; + + // Check if memory is above warning threshold + const isMemoryWarning = latestMetrics && latestMetrics.memory_mb >= MEMORY_WARNING_MB; + + // Custom tooltip style + const tooltipStyle = { + backgroundColor: '#1f2937', + border: '1px solid #374151', + borderRadius: '8px', + padding: '8px 12px', + }; + + return ( +
+ Green checkmark = Test passed (bot detection evaded) +
++ Red X = Test failed (may have been detected as a bot) +
++ Yellow warning = Test result unknown +
+No logs to display
++ {logs.length > 0 + ? 'Try adjusting your filters' + : 'Logs will appear here during job execution'} +
+