Files
2026-02-02 18:19:00 +00:00

267 lines
8.6 KiB
Python

"""
Session Manager for Google Reviews Scraper
Manages browser sessions between validation and scraping phases.
Allows reusing the same browser instance to avoid duplicate navigation.
Usage:
# During validation
session_id = session_manager.create_session(driver, business_info, total_reviews)
return {"session_id": session_id, "business_info": business_info}
# During scraping (with session_id from validation)
session = session_manager.get_session(session_id)
if session:
driver = session['driver']
# Continue from where validation left off
"""
import uuid
import time
import threading
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class BrowserSession:
"""Represents a validated browser session ready for scraping."""
session_id: str
driver: Any # WebDriver instance
url: str
business_info: Dict[str, Any]
total_reviews: int
created_at: float
expires_at: float
browser_fingerprint: Optional[Dict[str, Any]] = None
log_capture: Any = None # LogCapture instance
# Track session state
state: str = "validated" # validated -> scraping -> completed/expired
class SessionManager:
"""
Manages browser sessions between validation and scraping.
Sessions have a TTL (default 5 minutes) after which they're automatically
cleaned up and the browser is closed.
"""
DEFAULT_TTL_SECONDS = 300 # 5 minutes
CLEANUP_INTERVAL_SECONDS = 30 # Check for expired sessions every 30s
def __init__(self, ttl_seconds: int = None):
self.ttl_seconds = ttl_seconds or self.DEFAULT_TTL_SECONDS
self._sessions: Dict[str, BrowserSession] = {}
self._lock = threading.RLock()
self._cleanup_thread: Optional[threading.Thread] = None
self._running = False
def start(self):
"""Start the background cleanup thread."""
if self._running:
return
self._running = True
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._cleanup_thread.start()
def stop(self):
"""Stop the background cleanup thread."""
self._running = False
if self._cleanup_thread:
self._cleanup_thread.join(timeout=5)
def _cleanup_loop(self):
"""Background loop to clean up expired sessions."""
while self._running:
try:
self._cleanup_expired()
except Exception as e:
print(f"[SessionManager] Cleanup error: {e}")
time.sleep(self.CLEANUP_INTERVAL_SECONDS)
def _cleanup_expired(self):
"""Remove expired sessions and close their browsers."""
now = time.time()
expired_ids = []
with self._lock:
for session_id, session in self._sessions.items():
if now > session.expires_at:
expired_ids.append(session_id)
for session_id in expired_ids:
self.release_session(session_id, reason="expired")
def create_session(
self,
driver: Any,
url: str,
business_info: Dict[str, Any],
total_reviews: int,
browser_fingerprint: Optional[Dict[str, Any]] = None,
log_capture: Any = None,
ttl_seconds: Optional[int] = None
) -> str:
"""
Create a new browser session after validation.
Args:
driver: WebDriver instance (positioned on Google Maps page)
url: The validated Google Maps URL
business_info: Extracted business information
total_reviews: Total review count from page
browser_fingerprint: Browser fingerprint settings used
log_capture: LogCapture instance for logging
ttl_seconds: Custom TTL for this session (default: 5 min)
Returns:
session_id: Unique identifier for this session
"""
session_id = str(uuid.uuid4())[:8] # Short ID for easier use
ttl = ttl_seconds or self.ttl_seconds
now = time.time()
session = BrowserSession(
session_id=session_id,
driver=driver,
url=url,
business_info=business_info,
total_reviews=total_reviews,
created_at=now,
expires_at=now + ttl,
browser_fingerprint=browser_fingerprint,
log_capture=log_capture,
state="validated"
)
with self._lock:
self._sessions[session_id] = session
print(f"[SessionManager] Created session {session_id} for {business_info.get('name', 'unknown')} (TTL: {ttl}s)")
return session_id
def get_session(self, session_id: str) -> Optional[BrowserSession]:
"""
Retrieve a session by ID.
Args:
session_id: The session identifier
Returns:
BrowserSession if found and not expired, None otherwise
"""
with self._lock:
session = self._sessions.get(session_id)
if not session:
print(f"[SessionManager] Session {session_id} not found")
return None
# Check if expired
if time.time() > session.expires_at:
print(f"[SessionManager] Session {session_id} expired")
self.release_session(session_id, reason="expired")
return None
return session
def claim_session(self, session_id: str) -> Optional[BrowserSession]:
"""
Claim a session for scraping (marks it as in-use).
Args:
session_id: The session identifier
Returns:
BrowserSession if successfully claimed, None otherwise
"""
with self._lock:
session = self.get_session(session_id)
if not session:
return None
if session.state != "validated":
print(f"[SessionManager] Session {session_id} already in state: {session.state}")
return None
session.state = "scraping"
# Extend TTL during scraping (1 hour max)
session.expires_at = time.time() + 3600
print(f"[SessionManager] Claimed session {session_id} for scraping")
return session
def release_session(self, session_id: str, reason: str = "completed"):
"""
Release a session and close the browser.
Args:
session_id: The session identifier
reason: Why the session is being released
"""
with self._lock:
session = self._sessions.pop(session_id, None)
if session:
print(f"[SessionManager] Releasing session {session_id} ({reason})")
try:
if session.driver:
session.driver.quit()
except Exception as e:
print(f"[SessionManager] Error closing driver for {session_id}: {e}")
def extend_session(self, session_id: str, additional_seconds: int = 300) -> bool:
"""
Extend a session's TTL.
Args:
session_id: The session identifier
additional_seconds: Seconds to add to TTL
Returns:
True if extended, False if session not found
"""
with self._lock:
session = self._sessions.get(session_id)
if not session:
return False
session.expires_at = time.time() + additional_seconds
return True
def get_stats(self) -> Dict[str, Any]:
"""Get session manager statistics."""
with self._lock:
now = time.time()
sessions = []
for sid, s in self._sessions.items():
sessions.append({
"session_id": sid,
"business": s.business_info.get("name", "unknown"),
"state": s.state,
"age_seconds": int(now - s.created_at),
"ttl_remaining": int(s.expires_at - now)
})
return {
"total_sessions": len(self._sessions),
"sessions": sessions
}
def list_sessions(self) -> list:
"""List all active sessions."""
with self._lock:
return list(self._sessions.keys())
# Global singleton instance
_session_manager: Optional[SessionManager] = None
def get_session_manager() -> SessionManager:
"""Get or create the global session manager instance."""
global _session_manager
if _session_manager is None:
_session_manager = SessionManager()
_session_manager.start()
return _session_manager