diff --git a/modules/scraper.py b/modules/scraper.py index 72267e0..d2c20be 100644 --- a/modules/scraper.py +++ b/modules/scraper.py @@ -9,7 +9,8 @@ import platform import re import time import traceback -from typing import Dict, Any, List +import threading +from typing import Dict, Any, List, Optional, Tuple from seleniumbase import Driver from selenium.common.exceptions import TimeoutException, StaleElementReferenceException @@ -294,7 +295,7 @@ class GoogleReviewsScraper: # Try JavaScript click first (more reliable) driver.execute_script("arguments[0].click();", elem) log.info(f"Cookie/consent dialog dismissed with selector: {selector}") - time.sleep(1) # Wait for dialog to close + time.sleep(0.3) # Reduced from 1s to 0.3s dismissed = True break except Exception as e: @@ -395,196 +396,154 @@ class GoogleReviewsScraper: def click_reviews_tab(self, driver: Chrome): """ - Highly dynamic reviews tab detection and clicking with multiple fallback strategies. - Works across different languages, layouts, and browser environments. + Navigate to reviews section by clicking the Reviews tab/button on the page. + Uses text-based detection (what humans see) as primary method for robustness. """ - max_timeout = 25 # Maximum seconds to try + current_url = driver.current_url + + # PRIMARY METHOD: Look for text-based "Reviews" button/tab (what humans see) + log.info("Trying to find Reviews tab by visible text...") + max_timeout = 15 end_time = time.time() + max_timeout - attempts = 0 - # Define different selectors to try in order of reliability - tab_selectors = [ - # Current Google Maps tab selectors (January 2026) - '.LRkQ2', # Main tab button class in current Google Maps - '.hh2c6', # Alternative tab button class - - # Direct tab selectors - '[data-tab-index="1"]', # Most common tab index - '[role="tab"][data-tab-index]', # Any tab with index - 'button[role="tab"]', # Button tabs - 'div[role="tab"]', # Div tabs - 'a[role="tab"]', # Link tabs - - # Common Google Maps review tab selectors - '.fontTitleSmall[role="tab"]', # Google Maps title font tabs - '.m6QErb [role="tab"]', # Maps container tabs - - # Text-based selectors for various languages - 'button:contains("reviews")', # Button containing "reviews" - 'div[role="tablist"] > *', # Any tab in a tab list - 'div.m6QErb div[role="tablist"] > *', # Google Maps specific tablist - ] - - # Record successful clicks for debugging - successful_method = None - successful_selector = None - - # Try each selector in turn - for selector in tab_selectors: + for language_keyword in REVIEW_WORDS: if time.time() > end_time: break try: - elements = driver.find_elements(By.CSS_SELECTOR, selector) - if not elements: - continue + # Try XPath that finds elements containing the text (case-insensitive) + # This includes divs with aria-hidden="true" that contain "Reviews" + xpath = f"//*[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]" + elements = driver.find_elements(By.XPATH, xpath) - # Try each element found with this selector for element in elements: - attempts += 1 - - # First check if this is actually a reviews tab - if not self.is_reviews_tab(element): - continue - - # Found a reviews tab, attempt to click it with multiple methods - log.info(f"Found potential reviews tab ({selector}): '{element.text}', attempting to click") - - # Ensure visibility - driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", element) - time.sleep(0.7) # Wait for scroll - - # Try different click methods in order of reliability - click_methods = [ - # Method 1: JavaScript click (most reliable) - lambda: driver.execute_script("arguments[0].click();", element), - - # Method 2: Direct click - lambda: element.click(), - - # Method 3: ActionChains click - lambda: ActionChains(driver).move_to_element(element).click().perform(), - - # Method 4: Send RETURN key - lambda: element.send_keys(Keys.RETURN), - - # Method 5: Center click with ActionChains - lambda: ActionChains(driver).move_to_element_with_offset( - element, element.size['width'] // 2, element.size['height'] // 2).click().perform(), - ] - - # Try each click method - for i, click_method in enumerate(click_methods): - try: - click_method() - time.sleep(1.5) # Wait for click to take effect - - # Verify if click worked (check for new content) - if self.verify_reviews_tab_clicked(driver): - successful_method = i + 1 - successful_selector = selector - log.info( - f"Successfully clicked reviews tab using method {i + 1} and selector '{selector}'") - return True - except Exception as click_error: - log.debug(f"Click method {i + 1} failed: {click_error}") + try: + element_text = (element.text or '').strip() + if not element_text or len(element_text) > 50: continue - except Exception as selector_error: - log.debug(f"Error with selector '{selector}': {selector_error}") + tag_name = element.tag_name.lower() + role = element.get_attribute('role') or '' + aria_hidden = element.get_attribute('aria-hidden') + + # If this is a div with aria-hidden="true" containing "Reviews", + # try to click its parent button/clickable element + if tag_name == 'div' and aria_hidden == 'true': + log.info(f"Found aria-hidden div with text: '{element_text}', looking for clickable parent") + # Try parent element + try: + parent = driver.execute_script("return arguments[0].parentElement;", element) + parent_tag = parent.tag_name.lower() if parent else '' + parent_role = parent.get_attribute('role') if parent else '' + + if parent and (parent_tag in ['button', 'a'] or 'tab' in parent_role or 'button' in parent_role): + log.info(f"Found clickable parent: {parent_tag} with role={parent_role}") + driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", parent) + time.sleep(0.5) + driver.execute_script("arguments[0].click();", parent) + time.sleep(3) + + if self.verify_reviews_tab_clicked(driver): + log.info(f"✅ Successfully clicked Reviews via aria-hidden parent") + return True + except: + pass + + # Try clicking the element directly if it's clickable + elif tag_name in ['button', 'a'] or 'tab' in role or 'button' in role: + log.info(f"Found clickable Reviews element: '{element_text}' (tag: {tag_name}, role: {role})") + + driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", element) + time.sleep(0.5) + driver.execute_script("arguments[0].click();", element) + time.sleep(3) + + if self.verify_reviews_tab_clicked(driver): + log.info(f"✅ Successfully clicked Reviews via text: '{element_text}'") + return True + except: + continue + except: continue - # If we reach here, try XPath as a last resort - if time.time() <= end_time: - for language_keyword in REVIEW_WORDS: - try: - # Try XPath contains text - xpath = f"//*[contains(text(), '{language_keyword}')]" - elements = driver.find_elements(By.XPATH, xpath) - - for element in elements: - try: - log.info(f"Trying XPath with keyword '{language_keyword}'") - driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element) - time.sleep(0.7) - driver.execute_script("arguments[0].click();", element) - time.sleep(1.5) - - if self.verify_reviews_tab_clicked(driver): - log.info(f"Successfully clicked element with keyword '{language_keyword}'") - return True - except: - continue - except: - continue - - # Final attempt: try to navigate directly to reviews by URL + # FALLBACK METHOD: Find aria-hidden divs with exact text "Reviews" (or language variants) + log.info("Trying aria-hidden div detection as fallback...") try: - current_url = driver.current_url - if "?hl=" in current_url: # Preserve language setting if present - lang_param = re.search(r'\?hl=([^&]*)', current_url) - if lang_param: - lang_code = lang_param.group(1) - # Try to replace the current part with 'reviews' or append it - if '/place/' in current_url: - parts = current_url.split('/place/') - new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews?hl={lang_code}" - driver.get(new_url) - time.sleep(3) # Increased wait time for page load - if "review" in driver.current_url.lower(): - log.info("Navigated directly to reviews page via URL") - # Extra wait for reviews to render after URL navigation - time.sleep(2) - return True + # Look for divs with aria-hidden="true" that contain ONLY the review word (no extra text) + divs = driver.find_elements(By.CSS_SELECTOR, 'div[aria-hidden="true"]') - # Try to identify reviews link in URL - if '/place/' in current_url and '/reviews' not in current_url: - parts = current_url.split('/place/') - new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews" - driver.get(new_url) - time.sleep(3) # Increased wait time for page load - if "review" in driver.current_url.lower(): - log.info("Navigated directly to reviews page via URL") - # Extra wait for reviews to render after URL navigation - time.sleep(2) - return True - except Exception as url_error: - log.warning(f"Failed to navigate to reviews via URL: {url_error}") + for div in divs: + div_text = (div.text or '').strip() - log.warning(f"Failed to find/click reviews tab after {attempts} attempts") - raise TimeoutException("Reviews tab not found or could not be clicked") + # Check if this div contains ONLY a review keyword (exact match, case-insensitive) + for keyword in REVIEW_WORDS: + if div_text.lower() == keyword.lower(): + log.info(f"Found aria-hidden div with exact text: '{div_text}'") + + # Get the parent element (should be the clickable tab/button) + try: + parent = driver.execute_script("return arguments[0].parentElement;", div) + if parent: + parent_tag = parent.tag_name.lower() + parent_role = parent.get_attribute('role') or '' + + log.info(f"Parent element: tag={parent_tag}, role={parent_role}") + + # Click the parent if it looks clickable + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", parent) + time.sleep(0.5) + driver.execute_script("arguments[0].click();", parent) + time.sleep(2) + + if self.verify_reviews_tab_clicked(driver): + log.info(f"✅ Successfully clicked Reviews via aria-hidden fallback") + return True + except Exception as e: + log.debug(f"Error clicking parent of aria-hidden div: {e}") + continue + except Exception as e: + log.debug(f"Error in aria-hidden fallback: {e}") + + # If all methods failed + log.warning("Failed to navigate to reviews after trying all methods") + raise TimeoutException("Could not navigate to reviews section") def verify_reviews_tab_clicked(self, driver: Chrome) -> bool: """ - Verify that the reviews tab was successfully clicked by checking for - characteristic elements that appear on the reviews page. + Verify that the reviews tab was successfully clicked. + Uses robust verification methods that don't depend on fragile CSS classes. """ try: - # Common elements that appear when reviews tab is active (Updated January 2026) - verification_selectors = [ - # Reviews container (current) - 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde', - 'div.m6QErb.WNBkOb.XiKgde', + # METHOD 1: Check for text-based indicators (most robust) + # Look for common review-related text that appears regardless of CSS changes + page_text = driver.page_source.lower() - # Review cards (current) - 'div.jftiEf', - 'div[data-review-id]', - - # Sort button (usually appears with reviews) - 'button[aria-label*="Sort" i]', - - # Review rating elements - 'span[role="img"][aria-label*="star" i]', - - # Other indicators - 'div.m6QErb div.jftiEf', - '.HlvSq' + # These text patterns appear when reviews section is active + review_indicators = [ + 'sort reviews', + 'most relevant', + 'newest', + 'highest rating', + 'lowest rating', ] - # Check if any verification selector is present - for selector in verification_selectors: + for indicator in review_indicators: + if indicator in page_text: + log.debug(f"Found review indicator: '{indicator}'") + return True + + # METHOD 2: Check for semantic attributes (stable) + # Look for elements with review-specific attributes + semantic_selectors = [ + 'div[data-review-id]', # Review cards have data-review-id + 'button[aria-label*="Sort" i]', # Sort button + 'span[role="img"][aria-label*="star" i]', # Star ratings + ] + + for selector in semantic_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements and len(elements) > 0: + log.debug(f"Found semantic element: {selector}") return True # URL check - if "review" appears in the URL @@ -608,39 +567,34 @@ class GoogleReviewsScraper: log.info(f"Attempting to set sort order to '{method}'") try: - # 1. Find and click the sort button - sort_button_selectors = [ - # Exact selectors based on recent HTML structure - 'button.HQzyZ[aria-haspopup="true"]', - 'div.m6QErb button.HQzyZ', - 'button[jsaction*="pane.wfvdle84"]', - 'div.fontBodyLarge.k5lwKb', # The text element inside sort button + # 1. Find and click the sort button using ROBUST TEXT-BASED DETECTION + # Multi-language sort button keywords (what humans see) + sort_keywords = { + 'en': ['sort', 'Sort', 'SORT'], + 'he': ['סדר', 'סידור'], + 'th': ['เรียง'], + 'zh': ['排序'], + 'fr': ['trier', 'Trier'], + 'es': ['ordenar', 'Ordenar'], + 'de': ['sortieren', 'Sortieren'], + 'pt': ['Classificar'], + 'it': ['Ordina'], + 'ru': ['Сортировать'] + } - # Common attribute-based selectors - 'button[aria-label*="Sort" i]', - 'button[aria-label*="sort" i]', - 'button[aria-expanded="false"][aria-haspopup="true"]', + # Flatten all keywords + all_sort_keywords = [kw for keywords in sort_keywords.values() for kw in keywords] - # Multilingual selectors - 'button[aria-label*="סדר" i]', # Hebrew - 'button[aria-label*="เรียง" i]', # Thai - 'button[aria-label*="排序" i]', # Chinese - 'button[aria-label*="Trier" i]', # French - 'button[aria-label*="Ordenar" i]', # Spanish/Portuguese - 'button[aria-label*="Sortieren" i]', # German - - # Parent container-based selectors - 'div.m6QErb.Hk4XGb.XiKgde.tLjsW button', - 'div.m6QErb div.XiKgde button' - ] - - # Attempt to find the sort button + # PRIMARY METHOD: Find buttons by text or aria-label (robust) sort_button = None + log.info("Looking for sort button using text-based detection...") - # Try each selector - for selector in sort_button_selectors: + for keyword in all_sort_keywords: try: - elements = driver.find_elements(By.CSS_SELECTOR, selector) + # XPath to find buttons containing the keyword (case-sensitive for non-English) + xpath = f"//button[contains(text(), '{keyword}') or contains(@aria-label, '{keyword}')]" + elements = driver.find_elements(By.XPATH, xpath) + for element in elements: try: # Skip invisible/disabled elements @@ -650,108 +604,53 @@ class GoogleReviewsScraper: # Get button text and attributes for verification button_text = element.text.strip() if element.text else "" button_aria = element.get_attribute("aria-label") or "" - button_class = element.get_attribute("class") or "" # Skip buttons that are clearly not sort buttons negative_keywords = ["back", "next", "previous", "close", "cancel", "חזרה", "סגור", "ปิด"] - if any(keyword in button_text.lower() or keyword in button_aria.lower() - for keyword in negative_keywords): + if any(neg in button_text.lower() or neg in button_aria.lower() for neg in negative_keywords): continue - # Positive detection for sort buttons - sort_keywords = ["sort", "Sort", "SORT", "סידור", "เรียง", "排序", "trier", "ordenar", "sortieren"] - has_sort_keyword = any(keyword in button_text or keyword in button_aria - for keyword in sort_keywords) - - # Check for common sort button classes - has_sort_class = "HQzyZ" in button_class or "sort" in button_class.lower() - - # Check for aria attributes that indicate a dropdown - has_dropdown_attrs = (element.get_attribute("aria-haspopup") == "true" or - element.get_attribute("aria-expanded") is not None) + # Verify it has dropdown attributes (sort buttons are typically dropdowns) + has_dropdown = (element.get_attribute("aria-haspopup") == "true" or + element.get_attribute("aria-expanded") is not None) - if has_sort_keyword or has_sort_class or has_dropdown_attrs: - # Found a potential sort button + if has_dropdown or keyword in button_text or keyword in button_aria: sort_button = element - log.info(f"Found sort button with selector: {selector}") - log.info(f"Button text: '{button_text}', aria-label: '{button_aria}'") + log.info(f"✅ Found sort button with text: '{button_text}' or aria-label: '{button_aria}'") break + except Exception as e: log.debug(f"Error checking element: {e}") continue if sort_button: break + except Exception as e: - log.debug(f"Error with selector '{selector}': {e}") + log.debug(f"Error with keyword '{keyword}': {e}") continue - # If no button found with CSS selectors, try finding it from its container + # FALLBACK METHOD: Find any button with dropdown attributes near review content if not sort_button: + log.info("Trying fallback: finding buttons with dropdown attributes...") try: - # Look for the sort container by its distinctive classes - containers = driver.find_elements(By.CSS_SELECTOR, 'div.m6QErb.Hk4XGb, div.XiKgde.tLjsW') - for container in containers: - try: - # Find buttons within this container - buttons = container.find_elements(By.TAG_NAME, 'button') - for button in buttons: - if button.is_displayed() and button.is_enabled(): - sort_button = button - log.info("Found sort button through container element") - break - except: - continue - if sort_button: - break - except Exception as e: - log.debug(f"Error finding button via container: {e}") + buttons = driver.find_elements(By.CSS_SELECTOR, 'button[aria-haspopup="true"]') - # If still no button found, try XPath approach with keywords - if not sort_button: - xpath_terms = ["sort", "Sort", "סדר", "סידור", "เรียง", "排序", "Trier", "Ordenar", "Sortieren"] - for term in xpath_terms: - try: - xpath = f"//*[contains(text(), '{term}') or contains(@aria-label, '{term}')]" - elements = driver.find_elements(By.XPATH, xpath) - for element in elements: - try: - if element.is_displayed() and element.is_enabled(): - sort_button = element - log.info(f"Found sort button with XPath term: '{term}'") - break - except: - continue - if sort_button: - break - except: - continue - - # Final fallback: look for any button in the reviews area that might open a dropdown - if not sort_button: - try: - # Look specifically in the reviews container area - reviews_container = driver.find_elements(By.CSS_SELECTOR, 'div.m6QErb, div.DxyBCb') - for container in reviews_container: - try: - # Find all buttons in this container - buttons = container.find_elements(By.TAG_NAME, 'button') - for button in buttons: - try: - if (button.is_displayed() and button.is_enabled() and - (button.get_attribute("aria-haspopup") == "true" or - "dropdown" in (button.get_attribute("class") or "").lower())): - sort_button = button - log.info("Found potential sort button via fallback dropdown detection") - break - except: - continue - if sort_button: - break - except: + for button in buttons: + if not button.is_displayed() or not button.is_enabled(): continue + + button_text = (button.text or '').strip().lower() + button_aria = (button.get_attribute("aria-label") or '').lower() + + # Look for any sort-related keywords + if any(kw.lower() in button_text or kw.lower() in button_aria for kw in all_sort_keywords): + sort_button = button + log.info(f"✅ Found sort button via fallback: {button.text}") + break + except Exception as e: - log.debug(f"Error in fallback sort button detection: {e}") + log.debug(f"Error in fallback method: {e}") # Final check - do we have a sort button? if not sort_button: @@ -819,30 +718,18 @@ class GoogleReviewsScraper: return False # 3. Find and click the desired sort option in the menu - - # Selectors for menu items with focus on the exact HTML structure - menu_item_selectors = [ - # Exact Google Maps menu item selectors - 'div[role="menuitemradio"]', - 'div.fxNQSd[role="menuitemradio"]', - 'div[role="menuitemradio"] div.mLuXec', # Inner text container - - # Generic menu item selectors (fallback) - '[role="menuitemradio"]', - '[role="menuitem"]', - 'div[role="menu"] > div' - ] - - # Combined selector for efficiency - combined_selector = ", ".join(menu_item_selectors) + # Uses ROBUST SEMANTIC SELECTORS (role attributes), not CSS classes try: - # Wait for menu items to appear + # PRIMARY METHOD: Find menu items by role attribute (semantic, stable) + # menuitemradio is the standard role for radio menu items + log.info("Looking for menu items using semantic role attributes...") + menu_items = WebDriverWait(driver, 5).until( - EC.presence_of_all_elements_located((By.CSS_SELECTOR, combined_selector)) + EC.presence_of_all_elements_located((By.CSS_SELECTOR, '[role="menuitemradio"], [role="menuitem"]')) ) - # Process menu items to find matches + # Process menu items to extract text visible_items = [] for item in menu_items: @@ -851,39 +738,16 @@ class GoogleReviewsScraper: if not item.is_displayed(): continue - # Handle different element types - if item.get_attribute('role') == 'menuitemradio': - # This is a top-level menu item - try: - # Try to find text in the inner div.mLuXec element first - text_elements = item.find_elements(By.CSS_SELECTOR, 'div.mLuXec') - if text_elements and text_elements[0].is_displayed(): - text = text_elements[0].text.strip() - visible_items.append((item, text)) - else: - # Fall back to the item's own text - text = item.text.strip() - visible_items.append((item, text)) - except: - # Last resort - use the item's own text - text = item.text.strip() - visible_items.append((item, text)) - elif 'mLuXec' in (item.get_attribute('class') or ''): - # This is the text container element - get its parent menuitemradio - try: - text = item.text.strip() - parent = driver.execute_script( - "return arguments[0].closest('[role=\"menuitemradio\"]');", - item - ) - if parent: - visible_items.append((parent, text)) - except: - continue - else: - # Generic menu item handling - text = item.text.strip() + # Get the menu item text + # Try innerText first (most reliable), then textContent, then .text + text = driver.execute_script(""" + const elem = arguments[0]; + return elem.innerText || elem.textContent || elem.text || ''; + """, item).strip() + + if text: # Only add items with text visible_items.append((item, text)) + except Exception as e: log.debug(f"Error processing menu item: {e}") continue @@ -1132,6 +996,109 @@ class GoogleReviewsScraper: log.debug(f"Error checking menu state: {e}") return False + def wait_for_api_response(self, driver: Chrome, timeout: float = 2.0) -> bool: + """ + Smart wait that detects when new API response has arrived. + Much faster and more reliable than fixed time.sleep(). + + Returns True if new response detected, False if timeout. + """ + if not self.enable_api_intercept or not self.api_interceptor: + # Fallback to fixed wait if API interception disabled + time.sleep(0.6) + return False + + try: + # Get current response count + initial_count = driver.execute_script(""" + return (window.__allRequests || []).filter(r => + r.url && r.url.toLowerCase().includes('listugcposts') + ).length; + """) + + # Wait for new response with timeout + start = time.time() + while (time.time() - start) < timeout: + current_count = driver.execute_script(""" + return (window.__allRequests || []).filter(r => + r.url && r.url.toLowerCase().includes('listugcposts') + ).length; + """) + + if current_count > initial_count: + # New API response arrived! + elapsed = time.time() - start + log.debug(f"New API response detected after {elapsed:.2f}s") + time.sleep(0.2) # Small delay for DOM to update + return True + + time.sleep(0.05) # Check every 50ms + + # Timeout - no new response + log.debug(f"No API response after {timeout}s (might be at end of reviews)") + return False + + except Exception as e: + log.debug(f"Error waiting for API response: {e}") + time.sleep(0.6) # Fallback to fixed wait + return False + + def extract_total_reviews(self, driver: Chrome) -> Tuple[Optional[int], Optional[str]]: + """ + Extract total review count from Google Maps page. + Looks for patterns like "247 reviews", "1,234 reviews", or "5.2K reviews". + + Returns: + tuple: (total_count: int, count_string: str) or (None, None) if not found + """ + try: + # Method 1: Look for "XXX reviews" text in the page source + page_text = driver.page_source + + # Pattern: "244 reviews" or "1,234 reviews" or "5.2K reviews" + patterns = [ + r'(\d{1,3}(?:,\d{3})*)\s+reviews?', # "244 reviews" or "1,234 reviews" + r'(\d+\.?\d*K)\s+reviews?', # "5.2K reviews" + r'(\d{1,3}(?:,\d{3})*)\s+reseñas?', # Spanish + r'(\d{1,3}(?:,\d{3})*)\s+评论', # Chinese + ] + + for pattern in patterns: + matches = re.findall(pattern, page_text, re.IGNORECASE) + if matches: + count_str = matches[0] + + # Parse the count + if 'K' in count_str or 'k' in count_str: + # "5.2K" -> 5200 + num = float(count_str.replace('K', '').replace('k', '')) + total = int(num * 1000) + else: + # "1,234" -> 1234 + total = int(count_str.replace(',', '')) + + return total, count_str + + # Method 2: Look for aria-label with review count + buttons = driver.find_elements(By.TAG_NAME, 'button') + for btn in buttons: + aria_label = btn.get_attribute('aria-label') or '' + text = btn.text or '' + + # Check both aria-label and text + for content in [aria_label, text]: + match = re.search(r'(\d{1,3}(?:,\d{3})*)\s+reviews?', content, re.IGNORECASE) + if match: + count_str = match.group(1) + total = int(count_str.replace(',', '')) + return total, count_str + + return None, None + + except Exception as e: + log.debug(f"Error extracting total review count: {e}") + return None, None + def scrape(self): """Main scraper method""" start_time = time.time() @@ -1175,20 +1142,20 @@ class GoogleReviewsScraper: driver.get(url) wait.until(lambda d: "google.com/maps" in d.current_url) - # Wait for page to load and consent dialogs to appear - time.sleep(3) + # Wait briefly for consent dialogs to appear (optimized from 3s to 1s) + time.sleep(1) # Try to dismiss any consent/cookie dialogs if not self.dismiss_cookies(driver): - # Wait a bit more and try again - time.sleep(2) + # Quick retry (optimized from 2s to 0.5s) + time.sleep(0.5) self.dismiss_cookies(driver) self.click_reviews_tab(driver) - # Extra wait after clicking reviews tab to ensure page loads + # Reduced wait after clicking reviews tab (optimized from 3s to 1s) log.info("Waiting for reviews page to fully load...") - time.sleep(3) + time.sleep(1) # Wait for page to be fully interactive try: @@ -1197,6 +1164,14 @@ class GoogleReviewsScraper: except: log.debug("Could not verify page ready state") + # Extract total review count from the page + total_reviews, total_str = self.extract_total_reviews(driver) + if total_reviews: + log.info(f"✅ Google shows {total_str} ({total_reviews} total reviews)") + else: + log.warning("⚠️ Could not extract total review count - will scroll until no new reviews") + total_reviews = None + # Verify we're on a reviews page before proceeding if "review" not in driver.current_url.lower(): log.warning("URL doesn't contain 'review' - might not be on reviews page") @@ -1207,37 +1182,71 @@ class GoogleReviewsScraper: except Exception as sort_error: log.warning(f"Sort failed but continuing: {sort_error}") - # Add a longer wait after setting sort to allow results to load + # Reduced wait after setting sort (optimized from 3s to 1s) log.info("Waiting for reviews to render...") - time.sleep(3) + time.sleep(1) - # Use try-except to handle cases where the pane is not found - # Try multiple selectors for the reviews pane (Updated January 2026) + # Find the scrollable reviews pane using robust detection + # Uses JavaScript to find elements by their scrollable properties, not CSS classes pane = None - pane_selectors = [ - PANE_SEL, # Primary selector with XiKgde - 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde', # Without role="main" prefix - 'div.m6QErb.WNBkOb.XiKgde', # Alternative class combination - 'div[role="main"] div.m6QErb.XiKgde', # Simplified with XiKgde - 'div.m6QErb.DxyBCb.XiKgde', # Another variant - 'div[role="main"] div.m6QErb', # Simplified version - 'div.m6QErb.DxyBCb', # Even more simplified - 'div[role="main"]' # Most generic - ] - for selector in pane_selectors: + try: + log.info("Finding scrollable reviews pane using robust detection...") + + # JavaScript to find scrollable container (no CSS classes needed!) + find_scrollable_script = """ + function findScrollablePane() { + // Find all divs that might be scrollable + const allDivs = document.querySelectorAll('div'); + + for (let div of allDivs) { + const style = window.getComputedStyle(div); + const overflowY = style.overflowY; + + // Check if element is scrollable + if ((overflowY === 'auto' || overflowY === 'scroll') && + div.scrollHeight > div.clientHeight && + div.clientHeight > 200) { // Must be tall enough to be main pane + + // Additional checks: should contain review-like content + const text = div.textContent || ''; + const hasReviewIndicators = + text.includes('star') || + text.includes('rating') || + text.includes('review') || + div.querySelector('[data-review-id]') || + div.querySelector('[role="img"][aria-label*="star"]'); + + if (hasReviewIndicators) { + return div; + } + } + } + + // Fallback: return main element if found + return document.querySelector('[role="main"]'); + } + return findScrollablePane(); + """ + + pane = driver.execute_script(find_scrollable_script) + + if pane: + log.info("✅ Found scrollable reviews pane using robust JavaScript detection") + else: + log.warning("❌ Could not find scrollable reviews pane") + + except Exception as e: + log.warning(f"Error finding scrollable pane with JavaScript: {e}") + # Fallback to simple div[role="main"] if JS fails try: - log.info(f"Trying to find reviews pane with selector: {selector}") - pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, selector))) - if pane: - log.info(f"Found reviews pane with selector: {selector}") - break - except TimeoutException: - log.debug(f"Pane not found with selector: {selector}") - continue + pane = driver.find_element(By.CSS_SELECTOR, 'div[role="main"]') + log.info("Using fallback: div[role='main']") + except: + pass if not pane: - log.warning("Could not find reviews pane with any selector. Page structure might have changed.") + log.error("Could not find reviews pane. Page structure might have changed.") return False # Initialize API interceptor AFTER reviews page is loaded (if enabled) @@ -1261,30 +1270,62 @@ class GoogleReviewsScraper: log.warning(f"Error setting up scroll script: {e}") scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling - max_attempts = 50 # Increased from 10 to 50 for very patient scrolling - attempts = 0 - max_idle = 15 # Increased from 3 to 15 - much more patience for lazy-loaded reviews - consecutive_no_cards = 0 # Track how many times we find zero cards - last_scroll_position = 0 - scroll_stuck_count = 0 - - # Card selectors to try (Updated January 2026) + # Card selectors to try (ROBUST - semantic attributes only, no CSS classes!) + # Only use data-review-id attribute which is stable and won't break with Google updates card_selectors = [ - CARD_SEL, # Primary: div.jftiEf - "div[data-review-id]", # Alternative: direct data-review-id - ".jftiEf", # Without div prefix - "div.WMbnJf", # Another common review card class - "[data-review-id]", # Any element with review ID + "[data-review-id]", # PRIMARY: Any element with review ID (most robust) + "div[data-review-id]", # Fallback: Div with review ID ] + # REMOVED FRAGILE CSS CLASS SELECTORS: + # - CARD_SEL (div.jftiEf) - Google's obfuscated class, breaks on updates + # - .jftiEf - Same as above + # - div.WMbnJf - Another obfuscated class + # We now rely on semantic [data-review-id] attribute + API interceptor + + # CONTINUOUS SCROLLING APPROACH + # Scroll NON-STOP in background thread while extracting reviews in main thread + stop_scrolling = threading.Event() + scroll_count = [0] # Use list to make it mutable in thread + load_times = [] # Track when new reviews are loaded for smart timeout + + def continuous_scroll_worker(): + """Background thread that scrolls continuously without stopping""" + while not stop_scrolling.is_set(): + try: + driver.execute_script(scroll_script) + scroll_count[0] += 1 + time.sleep(0.005) # 5ms = ultra fast continuous scrolling! + except: + pass + + # Start continuous scrolling thread + scroll_thread = threading.Thread(target=continuous_scroll_worker, daemon=True) + scroll_thread.start() + log.info("🚀 Started continuous NON-STOP scrolling thread") + + check_num = 0 + max_checks = 100 # Maximum safety limit + + while check_num < max_checks: + check_num += 1 + + # Check if we've collected all reviews + if total_reviews and len(seen) >= total_reviews: + percent = (len(seen) / total_reviews) * 100 + log.info(f"✅ Got all {total_reviews} reviews ({percent:.1f}%)! Stopping scrolling.") + stop_scrolling.set() + break + + # Wait between checks while scrolling continues in background + time.sleep(2.0) # Check every 2 seconds - while attempts < max_attempts: try: # Try multiple card selectors within the pane cards = [] for card_sel in card_selectors: cards = pane.find_elements(By.CSS_SELECTOR, card_sel) if cards: - if attempts == 0: # Only log once + if check_num == 1: # Only log once log.info(f"Found {len(cards)} cards with selector: {card_sel}") break @@ -1293,31 +1334,12 @@ class GoogleReviewsScraper: for card_sel in card_selectors: cards = driver.find_elements(By.CSS_SELECTOR, card_sel) if cards: - if attempts == 0: + if check_num == 1: log.info(f"Found {len(cards)} cards in document with selector: {card_sel}") break fresh_cards: List[WebElement] = [] - - # Check for valid cards - if len(cards) == 0: - consecutive_no_cards += 1 - log.info(f"No review cards found in this iteration (consecutive: {consecutive_no_cards})") - - # If we keep finding no cards, might have hit the end - if consecutive_no_cards > 5: - log.warning("No cards found for 5+ iterations - might be at end of reviews") - break - - attempts += 1 - # Try aggressive scrolling - driver.execute_script(scroll_script) - time.sleep(1) - driver.execute_script("window.scrollBy(0, 1000);") # Extra scroll - time.sleep(1.5) - continue - else: - consecutive_no_cards = 0 # Reset counter when we find cards + previous_count = len(seen) for c in cards: try: @@ -1342,10 +1364,11 @@ class GoogleReviewsScraper: log.debug(f"Error getting review ID: {e}") continue + # Process fresh cards for card in fresh_cards: try: raw = RawReview.from_card(card) - processed_ids.add(raw.id) # Track this ID to avoid re-processing + processed_ids.add(raw.id) except StaleElementReferenceException: continue except Exception: @@ -1361,59 +1384,67 @@ class GoogleReviewsScraper: docs[raw.id] = merge_review(docs.get(raw.id), raw) seen.add(raw.id) pbar.update(1) - idle = 0 - attempts = 0 # Reset attempts counter when we successfully process a review - if idle >= max_idle: - log.info(f"Stopping: No new reviews found after {max_idle} scroll attempts") - break + # Calculate how many new reviews we got + new_count = len(seen) - previous_count - if not fresh_cards: - idle += 1 - attempts += 1 - log.info(f"No new reviews in this iteration (idle: {idle}/{max_idle}, attempts: {attempts}/{max_attempts}, total seen: {len(seen)})") + # Track load times for smart timeout + if new_count > 0: + current_time = time.time() + load_times.append(current_time) - # When no new reviews, scroll more aggressively - try: - # Try multiple scroll methods - driver.execute_script(scroll_script) - time.sleep(0.5) - driver.execute_script("window.scrollBy(0, 500);") # Extra scroll - time.sleep(0.5) - except Exception as e: - log.warning(f"Error scrolling: {e}") - else: - log.info(f"Found {len(fresh_cards)} new reviews in this iteration") - - # Check if we're actually scrolling or stuck - try: - current_scroll = driver.execute_script("return arguments[0].scrollTop;", pane) - if current_scroll == last_scroll_position and len(fresh_cards) == 0: - scroll_stuck_count += 1 - log.warning(f"Scroll position hasn't changed (stuck at {current_scroll}px, stuck count: {scroll_stuck_count})") - - if scroll_stuck_count > 5: - log.warning("Scroll is stuck - trying alternative scroll method") - # Try clicking the last visible review to force loading - try: - driver.execute_script("arguments[0].lastElementChild.scrollIntoView();", pane) - time.sleep(2) - except: - pass - scroll_stuck_count = 0 + if total_reviews: + percent = (len(seen) / total_reviews) * 100 + log.info(f"Check {check_num:2d}: {len(seen):3d}/{total_reviews} ({percent:5.1f}%) | +{new_count} new") else: - scroll_stuck_count = 0 - last_scroll_position = current_scroll - except: - pass + log.info(f"Check {check_num:2d}: {len(seen):3d} total | +{new_count} new") + else: + # No new reviews in this check + if total_reviews: + percent = (len(seen) / total_reviews) * 100 + log.info(f"Check {check_num:2d}: {len(seen):3d}/{total_reviews} ({percent:5.1f}%) | +0 new") + else: + log.info(f"Check {check_num:2d}: {len(seen):3d} total | +0 new") - # Use JavaScript for smoother scrolling - try: - driver.execute_script(scroll_script) - except Exception as e: - log.warning(f"Error scrolling: {e}") - # Try a simpler scroll method - driver.execute_script("window.scrollBy(0, 300);") + # Smart timeout: stop if no new reviews for 3x average gap + if new_count == 0: + if len(load_times) >= 3: + # Calculate average gap between individual review loads + gaps = [load_times[i] - load_times[i-1] for i in range(1, len(load_times))] + avg_gap = sum(gaps) / len(gaps) + timeout_threshold = avg_gap * 3 + timeout_type = f"gap-based (avg gap: {avg_gap:.1f}s)" + elif len(load_times) > 0: + # Initial timeout: use 3x time since first load started + time_since_first = time.time() - load_times[0] + timeout_threshold = max(10.0, time_since_first * 3) # At least 10s + timeout_type = f"initial (time since first: {time_since_first:.1f}s)" + else: + # No loads yet - use default initial timeout + timeout_threshold = 15.0 + timeout_type = "default (no loads yet)" + + # Check time since last load + if len(load_times) > 0: + time_since_last = time.time() - load_times[-1] + + # Log timeout status every check when no new reviews + log.debug(f" Timeout check: {time_since_last:.1f}s / {timeout_threshold:.1f}s ({timeout_type})") + + if time_since_last > timeout_threshold: + log.info(f"⏱️ No new reviews for {time_since_last:.1f}s (threshold: {timeout_threshold:.1f}s, {timeout_type}) - stopping") + stop_scrolling.set() + break + + # Fallback: stop if no new reviews for 10 consecutive checks + if new_count == 0: + idle += 1 + if idle >= 10: + log.info(f"⏱️ No new reviews for {idle} checks - stopping") + stop_scrolling.set() + break + else: + idle = 0 # Collect API responses if interception is enabled if self.enable_api_intercept and self.api_interceptor: @@ -1470,8 +1501,8 @@ class GoogleReviewsScraper: if parsed: log.info(f"API interceptor captured {len(parsed)} reviews (total unique API: {len(api_reviews)})") - # Log stats every 10 iterations - if attempts % 10 == 0: + # Log stats every 10 checks + if check_num % 10 == 0: stats = self.api_interceptor.get_interceptor_stats() if stats: log.debug(f"Interceptor stats - Fetch: {stats.get('totalFetch', 0)}/{stats.get('capturedFetch', 0)}, " @@ -1480,15 +1511,6 @@ class GoogleReviewsScraper: except Exception as api_err: log.warning(f"API interception error: {api_err}", exc_info=True) - # Dynamic sleep: sleep less when processing many reviews, more when finding none - if len(fresh_cards) > 5: - sleep_time = 0.7 - elif len(fresh_cards) == 0: - sleep_time = 2.0 # Wait longer when finding nothing (let page load) - else: - sleep_time = 1.0 - time.sleep(sleep_time) - except StaleElementReferenceException: # The pane or other element went stale, try to re-find log.debug("Stale element encountered, re-finding elements") @@ -1500,11 +1522,15 @@ class GoogleReviewsScraper: break except Exception as e: log.warning(f"Error during review processing: {e}") - attempts += 1 time.sleep(1) pbar.close() + # Stop continuous scrolling thread + stop_scrolling.set() + scroll_thread.join(timeout=2.0) + log.info(f"🛑 Stopped scrolling thread after {scroll_count[0]} total scrolls") + # Merge API-captured reviews if any if self.enable_api_intercept and api_reviews: log.info(f"Merging {len(api_reviews)} reviews captured via API interception") @@ -1562,7 +1588,18 @@ class GoogleReviewsScraper: self.json_storage.save_json_docs(docs) self.json_storage.save_seen(seen) - log.info("✅ Finished – total unique reviews: %s", len(docs)) + # Final summary with completion percentage + if total_reviews: + percent = (len(docs) / total_reviews) * 100 + missing = total_reviews - len(docs) + if missing <= 0: + log.info(f"✅ Finished – Got all {total_reviews} reviews ({percent:.1f}%)") + elif percent >= 95.0: + log.info(f"✅ Finished – Got {len(docs)}/{total_reviews} reviews ({percent:.1f}%) - missing {missing}") + else: + log.info(f"⚠️ Finished – Got {len(docs)}/{total_reviews} reviews ({percent:.1f}%) - missing {missing}") + else: + log.info("✅ Finished – total unique reviews: %s", len(docs)) end_time = time.time() elapsed_time = end_time - start_time