""" API Interceptor for Google Maps Reviews. Uses Chrome DevTools Protocol (CDP) to intercept network requests and capture Google's internal API responses for faster, more reliable data extraction. """ import base64 import json import logging import re import threading import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional from urllib.parse import parse_qs, urlparse log = logging.getLogger("api_interceptor") @dataclass class InterceptedReview: """Data class for a review extracted from API response""" review_id: str = "" author: str = "" rating: float = 0.0 text: str = "" date_text: str = "" timestamp: int = 0 likes: int = 0 photos: List[str] = field(default_factory=list) profile_url: str = "" avatar_url: str = "" owner_response: str = "" owner_response_date: str = "" lang: str = "" class GoogleMapsAPIInterceptor: """ Intercepts Google Maps internal API calls to capture review data directly. Google Maps uses several internal endpoints for reviews: - /maps/preview/review/listentitiesreviews - Main reviews endpoint - /maps/rpc/placereview - Alternative review endpoint - /maps/preview/reviewsdata - Review data endpoint The responses are often in a custom protobuf-like JSON format that needs parsing. """ # Patterns for review-related API endpoints REVIEW_API_PATTERNS = [ r'maps/preview/review', r'maps/rpc/placereview', r'maps/preview/reviewsdata', r'maps/preview/place', r'maps/api/place', r'/locationhistory/preview', r'batchexecute.*review', ] def __init__(self, driver): """Initialize the interceptor with a Selenium driver""" self.driver = driver self.captured_responses: List[Dict[str, Any]] = [] self.captured_reviews: List[InterceptedReview] = [] self.request_map: Dict[str, Dict] = {} # Map request IDs to URLs self._lock = threading.Lock() self._listening = False self._response_callback: Optional[Callable] = None def setup_interception(self): """Enable network interception via CDP""" try: # Enable network domain self.driver.execute_cdp_cmd('Network.enable', {}) # Set up request interception patterns self.driver.execute_cdp_cmd('Network.setRequestInterception', { 'patterns': [ {'urlPattern': '*maps*review*', 'resourceType': 'XHR'}, {'urlPattern': '*maps*review*', 'resourceType': 'Fetch'}, {'urlPattern': '*batchexecute*', 'resourceType': 'XHR'}, {'urlPattern': '*batchexecute*', 'resourceType': 'Fetch'}, ] }) self._listening = True log.info("API interception enabled via CDP") return True except Exception as e: log.warning(f"Could not enable CDP interception: {e}") # Try alternative approach return self._setup_performance_logging() def _setup_performance_logging(self): """Alternative approach using Performance logging""" try: self.driver.execute_cdp_cmd('Network.enable', { 'maxTotalBufferSize': 10000000, 'maxResourceBufferSize': 5000000 }) self._listening = True log.info("API interception enabled via performance logging") return True except Exception as e: log.error(f"Failed to setup performance logging: {e}") return False def capture_network_responses(self, duration: float = 5.0): """ Capture network responses for a specified duration. Call this while scrolling/loading more reviews. """ if not self._listening: log.warning("Interception not set up, call setup_interception() first") return [] captured = [] start_time = time.time() while time.time() - start_time < duration: try: # Get performance logs which contain network events logs = self.driver.get_log('performance') for entry in logs: try: log_data = json.loads(entry['message']) message = log_data.get('message', {}) method = message.get('method', '') params = message.get('params', {}) # Capture response received events if method == 'Network.responseReceived': response = params.get('response', {}) url = response.get('url', '') if self._is_review_api(url): request_id = params.get('requestId') self.request_map[request_id] = { 'url': url, 'status': response.get('status'), 'headers': response.get('headers', {}) } # Capture response body when loading is finished elif method == 'Network.loadingFinished': request_id = params.get('requestId') if request_id in self.request_map: body = self._get_response_body(request_id) if body: captured.append({ 'url': self.request_map[request_id]['url'], 'body': body, 'timestamp': time.time() }) except Exception as parse_error: log.debug(f"Error parsing log entry: {parse_error}") continue except Exception as e: # Performance logs might not be available log.debug(f"Could not get performance logs: {e}") break time.sleep(0.1) with self._lock: self.captured_responses.extend(captured) return captured def get_response_bodies_cdp(self): """Get response bodies using CDP directly (more reliable method)""" responses = [] try: # Use CDP to get all responses result = self.driver.execute_cdp_cmd('Network.getAllCookies', {}) # Execute JavaScript to intercept fetch/XHR responses intercept_script = """ (function() { if (window.__interceptedResponses) { var responses = window.__interceptedResponses; window.__interceptedResponses = []; return responses; } return []; })(); """ captured = self.driver.execute_script(intercept_script) if captured: responses.extend(captured) except Exception as e: log.debug(f"CDP response capture error: {e}") return responses def inject_response_interceptor(self): """ Inject JavaScript to intercept XHR/Fetch responses at the browser level. This is the most reliable method for capturing API responses. """ intercept_script = """ (function() { // Skip if already injected if (window.__reviewInterceptorInjected) return; window.__reviewInterceptorInjected = true; window.__interceptedResponses = []; // Store original fetch const originalFetch = window.fetch; // Override fetch window.fetch = async function(...args) { const response = await originalFetch.apply(this, args); const url = args[0].toString(); // Check if this is a review-related API call if (url.includes('review') || url.includes('batchexecute') || url.includes('place') || url.includes('maps')) { try { const clone = response.clone(); const text = await clone.text(); window.__interceptedResponses.push({ url: url, body: text, timestamp: Date.now(), type: 'fetch' }); // Keep only last 100 responses to avoid memory issues if (window.__interceptedResponses.length > 100) { window.__interceptedResponses = window.__interceptedResponses.slice(-50); } } catch (e) { console.debug('Response capture error:', e); } } return response; }; // Store original XMLHttpRequest const originalXHR = window.XMLHttpRequest; // Create intercepting XHR window.XMLHttpRequest = function() { const xhr = new originalXHR(); const originalOpen = xhr.open; const originalSend = xhr.send; let requestUrl = ''; xhr.open = function(method, url, ...rest) { requestUrl = url; return originalOpen.apply(this, [method, url, ...rest]); }; xhr.addEventListener('load', function() { if (requestUrl.includes('review') || requestUrl.includes('batchexecute') || requestUrl.includes('place') || requestUrl.includes('maps')) { try { window.__interceptedResponses.push({ url: requestUrl, body: xhr.responseText, timestamp: Date.now(), type: 'xhr' }); if (window.__interceptedResponses.length > 100) { window.__interceptedResponses = window.__interceptedResponses.slice(-50); } } catch (e) { console.debug('XHR capture error:', e); } } }); return xhr; }; // Copy static properties for (let prop of Object.getOwnPropertyNames(originalXHR)) { try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch (e) {} } console.log('Review API interceptor injected'); return true; })(); """ try: result = self.driver.execute_script(intercept_script) log.info("JavaScript response interceptor injected") return True except Exception as e: log.warning(f"Failed to inject interceptor: {e}") return False def get_intercepted_responses(self): """Retrieve intercepted responses from the browser""" try: script = """ if (window.__interceptedResponses) { var responses = window.__interceptedResponses.slice(); window.__interceptedResponses = []; return responses; } return []; """ responses = self.driver.execute_script(script) return responses or [] except Exception as e: log.debug(f"Error getting intercepted responses: {e}") return [] def _is_review_api(self, url: str) -> bool: """Check if URL matches review API patterns""" url_lower = url.lower() return any(re.search(pattern, url_lower) for pattern in self.REVIEW_API_PATTERNS) def _get_response_body(self, request_id: str) -> Optional[str]: """Get response body for a request ID using CDP""" try: result = self.driver.execute_cdp_cmd('Network.getResponseBody', { 'requestId': request_id }) body = result.get('body', '') if result.get('base64Encoded'): body = base64.b64decode(body).decode('utf-8', errors='ignore') return body except Exception as e: log.debug(f"Could not get response body for {request_id}: {e}") return None def parse_reviews_from_responses(self, responses: List[Dict]) -> List[InterceptedReview]: """ Parse review data from captured API responses. Google's API responses use a custom nested array format. """ reviews = [] for response in responses: try: body = response.get('body', '') url = response.get('url', '') # Skip non-JSON responses if not body or body.startswith(' List[InterceptedReview]: """Parse a single response body for review data""" reviews = [] # Handle batch execute format (starts with )]}' prefix) if body.startswith(")]}'"): body = body[4:].strip() try: data = json.loads(body) except json.JSONDecodeError: # Try to extract JSON from the response json_match = re.search(r'\[.*\]', body, re.DOTALL) if json_match: try: data = json.loads(json_match.group()) except: return reviews else: return reviews # Extract reviews from nested structure reviews.extend(self._extract_reviews_recursive(data)) return reviews def _extract_reviews_recursive(self, data: Any, depth: int = 0) -> List[InterceptedReview]: """Recursively search for review data in nested structures""" reviews = [] if depth > 20: # Prevent infinite recursion return reviews if isinstance(data, dict): # Check if this looks like a review object review = self._try_parse_review_dict(data) if review: reviews.append(review) # Recurse into dict values for value in data.values(): reviews.extend(self._extract_reviews_recursive(value, depth + 1)) elif isinstance(data, list): # Check if this array looks like a review array review = self._try_parse_review_array(data) if review: reviews.append(review) # Recurse into list items for item in data: reviews.extend(self._extract_reviews_recursive(item, depth + 1)) return reviews def _try_parse_review_dict(self, data: Dict) -> Optional[InterceptedReview]: """Try to parse a dictionary as a review object""" # Common keys in review objects review_keys = {'reviewId', 'review_id', 'author', 'rating', 'text', 'comment'} if not any(k in data for k in review_keys): return None try: review = InterceptedReview() # Try various key names for each field review.review_id = data.get('reviewId') or data.get('review_id') or data.get('id', '') review.author = data.get('author') or data.get('authorName') or data.get('name', '') review.rating = float(data.get('rating') or data.get('starRating') or 0) review.text = data.get('text') or data.get('comment') or data.get('reviewText', '') review.date_text = data.get('publishTime') or data.get('relativePublishTime') or data.get('date', '') review.likes = int(data.get('thumbsUpCount') or data.get('likes') or 0) # Photos photos = data.get('photos') or data.get('reviewPhotos') or [] if photos: review.photos = [p.get('url') or p for p in photos if p] # Profile author_data = data.get('author') if isinstance(data.get('author'), dict) else {} review.profile_url = author_data.get('profileUrl') or data.get('profileUrl', '') review.avatar_url = author_data.get('profilePhotoUrl') or data.get('avatar', '') # Owner response owner_resp = data.get('ownerResponse') or data.get('ownerReply') or {} if isinstance(owner_resp, dict): review.owner_response = owner_resp.get('text', '') review.owner_response_date = owner_resp.get('publishTime', '') # Only return if we have meaningful data if review.review_id or (review.author and review.text): return review except Exception as e: log.debug(f"Error parsing review dict: {e}") return None def _try_parse_review_array(self, data: List) -> Optional[InterceptedReview]: """ Try to parse a nested array as a review (Google's protobuf-like format). Google often uses positional arrays like: [id, author, [rating], text, ...] """ if not data or len(data) < 3: return None try: # Look for patterns that indicate this is a review array # Pattern 1: [review_id, [author_info], rating_array, text, ...] review = InterceptedReview() # Check if first element looks like a review ID if isinstance(data[0], str) and len(data[0]) > 20: review.review_id = data[0] # Search for rating (usually a small number 1-5) for item in data: if isinstance(item, (int, float)) and 1 <= item <= 5: review.rating = float(item) break elif isinstance(item, list) and len(item) >= 1: if isinstance(item[0], (int, float)) and 1 <= item[0] <= 5: review.rating = float(item[0]) break # Search for text (long string) for item in data: if isinstance(item, str) and len(item) > 30: review.text = item break elif isinstance(item, list): for subitem in item: if isinstance(subitem, str) and len(subitem) > 30: review.text = subitem break # Search for author name (shorter string) for item in data: if isinstance(item, list) and len(item) >= 1: for subitem in item: if isinstance(subitem, str) and 2 <= len(subitem) <= 100 and subitem != review.text: review.author = subitem break if review.author: break # Search for URLs (photos, profile) for item in data: if isinstance(item, str) and item.startswith('http'): if 'googleusercontent' in item or 'ggpht' in item: if not review.avatar_url: review.avatar_url = item else: review.photos.append(item) elif isinstance(item, list): self._extract_urls_from_array(item, review) # Only return if we have meaningful data if review.review_id and review.rating > 0: return review if review.text and review.rating > 0: return review except Exception as e: log.debug(f"Error parsing review array: {e}") return None def _extract_urls_from_array(self, arr: List, review: InterceptedReview, depth: int = 0): """Extract URLs from nested arrays""" if depth > 5: return for item in arr: if isinstance(item, str) and item.startswith('http'): if 'googleusercontent' in item or 'ggpht' in item or 'lh3' in item: if 'w72-h72' in item or 'p-rp-mo' in item: # Profile pic pattern review.avatar_url = item else: review.photos.append(item) elif isinstance(item, list): self._extract_urls_from_array(item, depth + 1, review) def convert_to_raw_review_format(self, intercepted: InterceptedReview) -> Dict[str, Any]: """Convert an InterceptedReview to the format used by RawReview/storage""" return { 'review_id': intercepted.review_id, 'author': intercepted.author, 'rating': intercepted.rating, 'description': {'en': intercepted.text} if intercepted.text else {}, 'likes': intercepted.likes, 'user_images': intercepted.photos, 'author_profile_url': intercepted.profile_url, 'profile_picture': intercepted.avatar_url, 'owner_responses': { 'en': {'text': intercepted.owner_response} } if intercepted.owner_response else {}, 'review_date': intercepted.date_text, '_source': 'api_intercept' } def cleanup(self): """Clean up interception resources""" try: self.driver.execute_cdp_cmd('Network.disable', {}) except: pass self.captured_responses.clear() self.captured_reviews.clear() self.request_map.clear() self._listening = False