""" Selenium scraping logic for Google Maps Reviews. Uses SeleniumBase UC Mode for enhanced anti-detection and better Chrome version management. """ import logging import os import platform import re import time import traceback import threading from typing import Dict, Any, List, Optional, Tuple from seleniumbase import Driver from selenium.common.exceptions import TimeoutException, StaleElementReferenceException from selenium.webdriver import Chrome from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from tqdm import tqdm from modules.data_storage import MongoDBStorage, JSONStorage, merge_review from modules.models import RawReview from modules.api_interceptor import GoogleMapsAPIInterceptor # Logger log = logging.getLogger("scraper") # CSS Selectors (Updated January 2026 for current Google Maps structure) PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde' CARD_SEL = "div.jftiEf" # Review card container # Cookie/consent dialog selectors (Updated January 2026) COOKIE_BTN = ('button[aria-label*="Accept" i],' 'button[aria-label*="Aceptar" i],' 'button[aria-label*="Akzeptieren" i],' 'button[aria-label*="Aceitar" i],' 'button[jsname="higCR"],' # Google's "Accept all" button 'button[jsname="hZCF7e"],' 'button[data-mdc-dialog-action="accept"],' 'form[action*="consent"] button,' 'div[role="dialog"] button[jsname],' '.VfPpkd-LgbsSe[data-mdc-dialog-action="accept"]') SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' SORT_OPTIONS = { "newest": ( "Newest", "החדשות ביותר", "ใหม่ที่สุด", "最新", "Más recientes", "最近", "Mais recentes", "Neueste", "Plus récent", "Più recenti", "Nyeste", "Новые", "Nieuwste", "جديد", "Nyeste", "Uusimmat", "Najnowsze", "Senaste", "Terbaru", "Yakın zamanlı", "Mới nhất", "नवीनतम" ), "highest": ( "Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด", "最高評価", "Calificación más alta", "最高评分", "Melhor avaliação", "Höchste Bewertung", "Note la plus élevée", "Valutazione più alta", "Høyeste vurdering", "Наивысший рейтинг", "Hoogste waardering", "أعلى تقييم", "Højeste vurdering", "Korkein arvostelu", "Najwyższa ocena", "Högsta betyg", "Peringkat tertinggi", "En yüksek puan", "Đánh giá cao nhất", "उच्चतम रेटिंग", "Top rating" ), "lowest": ( "Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด", "最低評価", "Calificación más baja", "最低评分", "Pior avaliação", "Niedrigste Bewertung", "Note la plus basse", "Valutazione più bassa", "Laveste vurdering", "Наименьший рейтинг", "Laagste waardering", "أقل تقييم", "Laveste vurdering", "Alhaisin arvostelu", "Najniższa ocena", "Lägsta betyg", "Peringkat terendah", "En düşük puan", "Đánh giá thấp nhất", "निम्नतम रेटिंग", "Worst rating" ), "relevance": ( "Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด", "関連性", "Más relevantes", "最相关", "Mais relevantes", "Relevanteste", "Plus pertinents", "Più pertinenti", "Mest relevante", "Наиболее релевантные", "Meest relevant", "الأكثر صلة", "Mest relevante", "Olennaisimmat", "Najbardziej trafne", "Mest relevanta", "Paling relevan", "En alakalı", "Liên quan nhất", "सबसे प्रासंगिक", "Relevance" ) } # Comprehensive multi-language review keywords REVIEW_WORDS = { # English "reviews", "review", "ratings", "rating", # Hebrew "ביקורות", "ביקורת", "ביקורות על", "דירוגים", "דירוג", # Thai "รีวิว", "บทวิจารณ์", "คะแนน", "ความคิดเห็น", # Spanish "reseñas", "opiniones", "valoraciones", "críticas", "calificaciones", # French "avis", "commentaires", "évaluations", "critiques", "notes", # German "bewertungen", "rezensionen", "beurteilungen", "meinungen", "kritiken", # Italian "recensioni", "valutazioni", "opinioni", "giudizi", "commenti", # Portuguese "avaliações", "comentários", "opiniões", "análises", "críticas", # Russian "отзывы", "рецензии", "обзоры", "оценки", "комментарии", # Japanese "レビュー", "口コミ", "評価", "批評", "感想", # Korean "리뷰", "평가", "후기", "댓글", "의견", # Chinese (Simplified and Traditional) "评论", "評論", "点评", "點評", "评价", "評價", "意见", "意見", "回顾", "回顧", # Arabic "مراجعات", "تقييمات", "آراء", "تعليقات", "نقد", # Hindi "समीक्षा", "रिव्यू", "राय", "मूल्यांकन", "प्रतिक्रिया", # Turkish "yorumlar", "değerlendirmeler", "incelemeler", "görüşler", "puanlar", # Dutch "beoordelingen", "recensies", "meningen", "opmerkingen", "waarderingen", # Polish "recenzje", "opinie", "oceny", "komentarze", "uwagi", # Vietnamese "đánh giá", "nhận xét", "bình luận", "phản hồi", "bài đánh giá", # Indonesian "ulasan", "tinjauan", "komentar", "penilaian", "pendapat", # Swedish "recensioner", "betyg", "omdömen", "åsikter", "kommentarer", # Norwegian "anmeldelser", "vurderinger", "omtaler", "meninger", "tilbakemeldinger", # Danish "anmeldelser", "bedømmelser", "vurderinger", "meninger", "kommentarer", # Finnish "arvostelut", "arviot", "kommentit", "mielipiteet", "palautteet", # Greek "κριτικές", "αξιολογήσεις", "σχόλια", "απόψεις", "βαθμολογίες", # Czech "recenze", "hodnocení", "názory", "komentáře", "posudky", # Romanian "recenzii", "evaluări", "opinii", "comentarii", "note", # Hungarian "vélemények", "értékelések", "kritikák", "hozzászólások", "megjegyzések", # Bulgarian "отзиви", "ревюта", "мнения", "коментари", "оценки" } class GoogleReviewsScraper: """Main scraper class for Google Maps reviews""" def __init__(self, config: Dict[str, Any]): """Initialize scraper with configuration""" self.config = config self.use_mongodb = config.get("use_mongodb", True) self.mongodb = MongoDBStorage(config) if self.use_mongodb else None self.json_storage = JSONStorage(config) self.backup_to_json = config.get("backup_to_json", True) self.overwrite_existing = config.get("overwrite_existing", False) self.enable_api_intercept = config.get("enable_api_intercept", False) self.api_interceptor = None # Will be initialized when driver is ready def setup_driver(self, headless: bool): """ Set up and configure Chrome driver using SeleniumBase UC Mode. SeleniumBase provides enhanced anti-detection and automatic Chrome/ChromeDriver version management. Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). """ # Log platform information for debugging log.info(f"Platform: {platform.platform()}") log.info(f"Python version: {platform.python_version()}") log.info("Using SeleniumBase UC Mode for enhanced anti-detection") # Determine if we're running in a container in_container = os.environ.get('CHROME_BIN') is not None if in_container: chrome_binary = os.environ.get('CHROME_BIN') log.info(f"Container environment detected") log.info(f"Chrome binary: {chrome_binary}") # Create driver with custom binary location for containers if chrome_binary and os.path.exists(chrome_binary): try: driver = Driver( uc=True, headless=headless, binary_location=chrome_binary, page_load_strategy="normal" ) log.info("Successfully created SeleniumBase UC driver with custom binary") except Exception as e: log.warning(f"Failed to create driver with custom binary: {e}") # Fall back to default driver = Driver( uc=True, headless=headless, page_load_strategy="normal" ) log.info("Successfully created SeleniumBase UC driver with defaults") else: driver = Driver( uc=True, headless=headless, page_load_strategy="normal" ) log.info("Successfully created SeleniumBase UC driver") else: # Regular OS environment - SeleniumBase handles version matching automatically log.info("Creating SeleniumBase UC Mode driver") try: driver = Driver( uc=True, headless=headless, page_load_strategy="normal", incognito=True # Use incognito mode for better stealth ) log.info("Successfully created SeleniumBase UC driver") except Exception as e: log.error(f"Failed to create SeleniumBase driver: {e}") raise # Set page load timeout to avoid hanging driver.set_page_load_timeout(30) # Set window size driver.set_window_size(1400, 900) # Add additional stealth settings try: # Disable automation flags driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); ''' }) log.info("Additional stealth settings applied") except Exception as e: log.debug(f"Could not apply additional stealth settings: {e}") log.info("SeleniumBase UC driver setup completed successfully") return driver def dismiss_cookies(self, driver: Chrome): """ Dismiss cookie consent dialogs if present. Handles stale element references by re-finding elements if needed. Updated January 2026 to handle current Google consent dialogs. """ dismissed = False # Try multiple approaches to dismiss consent dialogs consent_selectors = [ COOKIE_BTN, # Additional Google consent selectors 'button[aria-label*="Accept all" i]', 'button[aria-label*="Aceptar todo" i]', 'button[aria-label*="Reject all" i]', # Sometimes we need to reject 'button:has-text("Accept")', 'button:has-text("Aceptar")', '[role="dialog"] button:first-of-type', 'form[action*="consent"] button:first-of-type', ] for selector in consent_selectors: try: elements = driver.find_elements(By.CSS_SELECTOR, selector) for elem in elements: try: if elem.is_displayed() and elem.is_enabled(): # Try JavaScript click first (more reliable) driver.execute_script("arguments[0].click();", elem) log.info(f"Cookie/consent dialog dismissed with selector: {selector}") time.sleep(0.3) # Reduced from 1s to 0.3s dismissed = True break except Exception as e: log.debug(f"Error clicking consent button: {e}") continue if dismissed: break except Exception as e: log.debug(f"Error finding consent elements with {selector}: {e}") continue # Also try to find and click any visible modal close buttons if not dismissed: try: close_btns = driver.find_elements(By.CSS_SELECTOR, '[role="dialog"] button[aria-label*="close" i], ' '[role="dialog"] button[aria-label*="cerrar" i], ' '.modal-close, .dialog-close') for btn in close_btns: if btn.is_displayed(): driver.execute_script("arguments[0].click();", btn) log.info("Closed modal dialog") dismissed = True break except Exception: pass return dismissed def is_reviews_tab(self, tab: WebElement) -> bool: """ Dynamically detect if an element is the reviews tab across multiple languages and layouts. Uses multiple detection approaches for maximum reliability. """ try: # Strategy 1: Data attribute detection (most reliable across languages) tab_index = tab.get_attribute("data-tab-index") if tab_index == "1" or tab_index == "reviews": return True # Strategy 2: Role and aria attributes (accessibility detection) role = tab.get_attribute("role") aria_selected = tab.get_attribute("aria-selected") aria_label = (tab.get_attribute("aria-label") or "").lower() # Many review tabs have role="tab" and data attributes if role == "tab" and any(word in aria_label for word in REVIEW_WORDS): return True # Strategy 3: Text content detection (multiple sources) sources = [ tab.text.lower() if tab.text else "", # Direct text aria_label, # ARIA label tab.get_attribute("innerHTML").lower() or "", # Inner HTML tab.get_attribute("textContent").lower() or "" # Text content ] # Check all sources against our comprehensive keyword list for source in sources: if any(word in source for word in REVIEW_WORDS): return True # Strategy 4: Nested element detection try: # Check text in all child elements for child in tab.find_elements(By.CSS_SELECTOR, "*"): try: child_text = child.text.lower() if child.text else "" child_content = child.get_attribute("textContent").lower() or "" if any(word in child_text for word in REVIEW_WORDS) or any( word in child_content for word in REVIEW_WORDS): return True except: continue except: pass # Strategy 5: URL detection (some tabs have hrefs or data-hrefs with tell-tale values) for attr in ["href", "data-href", "data-url", "data-target"]: attr_value = (tab.get_attribute(attr) or "").lower() if attr_value and ("review" in attr_value or "rating" in attr_value): return True # Strategy 6: Class detection (some review tabs have specific classes) tab_class = tab.get_attribute("class") or "" review_classes = ["review", "reviews", "rating", "ratings", "comments", "feedback", "g4jrve"] if any(cls in tab_class for cls in review_classes): return True return False except StaleElementReferenceException: return False except Exception as e: log.debug(f"Error in is_reviews_tab: {e}") return False def click_reviews_tab(self, driver: Chrome): """ Navigate to reviews section by clicking the Reviews tab/button on the page. Uses text-based detection (what humans see) as primary method for robustness. """ current_url = driver.current_url # PRIMARY METHOD: Look for text-based "Reviews" button/tab (what humans see) log.info("Trying to find Reviews tab by visible text...") max_timeout = 15 end_time = time.time() + max_timeout for language_keyword in REVIEW_WORDS: if time.time() > end_time: break try: # Try XPath that finds elements containing the text (case-insensitive) # This includes divs with aria-hidden="true" that contain "Reviews" xpath = f"//*[contains(translate(., 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]" elements = driver.find_elements(By.XPATH, xpath) for element in elements: try: element_text = (element.text or '').strip() if not element_text or len(element_text) > 50: continue tag_name = element.tag_name.lower() role = element.get_attribute('role') or '' aria_hidden = element.get_attribute('aria-hidden') # If this is a div with aria-hidden="true" containing "Reviews", # try to click its parent button/clickable element if tag_name == 'div' and aria_hidden == 'true': log.info(f"Found aria-hidden div with text: '{element_text}', looking for clickable parent") # Try parent element try: parent = driver.execute_script("return arguments[0].parentElement;", element) parent_tag = parent.tag_name.lower() if parent else '' parent_role = parent.get_attribute('role') if parent else '' if parent and (parent_tag in ['button', 'a'] or 'tab' in parent_role or 'button' in parent_role): log.info(f"Found clickable parent: {parent_tag} with role={parent_role}") driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", parent) time.sleep(0.5) driver.execute_script("arguments[0].click();", parent) time.sleep(3) if self.verify_reviews_tab_clicked(driver): log.info(f"✅ Successfully clicked Reviews via aria-hidden parent") return True except: pass # Try clicking the element directly if it's clickable elif tag_name in ['button', 'a'] or 'tab' in role or 'button' in role: log.info(f"Found clickable Reviews element: '{element_text}' (tag: {tag_name}, role: {role})") driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", element) time.sleep(0.5) driver.execute_script("arguments[0].click();", element) time.sleep(3) if self.verify_reviews_tab_clicked(driver): log.info(f"✅ Successfully clicked Reviews via text: '{element_text}'") return True except: continue except: continue # FALLBACK METHOD: Find aria-hidden divs with exact text "Reviews" (or language variants) log.info("Trying aria-hidden div detection as fallback...") try: # Look for divs with aria-hidden="true" that contain ONLY the review word (no extra text) divs = driver.find_elements(By.CSS_SELECTOR, 'div[aria-hidden="true"]') for div in divs: div_text = (div.text or '').strip() # Check if this div contains ONLY a review keyword (exact match, case-insensitive) for keyword in REVIEW_WORDS: if div_text.lower() == keyword.lower(): log.info(f"Found aria-hidden div with exact text: '{div_text}'") # Get the parent element (should be the clickable tab/button) try: parent = driver.execute_script("return arguments[0].parentElement;", div) if parent: parent_tag = parent.tag_name.lower() parent_role = parent.get_attribute('role') or '' log.info(f"Parent element: tag={parent_tag}, role={parent_role}") # Click the parent if it looks clickable driver.execute_script("arguments[0].scrollIntoView({block:'center'});", parent) time.sleep(0.5) driver.execute_script("arguments[0].click();", parent) time.sleep(2) if self.verify_reviews_tab_clicked(driver): log.info(f"✅ Successfully clicked Reviews via aria-hidden fallback") return True except Exception as e: log.debug(f"Error clicking parent of aria-hidden div: {e}") continue except Exception as e: log.debug(f"Error in aria-hidden fallback: {e}") # If all methods failed log.warning("Failed to navigate to reviews after trying all methods") raise TimeoutException("Could not navigate to reviews section") def verify_reviews_tab_clicked(self, driver: Chrome) -> bool: """ Verify that the reviews tab was successfully clicked. Uses robust verification methods that don't depend on fragile CSS classes. """ try: # METHOD 1: Check for text-based indicators (most robust) # Look for common review-related text that appears regardless of CSS changes page_text = driver.page_source.lower() # These text patterns appear when reviews section is active review_indicators = [ 'sort reviews', 'most relevant', 'newest', 'highest rating', 'lowest rating', ] for indicator in review_indicators: if indicator in page_text: log.debug(f"Found review indicator: '{indicator}'") return True # METHOD 2: Check for semantic attributes (stable) # Look for elements with review-specific attributes semantic_selectors = [ 'div[data-review-id]', # Review cards have data-review-id 'button[aria-label*="Sort" i]', # Sort button 'span[role="img"][aria-label*="star" i]', # Star ratings ] for selector in semantic_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements and len(elements) > 0: log.debug(f"Found semantic element: {selector}") return True # URL check - if "review" appears in the URL if "review" in driver.current_url.lower(): return True return False except Exception as e: log.debug(f"Error verifying reviews tab click: {e}") return False def set_sort(self, driver: Chrome, method: str): """ Set the sorting method for reviews with enhanced detection for the latest Google Maps UI. Works across different languages and UI variations, with robust error handling. """ if method == "relevance": log.info("Using default 'relevance' sort - no need to change sort order") return True # Default order, no need to change log.info(f"Attempting to set sort order to '{method}'") try: # 1. Find and click the sort button using ROBUST TEXT-BASED DETECTION # Multi-language sort button keywords (what humans see) sort_keywords = { 'en': ['sort', 'Sort', 'SORT'], 'he': ['סדר', 'סידור'], 'th': ['เรียง'], 'zh': ['排序'], 'fr': ['trier', 'Trier'], 'es': ['ordenar', 'Ordenar'], 'de': ['sortieren', 'Sortieren'], 'pt': ['Classificar'], 'it': ['Ordina'], 'ru': ['Сортировать'] } # Flatten all keywords all_sort_keywords = [kw for keywords in sort_keywords.values() for kw in keywords] # PRIMARY METHOD: Find buttons by text or aria-label (robust) sort_button = None log.info("Looking for sort button using text-based detection...") for keyword in all_sort_keywords: try: # XPath to find buttons containing the keyword (case-sensitive for non-English) xpath = f"//button[contains(text(), '{keyword}') or contains(@aria-label, '{keyword}')]" elements = driver.find_elements(By.XPATH, xpath) for element in elements: try: # Skip invisible/disabled elements if not element.is_displayed() or not element.is_enabled(): continue # Get button text and attributes for verification button_text = element.text.strip() if element.text else "" button_aria = element.get_attribute("aria-label") or "" # Skip buttons that are clearly not sort buttons negative_keywords = ["back", "next", "previous", "close", "cancel", "חזרה", "סגור", "ปิด"] if any(neg in button_text.lower() or neg in button_aria.lower() for neg in negative_keywords): continue # Verify it has dropdown attributes (sort buttons are typically dropdowns) has_dropdown = (element.get_attribute("aria-haspopup") == "true" or element.get_attribute("aria-expanded") is not None) if has_dropdown or keyword in button_text or keyword in button_aria: sort_button = element log.info(f"✅ Found sort button with text: '{button_text}' or aria-label: '{button_aria}'") break except Exception as e: log.debug(f"Error checking element: {e}") continue if sort_button: break except Exception as e: log.debug(f"Error with keyword '{keyword}': {e}") continue # FALLBACK METHOD: Find any button with dropdown attributes near review content if not sort_button: log.info("Trying fallback: finding buttons with dropdown attributes...") try: buttons = driver.find_elements(By.CSS_SELECTOR, 'button[aria-haspopup="true"]') for button in buttons: if not button.is_displayed() or not button.is_enabled(): continue button_text = (button.text or '').strip().lower() button_aria = (button.get_attribute("aria-label") or '').lower() # Look for any sort-related keywords if any(kw.lower() in button_text or kw.lower() in button_aria for kw in all_sort_keywords): sort_button = button log.info(f"✅ Found sort button via fallback: {button.text}") break except Exception as e: log.debug(f"Error in fallback method: {e}") # Final check - do we have a sort button? if not sort_button: log.warning("No sort button found with any method - keeping default sort order") return False # 2. Click the sort button to open dropdown menu # First ensure the button is in view driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", sort_button) time.sleep(0.8) # Wait for scroll # Try multiple click methods click_methods = [ # Method 1: JavaScript click lambda: driver.execute_script("arguments[0].click();", sort_button), # Method 2: Direct click lambda: sort_button.click(), # Method 3: ActionChains click with move first lambda: ActionChains(driver).move_to_element(sort_button).pause(0.3).click().perform(), # Method 4: Click on center of element lambda: ActionChains(driver).move_to_element_with_offset( sort_button, sort_button.size['width'] // 2, sort_button.size['height'] // 2 ).click().perform(), # Method 5: JavaScript focus and click lambda: driver.execute_script( "arguments[0].focus(); setTimeout(function() { arguments[0].click(); }, 100);", sort_button ), # Method 6: Send RETURN key after focusing lambda: ActionChains(driver).move_to_element(sort_button).click().send_keys(Keys.RETURN).perform() ] # Try each click method menu_opened = False for i, click_method in enumerate(click_methods): try: log.info(f"Trying click method {i + 1} for sort button...") click_method() time.sleep(1) # Wait for menu to appear # Check if menu opened menu_opened = self.check_if_menu_opened(driver) if menu_opened: log.info(f"Sort menu opened with click method {i + 1}") break except Exception as e: log.debug(f"Click method {i + 1} failed: {e}") continue # If menu not opened, abort if not menu_opened: log.warning("Failed to open sort menu - keeping default sort order") # Try to reset state by clicking elsewhere try: ActionChains(driver).move_by_offset(50, 50).click().perform() except: pass return False # 3. Find and click the desired sort option in the menu # Uses ROBUST SEMANTIC SELECTORS (role attributes), not CSS classes try: # PRIMARY METHOD: Find menu items by role attribute (semantic, stable) # menuitemradio is the standard role for radio menu items log.info("Looking for menu items using semantic role attributes...") menu_items = WebDriverWait(driver, 5).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, '[role="menuitemradio"], [role="menuitem"]')) ) # Process menu items to extract text visible_items = [] for item in menu_items: try: # Skip invisible items if not item.is_displayed(): continue # Get the menu item text # Try innerText first (most reliable), then textContent, then .text text = driver.execute_script(""" const elem = arguments[0]; return elem.innerText || elem.textContent || elem.text || ''; """, item).strip() if text: # Only add items with text visible_items.append((item, text)) except Exception as e: log.debug(f"Error processing menu item: {e}") continue log.info(f"Found {len(visible_items)} visible menu items") for i, (_, text) in enumerate(visible_items): log.debug(f" Menu item {i + 1}: '{text}'") # Determine the target menu item based on sort method target_item = None matched_text = None # Log all available menu items for debugging log.info(f"Available menu items: {[text for _, text in visible_items]}") # Use position-based selection (most reliable for Google Maps) position_map = { "relevance": 0, # Usually the first option "newest": 1, # Usually the second option "highest": 2, # Usually the third option "lowest": 3 # Usually the fourth option } pos = position_map.get(method, -1) if pos >= 0 and pos < len(visible_items): target_item, matched_text = visible_items[pos] log.info(f"Selected menu item at position {pos + 1}: '{matched_text}' for sort method '{method}'") # Validate the selection makes sense wanted_labels = SORT_OPTIONS.get(method, []) text_clean = matched_text.lower() # Check if selected text contains any of the expected keywords valid_selection = False for label in wanted_labels: if label.lower() in text_clean or text_clean in label.lower(): valid_selection = True break if not valid_selection: log.warning(f"WARNING: Selected '{matched_text}' doesn't match expected '{method}' - might be wrong sort!") else: log.warning(f"Position {pos} not available in menu (only {len(visible_items)} items)") # 3. If target found, click it if target_item: # Ensure item is in view driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", target_item) time.sleep(0.3) # Try multiple click methods click_success = False click_methods = [ # Method 1: JavaScript click lambda: driver.execute_script("arguments[0].click();", target_item), # Method 2: Direct click lambda: target_item.click(), # Method 3: ActionChains click lambda: ActionChains(driver).move_to_element(target_item).click().perform(), # Method 4: Center click lambda: ActionChains(driver).move_to_element_with_offset( target_item, target_item.size['width'] // 2, target_item.size['height'] // 2 ).click().perform(), # Method 5: JavaScript click with custom event lambda: driver.execute_script(""" var el = arguments[0]; var evt = new MouseEvent('click', { bubbles: true, cancelable: true, view: window }); el.dispatchEvent(evt); """, target_item) ] for i, click_method in enumerate(click_methods): try: click_method() time.sleep(1.5) # Wait for sort to take effect # Try to verify sort happened by checking if menu closed still_open = self.check_if_menu_opened(driver) if not still_open: click_success = True log.info(f"Successfully clicked menu item with method {i + 1}") break except Exception as e: log.debug(f"Menu item click method {i + 1} failed: {e}") continue if click_success: log.info(f"Successfully set sort order to '{method}'") return True else: log.warning(f"Failed to click menu item - keeping default sort order") else: log.warning(f"No matching menu item found for '{method}'") # If we get here, we failed - try to close the menu by clicking elsewhere try: ActionChains(driver).move_by_offset(50, 50).click().perform() except: pass return False except TimeoutException: log.warning("Timeout waiting for menu items") return False except Exception as e: log.warning(f"Error in menu item selection: {e}") return False except Exception as e: log.warning(f"Error in set_sort method: {e}") return False def check_if_menu_opened(self, driver): """ Check if a sort menu has been opened after clicking the sort button. Uses multiple detection strategies optimized for Google Maps dropdowns. Returns True if menu is detected, False otherwise. """ try: # 1. First check for exact menu container selectors from the latest Google Maps UI specific_menu_selectors = [ 'div[role="menu"][id="action-menu"]', # Exact match from provided HTML 'div.fontBodyLarge.yu5kgd[role="menu"]', # Classes from provided HTML 'div.fxNQSd[role="menuitemradio"]', # Menu item class 'div.yu5kgd[role="menu"]' # Alternate class ] for selector in specific_menu_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): return True except: continue # 2. Check for generic menu containers generic_menu_selectors = [ 'div[role="menu"]', 'ul[role="menu"]', '[role="listbox"]' ] for selector in generic_menu_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): return True except: continue # 3. Look for menu items menu_item_selectors = [ 'div[role="menuitemradio"]', # Google Maps specific 'div.fxNQSd', # Class-based detection 'div.mLuXec', # Text container class '[role="menuitem"]', # Generic menu items '[role="option"]' # Alternative role ] visible_items = 0 for selector in menu_item_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): visible_items += 1 if visible_items >= 2: # At least 2 menu items should be visible return True except: continue # 4. Advanced detection with JavaScript # Checks if there are newly visible elements with menu-related roles or classes try: js_detection = """ return (function() { // Check for visible menu elements var menuElements = document.querySelectorAll('div[role="menu"], div[role="menuitemradio"], div.fxNQSd'); for (var i = 0; i < menuElements.length; i++) { var style = window.getComputedStyle(menuElements[i]); if (style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0') { return true; } } // Check for any recently appeared elements that might be a menu var possibleMenus = document.querySelectorAll('div.yu5kgd, div.fontBodyLarge'); for (var i = 0; i < possibleMenus.length; i++) { var style = window.getComputedStyle(possibleMenus[i]); var rect = possibleMenus[i].getBoundingClientRect(); // Check if element is visible and has a meaningful size if (style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 50 && rect.height > 50) { return true; } } return false; })(); """ menu_detected = driver.execute_script(js_detection) if menu_detected: return True except Exception as js_error: log.debug(f"Error in JavaScript menu detection: {js_error}") # 5. Last resort: check if any positioning styles were applied to elements # This can detect menu containers that have been positioned absolutely try: position_check = """ return (function() { // Look for absolutely positioned elements that appeared recently var elements = document.querySelectorAll('div[style*="position: absolute"]'); for (var i = 0; i < elements.length; i++) { var el = elements[i]; var style = window.getComputedStyle(el); var hasMenuItems = el.querySelectorAll('div[role="menuitemradio"], div.fxNQSd').length > 0; if (style.display !== 'none' && style.visibility !== 'hidden' && hasMenuItems) { return true; } } return false; })(); """ position_detected = driver.execute_script(position_check) if position_detected: return True except: pass return False except Exception as e: log.debug(f"Error checking menu state: {e}") return False def wait_for_api_response(self, driver: Chrome, timeout: float = 2.0) -> bool: """ Smart wait that detects when new API response has arrived. Much faster and more reliable than fixed time.sleep(). Returns True if new response detected, False if timeout. """ if not self.enable_api_intercept or not self.api_interceptor: # Fallback to fixed wait if API interception disabled time.sleep(0.6) return False try: # Get current response count initial_count = driver.execute_script(""" return (window.__allRequests || []).filter(r => r.url && r.url.toLowerCase().includes('listugcposts') ).length; """) # Wait for new response with timeout start = time.time() while (time.time() - start) < timeout: current_count = driver.execute_script(""" return (window.__allRequests || []).filter(r => r.url && r.url.toLowerCase().includes('listugcposts') ).length; """) if current_count > initial_count: # New API response arrived! elapsed = time.time() - start log.debug(f"New API response detected after {elapsed:.2f}s") time.sleep(0.2) # Small delay for DOM to update return True time.sleep(0.05) # Check every 50ms # Timeout - no new response log.debug(f"No API response after {timeout}s (might be at end of reviews)") return False except Exception as e: log.debug(f"Error waiting for API response: {e}") time.sleep(0.6) # Fallback to fixed wait return False def extract_total_reviews(self, driver: Chrome) -> Tuple[Optional[int], Optional[str]]: """ Extract total review count from Google Maps page. Looks for patterns like "247 reviews", "1,234 reviews", or "5.2K reviews". Returns: tuple: (total_count: int, count_string: str) or (None, None) if not found """ try: # Method 1: Look for "XXX reviews" text in the page source page_text = driver.page_source # Pattern: "244 reviews" or "1,234 reviews" or "5.2K reviews" patterns = [ r'(\d{1,3}(?:,\d{3})*)\s+reviews?', # "244 reviews" or "1,234 reviews" r'(\d+\.?\d*K)\s+reviews?', # "5.2K reviews" r'(\d{1,3}(?:,\d{3})*)\s+reseñas?', # Spanish r'(\d{1,3}(?:,\d{3})*)\s+评论', # Chinese ] for pattern in patterns: matches = re.findall(pattern, page_text, re.IGNORECASE) if matches: count_str = matches[0] # Parse the count if 'K' in count_str or 'k' in count_str: # "5.2K" -> 5200 num = float(count_str.replace('K', '').replace('k', '')) total = int(num * 1000) else: # "1,234" -> 1234 total = int(count_str.replace(',', '')) return total, count_str # Method 2: Look for aria-label with review count buttons = driver.find_elements(By.TAG_NAME, 'button') for btn in buttons: aria_label = btn.get_attribute('aria-label') or '' text = btn.text or '' # Check both aria-label and text for content in [aria_label, text]: match = re.search(r'(\d{1,3}(?:,\d{3})*)\s+reviews?', content, re.IGNORECASE) if match: count_str = match.group(1) total = int(count_str.replace(',', '')) return total, count_str return None, None except Exception as e: log.debug(f"Error extracting total review count: {e}") return None, None def scrape(self): """Main scraper method""" start_time = time.time() url = self.config.get("url") headless = self.config.get("headless", True) sort_by = self.config.get("sort_by", "relevance") stop_on_match = self.config.get("stop_on_match", False) log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") log.info(f"URL: {url}") # Initialize storage # If not overwriting, load existing data if self.overwrite_existing: docs = {} seen = set() else: # Try to get from MongoDB first if enabled docs = {} if self.use_mongodb and self.mongodb: docs = self.mongodb.fetch_existing_reviews() # If backup_to_json is enabled, also load from JSON for merging if self.backup_to_json: json_docs = self.json_storage.load_json_docs() # Merge JSON docs with MongoDB docs for review_id, review in json_docs.items(): if review_id not in docs: docs[review_id] = review # Load seen IDs from file seen = self.json_storage.load_seen() driver = None api_reviews = {} # Store reviews captured from API try: driver = self.setup_driver(headless) wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout driver.get(url) wait.until(lambda d: "google.com/maps" in d.current_url) # Wait briefly for consent dialogs to appear (optimized from 3s to 1s) time.sleep(1) # Try to dismiss any consent/cookie dialogs if not self.dismiss_cookies(driver): # Quick retry (optimized from 2s to 0.5s) time.sleep(0.5) self.dismiss_cookies(driver) self.click_reviews_tab(driver) # Reduced wait after clicking reviews tab (optimized from 3s to 1s) log.info("Waiting for reviews page to fully load...") time.sleep(1) # Wait for page to be fully interactive try: wait.until(lambda d: d.execute_script("return document.readyState") == "complete") log.info("Page DOM is ready") except: log.debug("Could not verify page ready state") # Extract total review count from the page total_reviews, total_str = self.extract_total_reviews(driver) if total_reviews: log.info(f"✅ Google shows {total_str} ({total_reviews} total reviews)") else: log.warning("⚠️ Could not extract total review count - will scroll until no new reviews") total_reviews = None # Verify we're on a reviews page before proceeding if "review" not in driver.current_url.lower(): log.warning("URL doesn't contain 'review' - might not be on reviews page") # Try to set sort - but don't fail if it doesn't work try: self.set_sort(driver, sort_by) except Exception as sort_error: log.warning(f"Sort failed but continuing: {sort_error}") # Reduced wait after setting sort (optimized from 3s to 1s) log.info("Waiting for reviews to render...") time.sleep(1) # Find the scrollable reviews pane using robust detection # Uses JavaScript to find elements by their scrollable properties, not CSS classes pane = None try: log.info("Finding scrollable reviews pane using robust detection...") # JavaScript to find scrollable container (no CSS classes needed!) find_scrollable_script = """ function findScrollablePane() { // Find all divs that might be scrollable const allDivs = document.querySelectorAll('div'); for (let div of allDivs) { const style = window.getComputedStyle(div); const overflowY = style.overflowY; // Check if element is scrollable if ((overflowY === 'auto' || overflowY === 'scroll') && div.scrollHeight > div.clientHeight && div.clientHeight > 200) { // Must be tall enough to be main pane // Additional checks: should contain review-like content const text = div.textContent || ''; const hasReviewIndicators = text.includes('star') || text.includes('rating') || text.includes('review') || div.querySelector('[data-review-id]') || div.querySelector('[role="img"][aria-label*="star"]'); if (hasReviewIndicators) { return div; } } } // Fallback: return main element if found return document.querySelector('[role="main"]'); } return findScrollablePane(); """ pane = driver.execute_script(find_scrollable_script) if pane: log.info("✅ Found scrollable reviews pane using robust JavaScript detection") else: log.warning("❌ Could not find scrollable reviews pane") except Exception as e: log.warning(f"Error finding scrollable pane with JavaScript: {e}") # Fallback to simple div[role="main"] if JS fails try: pane = driver.find_element(By.CSS_SELECTOR, 'div[role="main"]') log.info("Using fallback: div[role='main']") except: pass if not pane: log.error("Could not find reviews pane. Page structure might have changed.") return False # Initialize API interceptor AFTER reviews page is loaded (if enabled) # This prevents CDP interception from affecting initial page load and tab detection if self.enable_api_intercept: log.info("Setting up API interception for reviews capture") self.api_interceptor = GoogleMapsAPIInterceptor(driver) self.api_interceptor.setup_interception() self.api_interceptor.inject_response_interceptor() log.info("API interceptor ready - capturing network responses") pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) idle = 0 processed_ids = set() # Track processed IDs in current session # Prefetch selector to avoid repeated lookups try: driver.execute_script("window.scrollablePane = arguments[0];", pane) scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" except Exception as e: log.warning(f"Error setting up scroll script: {e}") scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling # Card selectors to try (ROBUST - semantic attributes only, no CSS classes!) # Only use data-review-id attribute which is stable and won't break with Google updates card_selectors = [ "[data-review-id]", # PRIMARY: Any element with review ID (most robust) "div[data-review-id]", # Fallback: Div with review ID ] # REMOVED FRAGILE CSS CLASS SELECTORS: # - CARD_SEL (div.jftiEf) - Google's obfuscated class, breaks on updates # - .jftiEf - Same as above # - div.WMbnJf - Another obfuscated class # We now rely on semantic [data-review-id] attribute + API interceptor # CONTINUOUS SCROLLING APPROACH # Scroll NON-STOP in background thread while extracting reviews in main thread stop_scrolling = threading.Event() scroll_count = [0] # Use list to make it mutable in thread load_times = [] # Track when new reviews are loaded for smart timeout def continuous_scroll_worker(): """Background thread that scrolls continuously without stopping""" while not stop_scrolling.is_set(): try: driver.execute_script(scroll_script) scroll_count[0] += 1 time.sleep(0.005) # 5ms = ultra fast continuous scrolling! except: pass # Start continuous scrolling thread scroll_thread = threading.Thread(target=continuous_scroll_worker, daemon=True) scroll_thread.start() log.info("🚀 Started continuous NON-STOP scrolling thread") check_num = 0 max_checks = 100 # Maximum safety limit while check_num < max_checks: check_num += 1 # Check if we've collected all reviews if total_reviews and len(seen) >= total_reviews: percent = (len(seen) / total_reviews) * 100 log.info(f"✅ Got all {total_reviews} reviews ({percent:.1f}%)! Stopping scrolling.") stop_scrolling.set() break # Wait between checks while scrolling continues in background time.sleep(2.0) # Check every 2 seconds try: # Try multiple card selectors within the pane cards = [] for card_sel in card_selectors: cards = pane.find_elements(By.CSS_SELECTOR, card_sel) if cards: if check_num == 1: # Only log once log.info(f"Found {len(cards)} cards with selector: {card_sel}") break # If no cards found in pane, try searching the entire document if not cards: for card_sel in card_selectors: cards = driver.find_elements(By.CSS_SELECTOR, card_sel) if cards: if check_num == 1: log.info(f"Found {len(cards)} cards in document with selector: {card_sel}") break fresh_cards: List[WebElement] = [] previous_count = len(seen) for c in cards: try: # Try to get data-review-id from the card itself cid = c.get_attribute("data-review-id") # If not found on card, try to find it in a child element if not cid: try: review_id_elem = c.find_element(By.CSS_SELECTOR, "[data-review-id]") cid = review_id_elem.get_attribute("data-review-id") except: pass if not cid or cid in seen or cid in processed_ids: if stop_on_match and cid and (cid in seen or cid in processed_ids): idle = 999 break continue fresh_cards.append(c) except StaleElementReferenceException: continue except Exception as e: log.debug(f"Error getting review ID: {e}") continue # Process fresh cards for card in fresh_cards: try: raw = RawReview.from_card(card) processed_ids.add(raw.id) except StaleElementReferenceException: continue except Exception: log.warning("⚠️ parse error – storing stub\n%s", traceback.format_exc(limit=1).strip()) try: raw_id = card.get_attribute("data-review-id") or "" raw = RawReview(id=raw_id, text="", lang="und") processed_ids.add(raw_id) except StaleElementReferenceException: continue docs[raw.id] = merge_review(docs.get(raw.id), raw) seen.add(raw.id) pbar.update(1) # Calculate how many new reviews we got new_count = len(seen) - previous_count # Track load times for smart timeout if new_count > 0: current_time = time.time() load_times.append(current_time) if total_reviews: percent = (len(seen) / total_reviews) * 100 log.info(f"Check {check_num:2d}: {len(seen):3d}/{total_reviews} ({percent:5.1f}%) | +{new_count} new") else: log.info(f"Check {check_num:2d}: {len(seen):3d} total | +{new_count} new") else: # No new reviews in this check if total_reviews: percent = (len(seen) / total_reviews) * 100 log.info(f"Check {check_num:2d}: {len(seen):3d}/{total_reviews} ({percent:5.1f}%) | +0 new") else: log.info(f"Check {check_num:2d}: {len(seen):3d} total | +0 new") # Smart timeout: stop if no new reviews for 3x average gap if new_count == 0: if len(load_times) >= 3: # Calculate average gap between individual review loads gaps = [load_times[i] - load_times[i-1] for i in range(1, len(load_times))] avg_gap = sum(gaps) / len(gaps) timeout_threshold = avg_gap * 3 timeout_type = f"gap-based (avg gap: {avg_gap:.1f}s)" elif len(load_times) > 0: # Initial timeout: use 3x time since first load started time_since_first = time.time() - load_times[0] timeout_threshold = max(10.0, time_since_first * 3) # At least 10s timeout_type = f"initial (time since first: {time_since_first:.1f}s)" else: # No loads yet - use default initial timeout timeout_threshold = 15.0 timeout_type = "default (no loads yet)" # Check time since last load if len(load_times) > 0: time_since_last = time.time() - load_times[-1] # Log timeout status every check when no new reviews log.debug(f" Timeout check: {time_since_last:.1f}s / {timeout_threshold:.1f}s ({timeout_type})") if time_since_last > timeout_threshold: log.info(f"⏱️ No new reviews for {time_since_last:.1f}s (threshold: {timeout_threshold:.1f}s, {timeout_type}) - stopping") stop_scrolling.set() break # Fallback: stop if no new reviews for 10 consecutive checks if new_count == 0: idle += 1 if idle >= 10: log.info(f"⏱️ No new reviews for {idle} checks - stopping") stop_scrolling.set() break else: idle = 0 # Collect API responses if interception is enabled if self.enable_api_intercept and self.api_interceptor: try: responses = self.api_interceptor.get_intercepted_responses() if responses: log.debug(f"Collected {len(responses)} network responses from browser") # Dump first few responses for analysis if not hasattr(self, '_dumped_responses'): self._dumped_responses = 0 if self._dumped_responses < 5: # Dump first 5 responses from pathlib import Path import json output_dir = Path("api_response_samples") output_dir.mkdir(exist_ok=True) for resp in responses: if self._dumped_responses >= 5: break idx = self._dumped_responses body = resp.get('body', '') # Save full response full_file = output_dir / f"response_{idx:02d}_full.json" with open(full_file, 'w', encoding='utf-8') as f: json.dump(resp, f, indent=2, ensure_ascii=False) # Save body body_file = output_dir / f"response_{idx:02d}_body.txt" with open(body_file, 'w', encoding='utf-8') as f: f.write(body) # Try to parse and save clean_body = body[4:].strip() if body.startswith(")]}'") else body try: parsed_data = json.loads(clean_body) parsed_file = output_dir / f"response_{idx:02d}_parsed.json" with open(parsed_file, 'w', encoding='utf-8') as f: json.dump(parsed_data, f, indent=2, ensure_ascii=False) log.info(f"Dumped API response {idx} to {output_dir}/ ({len(body)} bytes)") except: log.debug(f"Response {idx} is not JSON") self._dumped_responses += 1 parsed = self.api_interceptor.parse_reviews_from_responses(responses) log.debug(f"Parsed {len(parsed)} reviews from responses") for intercepted in parsed: if intercepted.review_id and intercepted.review_id not in api_reviews: api_reviews[intercepted.review_id] = self.api_interceptor.convert_to_raw_review_format(intercepted) if parsed: log.info(f"API interceptor captured {len(parsed)} reviews (total unique API: {len(api_reviews)})") # Log stats every 10 checks if check_num % 10 == 0: stats = self.api_interceptor.get_interceptor_stats() if stats: log.debug(f"Interceptor stats - Fetch: {stats.get('totalFetch', 0)}/{stats.get('capturedFetch', 0)}, " f"XHR: {stats.get('totalXHR', 0)}/{stats.get('capturedXHR', 0)}, " f"Last: {stats.get('lastCapture', 'never')}") except Exception as api_err: log.warning(f"API interception error: {api_err}", exc_info=True) except StaleElementReferenceException: # The pane or other element went stale, try to re-find log.debug("Stale element encountered, re-finding elements") try: pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) driver.execute_script("window.scrollablePane = arguments[0];", pane) except Exception: log.warning("Could not re-find reviews pane after stale element") break except Exception as e: log.warning(f"Error during review processing: {e}") time.sleep(1) pbar.close() # Stop continuous scrolling thread stop_scrolling.set() scroll_thread.join(timeout=2.0) log.info(f"🛑 Stopped scrolling thread after {scroll_count[0]} total scrolls") # Merge API-captured reviews if any if self.enable_api_intercept and api_reviews: log.info(f"Merging {len(api_reviews)} reviews captured via API interception") for review_id, api_review in api_reviews.items(): if review_id not in docs: # New review from API only docs[review_id] = api_review seen.add(review_id) else: # Merge API data with existing DOM data (API might have more details) existing = docs[review_id] # Only update fields that are missing or empty for key, value in api_review.items(): if key not in existing or not existing.get(key): existing[key] = value log.info(f"After merge: {len(docs)} total reviews") elif self.enable_api_intercept: # Log final stats even if no reviews captured if self.api_interceptor: stats = self.api_interceptor.get_interceptor_stats() if stats: log.warning(f"⚠️ API interception was enabled but captured 0 reviews. " f"Network stats - Fetch requests: {stats.get('capturedFetch', 0)}/{stats.get('totalFetch', 0)}, " f"XHR requests: {stats.get('capturedXHR', 0)}/{stats.get('totalXHR', 0)}") # Get browser console logs for debugging console_logs = self.api_interceptor.get_browser_console_logs() api_logs = [log_entry for log_entry in console_logs if 'API Interceptor' in log_entry.get('message', '')] if api_logs: log.info(f"Found {len(api_logs)} API interceptor console messages") for entry in api_logs[:10]: # Show first 10 log.debug(f" Console: {entry.get('message', '')[:200]}") else: log.debug("No API interceptor console messages found") # In debug mode, try to dump any responses that were collected if log.level <= logging.DEBUG: all_responses = self.api_interceptor.get_intercepted_responses() if all_responses: dump_path = self.api_interceptor.dump_responses_to_file(all_responses) if dump_path: log.info(f"Raw responses dumped to: {dump_path}") else: log.warning("API interceptor stats not available") # Save to MongoDB if enabled if self.use_mongodb and self.mongodb: log.info("Saving reviews to MongoDB...") self.mongodb.save_reviews(docs) # Backup to JSON if enabled if self.backup_to_json: log.info("Backing up to JSON...") self.json_storage.save_json_docs(docs) self.json_storage.save_seen(seen) # Final summary with completion percentage if total_reviews: percent = (len(docs) / total_reviews) * 100 missing = total_reviews - len(docs) if missing <= 0: log.info(f"✅ Finished – Got all {total_reviews} reviews ({percent:.1f}%)") elif percent >= 95.0: log.info(f"✅ Finished – Got {len(docs)}/{total_reviews} reviews ({percent:.1f}%) - missing {missing}") else: log.info(f"⚠️ Finished – Got {len(docs)}/{total_reviews} reviews ({percent:.1f}%) - missing {missing}") else: log.info("✅ Finished – total unique reviews: %s", len(docs)) end_time = time.time() elapsed_time = end_time - start_time log.info(f"Execution completed in {elapsed_time:.2f} seconds") return True except Exception as e: log.error(f"Error during scraping: {e}") log.error(traceback.format_exc()) return False finally: # Cleanup API interceptor if self.api_interceptor: try: self.api_interceptor.cleanup() except Exception: pass if driver is not None: try: driver.quit() except Exception: pass if self.mongodb: try: self.mongodb.close() except Exception: pass # """ # Selenium scraping logic for Google Maps Reviews. # """ # # import os # import time # import logging # import traceback # import platform # from typing import Dict, Any, List # # import undetected_chromedriver as uc # from selenium.common.exceptions import TimeoutException, StaleElementReferenceException # from selenium.webdriver import Chrome # from selenium.webdriver.common.by import By # from selenium.webdriver.remote.webelement import WebElement # from selenium.webdriver.support import expected_conditions as EC # from selenium.webdriver.support.ui import WebDriverWait # from tqdm import tqdm # # from modules.models import RawReview # from modules.data_storage import MongoDBStorage, JSONStorage, merge_review # # # Logger # log = logging.getLogger("scraper") # # # CSS Selectors # PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' # CARD_SEL = "div[data-review-id]" # COOKIE_BTN = ('button[aria-label*="Accept" i],' # 'button[jsname="hZCF7e"],' # 'button[data-mdc-dialog-action="accept"]') # SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' # MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' # # SORT_LABELS = { # text shown in Google Maps' menu # "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), # "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), # "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), # "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), # } # # REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", # "recensioni", "bewertungen", "口コミ", "レビュー", # "리뷰", "評論", "评论", "рецензии", "ביקורת"} # # # class GoogleReviewsScraper: # """Main scraper class for Google Maps reviews""" # # def __init__(self, config: Dict[str, Any]): # """Initialize scraper with configuration""" # self.config = config # self.use_mongodb = config.get("use_mongodb", True) # self.mongodb = MongoDBStorage(config) if self.use_mongodb else None # self.json_storage = JSONStorage(config) # self.backup_to_json = config.get("backup_to_json", True) # self.overwrite_existing = config.get("overwrite_existing", False) # # def setup_driver(self, headless: bool) -> Chrome: # """ # Set up and configure Chrome driver with flexibility for different environments. # Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). # """ # # Determine if we're running in a container # in_container = os.environ.get('CHROME_BIN') is not None # # # Create Chrome options # opts = uc.ChromeOptions() # opts.add_argument("--window-size=1400,900") # opts.add_argument("--ignore-certificate-errors") # opts.add_argument("--disable-gpu") # Improves performance # opts.add_argument("--disable-dev-shm-usage") # Helps with stability # opts.add_argument("--no-sandbox") # More stable in some environments # # # Use headless mode if requested # if headless: # opts.add_argument("--headless=new") # # # Log platform information for debugging # log.info(f"Platform: {platform.platform()}") # log.info(f"Python version: {platform.python_version()}") # # # If in container, use environment-provided binaries # if in_container: # chrome_binary = os.environ.get('CHROME_BIN') # chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') # # log.info(f"Container environment detected") # log.info(f"Chrome binary: {chrome_binary}") # log.info(f"ChromeDriver path: {chromedriver_path}") # # if chrome_binary and os.path.exists(chrome_binary): # log.info(f"Using Chrome binary from environment: {chrome_binary}") # opts.binary_location = chrome_binary # # try: # # Try creating Chrome driver with undetected_chromedriver # log.info("Attempting to create undetected_chromedriver instance") # driver = uc.Chrome(options=opts) # log.info("Successfully created undetected_chromedriver instance") # except Exception as e: # # Fall back to regular Selenium if undetected_chromedriver fails # log.warning(f"Failed to create undetected_chromedriver instance: {e}") # log.info("Falling back to regular Selenium Chrome") # # # Import Selenium webdriver here to avoid potential import issues # from selenium import webdriver # from selenium.webdriver.chrome.service import Service # # if chromedriver_path and os.path.exists(chromedriver_path): # log.info(f"Using ChromeDriver from path: {chromedriver_path}") # service = Service(executable_path=chromedriver_path) # driver = webdriver.Chrome(service=service, options=opts) # else: # log.info("Using default ChromeDriver") # driver = webdriver.Chrome(options=opts) # else: # # On regular OS, use default undetected_chromedriver # log.info("Using standard undetected_chromedriver setup") # driver = uc.Chrome(options=opts) # # # Set page load timeout to avoid hanging # driver.set_page_load_timeout(30) # log.info("Chrome driver setup completed successfully") # return driver # # def dismiss_cookies(self, driver: Chrome): # """ # Dismiss cookie consent dialogs if present. # Handles stale element references by re-finding elements if needed. # """ # try: # # Use WebDriverWait with expected_conditions to handle stale elements # WebDriverWait(driver, 3).until( # EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) # ) # log.info("Cookie consent dialog found, attempting to dismiss") # # # Get elements again after waiting to avoid stale references # elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) # for elem in elements: # try: # if elem.is_displayed(): # elem.click() # log.info("Cookie dialog dismissed") # return True # except Exception as e: # log.debug(f"Error clicking cookie button: {e}") # continue # except TimeoutException: # # This is expected if no cookie dialog is present # log.debug("No cookie consent dialog detected") # except Exception as e: # log.debug(f"Error handling cookie dialog: {e}") # # return False # # def is_reviews_tab(self, tab: WebElement) -> bool: # """Check if a tab is the reviews tab""" # try: # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) # except StaleElementReferenceException: # return False # except Exception as e: # log.debug(f"Error checking if tab is reviews tab: {e}") # return False # # def click_reviews_tab(self, driver: Chrome): # """ # Click on the reviews tab in Google Maps with improved stale element handling. # """ # end = time.time() + 15 # Timeout after 15 seconds # while time.time() < end: # try: # # Find all tab elements # tabs = driver.find_elements(By.CSS_SELECTOR, '[role="tab"], button[aria-label]') # # for tab in tabs: # try: # # Check if this is the reviews tab # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # is_review_tab = tab.get_attribute("data-tab-index") == "1" or any( # w in label for w in REVIEW_WORDS) # # if is_review_tab: # # Scroll the tab into view # driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) # time.sleep(0.2) # Small wait after scrolling # # # Try to click the tab # log.info("Found reviews tab, attempting to click") # tab.click() # log.info("Successfully clicked reviews tab") # return True # except Exception as e: # # Element might be stale or not clickable, try the next one # log.debug(f"Error with tab element: {str(e)}") # continue # # # If we get here, we didn't find a suitable tab in this iteration # log.debug("No reviews tab found in this iteration, waiting...") # time.sleep(0.5) # Wait before next attempt # # except Exception as e: # # General exception handling # log.debug(f"Exception while looking for reviews tab: {str(e)}") # time.sleep(0.5) # # # If we exit the loop, we've timed out # log.warning("Timeout while looking for reviews tab") # raise TimeoutException("Reviews tab not found") # # def set_sort(self, driver: Chrome, method: str): # """ # Set the sorting method for reviews with improved error handling. # """ # if method == "relevance": # return True # Default order, no need to change # # log.info(f"Attempting to set sort order to '{method}'") # # try: # # First try to find and click the sort button # sort_buttons = driver.find_elements(By.CSS_SELECTOR, SORT_BTN) # if not sort_buttons: # log.warning(f"Sort button not found - keeping default sort order") # return False # # # Try to click the first visible sort button # for sort_button in sort_buttons: # try: # if sort_button.is_displayed() and sort_button.is_enabled(): # sort_button.click() # log.info("Clicked sort button") # time.sleep(0.5) # Wait for menu to appear # break # except Exception as e: # log.debug(f"Error clicking sort button: {e}") # continue # else: # log.warning("No clickable sort button found") # return False # # # Now find and click the menu item for the desired sort method # wanted = SORT_LABELS[method] # menu_items = WebDriverWait(driver, 3).until( # EC.presence_of_all_elements_located((By.CSS_SELECTOR, MENU_ITEMS)) # ) # # for item in menu_items: # try: # label = item.text.strip() # if label in wanted: # item.click() # log.info(f"Selected sort option: {label}") # time.sleep(0.5) # Wait for sorting to take effect # return True # except Exception as e: # log.debug(f"Error clicking menu item: {e}") # continue # # log.warning(f"Sort option '{method}' not found in menu - keeping default") # return False # # except Exception as e: # log.warning(f"Error setting sort order: {e}") # return False # # def scrape(self): # """Main scraper method""" # start_time = time.time() # # url = self.config.get("url") # headless = self.config.get("headless", True) # sort_by = self.config.get("sort_by", "relevance") # stop_on_match = self.config.get("stop_on_match", False) # # log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") # log.info(f"URL: {url}") # # # Initialize storage # # If not overwriting, load existing data # if self.overwrite_existing: # docs = {} # seen = set() # else: # # Try to get from MongoDB first if enabled # docs = {} # if self.use_mongodb and self.mongodb: # docs = self.mongodb.fetch_existing_reviews() # # # If backup_to_json is enabled, also load from JSON for merging # if self.backup_to_json: # json_docs = self.json_storage.load_json_docs() # # Merge JSON docs with MongoDB docs # for review_id, review in json_docs.items(): # if review_id not in docs: # docs[review_id] = review # # # Load seen IDs from file # seen = self.json_storage.load_seen() # # driver = None # try: # driver = self.setup_driver(headless) # wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout # # driver.get(url) # wait.until(lambda d: "google.com/maps" in d.current_url) # # self.dismiss_cookies(driver) # self.click_reviews_tab(driver) # self.set_sort(driver, sort_by) # # # Add a wait after setting sort to allow results to load # time.sleep(1) # # # Use try-except to handle cases where the pane is not found # try: # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # except TimeoutException: # log.warning("Could not find reviews pane. Page structure might have changed.") # return False # # pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) # idle = 0 # processed_ids = set() # Track processed IDs in current session # # # Prefetch selector to avoid repeated lookups # try: # driver.execute_script("window.scrollablePane = arguments[0];", pane) # scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" # except Exception as e: # log.warning(f"Error setting up scroll script: {e}") # scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling # # max_attempts = 10 # Limit the number of attempts to find reviews # attempts = 0 # # while attempts < max_attempts: # try: # cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) # fresh_cards: List[WebElement] = [] # # # Check for valid cards # if len(cards) == 0: # log.debug("No review cards found in this iteration") # attempts += 1 # # Try scrolling anyway # driver.execute_script(scroll_script) # time.sleep(1) # continue # # for c in cards: # try: # cid = c.get_attribute("data-review-id") # if not cid or cid in seen or cid in processed_ids: # if stop_on_match and cid and (cid in seen or cid in processed_ids): # idle = 999 # break # continue # fresh_cards.append(c) # except StaleElementReferenceException: # continue # except Exception as e: # log.debug(f"Error getting review ID: {e}") # continue # # for card in fresh_cards: # try: # raw = RawReview.from_card(card) # processed_ids.add(raw.id) # Track this ID to avoid re-processing # except StaleElementReferenceException: # continue # except Exception: # log.warning("⚠️ parse error – storing stub\n%s", # traceback.format_exc(limit=1).strip()) # try: # raw_id = card.get_attribute("data-review-id") or "" # raw = RawReview(id=raw_id, text="", lang="und") # processed_ids.add(raw_id) # except StaleElementReferenceException: # continue # # docs[raw.id] = merge_review(docs.get(raw.id), raw) # seen.add(raw.id) # pbar.update(1) # idle = 0 # attempts = 0 # Reset attempts counter when we successfully process a review # # if idle >= 3: # break # # if not fresh_cards: # idle += 1 # attempts += 1 # # # Use JavaScript for smoother scrolling # try: # driver.execute_script(scroll_script) # except Exception as e: # log.warning(f"Error scrolling: {e}") # # Try a simpler scroll method # driver.execute_script("window.scrollBy(0, 300);") # # # Dynamic sleep: sleep less when processing many reviews # sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 # time.sleep(sleep_time) # # except StaleElementReferenceException: # # The pane or other element went stale, try to re-find # log.debug("Stale element encountered, re-finding elements") # try: # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # driver.execute_script("window.scrollablePane = arguments[0];", pane) # except Exception: # log.warning("Could not re-find reviews pane after stale element") # break # except Exception as e: # log.warning(f"Error during review processing: {e}") # attempts += 1 # time.sleep(1) # # pbar.close() # # # Save to MongoDB if enabled # if self.use_mongodb and self.mongodb: # log.info("Saving reviews to MongoDB...") # self.mongodb.save_reviews(docs) # # # Backup to JSON if enabled # if self.backup_to_json: # log.info("Backing up to JSON...") # self.json_storage.save_json_docs(docs) # self.json_storage.save_seen(seen) # # log.info("✅ Finished – total unique reviews: %s", len(docs)) # # end_time = time.time() # elapsed_time = end_time - start_time # log.info(f"Execution completed in {elapsed_time:.2f} seconds") # # return True # # except Exception as e: # log.error(f"Error during scraping: {e}") # log.error(traceback.format_exc()) # return False # # finally: # if driver is not None: # try: # driver.quit() # except Exception: # pass # # if self.mongodb: # try: # self.mongodb.close() # except Exception: # pass # # # """ # # Selenium scraping logic for Google Maps Reviews. # # """ # # # # import re # # import time # # import logging # # import traceback # # from typing import Dict, Any, Set, List # # # # import undetected_chromedriver as uc # # from selenium.common.exceptions import TimeoutException # # from selenium.webdriver import Chrome # # from selenium.webdriver.common.by import By # # from selenium.webdriver.remote.webelement import WebElement # # from selenium.webdriver.support import expected_conditions as EC # # from selenium.webdriver.support.ui import WebDriverWait # # from tqdm import tqdm # # # # from modules.models import RawReview # # from modules.data_storage import MongoDBStorage, JSONStorage, merge_review # # from modules.utils import click_if # # # # # Logger # # log = logging.getLogger("scraper") # # # # # CSS Selectors # # PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' # # CARD_SEL = "div[data-review-id]" # # COOKIE_BTN = ('button[aria-label*="Accept" i],' # # 'button[jsname="hZCF7e"],' # # 'button[data-mdc-dialog-action="accept"]') # # SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' # # MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' # # # # SORT_LABELS = { # text shown in Google Maps' menu # # "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), # # "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), # # "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), # # "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), # # } # # # # REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", # # "recensioni", "bewertungen", "口コミ", "レビュー", # # "리뷰", "評論", "评论", "рецензии"} # # # # # # class GoogleReviewsScraper: # # """Main scraper class for Google Maps reviews""" # # # # def __init__(self, config: Dict[str, Any]): # # """Initialize scraper with configuration""" # # self.config = config # # self.use_mongodb = config.get("use_mongodb", True) # # self.mongodb = MongoDBStorage(config) if self.use_mongodb else None # # self.json_storage = JSONStorage(config) # # self.backup_to_json = config.get("backup_to_json", True) # # self.overwrite_existing = config.get("overwrite_existing", False) # # # # def setup_driver(self, headless: bool) -> Chrome: # # """Set up and configure Chrome driver""" # # opts = uc.ChromeOptions() # # opts.add_argument("--window-size=1400,900") # # opts.add_argument("--ignore-certificate-errors") # # opts.add_argument("--disable-gpu") # Improves performance # # opts.add_argument("--disable-dev-shm-usage") # Helps with stability # # opts.add_argument("--no-sandbox") # More stable in some environments # # # # if headless: # # opts.add_argument("--headless=new") # # # # driver = uc.Chrome(options=opts) # # # Set page load timeout to avoid hanging # # driver.set_page_load_timeout(30) # # return driver # # # # def dismiss_cookies(self, driver: Chrome): # # """Dismiss cookie consent dialogs""" # # click_if(driver, COOKIE_BTN, timeout=3.0) # Reduced timeout for faster operation # # # # def is_reviews_tab(self, tab: WebElement) -> bool: # # """Check if a tab is the reviews tab""" # # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # # return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) # # # # def click_reviews_tab(self, driver: Chrome): # # """Click on the reviews tab in Google Maps""" # # end = time.time() + 15 # Reduced timeout from 30 to 15 seconds # # while time.time() < end: # # for tab in driver.find_elements(By.CSS_SELECTOR, # # '[role="tab"], button[aria-label]'): # # if self.is_reviews_tab(tab): # # driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) # # try: # # tab.click() # # return # # except Exception: # # continue # # time.sleep(.2) # Reduced sleep time from 0.4 to 0.2 # # raise TimeoutException("Reviews tab not found") # # # # def set_sort(self, driver: Chrome, method: str): # # """Set the sorting method for reviews""" # # if method == "relevance": # # return # default order # # if not click_if(driver, SORT_BTN): # # return # # # # wanted = SORT_LABELS[method] # # # # for item in driver.find_elements(By.CSS_SELECTOR, MENU_ITEMS): # # label = item.text.strip() # # if label in wanted: # # item.click() # # time.sleep(0.5) # Reduced wait time from 1.0 to 0.5 # # return # # log.warning("⚠️ sort option %s not found – keeping default", method) # # # # def scrape(self): # # """Main scraper method""" # # start_time = time.time() # # # # url = self.config.get("url") # # headless = self.config.get("headless", True) # # sort_by = self.config.get("sort_by", "relevance") # # stop_on_match = self.config.get("stop_on_match", False) # # # # log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") # # log.info(f"URL: {url}") # # # # # Initialize storage # # # If not overwriting, load existing data # # if self.overwrite_existing: # # docs = {} # # seen = set() # # else: # # # Try to get from MongoDB first if enabled # # docs = {} # # if self.use_mongodb and self.mongodb: # # docs = self.mongodb.fetch_existing_reviews() # # # # # If backup_to_json is enabled, also load from JSON for merging # # if self.backup_to_json: # # json_docs = self.json_storage.load_json_docs() # # # Merge JSON docs with MongoDB docs # # for review_id, review in json_docs.items(): # # if review_id not in docs: # # docs[review_id] = review # # # # # Load seen IDs from file # # seen = self.json_storage.load_seen() # # # # driver = self.setup_driver(headless) # # wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout # # # # try: # # driver.get(url) # # wait.until(lambda d: "google.com/maps" in d.current_url) # # # # self.dismiss_cookies(driver) # # self.click_reviews_tab(driver) # # self.set_sort(driver, sort_by) # # # # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # # pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) # # idle = 0 # # processed_ids = set() # Track processed IDs in current session # # # # # Prefetch selector to avoid repeated lookups # # driver.execute_script("window.scrollablePane = arguments[0];", pane) # # scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" # # # # while True: # # cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) # # fresh_cards: List[WebElement] = [] # # # # for c in cards: # # cid = c.get_attribute("data-review-id") # # if cid in seen or cid in processed_ids: # # if stop_on_match: # # idle = 999 # # break # # continue # # fresh_cards.append(c) # # # # for card in fresh_cards: # # try: # # raw = RawReview.from_card(card) # # processed_ids.add(raw.id) # Track this ID to avoid re-processing # # except Exception: # # log.warning("⚠️ parse error – storing stub\n%s", # # traceback.format_exc(limit=1).strip()) # # raw_id = card.get_attribute("data-review-id") or "" # # raw = RawReview(id=raw_id, text="", lang="und") # # processed_ids.add(raw_id) # # # # docs[raw.id] = merge_review(docs.get(raw.id), raw) # # seen.add(raw.id) # # pbar.update(1) # # idle = 0 # # # # if idle >= 3: # # break # # # # if not fresh_cards: # # idle += 1 # # # # # Use JavaScript for smoother scrolling # # driver.execute_script(scroll_script) # # # # # Dynamic sleep: sleep less when processing many reviews # # sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 # # time.sleep(sleep_time) # # # # pbar.close() # # # # # Save to MongoDB if enabled # # if self.use_mongodb and self.mongodb: # # log.info("Saving reviews to MongoDB...") # # self.mongodb.save_reviews(docs) # # # # # Backup to JSON if enabled # # if self.backup_to_json: # # log.info("Backing up to JSON...") # # self.json_storage.save_json_docs(docs) # # self.json_storage.save_seen(seen) # # # # log.info("✅ Finished – total unique reviews: %s", len(docs)) # # # # end_time = time.time() # # elapsed_time = end_time - start_time # # log.info(f"Execution completed in {elapsed_time:.2f} seconds") # # # # finally: # # driver.quit() # # if self.mongodb: # # self.mongodb.close()