Files
2026-02-02 18:19:00 +00:00

301 lines
10 KiB
Python

"""
Session Routes for Google Reviews Scraper API
Provides session handoff endpoints for efficient validation → scraping workflow.
Uses scraper v1.2.0 with session support.
Endpoints:
POST /sessions/validate - Validate URL, keep browser alive, return session_id
POST /sessions/scrape - Scrape using existing session (skips navigation)
GET /sessions - List active sessions
GET /sessions/{id} - Get session status
DELETE /sessions/{id} - Release session manually
Usage:
1. POST /sessions/validate with URL → returns session_id
2. Frontend shows business info to user for confirmation
3. POST /sessions/scrape with session_id → scrapes using existing browser
"""
import asyncio
import logging
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, HttpUrl, Field
# Import v1.2.0 scraper with session support
from scrapers.google_reviews.v1_2_0 import (
validate_with_session,
scrape_with_session,
LogCapture
)
from scrapers.google_reviews.session_manager import get_session_manager
log = logging.getLogger("api_sessions")
# Create router
router = APIRouter(prefix="/sessions", tags=["sessions"])
# ============================================================================
# Request/Response Models
# ============================================================================
class GeoLocation(BaseModel):
lat: float
lng: float
class Viewport(BaseModel):
width: int
height: int
class BrowserFingerprint(BaseModel):
userAgent: Optional[str] = None
timezone: Optional[str] = None
language: Optional[str] = None
platform: Optional[str] = None
viewport: Optional[Viewport] = None
geolocation: Optional[GeoLocation] = None
class ValidateRequest(BaseModel):
"""Request body for session validation."""
url: HttpUrl = Field(..., description="Google Maps URL to validate")
browser_fingerprint: Optional[BrowserFingerprint] = None
geolocation: Optional[GeoLocation] = None
session_ttl: int = Field(300, description="Session TTL in seconds (default: 5 min)", ge=60, le=900)
class ValidateResponse(BaseModel):
"""Response from session validation."""
session_id: Optional[str] = Field(None, description="Session ID for scraping (None if validation failed)")
business_info: Dict[str, Any] = Field(default_factory=dict)
total_reviews: Optional[int] = None
success: bool
error: Optional[str] = None
expires_in: Optional[int] = Field(None, description="Seconds until session expires")
class ScrapeWithSessionRequest(BaseModel):
"""Request body for scraping with an existing session."""
session_id: str = Field(..., description="Session ID from validation")
max_reviews: Optional[int] = Field(None, description="Max reviews to collect (None = unlimited)", ge=1, le=50000)
sort_strategy: str = Field("auto", description="Sort strategy: auto, multi, newest, lowest, highest, relevant")
initial_sort: Optional[str] = Field(None, description="Initial sort order for first pass")
class ScrapeWithSessionResponse(BaseModel):
"""Response from session-based scraping."""
reviews: list = Field(default_factory=list)
count: int = 0
total_reviews: int = 0
success: bool
error: Optional[str] = None
time: float = 0
session_reused: bool = Field(True, description="Indicates session was reused from validation")
business_info: Dict[str, Any] = Field(default_factory=dict)
class SessionInfo(BaseModel):
"""Information about an active session."""
session_id: str
business: str
state: str
total_reviews: int
age_seconds: int
ttl_remaining: int
class SessionListResponse(BaseModel):
"""Response listing all active sessions."""
total_sessions: int
sessions: list
# ============================================================================
# Endpoints
# ============================================================================
@router.post("/validate", response_model=ValidateResponse, summary="Validate URL and Create Session")
async def validate_and_create_session(request: ValidateRequest):
"""
Validate a Google Maps URL and keep the browser session alive for scraping.
This endpoint:
1. Creates a Chrome browser
2. Navigates to the Google Maps URL
3. Extracts business information
4. Keeps the browser ALIVE and returns a session_id
The session can then be used with /sessions/scrape to continue scraping
without re-navigating (saves 4-16 seconds per job).
Session expires after TTL (default: 5 minutes).
"""
try:
url = str(request.url)
log.info(f"Validating URL with session: {url[:80]}...")
# Build fingerprint dict
fingerprint = None
if request.browser_fingerprint:
fp = request.browser_fingerprint
fingerprint = {
"userAgent": fp.userAgent,
"timezone": fp.timezone,
"language": fp.language,
"platform": fp.platform,
}
if fp.viewport:
fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height}
if fp.geolocation:
fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
elif request.geolocation:
fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}}
# Run validation in thread (blocks Chrome operations)
result = await asyncio.to_thread(
validate_with_session,
url=url,
headless=False, # Headed Chrome with Xvfb
browser_fingerprint=fingerprint,
session_ttl=request.session_ttl
)
return ValidateResponse(
session_id=result.get("session_id"),
business_info=result.get("business_info", {}),
total_reviews=result.get("total_reviews"),
success=result.get("success", False),
error=result.get("error"),
expires_in=result.get("expires_in")
)
except Exception as e:
log.error(f"Session validation error: {e}")
return ValidateResponse(
session_id=None,
success=False,
error=str(e)
)
@router.post("/scrape", response_model=ScrapeWithSessionResponse, summary="Scrape Using Existing Session")
async def scrape_using_session(request: ScrapeWithSessionRequest):
"""
Scrape reviews using an existing validated session.
This endpoint:
1. Retrieves the browser from the session (already on Google Maps page)
2. Skips navigation and consent handling (already done)
3. Clicks Reviews tab and starts scraping
4. Releases the session when done
Saves 4-16 seconds compared to starting fresh.
"""
try:
log.info(f"Scraping with session {request.session_id}...")
# Run scraping in thread
result = await asyncio.to_thread(
scrape_with_session,
session_id=request.session_id,
max_reviews=request.max_reviews,
sort_strategy=request.sort_strategy,
initial_sort=request.initial_sort
)
return ScrapeWithSessionResponse(
reviews=result.get("reviews", []),
count=result.get("count", 0),
total_reviews=result.get("total_reviews", 0),
success=result.get("success", False),
error=result.get("error"),
time=result.get("time", 0),
session_reused=result.get("session_reused", True),
business_info=result.get("business_info", {})
)
except Exception as e:
log.error(f"Session scraping error: {e}")
return ScrapeWithSessionResponse(
success=False,
error=str(e)
)
@router.get("", response_model=SessionListResponse, summary="List Active Sessions")
async def list_sessions():
"""
List all active browser sessions.
Returns information about each session including:
- Business name
- State (validated, scraping)
- Time until expiration
"""
session_manager = get_session_manager()
stats = session_manager.get_stats()
return SessionListResponse(
total_sessions=stats.get("total_sessions", 0),
sessions=stats.get("sessions", [])
)
@router.get("/{session_id}", summary="Get Session Status")
async def get_session_status(session_id: str):
"""
Get the status of a specific session.
"""
session_manager = get_session_manager()
session = session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found or expired")
import time
now = time.time()
return {
"session_id": session.session_id,
"business": session.business_info.get("name", "unknown"),
"state": session.state,
"total_reviews": session.total_reviews,
"url": session.url,
"age_seconds": int(now - session.created_at),
"ttl_remaining": int(session.expires_at - now)
}
@router.delete("/{session_id}", summary="Release Session")
async def release_session(session_id: str):
"""
Manually release a session and close its browser.
Use this if the user cancels before scraping.
"""
session_manager = get_session_manager()
session = session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found or expired")
session_manager.release_session(session_id, reason="manual_release")
return {
"success": True,
"message": f"Session {session_id} released"
}
# ============================================================================
# Helper to register router with main app
# ============================================================================
def register_session_routes(app):
"""Register session routes with the FastAPI app."""
app.include_router(router)
log.info("Session routes registered at /sessions")