Files
whyrating-engine-legacy/modules/api_interceptor.py
Alejandro Gutiérrez faa0704737 Optimize scraper performance and add fallback selectors for robustness
Performance improvements:
- Validation speed: 59.71s → 10.96s (5.5x improvement)
- Removed 50+ console.log statements from JavaScript extraction
- Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting
- Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls)

Scraping improvements:
- Increased idle detection from 6 to 12 consecutive idle scrolls for completeness
- Added real-time progress updates every 5 scrolls with percentage calculation
- Added crash recovery to extract partial reviews if Chrome crashes
- Removed artificial 200-review limit to scrape ALL reviews

Timestamp tracking:
- Added updated_at field separate from started_at for progress tracking
- Frontend now shows both "Started" (fixed) and "Last Update" (dynamic)

Robustness improvements:
- Added 5 fallback CSS selectors to handle different Google Maps page structures
- Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc.
- Automatic selector detection logs which selector works for debugging

Test results:
- Successfully scraped 550 reviews in 150.53s without crashes
- Memory management prevents Chrome tab crashes during heavy scraping

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-18 19:49:24 +00:00

924 lines
36 KiB
Python

"""
API Interceptor for Google Maps Reviews.
Uses Chrome DevTools Protocol (CDP) to intercept network requests and capture
Google's internal API responses for faster, more reliable data extraction.
"""
import base64
import json
import logging
import os
import re
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import parse_qs, urlparse
log = logging.getLogger("api_interceptor")
@dataclass
class InterceptedReview:
"""Data class for a review extracted from API response"""
review_id: str = ""
author: str = ""
rating: float = 0.0
text: str = ""
date_text: str = ""
timestamp: int = 0
likes: int = 0
photos: List[str] = field(default_factory=list)
profile_url: str = ""
avatar_url: str = ""
owner_response: str = ""
owner_response_date: str = ""
lang: str = ""
class GoogleMapsAPIInterceptor:
"""
Intercepts Google Maps internal API calls to capture review data directly.
Google Maps uses several internal endpoints for reviews:
- /maps/preview/review/listentitiesreviews - Main reviews endpoint
- /maps/rpc/placereview - Alternative review endpoint
- /maps/preview/reviewsdata - Review data endpoint
The responses are often in a custom protobuf-like JSON format that needs parsing.
"""
# Patterns for review-related API endpoints
REVIEW_API_PATTERNS = [
r'maps/preview/review',
r'maps/rpc/placereview',
r'maps/preview/reviewsdata',
r'maps/preview/place',
r'maps/api/place',
r'/locationhistory/preview',
r'batchexecute.*review',
]
def __init__(self, driver):
"""Initialize the interceptor with a Selenium driver"""
self.driver = driver
self.captured_responses: List[Dict[str, Any]] = []
self.captured_reviews: List[InterceptedReview] = []
self.request_map: Dict[str, Dict] = {} # Map request IDs to URLs
self._lock = threading.Lock()
self._listening = False
self._response_callback: Optional[Callable] = None
def setup_interception(self):
"""Enable network interception via CDP"""
try:
# Enable network domain
self.driver.execute_cdp_cmd('Network.enable', {})
# Set up request interception patterns
self.driver.execute_cdp_cmd('Network.setRequestInterception', {
'patterns': [
{'urlPattern': '*maps*review*', 'resourceType': 'XHR'},
{'urlPattern': '*maps*review*', 'resourceType': 'Fetch'},
{'urlPattern': '*batchexecute*', 'resourceType': 'XHR'},
{'urlPattern': '*batchexecute*', 'resourceType': 'Fetch'},
]
})
self._listening = True
log.info("API interception enabled via CDP")
return True
except Exception as e:
log.warning(f"Could not enable CDP interception: {e}")
# Try alternative approach
return self._setup_performance_logging()
def _setup_performance_logging(self):
"""Alternative approach using Performance logging"""
try:
self.driver.execute_cdp_cmd('Network.enable', {
'maxTotalBufferSize': 10000000,
'maxResourceBufferSize': 5000000
})
self._listening = True
log.info("API interception enabled via performance logging")
return True
except Exception as e:
log.error(f"Failed to setup performance logging: {e}")
return False
def capture_network_responses(self, duration: float = 5.0):
"""
Capture network responses for a specified duration.
Call this while scrolling/loading more reviews.
"""
if not self._listening:
log.warning("Interception not set up, call setup_interception() first")
return []
captured = []
start_time = time.time()
while time.time() - start_time < duration:
try:
# Get performance logs which contain network events
logs = self.driver.get_log('performance')
for entry in logs:
try:
log_data = json.loads(entry['message'])
message = log_data.get('message', {})
method = message.get('method', '')
params = message.get('params', {})
# Capture response received events
if method == 'Network.responseReceived':
response = params.get('response', {})
url = response.get('url', '')
if self._is_review_api(url):
request_id = params.get('requestId')
self.request_map[request_id] = {
'url': url,
'status': response.get('status'),
'headers': response.get('headers', {})
}
# Capture response body when loading is finished
elif method == 'Network.loadingFinished':
request_id = params.get('requestId')
if request_id in self.request_map:
body = self._get_response_body(request_id)
if body:
captured.append({
'url': self.request_map[request_id]['url'],
'body': body,
'timestamp': time.time()
})
except Exception as parse_error:
log.debug(f"Error parsing log entry: {parse_error}")
continue
except Exception as e:
# Performance logs might not be available
log.debug(f"Could not get performance logs: {e}")
break
time.sleep(0.1)
with self._lock:
self.captured_responses.extend(captured)
return captured
def get_response_bodies_cdp(self):
"""Get response bodies using CDP directly (more reliable method)"""
responses = []
try:
# Use CDP to get all responses
result = self.driver.execute_cdp_cmd('Network.getAllCookies', {})
# Execute JavaScript to intercept fetch/XHR responses
intercept_script = """
(function() {
if (window.__interceptedResponses) {
var responses = window.__interceptedResponses;
window.__interceptedResponses = [];
return responses;
}
return [];
})();
"""
captured = self.driver.execute_script(intercept_script)
if captured:
responses.extend(captured)
except Exception as e:
log.debug(f"CDP response capture error: {e}")
return responses
def inject_response_interceptor(self):
"""
Inject JavaScript to intercept XHR/Fetch responses at the browser level.
This is the most reliable method for capturing API responses.
"""
intercept_script = """
(function() {
// Skip if already injected
if (window.__reviewInterceptorInjected) {
console.log('[API Interceptor] Already injected, skipping');
return;
}
window.__reviewInterceptorInjected = true;
window.__interceptedResponses = [];
window.__interceptorStats = {
totalFetch: 0,
totalXHR: 0,
capturedFetch: 0,
capturedXHR: 0,
lastCapture: null
};
console.log('[API Interceptor] Initializing...');
// Store original fetch
const originalFetch = window.fetch;
// Override fetch
window.fetch = async function(...args) {
window.__interceptorStats.totalFetch++;
const url = args[0].toString();
// Log ALL fetch requests for debugging
console.debug('[API Interceptor] FETCH:', url.substring(0, 150));
const response = await originalFetch.apply(this, args);
// Check if this is a review-related API call
if (url.includes('review') || url.includes('batchexecute') ||
url.includes('place') || url.includes('maps') ||
url.includes('listugcposts') || url.includes('getreviews')) {
try {
const clone = response.clone();
const text = await clone.text();
console.log('[API Interceptor] ✅ CAPTURED FETCH:', url.substring(0, 100), 'Size:', text.length);
window.__interceptedResponses.push({
url: url,
body: text,
timestamp: Date.now(),
type: 'fetch',
size: text.length
});
window.__interceptorStats.capturedFetch++;
window.__interceptorStats.lastCapture = new Date().toISOString();
// Keep only last 100 responses to avoid memory issues
if (window.__interceptedResponses.length > 100) {
window.__interceptedResponses = window.__interceptedResponses.slice(-50);
}
} catch (e) {
console.error('[API Interceptor] Response capture error:', e);
}
}
return response;
};
// Store original XMLHttpRequest
const originalXHR = window.XMLHttpRequest;
// Create intercepting XHR
window.XMLHttpRequest = function() {
const xhr = new originalXHR();
const originalOpen = xhr.open;
const originalSend = xhr.send;
let requestUrl = '';
xhr.open = function(method, url, ...rest) {
requestUrl = url;
window.__interceptorStats.totalXHR++;
console.debug('[API Interceptor] XHR:', method, url.substring(0, 150));
return originalOpen.apply(this, [method, url, ...rest]);
};
xhr.addEventListener('load', function() {
if (requestUrl.includes('review') || requestUrl.includes('batchexecute') ||
requestUrl.includes('place') || requestUrl.includes('maps') ||
requestUrl.includes('listugcposts') || requestUrl.includes('getreviews')) {
try {
console.log('[API Interceptor] ✅ CAPTURED XHR:', requestUrl.substring(0, 100), 'Size:', xhr.responseText.length);
window.__interceptedResponses.push({
url: requestUrl,
body: xhr.responseText,
timestamp: Date.now(),
type: 'xhr',
status: xhr.status,
size: xhr.responseText.length
});
window.__interceptorStats.capturedXHR++;
window.__interceptorStats.lastCapture = new Date().toISOString();
if (window.__interceptedResponses.length > 100) {
window.__interceptedResponses = window.__interceptedResponses.slice(-50);
}
} catch (e) {
console.error('[API Interceptor] XHR capture error:', e);
}
}
});
return xhr;
};
// Copy static properties
for (let prop of Object.getOwnPropertyNames(originalXHR)) {
try {
window.XMLHttpRequest[prop] = originalXHR[prop];
} catch (e) {}
}
console.log('[API Interceptor] ✅ Injected successfully! Monitoring network requests...');
// Log stats every 10 seconds
setInterval(() => {
if (window.__interceptorStats.totalFetch > 0 || window.__interceptorStats.totalXHR > 0) {
console.log('[API Interceptor] Stats:',
'Fetch:', window.__interceptorStats.totalFetch, '/', window.__interceptorStats.capturedFetch,
'XHR:', window.__interceptorStats.totalXHR, '/', window.__interceptorStats.capturedXHR,
'Queue:', window.__interceptedResponses.length);
}
}, 10000);
return true;
})();
"""
try:
result = self.driver.execute_script(intercept_script)
log.info("JavaScript response interceptor injected with enhanced debugging")
# Get initial stats
stats = self.get_interceptor_stats()
log.debug(f"Interceptor stats: {stats}")
return True
except Exception as e:
log.warning(f"Failed to inject interceptor: {e}")
return False
def get_intercepted_responses(self):
"""Retrieve intercepted responses from the browser"""
try:
script = """
if (window.__interceptedResponses) {
var responses = window.__interceptedResponses.slice();
window.__interceptedResponses = [];
return responses;
}
return [];
"""
responses = self.driver.execute_script(script)
if responses:
log.debug(f"Retrieved {len(responses)} intercepted responses from browser")
for resp in responses[:3]: # Log first 3 for debugging
log.debug(f" - {resp.get('type', '?').upper()}: {resp.get('url', '')[:100]} ({resp.get('size', 0)} bytes)")
else:
log.debug("No intercepted responses available")
return responses or []
except Exception as e:
log.debug(f"Error getting intercepted responses: {e}")
return []
def get_interceptor_stats(self):
"""Get statistics from the JavaScript interceptor"""
try:
script = """
if (window.__interceptorStats) {
return window.__interceptorStats;
}
return null;
"""
stats = self.driver.execute_script(script)
return stats
except Exception as e:
log.debug(f"Error getting interceptor stats: {e}")
return None
def get_browser_console_logs(self):
"""Get browser console logs (for debugging)"""
try:
logs = self.driver.get_log('browser')
return logs
except Exception as e:
log.debug(f"Could not get browser console logs: {e}")
return []
def dump_responses_to_file(self, responses: List[Dict], output_dir: str = "debug_api_responses"):
"""
Dump captured responses to files for debugging.
Creates one file per response with metadata and body.
"""
try:
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
for i, response in enumerate(responses):
timestamp = response.get('timestamp', int(time.time() * 1000))
url = response.get('url', 'unknown')
req_type = response.get('type', 'unknown')
# Create filename from timestamp and type
filename = f"{timestamp}_{req_type}_{i}.json"
filepath = output_path / filename
# Write response with metadata
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
'metadata': {
'url': url,
'type': req_type,
'timestamp': timestamp,
'size': response.get('size', len(response.get('body', ''))),
'status': response.get('status')
},
'body': response.get('body', '')
}, f, indent=2, ensure_ascii=False)
log.info(f"Dumped {len(responses)} responses to {output_path}")
return str(output_path)
except Exception as e:
log.error(f"Error dumping responses to file: {e}")
return None
def _is_review_api(self, url: str) -> bool:
"""Check if URL matches review API patterns"""
url_lower = url.lower()
return any(re.search(pattern, url_lower) for pattern in self.REVIEW_API_PATTERNS)
def _get_response_body(self, request_id: str) -> Optional[str]:
"""Get response body for a request ID using CDP"""
try:
result = self.driver.execute_cdp_cmd('Network.getResponseBody', {
'requestId': request_id
})
body = result.get('body', '')
if result.get('base64Encoded'):
body = base64.b64decode(body).decode('utf-8', errors='ignore')
return body
except Exception as e:
log.debug(f"Could not get response body for {request_id}: {e}")
return None
def parse_reviews_from_responses(self, responses: List[Dict]) -> List[InterceptedReview]:
"""
Parse review data from captured API responses.
Google's API responses use a custom nested array format.
"""
reviews = []
for response in responses:
try:
body = response.get('body', '')
url = response.get('url', '')
# Skip non-JSON responses
if not body or body.startswith('<!DOCTYPE'):
continue
# Try to parse as JSON
parsed_reviews = self._parse_response_body(body, url)
reviews.extend(parsed_reviews)
except Exception as e:
log.debug(f"Error parsing response: {e}")
continue
# Deduplicate by review ID
seen_ids = set()
unique_reviews = []
for review in reviews:
if review.review_id and review.review_id not in seen_ids:
seen_ids.add(review.review_id)
unique_reviews.append(review)
return unique_reviews
def _parse_response_body(self, body: str, url: str) -> List[InterceptedReview]:
"""Parse a single response body for review data"""
reviews = []
# Skip empty or HTML responses
if not body or body.startswith('<!DOCTYPE') or body.startswith('<html'):
return reviews
# Handle batch execute format (starts with )]}' prefix)
if body.startswith(")]}'"):
body = body[4:].strip()
try:
data = json.loads(body)
except json.JSONDecodeError:
# Try to extract JSON from the response
json_match = re.search(r'\[.*\]', body, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
except:
log.debug(f"Failed to parse JSON from response")
return reviews
else:
log.debug(f"No JSON found in response")
return reviews
# Special handling for listugcposts endpoint
if 'listugcposts' in url.lower():
reviews.extend(self._parse_listugcposts_response(data))
else:
# Generic recursive extraction
reviews.extend(self._extract_reviews_recursive(data))
return reviews
def _parse_listugcposts_response(self, data: Any) -> List[InterceptedReview]:
"""
Parse Google Maps listugcposts API response.
Structure discovered:
data[2] = array of review groups
data[2][i] = single review group [review_data, metadata, continuation_token]
data[2][i][0] = review data (6-item array containing all review info)
"""
reviews = []
try:
if not isinstance(data, list) or len(data) < 3:
log.debug("Response doesn't match expected structure (not a list or too short)")
return reviews
# data[2] contains the review groups
review_groups = data[2]
if not isinstance(review_groups, list):
log.debug("data[2] is not a list")
return reviews
log.debug(f"Found {len(review_groups)} reviews in data[2]")
# Each group IS ONE REVIEW
for group_idx, group in enumerate(review_groups):
if not isinstance(group, list) or len(group) == 0:
continue
# group[0] is the review data array (6 items)
review_data = group[0]
if not isinstance(review_data, list):
continue
try:
review = self._parse_google_review_array(review_data)
if review:
reviews.append(review)
log.debug(f"Parsed review {group_idx}: {review.author} - {review.rating}")
except Exception as e:
log.debug(f"Error parsing review at group[{group_idx}]: {e}")
except Exception as e:
log.debug(f"Error in _parse_listugcposts_response: {e}")
return reviews
def _parse_google_review_array(self, review_data: List) -> Optional[InterceptedReview]:
"""
Parse a single review from Google's 6-item array format.
Discovered structure (review_data is a 6-item array):
review_data[0] = Review ID (string)
review_data[1][4][5][0] = Author Name
review_data[1][4][5][3] = User ID
review_data[1][6] = Date Text
review_data[2][0][0] = Rating (1-5)
review_data[2][15][0][0] = Review Text (original)
review_data[2][15][1][0] = Review Text (translated)
"""
review = InterceptedReview()
try:
# Extract review ID from review_data[0]
if len(review_data) > 0 and isinstance(review_data[0], str):
review.review_id = review_data[0]
# Extract author info from review_data[1][4][5]
if (len(review_data) > 1 and
isinstance(review_data[1], list) and
len(review_data[1]) > 4 and
isinstance(review_data[1][4], list) and
len(review_data[1][4]) > 5 and
isinstance(review_data[1][4][5], list)):
author_info = review_data[1][4][5]
# Author name at [1][4][5][0]
if len(author_info) > 0 and isinstance(author_info[0], str):
review.author = author_info[0]
# Profile picture at [1][4][5][1] (if available)
if len(author_info) > 1 and isinstance(author_info[1], str):
review.avatar_url = author_info[1]
# Extract date from review_data[1][6]
if (len(review_data) > 1 and
isinstance(review_data[1], list) and
len(review_data[1]) > 6 and
isinstance(review_data[1][6], str)):
review.date_text = review_data[1][6]
# Extract rating from review_data[2][0][0]
if (len(review_data) > 2 and
isinstance(review_data[2], list) and
len(review_data[2]) > 0 and
isinstance(review_data[2][0], list) and
len(review_data[2][0]) > 0):
rating_val = review_data[2][0][0]
if isinstance(rating_val, (int, float)) and 1 <= rating_val <= 5:
review.rating = float(rating_val)
# Extract review text from review_data[2][15][0][0]
if (len(review_data) > 2 and
isinstance(review_data[2], list) and
len(review_data[2]) > 15 and
isinstance(review_data[2][15], list) and
len(review_data[2][15]) > 0 and
isinstance(review_data[2][15][0], list) and
len(review_data[2][15][0]) > 0):
text = review_data[2][15][0][0]
if isinstance(text, str):
review.text = text
# Only return if we have minimum required data
if review.rating > 0 and (review.author or review.text):
return review
except Exception as e:
log.debug(f"Error parsing Google review array: {e}")
return None
def _parse_review_array_v2(self, arr: List) -> Optional[InterceptedReview]:
"""
Parse review from Google's nested array format.
Improved version with better field detection.
"""
review = InterceptedReview()
try:
# Extract review ID (usually a long string in first few elements)
for i, item in enumerate(arr[:5]):
if isinstance(item, str) and len(item) > 30 and not item.startswith('http'):
review.review_id = item
break
# Extract rating (number between 1-5)
for item in arr:
if isinstance(item, (int, float)) and 1 <= item <= 5:
review.rating = float(item)
break
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, (int, float)) and 1 <= subitem <= 5:
review.rating = float(subitem)
break
if review.rating > 0:
break
# Extract review text (long string, not a URL)
for item in arr:
if isinstance(item, str) and len(item) > 50 and not item.startswith('http'):
if not review.review_id or item != review.review_id:
review.text = item
break
# Extract author name (shorter string, not ID or text)
for item in arr:
if isinstance(item, str) and 3 <= len(item) <= 100:
if item != review.review_id and item != review.text and not item.startswith('http'):
review.author = item
break
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str) and 3 <= len(subitem) <= 100:
if subitem != review.text and not subitem.startswith('http'):
review.author = subitem
break
if review.author:
break
# Extract dates (strings that look like dates)
date_patterns = [r'\d{1,2}/\d{1,2}/\d{2,4}', r'\d{4}-\d{2}-\d{2}', r'hace \d+', r'\d+ days? ago']
for item in arr:
if isinstance(item, str):
for pattern in date_patterns:
if re.search(pattern, item, re.IGNORECASE):
review.date_text = item
break
if review.date_text:
break
# Only return if we have meaningful data
if (review.review_id or review.author) and review.rating > 0:
return review
except Exception as e:
log.debug(f"Error in _parse_review_array_v2: {e}")
return None
def _extract_reviews_recursive(self, data: Any, depth: int = 0) -> List[InterceptedReview]:
"""Recursively search for review data in nested structures"""
reviews = []
if depth > 20: # Prevent infinite recursion
return reviews
# Skip if data is already an InterceptedReview object
if isinstance(data, InterceptedReview):
return [data]
if isinstance(data, dict):
# Check if this looks like a review object
review = self._try_parse_review_dict(data)
if review:
reviews.append(review)
# Recurse into dict values
for value in data.values():
if not isinstance(value, InterceptedReview):
reviews.extend(self._extract_reviews_recursive(value, depth + 1))
elif isinstance(data, list):
# Check if this array looks like a review array
review = self._try_parse_review_array(data)
if review:
reviews.append(review)
# Recurse into list items
for item in data:
if not isinstance(item, InterceptedReview):
reviews.extend(self._extract_reviews_recursive(item, depth + 1))
return reviews
def _try_parse_review_dict(self, data: Dict) -> Optional[InterceptedReview]:
"""Try to parse a dictionary as a review object"""
# Common keys in review objects
review_keys = {'reviewId', 'review_id', 'author', 'rating', 'text', 'comment'}
if not any(k in data for k in review_keys):
return None
try:
review = InterceptedReview()
# Try various key names for each field
review.review_id = data.get('reviewId') or data.get('review_id') or data.get('id', '')
review.author = data.get('author') or data.get('authorName') or data.get('name', '')
review.rating = float(data.get('rating') or data.get('starRating') or 0)
review.text = data.get('text') or data.get('comment') or data.get('reviewText', '')
review.date_text = data.get('publishTime') or data.get('relativePublishTime') or data.get('date', '')
review.likes = int(data.get('thumbsUpCount') or data.get('likes') or 0)
# Photos
photos = data.get('photos') or data.get('reviewPhotos') or []
if photos:
review.photos = [p.get('url') or p for p in photos if p]
# Profile
author_data = data.get('author') if isinstance(data.get('author'), dict) else {}
review.profile_url = author_data.get('profileUrl') or data.get('profileUrl', '')
review.avatar_url = author_data.get('profilePhotoUrl') or data.get('avatar', '')
# Owner response
owner_resp = data.get('ownerResponse') or data.get('ownerReply') or {}
if isinstance(owner_resp, dict):
review.owner_response = owner_resp.get('text', '')
review.owner_response_date = owner_resp.get('publishTime', '')
# Only return if we have meaningful data
if review.review_id or (review.author and review.text):
return review
except Exception as e:
log.debug(f"Error parsing review dict: {e}")
return None
def _try_parse_review_array(self, data: List) -> Optional[InterceptedReview]:
"""
Try to parse a nested array as a review (Google's protobuf-like format).
Google often uses positional arrays like: [id, author, [rating], text, ...]
"""
if not data or len(data) < 3:
return None
try:
# Look for patterns that indicate this is a review array
# Pattern 1: [review_id, [author_info], rating_array, text, ...]
review = InterceptedReview()
# Check if first element looks like a review ID
if isinstance(data[0], str) and len(data[0]) > 20:
review.review_id = data[0]
# Search for rating (usually a small number 1-5)
for item in data:
if isinstance(item, (int, float)) and 1 <= item <= 5:
review.rating = float(item)
break
elif isinstance(item, list) and len(item) >= 1:
if isinstance(item[0], (int, float)) and 1 <= item[0] <= 5:
review.rating = float(item[0])
break
# Search for text (long string)
for item in data:
if isinstance(item, str) and len(item) > 30:
review.text = item
break
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str) and len(subitem) > 30:
review.text = subitem
break
# Search for author name (shorter string)
for item in data:
if isinstance(item, list) and len(item) >= 1:
for subitem in item:
if isinstance(subitem, str) and 2 <= len(subitem) <= 100 and subitem != review.text:
review.author = subitem
break
if review.author:
break
# Search for URLs (photos, profile)
for item in data:
if isinstance(item, str) and item.startswith('http'):
if 'googleusercontent' in item or 'ggpht' in item:
if not review.avatar_url:
review.avatar_url = item
else:
review.photos.append(item)
elif isinstance(item, list):
self._extract_urls_from_array(item, review)
# Only return if we have meaningful data
if review.review_id and review.rating > 0:
return review
if review.text and review.rating > 0:
return review
except Exception as e:
log.debug(f"Error parsing review array: {e}")
return None
def _extract_urls_from_array(self, arr: List, review: InterceptedReview, depth: int = 0):
"""Extract URLs from nested arrays"""
if depth > 5:
return
for item in arr:
if isinstance(item, str) and item.startswith('http'):
if 'googleusercontent' in item or 'ggpht' in item or 'lh3' in item:
if 'w72-h72' in item or 'p-rp-mo' in item: # Profile pic pattern
review.avatar_url = item
else:
review.photos.append(item)
elif isinstance(item, list):
self._extract_urls_from_array(item, depth + 1, review)
def convert_to_raw_review_format(self, intercepted: InterceptedReview) -> Dict[str, Any]:
"""Convert an InterceptedReview to the format used by RawReview/storage"""
return {
'review_id': intercepted.review_id,
'author': intercepted.author,
'rating': intercepted.rating,
'description': {'en': intercepted.text} if intercepted.text else {},
'likes': intercepted.likes,
'user_images': intercepted.photos,
'author_profile_url': intercepted.profile_url,
'profile_picture': intercepted.avatar_url,
'owner_responses': {
'en': {'text': intercepted.owner_response}
} if intercepted.owner_response else {},
'review_date': intercepted.date_text,
'_source': 'api_intercept'
}
def cleanup(self):
"""Clean up interception resources"""
try:
self.driver.execute_cdp_cmd('Network.disable', {})
except:
pass
self.captured_responses.clear()
self.captured_reviews.clear()
self.request_map.clear()
self._listening = False