""" API Interceptor for Google Maps Reviews. Uses Chrome DevTools Protocol (CDP) to intercept network requests and capture Google's internal API responses for faster, more reliable data extraction. """ import base64 import json import logging import os import re import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional from urllib.parse import parse_qs, urlparse log = logging.getLogger("api_interceptor") @dataclass class InterceptedReview: """Data class for a review extracted from API response""" review_id: str = "" author: str = "" rating: float = 0.0 text: str = "" date_text: str = "" timestamp: int = 0 likes: int = 0 photos: List[str] = field(default_factory=list) profile_url: str = "" avatar_url: str = "" owner_response: str = "" owner_response_date: str = "" lang: str = "" class GoogleMapsAPIInterceptor: """ Intercepts Google Maps internal API calls to capture review data directly. Google Maps uses several internal endpoints for reviews: - /maps/preview/review/listentitiesreviews - Main reviews endpoint - /maps/rpc/placereview - Alternative review endpoint - /maps/preview/reviewsdata - Review data endpoint The responses are often in a custom protobuf-like JSON format that needs parsing. """ # Patterns for review-related API endpoints REVIEW_API_PATTERNS = [ r'maps/preview/review', r'maps/rpc/placereview', r'maps/preview/reviewsdata', r'maps/preview/place', r'maps/api/place', r'/locationhistory/preview', r'batchexecute.*review', ] def __init__(self, driver): """Initialize the interceptor with a Selenium driver""" self.driver = driver self.captured_responses: List[Dict[str, Any]] = [] self.captured_reviews: List[InterceptedReview] = [] self.request_map: Dict[str, Dict] = {} # Map request IDs to URLs self._lock = threading.Lock() self._listening = False self._response_callback: Optional[Callable] = None def setup_interception(self): """Enable network interception via CDP""" try: # Enable network domain self.driver.execute_cdp_cmd('Network.enable', {}) # Set up request interception patterns self.driver.execute_cdp_cmd('Network.setRequestInterception', { 'patterns': [ {'urlPattern': '*maps*review*', 'resourceType': 'XHR'}, {'urlPattern': '*maps*review*', 'resourceType': 'Fetch'}, {'urlPattern': '*batchexecute*', 'resourceType': 'XHR'}, {'urlPattern': '*batchexecute*', 'resourceType': 'Fetch'}, ] }) self._listening = True log.info("API interception enabled via CDP") return True except Exception as e: log.warning(f"Could not enable CDP interception: {e}") # Try alternative approach return self._setup_performance_logging() def _setup_performance_logging(self): """Alternative approach using Performance logging""" try: self.driver.execute_cdp_cmd('Network.enable', { 'maxTotalBufferSize': 10000000, 'maxResourceBufferSize': 5000000 }) self._listening = True log.info("API interception enabled via performance logging") return True except Exception as e: log.error(f"Failed to setup performance logging: {e}") return False def capture_network_responses(self, duration: float = 5.0): """ Capture network responses for a specified duration. Call this while scrolling/loading more reviews. """ if not self._listening: log.warning("Interception not set up, call setup_interception() first") return [] captured = [] start_time = time.time() while time.time() - start_time < duration: try: # Get performance logs which contain network events logs = self.driver.get_log('performance') for entry in logs: try: log_data = json.loads(entry['message']) message = log_data.get('message', {}) method = message.get('method', '') params = message.get('params', {}) # Capture response received events if method == 'Network.responseReceived': response = params.get('response', {}) url = response.get('url', '') if self._is_review_api(url): request_id = params.get('requestId') self.request_map[request_id] = { 'url': url, 'status': response.get('status'), 'headers': response.get('headers', {}) } # Capture response body when loading is finished elif method == 'Network.loadingFinished': request_id = params.get('requestId') if request_id in self.request_map: body = self._get_response_body(request_id) if body: captured.append({ 'url': self.request_map[request_id]['url'], 'body': body, 'timestamp': time.time() }) except Exception as parse_error: log.debug(f"Error parsing log entry: {parse_error}") continue except Exception as e: # Performance logs might not be available log.debug(f"Could not get performance logs: {e}") break time.sleep(0.1) with self._lock: self.captured_responses.extend(captured) return captured def get_response_bodies_cdp(self): """Get response bodies using CDP directly (more reliable method)""" responses = [] try: # Use CDP to get all responses result = self.driver.execute_cdp_cmd('Network.getAllCookies', {}) # Execute JavaScript to intercept fetch/XHR responses intercept_script = """ (function() { if (window.__interceptedResponses) { var responses = window.__interceptedResponses; window.__interceptedResponses = []; return responses; } return []; })(); """ captured = self.driver.execute_script(intercept_script) if captured: responses.extend(captured) except Exception as e: log.debug(f"CDP response capture error: {e}") return responses def inject_response_interceptor(self): """ Inject JavaScript to intercept XHR/Fetch responses at the browser level. This is the most reliable method for capturing API responses. """ intercept_script = """ (function() { // Skip if already injected if (window.__reviewInterceptorInjected) { console.log('[API Interceptor] Already injected, skipping'); return; } window.__reviewInterceptorInjected = true; window.__interceptedResponses = []; window.__interceptorStats = { totalFetch: 0, totalXHR: 0, capturedFetch: 0, capturedXHR: 0, lastCapture: null }; console.log('[API Interceptor] Initializing...'); // Store original fetch const originalFetch = window.fetch; // Override fetch window.fetch = async function(...args) { window.__interceptorStats.totalFetch++; const url = args[0].toString(); // Log ALL fetch requests for debugging console.debug('[API Interceptor] FETCH:', url.substring(0, 150)); const response = await originalFetch.apply(this, args); // Check if this is a review-related API call if (url.includes('review') || url.includes('batchexecute') || url.includes('place') || url.includes('maps') || url.includes('listugcposts') || url.includes('getreviews')) { try { const clone = response.clone(); const text = await clone.text(); console.log('[API Interceptor] ✅ CAPTURED FETCH:', url.substring(0, 100), 'Size:', text.length); window.__interceptedResponses.push({ url: url, body: text, timestamp: Date.now(), type: 'fetch', size: text.length }); window.__interceptorStats.capturedFetch++; window.__interceptorStats.lastCapture = new Date().toISOString(); // Keep only last 100 responses to avoid memory issues if (window.__interceptedResponses.length > 100) { window.__interceptedResponses = window.__interceptedResponses.slice(-50); } } catch (e) { console.error('[API Interceptor] Response capture error:', e); } } return response; }; // Store original XMLHttpRequest const originalXHR = window.XMLHttpRequest; // Create intercepting XHR window.XMLHttpRequest = function() { const xhr = new originalXHR(); const originalOpen = xhr.open; const originalSend = xhr.send; let requestUrl = ''; xhr.open = function(method, url, ...rest) { requestUrl = url; window.__interceptorStats.totalXHR++; console.debug('[API Interceptor] XHR:', method, url.substring(0, 150)); return originalOpen.apply(this, [method, url, ...rest]); }; xhr.addEventListener('load', function() { if (requestUrl.includes('review') || requestUrl.includes('batchexecute') || requestUrl.includes('place') || requestUrl.includes('maps') || requestUrl.includes('listugcposts') || requestUrl.includes('getreviews')) { try { console.log('[API Interceptor] ✅ CAPTURED XHR:', requestUrl.substring(0, 100), 'Size:', xhr.responseText.length); window.__interceptedResponses.push({ url: requestUrl, body: xhr.responseText, timestamp: Date.now(), type: 'xhr', status: xhr.status, size: xhr.responseText.length }); window.__interceptorStats.capturedXHR++; window.__interceptorStats.lastCapture = new Date().toISOString(); if (window.__interceptedResponses.length > 100) { window.__interceptedResponses = window.__interceptedResponses.slice(-50); } } catch (e) { console.error('[API Interceptor] XHR capture error:', e); } } }); return xhr; }; // Copy static properties for (let prop of Object.getOwnPropertyNames(originalXHR)) { try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch (e) {} } console.log('[API Interceptor] ✅ Injected successfully! Monitoring network requests...'); // Log stats every 10 seconds setInterval(() => { if (window.__interceptorStats.totalFetch > 0 || window.__interceptorStats.totalXHR > 0) { console.log('[API Interceptor] Stats:', 'Fetch:', window.__interceptorStats.totalFetch, '/', window.__interceptorStats.capturedFetch, 'XHR:', window.__interceptorStats.totalXHR, '/', window.__interceptorStats.capturedXHR, 'Queue:', window.__interceptedResponses.length); } }, 10000); return true; })(); """ try: result = self.driver.execute_script(intercept_script) log.info("JavaScript response interceptor injected with enhanced debugging") # Get initial stats stats = self.get_interceptor_stats() log.debug(f"Interceptor stats: {stats}") return True except Exception as e: log.warning(f"Failed to inject interceptor: {e}") return False def get_intercepted_responses(self): """Retrieve intercepted responses from the browser""" try: script = """ if (window.__interceptedResponses) { var responses = window.__interceptedResponses.slice(); window.__interceptedResponses = []; return responses; } return []; """ responses = self.driver.execute_script(script) if responses: log.debug(f"Retrieved {len(responses)} intercepted responses from browser") for resp in responses[:3]: # Log first 3 for debugging log.debug(f" - {resp.get('type', '?').upper()}: {resp.get('url', '')[:100]} ({resp.get('size', 0)} bytes)") else: log.debug("No intercepted responses available") return responses or [] except Exception as e: log.debug(f"Error getting intercepted responses: {e}") return [] def get_interceptor_stats(self): """Get statistics from the JavaScript interceptor""" try: script = """ if (window.__interceptorStats) { return window.__interceptorStats; } return null; """ stats = self.driver.execute_script(script) return stats except Exception as e: log.debug(f"Error getting interceptor stats: {e}") return None def get_browser_console_logs(self): """Get browser console logs (for debugging)""" try: logs = self.driver.get_log('browser') return logs except Exception as e: log.debug(f"Could not get browser console logs: {e}") return [] def dump_responses_to_file(self, responses: List[Dict], output_dir: str = "debug_api_responses"): """ Dump captured responses to files for debugging. Creates one file per response with metadata and body. """ try: output_path = Path(output_dir) output_path.mkdir(exist_ok=True) for i, response in enumerate(responses): timestamp = response.get('timestamp', int(time.time() * 1000)) url = response.get('url', 'unknown') req_type = response.get('type', 'unknown') # Create filename from timestamp and type filename = f"{timestamp}_{req_type}_{i}.json" filepath = output_path / filename # Write response with metadata with open(filepath, 'w', encoding='utf-8') as f: json.dump({ 'metadata': { 'url': url, 'type': req_type, 'timestamp': timestamp, 'size': response.get('size', len(response.get('body', ''))), 'status': response.get('status') }, 'body': response.get('body', '') }, f, indent=2, ensure_ascii=False) log.info(f"Dumped {len(responses)} responses to {output_path}") return str(output_path) except Exception as e: log.error(f"Error dumping responses to file: {e}") return None def _is_review_api(self, url: str) -> bool: """Check if URL matches review API patterns""" url_lower = url.lower() return any(re.search(pattern, url_lower) for pattern in self.REVIEW_API_PATTERNS) def _get_response_body(self, request_id: str) -> Optional[str]: """Get response body for a request ID using CDP""" try: result = self.driver.execute_cdp_cmd('Network.getResponseBody', { 'requestId': request_id }) body = result.get('body', '') if result.get('base64Encoded'): body = base64.b64decode(body).decode('utf-8', errors='ignore') return body except Exception as e: log.debug(f"Could not get response body for {request_id}: {e}") return None def parse_reviews_from_responses(self, responses: List[Dict]) -> List[InterceptedReview]: """ Parse review data from captured API responses. Google's API responses use a custom nested array format. """ reviews = [] for response in responses: try: body = response.get('body', '') url = response.get('url', '') # Skip non-JSON responses if not body or body.startswith(' List[InterceptedReview]: """Parse a single response body for review data""" reviews = [] # Skip empty or HTML responses if not body or body.startswith(' List[InterceptedReview]: """ Parse Google Maps listugcposts API response. Structure discovered: data[2] = array of review groups data[2][i] = single review group [review_data, metadata, continuation_token] data[2][i][0] = review data (6-item array containing all review info) """ reviews = [] try: if not isinstance(data, list) or len(data) < 3: log.debug("Response doesn't match expected structure (not a list or too short)") return reviews # data[2] contains the review groups review_groups = data[2] if not isinstance(review_groups, list): log.debug("data[2] is not a list") return reviews log.debug(f"Found {len(review_groups)} reviews in data[2]") # Each group IS ONE REVIEW for group_idx, group in enumerate(review_groups): if not isinstance(group, list) or len(group) == 0: continue # group[0] is the review data array (6 items) review_data = group[0] if not isinstance(review_data, list): continue try: review = self._parse_google_review_array(review_data) if review: reviews.append(review) log.debug(f"Parsed review {group_idx}: {review.author} - {review.rating}★") except Exception as e: log.debug(f"Error parsing review at group[{group_idx}]: {e}") except Exception as e: log.debug(f"Error in _parse_listugcposts_response: {e}") return reviews def _parse_google_review_array(self, review_data: List) -> Optional[InterceptedReview]: """ Parse a single review from Google's 6-item array format. Discovered structure (review_data is a 6-item array): review_data[0] = Review ID (string) review_data[1][4][5][0] = Author Name review_data[1][4][5][3] = User ID review_data[1][6] = Date Text review_data[2][0][0] = Rating (1-5) review_data[2][15][0][0] = Review Text (original) review_data[2][15][1][0] = Review Text (translated) """ review = InterceptedReview() try: # Extract review ID from review_data[0] if len(review_data) > 0 and isinstance(review_data[0], str): review.review_id = review_data[0] # Extract author info from review_data[1][4][5] if (len(review_data) > 1 and isinstance(review_data[1], list) and len(review_data[1]) > 4 and isinstance(review_data[1][4], list) and len(review_data[1][4]) > 5 and isinstance(review_data[1][4][5], list)): author_info = review_data[1][4][5] # Author name at [1][4][5][0] if len(author_info) > 0 and isinstance(author_info[0], str): review.author = author_info[0] # Profile picture at [1][4][5][1] (if available) if len(author_info) > 1 and isinstance(author_info[1], str): review.avatar_url = author_info[1] # Extract date from review_data[1][6] if (len(review_data) > 1 and isinstance(review_data[1], list) and len(review_data[1]) > 6 and isinstance(review_data[1][6], str)): review.date_text = review_data[1][6] # Extract rating from review_data[2][0][0] if (len(review_data) > 2 and isinstance(review_data[2], list) and len(review_data[2]) > 0 and isinstance(review_data[2][0], list) and len(review_data[2][0]) > 0): rating_val = review_data[2][0][0] if isinstance(rating_val, (int, float)) and 1 <= rating_val <= 5: review.rating = float(rating_val) # Extract review text from review_data[2][15][0][0] if (len(review_data) > 2 and isinstance(review_data[2], list) and len(review_data[2]) > 15 and isinstance(review_data[2][15], list) and len(review_data[2][15]) > 0 and isinstance(review_data[2][15][0], list) and len(review_data[2][15][0]) > 0): text = review_data[2][15][0][0] if isinstance(text, str): review.text = text # Only return if we have minimum required data if review.rating > 0 and (review.author or review.text): return review except Exception as e: log.debug(f"Error parsing Google review array: {e}") return None def _parse_review_array_v2(self, arr: List) -> Optional[InterceptedReview]: """ Parse review from Google's nested array format. Improved version with better field detection. """ review = InterceptedReview() try: # Extract review ID (usually a long string in first few elements) for i, item in enumerate(arr[:5]): if isinstance(item, str) and len(item) > 30 and not item.startswith('http'): review.review_id = item break # Extract rating (number between 1-5) for item in arr: if isinstance(item, (int, float)) and 1 <= item <= 5: review.rating = float(item) break elif isinstance(item, list): for subitem in item: if isinstance(subitem, (int, float)) and 1 <= subitem <= 5: review.rating = float(subitem) break if review.rating > 0: break # Extract review text (long string, not a URL) for item in arr: if isinstance(item, str) and len(item) > 50 and not item.startswith('http'): if not review.review_id or item != review.review_id: review.text = item break # Extract author name (shorter string, not ID or text) for item in arr: if isinstance(item, str) and 3 <= len(item) <= 100: if item != review.review_id and item != review.text and not item.startswith('http'): review.author = item break elif isinstance(item, list): for subitem in item: if isinstance(subitem, str) and 3 <= len(subitem) <= 100: if subitem != review.text and not subitem.startswith('http'): review.author = subitem break if review.author: break # Extract dates (strings that look like dates) date_patterns = [r'\d{1,2}/\d{1,2}/\d{2,4}', r'\d{4}-\d{2}-\d{2}', r'hace \d+', r'\d+ days? ago'] for item in arr: if isinstance(item, str): for pattern in date_patterns: if re.search(pattern, item, re.IGNORECASE): review.date_text = item break if review.date_text: break # Only return if we have meaningful data if (review.review_id or review.author) and review.rating > 0: return review except Exception as e: log.debug(f"Error in _parse_review_array_v2: {e}") return None def _extract_reviews_recursive(self, data: Any, depth: int = 0) -> List[InterceptedReview]: """Recursively search for review data in nested structures""" reviews = [] if depth > 20: # Prevent infinite recursion return reviews # Skip if data is already an InterceptedReview object if isinstance(data, InterceptedReview): return [data] if isinstance(data, dict): # Check if this looks like a review object review = self._try_parse_review_dict(data) if review: reviews.append(review) # Recurse into dict values for value in data.values(): if not isinstance(value, InterceptedReview): reviews.extend(self._extract_reviews_recursive(value, depth + 1)) elif isinstance(data, list): # Check if this array looks like a review array review = self._try_parse_review_array(data) if review: reviews.append(review) # Recurse into list items for item in data: if not isinstance(item, InterceptedReview): reviews.extend(self._extract_reviews_recursive(item, depth + 1)) return reviews def _try_parse_review_dict(self, data: Dict) -> Optional[InterceptedReview]: """Try to parse a dictionary as a review object""" # Common keys in review objects review_keys = {'reviewId', 'review_id', 'author', 'rating', 'text', 'comment'} if not any(k in data for k in review_keys): return None try: review = InterceptedReview() # Try various key names for each field review.review_id = data.get('reviewId') or data.get('review_id') or data.get('id', '') review.author = data.get('author') or data.get('authorName') or data.get('name', '') review.rating = float(data.get('rating') or data.get('starRating') or 0) review.text = data.get('text') or data.get('comment') or data.get('reviewText', '') review.date_text = data.get('publishTime') or data.get('relativePublishTime') or data.get('date', '') review.likes = int(data.get('thumbsUpCount') or data.get('likes') or 0) # Photos photos = data.get('photos') or data.get('reviewPhotos') or [] if photos: review.photos = [p.get('url') or p for p in photos if p] # Profile author_data = data.get('author') if isinstance(data.get('author'), dict) else {} review.profile_url = author_data.get('profileUrl') or data.get('profileUrl', '') review.avatar_url = author_data.get('profilePhotoUrl') or data.get('avatar', '') # Owner response owner_resp = data.get('ownerResponse') or data.get('ownerReply') or {} if isinstance(owner_resp, dict): review.owner_response = owner_resp.get('text', '') review.owner_response_date = owner_resp.get('publishTime', '') # Only return if we have meaningful data if review.review_id or (review.author and review.text): return review except Exception as e: log.debug(f"Error parsing review dict: {e}") return None def _try_parse_review_array(self, data: List) -> Optional[InterceptedReview]: """ Try to parse a nested array as a review (Google's protobuf-like format). Google often uses positional arrays like: [id, author, [rating], text, ...] """ if not data or len(data) < 3: return None try: # Look for patterns that indicate this is a review array # Pattern 1: [review_id, [author_info], rating_array, text, ...] review = InterceptedReview() # Check if first element looks like a review ID if isinstance(data[0], str) and len(data[0]) > 20: review.review_id = data[0] # Search for rating (usually a small number 1-5) for item in data: if isinstance(item, (int, float)) and 1 <= item <= 5: review.rating = float(item) break elif isinstance(item, list) and len(item) >= 1: if isinstance(item[0], (int, float)) and 1 <= item[0] <= 5: review.rating = float(item[0]) break # Search for text (long string) for item in data: if isinstance(item, str) and len(item) > 30: review.text = item break elif isinstance(item, list): for subitem in item: if isinstance(subitem, str) and len(subitem) > 30: review.text = subitem break # Search for author name (shorter string) for item in data: if isinstance(item, list) and len(item) >= 1: for subitem in item: if isinstance(subitem, str) and 2 <= len(subitem) <= 100 and subitem != review.text: review.author = subitem break if review.author: break # Search for URLs (photos, profile) for item in data: if isinstance(item, str) and item.startswith('http'): if 'googleusercontent' in item or 'ggpht' in item: if not review.avatar_url: review.avatar_url = item else: review.photos.append(item) elif isinstance(item, list): self._extract_urls_from_array(item, review) # Only return if we have meaningful data if review.review_id and review.rating > 0: return review if review.text and review.rating > 0: return review except Exception as e: log.debug(f"Error parsing review array: {e}") return None def _extract_urls_from_array(self, arr: List, review: InterceptedReview, depth: int = 0): """Extract URLs from nested arrays""" if depth > 5: return for item in arr: if isinstance(item, str) and item.startswith('http'): if 'googleusercontent' in item or 'ggpht' in item or 'lh3' in item: if 'w72-h72' in item or 'p-rp-mo' in item: # Profile pic pattern review.avatar_url = item else: review.photos.append(item) elif isinstance(item, list): self._extract_urls_from_array(item, depth + 1, review) def convert_to_raw_review_format(self, intercepted: InterceptedReview) -> Dict[str, Any]: """Convert an InterceptedReview to the format used by RawReview/storage""" return { 'review_id': intercepted.review_id, 'author': intercepted.author, 'rating': intercepted.rating, 'description': {'en': intercepted.text} if intercepted.text else {}, 'likes': intercepted.likes, 'user_images': intercepted.photos, 'author_profile_url': intercepted.profile_url, 'profile_picture': intercepted.avatar_url, 'owner_responses': { 'en': {'text': intercepted.owner_response} } if intercepted.owner_response else {}, 'review_date': intercepted.date_text, '_source': 'api_intercept' } def cleanup(self): """Clean up interception resources""" try: self.driver.execute_cdp_cmd('Network.disable', {}) except: pass self.captured_responses.clear() self.captured_reviews.clear() self.request_map.clear() self._listening = False