diff --git a/modules/api_interceptor.py b/modules/api_interceptor.py new file mode 100644 index 0000000..f3edab5 --- /dev/null +++ b/modules/api_interceptor.py @@ -0,0 +1,593 @@ +""" +API Interceptor for Google Maps Reviews. +Uses Chrome DevTools Protocol (CDP) to intercept network requests and capture +Google's internal API responses for faster, more reliable data extraction. +""" + +import base64 +import json +import logging +import re +import threading +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional +from urllib.parse import parse_qs, urlparse + +log = logging.getLogger("api_interceptor") + + +@dataclass +class InterceptedReview: + """Data class for a review extracted from API response""" + review_id: str = "" + author: str = "" + rating: float = 0.0 + text: str = "" + date_text: str = "" + timestamp: int = 0 + likes: int = 0 + photos: List[str] = field(default_factory=list) + profile_url: str = "" + avatar_url: str = "" + owner_response: str = "" + owner_response_date: str = "" + lang: str = "" + + +class GoogleMapsAPIInterceptor: + """ + Intercepts Google Maps internal API calls to capture review data directly. + + Google Maps uses several internal endpoints for reviews: + - /maps/preview/review/listentitiesreviews - Main reviews endpoint + - /maps/rpc/placereview - Alternative review endpoint + - /maps/preview/reviewsdata - Review data endpoint + + The responses are often in a custom protobuf-like JSON format that needs parsing. + """ + + # Patterns for review-related API endpoints + REVIEW_API_PATTERNS = [ + r'maps/preview/review', + r'maps/rpc/placereview', + r'maps/preview/reviewsdata', + r'maps/preview/place', + r'maps/api/place', + r'/locationhistory/preview', + r'batchexecute.*review', + ] + + def __init__(self, driver): + """Initialize the interceptor with a Selenium driver""" + self.driver = driver + self.captured_responses: List[Dict[str, Any]] = [] + self.captured_reviews: List[InterceptedReview] = [] + self.request_map: Dict[str, Dict] = {} # Map request IDs to URLs + self._lock = threading.Lock() + self._listening = False + self._response_callback: Optional[Callable] = None + + def setup_interception(self): + """Enable network interception via CDP""" + try: + # Enable network domain + self.driver.execute_cdp_cmd('Network.enable', {}) + + # Set up request interception patterns + self.driver.execute_cdp_cmd('Network.setRequestInterception', { + 'patterns': [ + {'urlPattern': '*maps*review*', 'resourceType': 'XHR'}, + {'urlPattern': '*maps*review*', 'resourceType': 'Fetch'}, + {'urlPattern': '*batchexecute*', 'resourceType': 'XHR'}, + {'urlPattern': '*batchexecute*', 'resourceType': 'Fetch'}, + ] + }) + + self._listening = True + log.info("API interception enabled via CDP") + return True + + except Exception as e: + log.warning(f"Could not enable CDP interception: {e}") + # Try alternative approach + return self._setup_performance_logging() + + def _setup_performance_logging(self): + """Alternative approach using Performance logging""" + try: + self.driver.execute_cdp_cmd('Network.enable', { + 'maxTotalBufferSize': 10000000, + 'maxResourceBufferSize': 5000000 + }) + self._listening = True + log.info("API interception enabled via performance logging") + return True + except Exception as e: + log.error(f"Failed to setup performance logging: {e}") + return False + + def capture_network_responses(self, duration: float = 5.0): + """ + Capture network responses for a specified duration. + Call this while scrolling/loading more reviews. + """ + if not self._listening: + log.warning("Interception not set up, call setup_interception() first") + return [] + + captured = [] + start_time = time.time() + + while time.time() - start_time < duration: + try: + # Get performance logs which contain network events + logs = self.driver.get_log('performance') + + for entry in logs: + try: + log_data = json.loads(entry['message']) + message = log_data.get('message', {}) + method = message.get('method', '') + params = message.get('params', {}) + + # Capture response received events + if method == 'Network.responseReceived': + response = params.get('response', {}) + url = response.get('url', '') + + if self._is_review_api(url): + request_id = params.get('requestId') + self.request_map[request_id] = { + 'url': url, + 'status': response.get('status'), + 'headers': response.get('headers', {}) + } + + # Capture response body when loading is finished + elif method == 'Network.loadingFinished': + request_id = params.get('requestId') + if request_id in self.request_map: + body = self._get_response_body(request_id) + if body: + captured.append({ + 'url': self.request_map[request_id]['url'], + 'body': body, + 'timestamp': time.time() + }) + + except Exception as parse_error: + log.debug(f"Error parsing log entry: {parse_error}") + continue + + except Exception as e: + # Performance logs might not be available + log.debug(f"Could not get performance logs: {e}") + break + + time.sleep(0.1) + + with self._lock: + self.captured_responses.extend(captured) + + return captured + + def get_response_bodies_cdp(self): + """Get response bodies using CDP directly (more reliable method)""" + responses = [] + + try: + # Use CDP to get all responses + result = self.driver.execute_cdp_cmd('Network.getAllCookies', {}) + + # Execute JavaScript to intercept fetch/XHR responses + intercept_script = """ + (function() { + if (window.__interceptedResponses) { + var responses = window.__interceptedResponses; + window.__interceptedResponses = []; + return responses; + } + return []; + })(); + """ + + captured = self.driver.execute_script(intercept_script) + if captured: + responses.extend(captured) + + except Exception as e: + log.debug(f"CDP response capture error: {e}") + + return responses + + def inject_response_interceptor(self): + """ + Inject JavaScript to intercept XHR/Fetch responses at the browser level. + This is the most reliable method for capturing API responses. + """ + intercept_script = """ + (function() { + // Skip if already injected + if (window.__reviewInterceptorInjected) return; + window.__reviewInterceptorInjected = true; + window.__interceptedResponses = []; + + // Store original fetch + const originalFetch = window.fetch; + + // Override fetch + window.fetch = async function(...args) { + const response = await originalFetch.apply(this, args); + const url = args[0].toString(); + + // Check if this is a review-related API call + if (url.includes('review') || url.includes('batchexecute') || + url.includes('place') || url.includes('maps')) { + try { + const clone = response.clone(); + const text = await clone.text(); + + window.__interceptedResponses.push({ + url: url, + body: text, + timestamp: Date.now(), + type: 'fetch' + }); + + // Keep only last 100 responses to avoid memory issues + if (window.__interceptedResponses.length > 100) { + window.__interceptedResponses = window.__interceptedResponses.slice(-50); + } + } catch (e) { + console.debug('Response capture error:', e); + } + } + + return response; + }; + + // Store original XMLHttpRequest + const originalXHR = window.XMLHttpRequest; + + // Create intercepting XHR + window.XMLHttpRequest = function() { + const xhr = new originalXHR(); + const originalOpen = xhr.open; + const originalSend = xhr.send; + let requestUrl = ''; + + xhr.open = function(method, url, ...rest) { + requestUrl = url; + return originalOpen.apply(this, [method, url, ...rest]); + }; + + xhr.addEventListener('load', function() { + if (requestUrl.includes('review') || requestUrl.includes('batchexecute') || + requestUrl.includes('place') || requestUrl.includes('maps')) { + try { + window.__interceptedResponses.push({ + url: requestUrl, + body: xhr.responseText, + timestamp: Date.now(), + type: 'xhr' + }); + + if (window.__interceptedResponses.length > 100) { + window.__interceptedResponses = window.__interceptedResponses.slice(-50); + } + } catch (e) { + console.debug('XHR capture error:', e); + } + } + }); + + return xhr; + }; + + // Copy static properties + for (let prop of Object.getOwnPropertyNames(originalXHR)) { + try { + window.XMLHttpRequest[prop] = originalXHR[prop]; + } catch (e) {} + } + + console.log('Review API interceptor injected'); + return true; + })(); + """ + + try: + result = self.driver.execute_script(intercept_script) + log.info("JavaScript response interceptor injected") + return True + except Exception as e: + log.warning(f"Failed to inject interceptor: {e}") + return False + + def get_intercepted_responses(self): + """Retrieve intercepted responses from the browser""" + try: + script = """ + if (window.__interceptedResponses) { + var responses = window.__interceptedResponses.slice(); + window.__interceptedResponses = []; + return responses; + } + return []; + """ + responses = self.driver.execute_script(script) + return responses or [] + except Exception as e: + log.debug(f"Error getting intercepted responses: {e}") + return [] + + def _is_review_api(self, url: str) -> bool: + """Check if URL matches review API patterns""" + url_lower = url.lower() + return any(re.search(pattern, url_lower) for pattern in self.REVIEW_API_PATTERNS) + + def _get_response_body(self, request_id: str) -> Optional[str]: + """Get response body for a request ID using CDP""" + try: + result = self.driver.execute_cdp_cmd('Network.getResponseBody', { + 'requestId': request_id + }) + + body = result.get('body', '') + if result.get('base64Encoded'): + body = base64.b64decode(body).decode('utf-8', errors='ignore') + + return body + except Exception as e: + log.debug(f"Could not get response body for {request_id}: {e}") + return None + + def parse_reviews_from_responses(self, responses: List[Dict]) -> List[InterceptedReview]: + """ + Parse review data from captured API responses. + Google's API responses use a custom nested array format. + """ + reviews = [] + + for response in responses: + try: + body = response.get('body', '') + url = response.get('url', '') + + # Skip non-JSON responses + if not body or body.startswith(' List[InterceptedReview]: + """Parse a single response body for review data""" + reviews = [] + + # Handle batch execute format (starts with )]}' prefix) + if body.startswith(")]}'"): + body = body[4:].strip() + + try: + data = json.loads(body) + except json.JSONDecodeError: + # Try to extract JSON from the response + json_match = re.search(r'\[.*\]', body, re.DOTALL) + if json_match: + try: + data = json.loads(json_match.group()) + except: + return reviews + else: + return reviews + + # Extract reviews from nested structure + reviews.extend(self._extract_reviews_recursive(data)) + + return reviews + + def _extract_reviews_recursive(self, data: Any, depth: int = 0) -> List[InterceptedReview]: + """Recursively search for review data in nested structures""" + reviews = [] + + if depth > 20: # Prevent infinite recursion + return reviews + + if isinstance(data, dict): + # Check if this looks like a review object + review = self._try_parse_review_dict(data) + if review: + reviews.append(review) + + # Recurse into dict values + for value in data.values(): + reviews.extend(self._extract_reviews_recursive(value, depth + 1)) + + elif isinstance(data, list): + # Check if this array looks like a review array + review = self._try_parse_review_array(data) + if review: + reviews.append(review) + + # Recurse into list items + for item in data: + reviews.extend(self._extract_reviews_recursive(item, depth + 1)) + + return reviews + + def _try_parse_review_dict(self, data: Dict) -> Optional[InterceptedReview]: + """Try to parse a dictionary as a review object""" + # Common keys in review objects + review_keys = {'reviewId', 'review_id', 'author', 'rating', 'text', 'comment'} + + if not any(k in data for k in review_keys): + return None + + try: + review = InterceptedReview() + + # Try various key names for each field + review.review_id = data.get('reviewId') or data.get('review_id') or data.get('id', '') + review.author = data.get('author') or data.get('authorName') or data.get('name', '') + review.rating = float(data.get('rating') or data.get('starRating') or 0) + review.text = data.get('text') or data.get('comment') or data.get('reviewText', '') + review.date_text = data.get('publishTime') or data.get('relativePublishTime') or data.get('date', '') + review.likes = int(data.get('thumbsUpCount') or data.get('likes') or 0) + + # Photos + photos = data.get('photos') or data.get('reviewPhotos') or [] + if photos: + review.photos = [p.get('url') or p for p in photos if p] + + # Profile + author_data = data.get('author') if isinstance(data.get('author'), dict) else {} + review.profile_url = author_data.get('profileUrl') or data.get('profileUrl', '') + review.avatar_url = author_data.get('profilePhotoUrl') or data.get('avatar', '') + + # Owner response + owner_resp = data.get('ownerResponse') or data.get('ownerReply') or {} + if isinstance(owner_resp, dict): + review.owner_response = owner_resp.get('text', '') + review.owner_response_date = owner_resp.get('publishTime', '') + + # Only return if we have meaningful data + if review.review_id or (review.author and review.text): + return review + + except Exception as e: + log.debug(f"Error parsing review dict: {e}") + + return None + + def _try_parse_review_array(self, data: List) -> Optional[InterceptedReview]: + """ + Try to parse a nested array as a review (Google's protobuf-like format). + Google often uses positional arrays like: [id, author, [rating], text, ...] + """ + if not data or len(data) < 3: + return None + + try: + # Look for patterns that indicate this is a review array + # Pattern 1: [review_id, [author_info], rating_array, text, ...] + + review = InterceptedReview() + + # Check if first element looks like a review ID + if isinstance(data[0], str) and len(data[0]) > 20: + review.review_id = data[0] + + # Search for rating (usually a small number 1-5) + for item in data: + if isinstance(item, (int, float)) and 1 <= item <= 5: + review.rating = float(item) + break + elif isinstance(item, list) and len(item) >= 1: + if isinstance(item[0], (int, float)) and 1 <= item[0] <= 5: + review.rating = float(item[0]) + break + + # Search for text (long string) + for item in data: + if isinstance(item, str) and len(item) > 30: + review.text = item + break + elif isinstance(item, list): + for subitem in item: + if isinstance(subitem, str) and len(subitem) > 30: + review.text = subitem + break + + # Search for author name (shorter string) + for item in data: + if isinstance(item, list) and len(item) >= 1: + for subitem in item: + if isinstance(subitem, str) and 2 <= len(subitem) <= 100 and subitem != review.text: + review.author = subitem + break + if review.author: + break + + # Search for URLs (photos, profile) + for item in data: + if isinstance(item, str) and item.startswith('http'): + if 'googleusercontent' in item or 'ggpht' in item: + if not review.avatar_url: + review.avatar_url = item + else: + review.photos.append(item) + elif isinstance(item, list): + self._extract_urls_from_array(item, review) + + # Only return if we have meaningful data + if review.review_id and review.rating > 0: + return review + if review.text and review.rating > 0: + return review + + except Exception as e: + log.debug(f"Error parsing review array: {e}") + + return None + + def _extract_urls_from_array(self, arr: List, review: InterceptedReview, depth: int = 0): + """Extract URLs from nested arrays""" + if depth > 5: + return + + for item in arr: + if isinstance(item, str) and item.startswith('http'): + if 'googleusercontent' in item or 'ggpht' in item or 'lh3' in item: + if 'w72-h72' in item or 'p-rp-mo' in item: # Profile pic pattern + review.avatar_url = item + else: + review.photos.append(item) + elif isinstance(item, list): + self._extract_urls_from_array(item, depth + 1, review) + + def convert_to_raw_review_format(self, intercepted: InterceptedReview) -> Dict[str, Any]: + """Convert an InterceptedReview to the format used by RawReview/storage""" + return { + 'review_id': intercepted.review_id, + 'author': intercepted.author, + 'rating': intercepted.rating, + 'description': {'en': intercepted.text} if intercepted.text else {}, + 'likes': intercepted.likes, + 'user_images': intercepted.photos, + 'author_profile_url': intercepted.profile_url, + 'profile_picture': intercepted.avatar_url, + 'owner_responses': { + 'en': {'text': intercepted.owner_response} + } if intercepted.owner_response else {}, + 'review_date': intercepted.date_text, + '_source': 'api_intercept' + } + + def cleanup(self): + """Clean up interception resources""" + try: + self.driver.execute_cdp_cmd('Network.disable', {}) + except: + pass + + self.captured_responses.clear() + self.captured_reviews.clear() + self.request_map.clear() + self._listening = False diff --git a/modules/cli.py b/modules/cli.py index 6b8aef4..d05c480 100644 --- a/modules/cli.py +++ b/modules/cli.py @@ -57,6 +57,10 @@ def parse_arguments(): ap.add_argument("--custom-params", type=str, default=None, help="JSON string with custom parameters to add to each document (e.g. '{\"company\":\"Thaitours\"}')") + # API interception option + ap.add_argument("--api-intercept", action="store_true", dest="enable_api_intercept", + help="enable API response interception for faster data capture (experimental)") + args = ap.parse_args() # Handle config path diff --git a/modules/models.py b/modules/models.py index b4b64a2..b30c1ec 100644 --- a/modules/models.py +++ b/modules/models.py @@ -47,7 +47,13 @@ class RawReview: except Exception: pass + # Try to get data-review-id from the card itself, or from a child element rid = card.get_attribute("data-review-id") or "" + if not rid: + # Try to find it in a child element + review_id_elem = try_find(card, "[data-review-id]") + if review_id_elem: + rid = review_id_elem[0].get_attribute("data-review-id") or "" author = first_text(card, 'div[class*="d4r55"]') profile = first_attr(card, 'button[data-review-id]', "data-href") avatar = first_attr(card, 'button[data-review-id] img', "src") diff --git a/modules/scraper.py b/modules/scraper.py index cc20469..a2e36b4 100644 --- a/modules/scraper.py +++ b/modules/scraper.py @@ -24,16 +24,25 @@ from tqdm import tqdm from modules.data_storage import MongoDBStorage, JSONStorage, merge_review from modules.models import RawReview +from modules.api_interceptor import GoogleMapsAPIInterceptor # Logger log = logging.getLogger("scraper") -# CSS Selectors -PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' -CARD_SEL = "div[data-review-id]" +# CSS Selectors (Updated January 2026 for current Google Maps structure) +PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde' +CARD_SEL = "div.jftiEf" # Review card container +# Cookie/consent dialog selectors (Updated January 2026) COOKIE_BTN = ('button[aria-label*="Accept" i],' + 'button[aria-label*="Aceptar" i],' + 'button[aria-label*="Akzeptieren" i],' + 'button[aria-label*="Aceitar" i],' + 'button[jsname="higCR"],' # Google's "Accept all" button 'button[jsname="hZCF7e"],' - 'button[data-mdc-dialog-action="accept"]') + 'button[data-mdc-dialog-action="accept"],' + 'form[action*="consent"] button,' + 'div[role="dialog"] button[jsname],' + '.VfPpkd-LgbsSe[data-mdc-dialog-action="accept"]') SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' @@ -169,6 +178,8 @@ class GoogleReviewsScraper: self.json_storage = JSONStorage(config) self.backup_to_json = config.get("backup_to_json", True) self.overwrite_existing = config.get("overwrite_existing", False) + self.enable_api_intercept = config.get("enable_api_intercept", False) + self.api_interceptor = None # Will be initialized when driver is ready def setup_driver(self, headless: bool): """ @@ -257,32 +268,61 @@ class GoogleReviewsScraper: """ Dismiss cookie consent dialogs if present. Handles stale element references by re-finding elements if needed. + Updated January 2026 to handle current Google consent dialogs. """ - try: - # Use WebDriverWait with expected_conditions to handle stale elements - WebDriverWait(driver, 3).until( - EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) - ) - log.info("Cookie consent dialog found, attempting to dismiss") + dismissed = False - # Get elements again after waiting to avoid stale references - elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) - for elem in elements: - try: - if elem.is_displayed(): - elem.click() - log.info("Cookie dialog dismissed") - return True - except Exception as e: - log.debug(f"Error clicking cookie button: {e}") - continue - except TimeoutException: - # This is expected if no cookie dialog is present - log.debug("No cookie consent dialog detected") - except Exception as e: - log.debug(f"Error handling cookie dialog: {e}") + # Try multiple approaches to dismiss consent dialogs + consent_selectors = [ + COOKIE_BTN, + # Additional Google consent selectors + 'button[aria-label*="Accept all" i]', + 'button[aria-label*="Aceptar todo" i]', + 'button[aria-label*="Reject all" i]', # Sometimes we need to reject + 'button:has-text("Accept")', + 'button:has-text("Aceptar")', + '[role="dialog"] button:first-of-type', + 'form[action*="consent"] button:first-of-type', + ] - return False + for selector in consent_selectors: + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for elem in elements: + try: + if elem.is_displayed() and elem.is_enabled(): + # Try JavaScript click first (more reliable) + driver.execute_script("arguments[0].click();", elem) + log.info(f"Cookie/consent dialog dismissed with selector: {selector}") + time.sleep(1) # Wait for dialog to close + dismissed = True + break + except Exception as e: + log.debug(f"Error clicking consent button: {e}") + continue + if dismissed: + break + except Exception as e: + log.debug(f"Error finding consent elements with {selector}: {e}") + continue + + # Also try to find and click any visible modal close buttons + if not dismissed: + try: + close_btns = driver.find_elements(By.CSS_SELECTOR, + '[role="dialog"] button[aria-label*="close" i], ' + '[role="dialog"] button[aria-label*="cerrar" i], ' + '.modal-close, .dialog-close') + for btn in close_btns: + if btn.is_displayed(): + driver.execute_script("arguments[0].click();", btn) + log.info("Closed modal dialog") + dismissed = True + break + except Exception: + pass + + return dismissed def is_reviews_tab(self, tab: WebElement) -> bool: """ @@ -364,6 +404,10 @@ class GoogleReviewsScraper: # Define different selectors to try in order of reliability tab_selectors = [ + # Current Google Maps tab selectors (January 2026) + '.LRkQ2', # Main tab button class in current Google Maps + '.hh2c6', # Alternative tab button class + # Direct tab selectors '[data-tab-index="1"]', # Most common tab index '[role="tab"][data-tab-index]', # Any tab with index @@ -373,7 +417,6 @@ class GoogleReviewsScraper: # Common Google Maps review tab selectors '.fontTitleSmall[role="tab"]', # Google Maps title font tabs - '.hh2c6[role="tab"]', # Common Google Maps class '.m6QErb [role="tab"]', # Maps container tabs # Text-based selectors for various languages @@ -517,12 +560,14 @@ class GoogleReviewsScraper: characteristic elements that appear on the reviews page. """ try: - # Common elements that appear when reviews tab is active + # Common elements that appear when reviews tab is active (Updated January 2026) verification_selectors = [ - # Reviews container - 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf', + # Reviews container (current) + 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde', + 'div.m6QErb.WNBkOb.XiKgde', - # Review cards + # Review cards (current) + 'div.jftiEf', 'div[data-review-id]', # Sort button (usually appears with reviews) @@ -1122,6 +1167,7 @@ class GoogleReviewsScraper: seen = self.json_storage.load_seen() driver = None + api_reviews = {} # Store reviews captured from API try: driver = self.setup_driver(headless) wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout @@ -1129,7 +1175,15 @@ class GoogleReviewsScraper: driver.get(url) wait.until(lambda d: "google.com/maps" in d.current_url) - self.dismiss_cookies(driver) + # Wait for page to load and consent dialogs to appear + time.sleep(3) + + # Try to dismiss any consent/cookie dialogs + if not self.dismiss_cookies(driver): + # Wait a bit more and try again + time.sleep(2) + self.dismiss_cookies(driver) + self.click_reviews_tab(driver) # Extra wait after clicking reviews tab to ensure page loads @@ -1158,10 +1212,14 @@ class GoogleReviewsScraper: time.sleep(3) # Use try-except to handle cases where the pane is not found - # Try multiple selectors for the reviews pane + # Try multiple selectors for the reviews pane (Updated January 2026) pane = None pane_selectors = [ - PANE_SEL, # Primary selector + PANE_SEL, # Primary selector with XiKgde + 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde', # Without role="main" prefix + 'div.m6QErb.WNBkOb.XiKgde', # Alternative class combination + 'div[role="main"] div.m6QErb.XiKgde', # Simplified with XiKgde + 'div.m6QErb.DxyBCb.XiKgde', # Another variant 'div[role="main"] div.m6QErb', # Simplified version 'div.m6QErb.DxyBCb', # Even more simplified 'div[role="main"]' # Most generic @@ -1182,6 +1240,15 @@ class GoogleReviewsScraper: log.warning("Could not find reviews pane with any selector. Page structure might have changed.") return False + # Initialize API interceptor AFTER reviews page is loaded (if enabled) + # This prevents CDP interception from affecting initial page load and tab detection + if self.enable_api_intercept: + log.info("Setting up API interception for reviews capture") + self.api_interceptor = GoogleMapsAPIInterceptor(driver) + self.api_interceptor.setup_interception() + self.api_interceptor.inject_response_interceptor() + log.info("API interceptor ready - capturing network responses") + pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) idle = 0 processed_ids = set() # Track processed IDs in current session @@ -1201,9 +1268,35 @@ class GoogleReviewsScraper: last_scroll_position = 0 scroll_stuck_count = 0 + # Card selectors to try (Updated January 2026) + card_selectors = [ + CARD_SEL, # Primary: div.jftiEf + "div[data-review-id]", # Alternative: direct data-review-id + ".jftiEf", # Without div prefix + "div.WMbnJf", # Another common review card class + "[data-review-id]", # Any element with review ID + ] + while attempts < max_attempts: try: - cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) + # Try multiple card selectors within the pane + cards = [] + for card_sel in card_selectors: + cards = pane.find_elements(By.CSS_SELECTOR, card_sel) + if cards: + if attempts == 0: # Only log once + log.info(f"Found {len(cards)} cards with selector: {card_sel}") + break + + # If no cards found in pane, try searching the entire document + if not cards: + for card_sel in card_selectors: + cards = driver.find_elements(By.CSS_SELECTOR, card_sel) + if cards: + if attempts == 0: + log.info(f"Found {len(cards)} cards in document with selector: {card_sel}") + break + fresh_cards: List[WebElement] = [] # Check for valid cards @@ -1228,7 +1321,15 @@ class GoogleReviewsScraper: for c in cards: try: + # Try to get data-review-id from the card itself cid = c.get_attribute("data-review-id") + # If not found on card, try to find it in a child element + if not cid: + try: + review_id_elem = c.find_element(By.CSS_SELECTOR, "[data-review-id]") + cid = review_id_elem.get_attribute("data-review-id") + except: + pass if not cid or cid in seen or cid in processed_ids: if stop_on_match and cid and (cid in seen or cid in processed_ids): idle = 999 @@ -1314,6 +1415,20 @@ class GoogleReviewsScraper: # Try a simpler scroll method driver.execute_script("window.scrollBy(0, 300);") + # Collect API responses if interception is enabled + if self.enable_api_intercept and self.api_interceptor: + try: + responses = self.api_interceptor.get_intercepted_responses() + if responses: + parsed = self.api_interceptor.parse_reviews_from_responses(responses) + for intercepted in parsed: + if intercepted.review_id and intercepted.review_id not in api_reviews: + api_reviews[intercepted.review_id] = self.api_interceptor.convert_to_raw_review_format(intercepted) + if parsed: + log.debug(f"API interceptor captured {len(parsed)} reviews (total unique: {len(api_reviews)})") + except Exception as api_err: + log.debug(f"API interception error: {api_err}") + # Dynamic sleep: sleep less when processing many reviews, more when finding none if len(fresh_cards) > 5: sleep_time = 0.7 @@ -1339,6 +1454,23 @@ class GoogleReviewsScraper: pbar.close() + # Merge API-captured reviews if any + if self.enable_api_intercept and api_reviews: + log.info(f"Merging {len(api_reviews)} reviews captured via API interception") + for review_id, api_review in api_reviews.items(): + if review_id not in docs: + # New review from API only + docs[review_id] = api_review + seen.add(review_id) + else: + # Merge API data with existing DOM data (API might have more details) + existing = docs[review_id] + # Only update fields that are missing or empty + for key, value in api_review.items(): + if key not in existing or not existing.get(key): + existing[key] = value + log.info(f"After merge: {len(docs)} total reviews") + # Save to MongoDB if enabled if self.use_mongodb and self.mongodb: log.info("Saving reviews to MongoDB...") @@ -1364,6 +1496,13 @@ class GoogleReviewsScraper: return False finally: + # Cleanup API interceptor + if self.api_interceptor: + try: + self.api_interceptor.cleanup() + except Exception: + pass + if driver is not None: try: driver.quit() diff --git a/start.py b/start.py index e0a070d..87cc4bf 100644 --- a/start.py +++ b/start.py @@ -64,6 +64,10 @@ def main(): # Update config with the provided custom parameters config["custom_params"].update(args.custom_params) + # Handle API interception option + if args.enable_api_intercept: + config["enable_api_intercept"] = True + # Initialize and run scraper scraper = GoogleReviewsScraper(config) scraper.scrape()