#!/usr/bin/env python3 """ Smart health check system with canary testing. Verifies that scraping actually works, not just that services are up. """ import asyncio import logging from datetime import datetime, timedelta from typing import Dict, Any, Optional import os log = logging.getLogger(__name__) class CanaryMonitor: """ Background canary test monitor. Runs actual scraping tests periodically to verify the scraper works. This catches issues like: - Google Maps page structure changes - Broken CSS selectors - GDPR consent handling issues - Network/proxy problems - Chrome/browser issues """ def __init__( self, db, interval_hours: int = 4, test_url: Optional[str] = None ): """ Initialize canary monitor. Args: db: Database manager instance interval_hours: How often to run canary tests test_url: Optional test URL (defaults to Soho Factory in Vilnius) """ self.db = db self.interval = timedelta(hours=interval_hours) self.test_url = test_url or os.getenv( 'CANARY_TEST_URL', 'https://www.google.com/maps/place/Soho+Factory/@54.6738155,25.2595844,17z/' ) self.running = False self.last_run: Optional[datetime] = None self.last_success: Optional[datetime] = None self.consecutive_failures = 0 self.last_result: Optional[Dict[str, Any]] = None async def start(self): """Start the background canary monitoring""" self.running = True log.info(f"Canary monitor started (interval: {self.interval.total_seconds()/3600:.1f}h)") while self.running: try: await self.run_canary_test() except Exception as e: log.error(f"Canary test failed with exception: {e}") self.consecutive_failures += 1 # Alert if multiple consecutive failures if self.consecutive_failures >= 3: await self.send_alert( f"🚨 CRITICAL: Scraper canary failed {self.consecutive_failures} times in a row! " f"Last error: {str(e)[:200]}" ) # Sleep until next run await asyncio.sleep(self.interval.total_seconds()) def stop(self): """Stop the background monitoring""" self.running = False log.info("Canary monitor stopped") async def run_canary_test(self): """ Run a single canary test. This performs an actual scrape on a known test URL and validates: - Scraping succeeds - Reviews are extracted - Review count is reasonable - Scrape time is reasonable - Data structure is valid """ from modules.fast_scraper import fast_scrape_reviews log.info(f"Running canary scrape test on {self.test_url[:60]}...") self.last_run = datetime.now() try: # Run actual scrape with timeout result = await asyncio.wait_for( asyncio.to_thread( fast_scrape_reviews, url=self.test_url, headless=True, max_scrolls=10 # Limited for canary ), timeout=60 # Fail if takes > 60s ) # Validate result checks = { "scrape_succeeded": result['success'], "got_reviews": result['count'] > 0, "reasonable_count": 10 <= result['count'] <= 500, "reasonable_time": result['time'] < 30, "data_structure_valid": self._validate_review_structure(result.get('reviews', [])) } all_passed = all(checks.values()) if all_passed: # Success! log.info( f"✅ Canary test PASSED: {result['count']} reviews in {result['time']:.1f}s" ) self.consecutive_failures = 0 self.last_success = datetime.now() self.last_result = { "status": "pass", "reviews_count": result['count'], "scrape_time": result['time'], "checks": checks } # Save to database await self.db.save_canary_result( success=True, reviews_count=result['count'], scrape_time=result['time'], metadata={"checks": checks} ) else: # Validation failed failed_checks = [k for k, v in checks.items() if not v] log.error( f"❌ Canary test FAILED: validation failed on {failed_checks}" ) self.consecutive_failures += 1 self.last_result = { "status": "fail", "reviews_count": result['count'], "scrape_time": result['time'], "checks": checks, "failed_checks": failed_checks } # Save to database await self.db.save_canary_result( success=False, reviews_count=result['count'], scrape_time=result['time'], error_message=f"Validation failed: {failed_checks}", metadata={"checks": checks} ) # Alert on failure if self.consecutive_failures >= 3: await self.send_alert( f"🚨 CRITICAL: Canary validation failed {self.consecutive_failures} times! " f"Failed checks: {failed_checks}" ) except asyncio.TimeoutError: log.error("❌ Canary test TIMEOUT (>60s)") self.consecutive_failures += 1 self.last_result = { "status": "timeout", "error": "Scrape took longer than 60 seconds" } await self.db.save_canary_result( success=False, error_message="Timeout after 60 seconds" ) if self.consecutive_failures >= 3: await self.send_alert( f"🚨 CRITICAL: Canary timeout {self.consecutive_failures} times!" ) except Exception as e: log.error(f"❌ Canary test ERROR: {e}") self.consecutive_failures += 1 self.last_result = { "status": "error", "error": str(e) } await self.db.save_canary_result( success=False, error_message=str(e) ) raise # Re-raise to trigger alert in main loop def _validate_review_structure(self, reviews) -> bool: """ Validate that reviews have expected structure. Args: reviews: List of review dictionaries Returns: True if structure is valid """ if not reviews or len(reviews) == 0: return False # Check first review has required fields first_review = reviews[0] required_fields = ['author', 'rating', 'date_text'] return all(field in first_review for field in required_fields) async def send_alert(self, message: str): """ Send alert via configured channels. Args: message: Alert message to send """ log.critical(message) # TODO: Integrate with alerting systems # Examples: # Slack slack_webhook = os.getenv('SLACK_WEBHOOK_URL') if slack_webhook: try: import httpx async with httpx.AsyncClient() as client: await client.post( slack_webhook, json={"text": message}, timeout=5.0 ) log.info("Alert sent to Slack") except Exception as e: log.error(f"Failed to send Slack alert: {e}") # Email (example with SMTP) # smtp_config = os.getenv('SMTP_CONFIG') # if smtp_config: # await send_email( # to=os.getenv('ALERT_EMAIL'), # subject="Scraper Canary Alert", # body=message # ) # PagerDuty # pagerduty_key = os.getenv('PAGERDUTY_KEY') # if pagerduty_key: # await trigger_pagerduty(message) def get_status(self) -> Dict[str, Any]: """ Get current canary status. Returns: Status dictionary """ if not self.last_success: return { "status": "unknown", "message": "No canary tests run yet", "last_run": self.last_run.isoformat() if self.last_run else None } age = datetime.now() - self.last_success max_age = timedelta(hours=6) # Alert if no success in 6 hours if age > max_age: return { "status": "stale", "last_success": self.last_success.isoformat(), "age_hours": age.total_seconds() / 3600, "consecutive_failures": self.consecutive_failures, "message": f"Last successful canary was {age.total_seconds()/3600:.1f} hours ago" } return { "status": "healthy", "last_success": self.last_success.isoformat(), "last_run": self.last_run.isoformat() if self.last_run else None, "age_minutes": age.total_seconds() / 60, "consecutive_failures": self.consecutive_failures, "last_result": self.last_result } class HealthCheckSystem: """ Complete health check system for production. Provides multiple levels of health checks: - Liveness: Is the server alive? - Readiness: Can it handle traffic? - Canary: Does scraping actually work? """ def __init__(self, db): """ Initialize health check system. Args: db: Database manager instance """ self.db = db self.canary = CanaryMonitor(db, interval_hours=4) async def start(self): """Start background health monitoring""" asyncio.create_task(self.canary.start()) def stop(self): """Stop background health monitoring""" self.canary.stop() async def check_liveness(self) -> Dict[str, Any]: """ Liveness check: Is the server alive? This is a simple check that always succeeds if the server is running. Used by Kubernetes liveness probe - restart container if fails. Returns: Liveness status """ return { "status": "alive", "timestamp": datetime.utcnow().isoformat() } async def check_readiness(self) -> Dict[str, Any]: """ Readiness check: Can the server handle traffic? Checks if dependencies are available. Used by Kubernetes readiness probe - remove from load balancer if fails. Returns: Readiness status """ checks = {} # Check database try: await self.db.pool.fetchval("SELECT 1") checks["database"] = {"healthy": True} except Exception as e: checks["database"] = {"healthy": False, "error": str(e)} # Overall readiness all_healthy = all(c.get("healthy", False) for c in checks.values()) return { "status": "ready" if all_healthy else "not_ready", "checks": checks, "timestamp": datetime.utcnow().isoformat() } async def check_canary(self) -> Dict[str, Any]: """ Canary check: Does scraping actually work? Returns the latest canary test result. Used by external monitoring (PagerDuty, DataDog) for alerts. Returns: Canary status """ return self.canary.get_status() async def get_detailed_health(self) -> Dict[str, Any]: """ Get detailed health status of all components. Returns: Complete health status """ liveness = await self.check_liveness() readiness = await self.check_readiness() canary = await self.check_canary() overall_healthy = ( liveness["status"] == "alive" and readiness["status"] == "ready" and canary["status"] in ["healthy", "unknown"] # Unknown is OK (first run) ) return { "status": "healthy" if overall_healthy else "degraded", "components": { "liveness": liveness, "readiness": readiness, "canary": canary }, "timestamp": datetime.utcnow().isoformat() }