""" Session Routes for Google Reviews Scraper API Provides session handoff endpoints for efficient validation → scraping workflow. Uses scraper v1.2.0 with session support. Endpoints: POST /sessions/validate - Validate URL, keep browser alive, return session_id POST /sessions/scrape - Scrape using existing session (skips navigation) GET /sessions - List active sessions GET /sessions/{id} - Get session status DELETE /sessions/{id} - Release session manually Usage: 1. POST /sessions/validate with URL → returns session_id 2. Frontend shows business info to user for confirmation 3. POST /sessions/scrape with session_id → scrapes using existing browser """ import asyncio import logging from typing import Optional, Dict, Any from fastapi import APIRouter, HTTPException from pydantic import BaseModel, HttpUrl, Field # Import v1.2.0 scraper with session support from scrapers.google_reviews.v1_2_0 import ( validate_with_session, scrape_with_session, LogCapture ) from scrapers.google_reviews.session_manager import get_session_manager log = logging.getLogger("api_sessions") # Create router router = APIRouter(prefix="/sessions", tags=["sessions"]) # ============================================================================ # Request/Response Models # ============================================================================ class GeoLocation(BaseModel): lat: float lng: float class Viewport(BaseModel): width: int height: int class BrowserFingerprint(BaseModel): userAgent: Optional[str] = None timezone: Optional[str] = None language: Optional[str] = None platform: Optional[str] = None viewport: Optional[Viewport] = None geolocation: Optional[GeoLocation] = None class ValidateRequest(BaseModel): """Request body for session validation.""" url: HttpUrl = Field(..., description="Google Maps URL to validate") browser_fingerprint: Optional[BrowserFingerprint] = None geolocation: Optional[GeoLocation] = None session_ttl: int = Field(300, description="Session TTL in seconds (default: 5 min)", ge=60, le=900) class ValidateResponse(BaseModel): """Response from session validation.""" session_id: Optional[str] = Field(None, description="Session ID for scraping (None if validation failed)") business_info: Dict[str, Any] = Field(default_factory=dict) total_reviews: Optional[int] = None success: bool error: Optional[str] = None expires_in: Optional[int] = Field(None, description="Seconds until session expires") class ScrapeWithSessionRequest(BaseModel): """Request body for scraping with an existing session.""" session_id: str = Field(..., description="Session ID from validation") max_reviews: Optional[int] = Field(None, description="Max reviews to collect (None = unlimited)", ge=1, le=50000) sort_strategy: str = Field("auto", description="Sort strategy: auto, multi, newest, lowest, highest, relevant") initial_sort: Optional[str] = Field(None, description="Initial sort order for first pass") class ScrapeWithSessionResponse(BaseModel): """Response from session-based scraping.""" reviews: list = Field(default_factory=list) count: int = 0 total_reviews: int = 0 success: bool error: Optional[str] = None time: float = 0 session_reused: bool = Field(True, description="Indicates session was reused from validation") business_info: Dict[str, Any] = Field(default_factory=dict) class SessionInfo(BaseModel): """Information about an active session.""" session_id: str business: str state: str total_reviews: int age_seconds: int ttl_remaining: int class SessionListResponse(BaseModel): """Response listing all active sessions.""" total_sessions: int sessions: list # ============================================================================ # Endpoints # ============================================================================ @router.post("/validate", response_model=ValidateResponse, summary="Validate URL and Create Session") async def validate_and_create_session(request: ValidateRequest): """ Validate a Google Maps URL and keep the browser session alive for scraping. This endpoint: 1. Creates a Chrome browser 2. Navigates to the Google Maps URL 3. Extracts business information 4. Keeps the browser ALIVE and returns a session_id The session can then be used with /sessions/scrape to continue scraping without re-navigating (saves 4-16 seconds per job). Session expires after TTL (default: 5 minutes). """ try: url = str(request.url) log.info(f"Validating URL with session: {url[:80]}...") # Build fingerprint dict fingerprint = None if request.browser_fingerprint: fp = request.browser_fingerprint fingerprint = { "userAgent": fp.userAgent, "timezone": fp.timezone, "language": fp.language, "platform": fp.platform, } if fp.viewport: fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height} if fp.geolocation: fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng} elif request.geolocation: fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}} # Run validation in thread (blocks Chrome operations) result = await asyncio.to_thread( validate_with_session, url=url, headless=False, # Headed Chrome with Xvfb browser_fingerprint=fingerprint, session_ttl=request.session_ttl ) return ValidateResponse( session_id=result.get("session_id"), business_info=result.get("business_info", {}), total_reviews=result.get("total_reviews"), success=result.get("success", False), error=result.get("error"), expires_in=result.get("expires_in") ) except Exception as e: log.error(f"Session validation error: {e}") return ValidateResponse( session_id=None, success=False, error=str(e) ) @router.post("/scrape", response_model=ScrapeWithSessionResponse, summary="Scrape Using Existing Session") async def scrape_using_session(request: ScrapeWithSessionRequest): """ Scrape reviews using an existing validated session. This endpoint: 1. Retrieves the browser from the session (already on Google Maps page) 2. Skips navigation and consent handling (already done) 3. Clicks Reviews tab and starts scraping 4. Releases the session when done Saves 4-16 seconds compared to starting fresh. """ try: log.info(f"Scraping with session {request.session_id}...") # Run scraping in thread result = await asyncio.to_thread( scrape_with_session, session_id=request.session_id, max_reviews=request.max_reviews, sort_strategy=request.sort_strategy, initial_sort=request.initial_sort ) return ScrapeWithSessionResponse( reviews=result.get("reviews", []), count=result.get("count", 0), total_reviews=result.get("total_reviews", 0), success=result.get("success", False), error=result.get("error"), time=result.get("time", 0), session_reused=result.get("session_reused", True), business_info=result.get("business_info", {}) ) except Exception as e: log.error(f"Session scraping error: {e}") return ScrapeWithSessionResponse( success=False, error=str(e) ) @router.get("", response_model=SessionListResponse, summary="List Active Sessions") async def list_sessions(): """ List all active browser sessions. Returns information about each session including: - Business name - State (validated, scraping) - Time until expiration """ session_manager = get_session_manager() stats = session_manager.get_stats() return SessionListResponse( total_sessions=stats.get("total_sessions", 0), sessions=stats.get("sessions", []) ) @router.get("/{session_id}", summary="Get Session Status") async def get_session_status(session_id: str): """ Get the status of a specific session. """ session_manager = get_session_manager() session = session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found or expired") import time now = time.time() return { "session_id": session.session_id, "business": session.business_info.get("name", "unknown"), "state": session.state, "total_reviews": session.total_reviews, "url": session.url, "age_seconds": int(now - session.created_at), "ttl_remaining": int(session.expires_at - now) } @router.delete("/{session_id}", summary="Release Session") async def release_session(session_id: str): """ Manually release a session and close its browser. Use this if the user cancels before scraping. """ session_manager = get_session_manager() session = session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found or expired") session_manager.release_session(session_id, reason="manual_release") return { "success": True, "message": f"Session {session_id} released" } # ============================================================================ # Helper to register router with main app # ============================================================================ def register_session_routes(app): """Register session routes with the FastAPI app.""" app.include_router(router) log.info("Session routes registered at /sessions")