""" Selenium scraping logic for Google Maps Reviews. """ import logging import os import platform import re import time import traceback from typing import Dict, Any, List import undetected_chromedriver as uc from selenium.common.exceptions import TimeoutException, StaleElementReferenceException from selenium.webdriver import Chrome from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from tqdm import tqdm from modules.data_storage import MongoDBStorage, JSONStorage, merge_review, merge_review_with_translation from modules.models import RawReview # Logger log = logging.getLogger("scraper") # CSS Selectors PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' CARD_SEL = "div[data-review-id]" COOKIE_BTN = ('button[aria-label*="Accept" i],' 'button[jsname="hZCF7e"],' 'button[data-mdc-dialog-action="accept"]') SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' SORT_OPTIONS = { "newest": ( "Newest", "החדשות ביותר", "ใหม่ที่สุด", "最新", "Más recientes", "最近", "Mais recentes", "Neueste", "Plus récent", "Più recenti", "Nyeste", "Новые", "Nieuwste", "جديد", "Nyeste", "Uusimmat", "Najnowsze", "Senaste", "Terbaru", "Yakın zamanlı", "Mới nhất", "नवीनतम" ), "highest": ( "Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด", "最高評価", "Calificación más alta", "最高评分", "Melhor avaliação", "Höchste Bewertung", "Note la plus élevée", "Valutazione più alta", "Høyeste vurdering", "Наивысший рейтинг", "Hoogste waardering", "أعلى تقييم", "Højeste vurdering", "Korkein arvostelu", "Najwyższa ocena", "Högsta betyg", "Peringkat tertinggi", "En yüksek puan", "Đánh giá cao nhất", "उच्चतम रेटिंग", "Top rating" ), "lowest": ( "Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด", "最低評価", "Calificación más baja", "最低评分", "Pior avaliação", "Niedrigste Bewertung", "Note la plus basse", "Valutazione più bassa", "Laveste vurdering", "Наименьший рейтинг", "Laagste waardering", "أقل تقييم", "Laveste vurdering", "Alhaisin arvostelu", "Najniższa ocena", "Lägsta betyg", "Peringkat terendah", "En düşük puan", "Đánh giá thấp nhất", "निम्नतम रेटिंग", "Worst rating" ), "relevance": ( "Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด", "関連性", "Más relevantes", "最相关", "Mais relevantes", "Relevanteste", "Plus pertinents", "Più pertinenti", "Mest relevante", "Наиболее релевантные", "Meest relevant", "الأكثر صلة", "Mest relevante", "Olennaisimmat", "Najbardziej trafne", "Mest relevanta", "Paling relevan", "En alakalı", "Liên quan nhất", "सबसे प्रासंगिक", "Relevance" ) } # Comprehensive multi-language review keywords REVIEW_WORDS = { # English "reviews", "review", "ratings", "rating", # Hebrew "ביקורות", "ביקורת", "ביקורות על", "דירוגים", "דירוג", # Thai "รีวิว", "บทวิจารณ์", "คะแนน", "ความคิดเห็น", # Spanish "reseñas", "opiniones", "valoraciones", "críticas", "calificaciones", # French "avis", "commentaires", "évaluations", "critiques", "notes", # German "bewertungen", "rezensionen", "beurteilungen", "meinungen", "kritiken", # Italian "recensioni", "valutazioni", "opinioni", "giudizi", "commenti", # Portuguese "avaliações", "comentários", "opiniões", "análises", "críticas", # Russian "отзывы", "рецензии", "обзоры", "оценки", "комментарии", # Japanese "レビュー", "口コミ", "評価", "批評", "感想", # Korean "리뷰", "평가", "후기", "댓글", "의견", # Chinese (Simplified and Traditional) "评论", "評論", "点评", "點評", "评价", "評價", "意见", "意見", "回顾", "回顧", # Arabic "مراجعات", "تقييمات", "آراء", "تعليقات", "نقد", # Hindi "समीक्षा", "रिव्यू", "राय", "मूल्यांकन", "प्रतिक्रिया", # Turkish "yorumlar", "değerlendirmeler", "incelemeler", "görüşler", "puanlar", # Dutch "beoordelingen", "recensies", "meningen", "opmerkingen", "waarderingen", # Polish "recenzje", "opinie", "oceny", "komentarze", "uwagi", # Vietnamese "đánh giá", "nhận xét", "bình luận", "phản hồi", "bài đánh giá", # Indonesian "ulasan", "tinjauan", "komentar", "penilaian", "pendapat", # Swedish "recensioner", "betyg", "omdömen", "åsikter", "kommentarer", # Norwegian "anmeldelser", "vurderinger", "omtaler", "meninger", "tilbakemeldinger", # Danish "anmeldelser", "bedømmelser", "vurderinger", "meninger", "kommentarer", # Finnish "arvostelut", "arviot", "kommentit", "mielipiteet", "palautteet", # Greek "κριτικές", "αξιολογήσεις", "σχόλια", "απόψεις", "βαθμολογίες", # Czech "recenze", "hodnocení", "názory", "komentáře", "posudky", # Romanian "recenzii", "evaluări", "opinii", "comentarii", "note", # Hungarian "vélemények", "értékelések", "kritikák", "hozzászólások", "megjegyzések", # Bulgarian "отзиви", "ревюта", "мнения", "коментари", "оценки" } class GoogleReviewsScraper: """Main scraper class for Google Maps reviews""" def __init__(self, config: Dict[str, Any]): """Initialize scraper with configuration""" self.config = config self.use_mongodb = config.get("use_mongodb", True) self.mongodb = MongoDBStorage(config) if self.use_mongodb else None self.json_storage = JSONStorage(config) self.backup_to_json = config.get("backup_to_json", True) self.overwrite_existing = config.get("overwrite_existing", False) # Translation feature settings self.append_translations = config.get("append_translations", False) self.translation_language = config.get("translation_language", "auto") self.force_full_scan = config.get("force_full_scan", False) def setup_driver(self, headless: bool) -> Chrome: """ Set up and configure Chrome driver with flexibility for different environments. Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). """ # Determine if we're running in a container in_container = os.environ.get('CHROME_BIN') is not None # Create Chrome options opts = uc.ChromeOptions() opts.add_argument("--window-size=1400,900") opts.add_argument("--ignore-certificate-errors") opts.add_argument("--disable-gpu") # Improves performance opts.add_argument("--disable-dev-shm-usage") # Helps with stability opts.add_argument("--no-sandbox") # More stable in some environments # Use headless mode if requested if headless: opts.add_argument("--headless=new") # Log platform information for debugging log.info(f"Platform: {platform.platform()}") log.info(f"Python version: {platform.python_version()}") # If in container, use environment-provided binaries if in_container: chrome_binary = os.environ.get('CHROME_BIN') chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') log.info(f"Container environment detected") log.info(f"Chrome binary: {chrome_binary}") log.info(f"ChromeDriver path: {chromedriver_path}") if chrome_binary and os.path.exists(chrome_binary): log.info(f"Using Chrome binary from environment: {chrome_binary}") opts.binary_location = chrome_binary try: # Try creating Chrome driver with undetected_chromedriver log.info("Attempting to create undetected_chromedriver instance") driver = uc.Chrome(options=opts) log.info("Successfully created undetected_chromedriver instance") except Exception as e: # Fall back to regular Selenium if undetected_chromedriver fails log.warning(f"Failed to create undetected_chromedriver instance: {e}") log.info("Falling back to regular Selenium Chrome") # Import Selenium webdriver here to avoid potential import issues from selenium import webdriver from selenium.webdriver.chrome.service import Service if chromedriver_path and os.path.exists(chromedriver_path): log.info(f"Using ChromeDriver from path: {chromedriver_path}") service = Service(executable_path=chromedriver_path) driver = webdriver.Chrome(service=service, options=opts) else: log.info("Using default ChromeDriver") driver = webdriver.Chrome(options=opts) else: # On regular OS, use default undetected_chromedriver log.info("Using standard undetected_chromedriver setup") driver = uc.Chrome(options=opts) # Set page load timeout to avoid hanging driver.set_page_load_timeout(30) log.info("Chrome driver setup completed successfully") return driver def dismiss_cookies(self, driver: Chrome): """ Dismiss cookie consent dialogs if present. Handles stale element references by re-finding elements if needed. """ try: # Use WebDriverWait with expected_conditions to handle stale elements WebDriverWait(driver, 3).until( EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) ) log.info("Cookie consent dialog found, attempting to dismiss") # Get elements again after waiting to avoid stale references elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) for elem in elements: try: if elem.is_displayed(): elem.click() log.info("Cookie dialog dismissed") return True except Exception as e: log.debug(f"Error clicking cookie button: {e}") continue except TimeoutException: # This is expected if no cookie dialog is present log.debug("No cookie consent dialog detected") except Exception as e: log.debug(f"Error handling cookie dialog: {e}") return False def is_reviews_tab(self, tab: WebElement) -> bool: """ Dynamically detect if an element is the reviews tab across multiple languages and layouts. Uses multiple detection approaches for maximum reliability. """ try: # Strategy 1: Data attribute detection (most reliable across languages) tab_index = tab.get_attribute("data-tab-index") if tab_index in ["1", "2", "reviews"]: # Reviews can be index 1 or 2 depending on layout # Double-check this is actually a reviews tab by checking text content aria_label = (tab.get_attribute("aria-label") or "").lower() tab_text = (tab.text or "").lower() if any(word.lower() in aria_label or word.lower() in tab_text for word in REVIEW_WORDS): log.debug(f"Found reviews tab by data-tab-index: {tab_index}") return True # Strategy 2: Role and aria attributes (accessibility detection) role = tab.get_attribute("role") aria_selected = tab.get_attribute("aria-selected") aria_label = (tab.get_attribute("aria-label") or "").lower() # Many review tabs have role="tab" and data attributes if role == "tab" and any(word.lower() in aria_label for word in REVIEW_WORDS): log.debug(f"Found reviews tab by aria-label: {aria_label}") return True # Strategy 3: Text content detection (multiple sources) tab_text = tab.text.lower() if tab.text else "" inner_html = tab.get_attribute("innerHTML").lower() or "" text_content = tab.get_attribute("textContent").lower() or "" sources = [ tab_text, # Direct text aria_label, # ARIA label inner_html, # Inner HTML text_content # Text content ] # Check all sources against our comprehensive keyword list for i, source in enumerate(sources): if any(word.lower() in source for word in REVIEW_WORDS): source_names = ["text", "aria-label", "innerHTML", "textContent"] log.debug(f"Found reviews tab by {source_names[i]}: '{source}' (contains review word)") return True # Strategy 4: Nested element detection try: # Check text in all child elements for child in tab.find_elements(By.CSS_SELECTOR, "*"): try: child_text = child.text.lower() if child.text else "" child_content = child.get_attribute("textContent").lower() or "" if any(word.lower() in child_text for word in REVIEW_WORDS) or any( word.lower() in child_content for word in REVIEW_WORDS): log.debug(f"Found reviews tab by child element text: '{child_text}' or '{child_content}'") return True except: continue except: pass # Strategy 5: URL detection (some tabs have hrefs or data-hrefs with tell-tale values) for attr in ["href", "data-href", "data-url", "data-target"]: attr_value = (tab.get_attribute(attr) or "").lower() if attr_value and ("review" in attr_value or "rating" in attr_value): log.debug(f"Found reviews tab by {attr}: {attr_value}") return True # Strategy 6: Class detection (some review tabs have specific classes) tab_class = tab.get_attribute("class") or "" review_classes = ["review", "reviews", "rating", "ratings", "comments", "feedback", "g4jrve"] if any(cls in tab_class.lower() for cls in review_classes): log.debug(f"Found reviews tab by class: {tab_class}") return True # Log what we found for debugging log.debug(f"Tab not identified as reviews tab - role: {role}, index: {tab_index}, aria-label: '{aria_label}', text: '{tab_text}', class: '{tab_class}'") return False except StaleElementReferenceException: return False except Exception as e: log.debug(f"Error in is_reviews_tab: {e}") return False def click_reviews_tab(self, driver: Chrome): """ Highly dynamic reviews tab detection and clicking with multiple fallback strategies. Works across different languages, layouts, and browser environments. """ max_timeout = 30 # Maximum seconds to try end_time = time.time() + max_timeout attempts = 0 # First, wait for the business panel to load log.info("Waiting for business information panel to load...") business_panel_loaded = False panel_wait_end = time.time() + 15 # Wait up to 15 seconds for business panel while time.time() < panel_wait_end: try: # Look for indicators that the business panel has loaded business_indicators = [ # Business name or rating elements '[role="main"] h1', '[role="main"] .DUwDvf', # Business name class '[role="main"] .F7nice', # Rating class '[role="main"] .fontHeadlineSmall', # Or any elements with review-related text '//*[contains(translate(text(), "REVIEWS", "reviews"), "review")]', ] for indicator in business_indicators[:-1]: # CSS selectors first elements = driver.find_elements(By.CSS_SELECTOR, indicator) if elements: log.info(f"Business panel detected using selector: {indicator}") business_panel_loaded = True break if not business_panel_loaded: # Try XPath selector for review text elements = driver.find_elements(By.XPATH, business_indicators[-1]) if elements: log.info("Business panel detected using review text search") business_panel_loaded = True if business_panel_loaded: break time.sleep(1) except Exception as e: log.debug(f"Error checking for business panel: {e}") time.sleep(1) if business_panel_loaded: log.info("Business panel loaded successfully") # Give it a bit more time for tabs to appear time.sleep(2) else: log.warning("Business panel may not have loaded completely, continuing with tab search...") # If no business panel is detected, try URL manipulation to force reviews view if not business_panel_loaded: try: current_url = driver.current_url if "/place/" in current_url and "/reviews" not in current_url: # Try to navigate directly to reviews by modifying URL if "?" in current_url: base_url = current_url.split("?")[0] params = current_url.split("?")[1] new_url = f"{base_url}/reviews?{params}" else: new_url = f"{current_url}/reviews" log.info(f"Attempting to navigate directly to reviews: {new_url}") driver.get(new_url) time.sleep(3) # Check if this worked by looking for review content review_cards = driver.find_elements(By.CSS_SELECTOR, 'div[data-review-id]') if review_cards: log.info("Successfully navigated to reviews page") return True except Exception as e: log.debug(f"URL manipulation failed: {e}") # Define different selectors to try in order of reliability tab_selectors = [ # Direct tab selectors - try both common indexes '[data-tab-index="1"]', # Common tab index for reviews '[data-tab-index="2"]', # Alternative tab index for reviews 'button[role="tab"][data-tab-index="1"]', # Exact match from HTML 'button[role="tab"][data-tab-index="2"]', # Alternative exact match '[role="tab"][data-tab-index]', # Any tab with index 'button[role="tab"]', # Button tabs 'div[role="tab"]', # Div tabs 'a[role="tab"]', # Link tabs # Common Google Maps review tab selectors '.fontTitleSmall[role="tab"]', # Google Maps title font tabs '.hh2c6[role="tab"]', # Common Google Maps class '.m6QErb [role="tab"]', # Maps container tabs # Text-based selectors for various languages 'button:contains("reviews")', # Button containing "reviews" 'div[role="tablist"] > *', # Any tab in a tab list 'div.m6QErb div[role="tablist"] > *', # Google Maps specific tablist # Fallback selectors '[role="tab"]', # Any tab element ] # Record successful clicks for debugging successful_method = None successful_selector = None # Try each selector in turn for selector in tab_selectors: if time.time() > end_time: break try: elements = driver.find_elements(By.CSS_SELECTOR, selector) if not elements: continue # Try each element found with this selector for element in elements: attempts += 1 # First check if this is actually a reviews tab if not self.is_reviews_tab(element): continue # Found a reviews tab, attempt to click it with multiple methods log.info(f"Found potential reviews tab ({selector}): '{element.text}', attempting to click") # Ensure visibility driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", element) time.sleep(0.7) # Wait for scroll # Try different click methods in order of reliability click_methods = [ # Method 1: JavaScript click (most reliable) lambda: driver.execute_script("arguments[0].click();", element), # Method 2: Direct click lambda: element.click(), # Method 3: ActionChains click lambda: ActionChains(driver).move_to_element(element).click().perform(), # Method 4: Send RETURN key lambda: element.send_keys(Keys.RETURN), # Method 5: Center click with ActionChains lambda: ActionChains(driver).move_to_element_with_offset( element, element.size['width'] // 2, element.size['height'] // 2).click().perform(), ] # Try each click method for i, click_method in enumerate(click_methods): try: click_method() time.sleep(1.5) # Wait for click to take effect # Verify if click worked (check for new content) if self.verify_reviews_tab_clicked(driver): successful_method = i + 1 successful_selector = selector log.info( f"Successfully clicked reviews tab using method {i + 1} and selector '{selector}'") return True except Exception as click_error: log.debug(f"Click method {i + 1} failed: {click_error}") continue except Exception as selector_error: log.debug(f"Error with selector '{selector}': {selector_error}") continue # If we reach here, try XPath as a last resort if time.time() <= end_time: log.info("Trying XPath-based text matching for review tabs...") for language_keyword in REVIEW_WORDS: try: # Try different XPath patterns for review text xpath_patterns = [ f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]", f"//button[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]", f"//*[@role='tab' and contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]", f"//*[contains(translate(@aria-label, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{language_keyword.lower()}')]", ] for xpath in xpath_patterns: try: elements = driver.find_elements(By.XPATH, xpath) for element in elements: try: # Skip if it's a script tag or hidden element if element.tag_name.lower() in ['script', 'style', 'noscript']: continue if not element.is_displayed(): continue log.info(f"Trying XPath pattern with keyword '{language_keyword}': {xpath}") driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element) time.sleep(0.7) driver.execute_script("arguments[0].click();", element) time.sleep(1.5) if self.verify_reviews_tab_clicked(driver): log.info(f"Successfully clicked element with keyword '{language_keyword}'") return True except: continue except: continue except: continue # Final attempt: try to navigate directly to reviews by URL try: current_url = driver.current_url if "?hl=" in current_url: # Preserve language setting if present lang_param = re.search(r'\?hl=([^&]*)', current_url) if lang_param: lang_code = lang_param.group(1) # Try to replace the current part with 'reviews' or append it if '/place/' in current_url: parts = current_url.split('/place/') new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews?hl={lang_code}" driver.get(new_url) time.sleep(2) if "review" in driver.current_url.lower(): log.info("Navigated directly to reviews page via URL") return True # Try to identify reviews link in URL if '/place/' in current_url and '/reviews' not in current_url: parts = current_url.split('/place/') new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews" driver.get(new_url) time.sleep(2) if "review" in driver.current_url.lower(): log.info("Navigated directly to reviews page via URL") return True except Exception as url_error: log.warning(f"Failed to navigate to reviews via URL: {url_error}") log.warning(f"Failed to find/click reviews tab after {attempts} attempts") raise TimeoutException("Reviews tab not found or could not be clicked") def verify_reviews_tab_clicked(self, driver: Chrome) -> bool: """ Verify that the reviews tab was successfully clicked by checking for characteristic elements that appear on the reviews page. """ try: # Common elements that appear when reviews tab is active verification_selectors = [ # Reviews container 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf', # Review cards 'div[data-review-id]', # Sort button (usually appears with reviews) 'button[aria-label*="Sort" i]', # Review rating elements 'span[role="img"][aria-label*="star" i]', # Other indicators 'div.m6QErb div.jftiEf', '.HlvSq' ] # Check if any verification selector is present for selector in verification_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements and len(elements) > 0: return True # URL check - if "review" appears in the URL if "review" in driver.current_url.lower(): return True return False except Exception as e: log.debug(f"Error verifying reviews tab click: {e}") return False def set_sort(self, driver: Chrome, method: str): """ Set the sorting method for reviews with enhanced detection for the latest Google Maps UI. Works across different languages and UI variations, with robust error handling. """ if method == "relevance": log.info("Using default 'relevance' sort - no need to change sort order") return True # Default order, no need to change log.info(f"Attempting to set sort order to '{method}'") try: # 1. Find and click the sort button sort_button_selectors = [ # Exact selectors based on recent HTML structure 'button.HQzyZ[aria-haspopup="true"]', 'div.m6QErb button.HQzyZ', 'button[jsaction*="pane.wfvdle84"]', 'div.fontBodyLarge.k5lwKb', # The text element inside sort button # Common attribute-based selectors 'button[aria-label*="Sort" i]', 'button[aria-label*="sort" i]', 'button[aria-expanded="false"][aria-haspopup="true"]', # Multilingual selectors 'button[aria-label*="סדר" i]', # Hebrew 'button[aria-label*="เรียง" i]', # Thai 'button[aria-label*="排序" i]', # Chinese 'button[aria-label*="Trier" i]', # French 'button[aria-label*="Ordenar" i]', # Spanish/Portuguese 'button[aria-label*="Sortieren" i]', # German # Parent container-based selectors 'div.m6QErb.Hk4XGb.XiKgde.tLjsW button', 'div.m6QErb div.XiKgde button' ] # Attempt to find the sort button sort_button = None # Try each selector for selector in sort_button_selectors: try: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: # Skip invisible/disabled elements if not element.is_displayed() or not element.is_enabled(): continue # Get button text and attributes for verification button_text = element.text.strip() if element.text else "" button_aria = element.get_attribute("aria-label") or "" # Skip buttons that are clearly not sort buttons negative_keywords = ["back", "next", "previous", "close", "cancel", "חזרה", "סגור", "ปิด"] if any(keyword in button_text.lower() or keyword in button_aria.lower() for keyword in negative_keywords): continue # Additional check - make sure this is actually a sort button for reviews if ("sort" not in button_text.lower() and "sort" not in button_aria.lower() and "סדר" not in button_text.lower() and "เรียง" not in button_text.lower() and "ordenar" not in button_text.lower() and "trier" not in button_text.lower()): log.debug(f"Button doesn't appear to be a sort button, skipping: '{button_text}' / '{button_aria}'") continue # Found a potential sort button sort_button = element log.info(f"Found sort button with selector: {selector}") log.info(f"Button text: '{button_text}', aria-label: '{button_aria}'") break except Exception as e: log.debug(f"Error checking element: {e}") continue if sort_button: break except Exception as e: log.debug(f"Error with selector '{selector}': {e}") continue # If no button found with CSS selectors, try finding it from its container if not sort_button: try: # Look for the sort container by its distinctive classes containers = driver.find_elements(By.CSS_SELECTOR, 'div.m6QErb.Hk4XGb, div.XiKgde.tLjsW') for container in containers: try: # Find buttons within this container buttons = container.find_elements(By.TAG_NAME, 'button') for button in buttons: if button.is_displayed() and button.is_enabled(): sort_button = button log.info("Found sort button through container element") break except: continue if sort_button: break except Exception as e: log.debug(f"Error finding button via container: {e}") # If still no button found, try XPath approach with keywords if not sort_button: xpath_terms = ["sort", "Sort", "סדר", "סידור", "เรียง", "排序", "Trier", "Ordenar", "Sortieren"] for term in xpath_terms: try: xpath = f"//*[contains(text(), '{term}') or contains(@aria-label, '{term}')]" elements = driver.find_elements(By.XPATH, xpath) for element in elements: try: if element.is_displayed() and element.is_enabled(): sort_button = element log.info(f"Found sort button with XPath term: '{term}'") break except: continue if sort_button: break except: continue # Final check - do we have a sort button? if not sort_button: log.warning("No sort button found with any method - keeping default sort order") return False # 2. Click the sort button to open dropdown menu # First ensure the button is in view driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", sort_button) time.sleep(0.8) # Wait for scroll # Try multiple click methods click_methods = [ # Method 1: JavaScript click lambda: driver.execute_script("arguments[0].click();", sort_button), # Method 2: Direct click lambda: sort_button.click(), # Method 3: ActionChains click with move first lambda: ActionChains(driver).move_to_element(sort_button).pause(0.3).click().perform(), # Method 4: Click on center of element lambda: ActionChains(driver).move_to_element_with_offset( sort_button, sort_button.size['width'] // 2, sort_button.size['height'] // 2 ).click().perform(), # Method 5: JavaScript focus and click lambda: driver.execute_script( "arguments[0].focus(); setTimeout(function() { arguments[0].click(); }, 100);", sort_button ), # Method 6: Send RETURN key after focusing lambda: ActionChains(driver).move_to_element(sort_button).click().send_keys(Keys.RETURN).perform() ] # Try each click method menu_opened = False for i, click_method in enumerate(click_methods): try: log.info(f"Trying click method {i + 1} for sort button...") click_method() time.sleep(1) # Wait for menu to appear # Check if menu opened menu_opened = self.check_if_menu_opened(driver) if menu_opened: log.info(f"Sort menu opened with click method {i + 1}") break except Exception as e: log.debug(f"Click method {i + 1} failed: {e}") continue # If menu not opened, abort if not menu_opened: log.warning("Failed to open sort menu - keeping default sort order") # Try to reset state by clicking elsewhere try: ActionChains(driver).move_by_offset(50, 50).click().perform() except: pass return False # 3. Find and click the desired sort option in the menu # Selectors for menu items with focus on the exact HTML structure menu_item_selectors = [ # Exact Google Maps menu item selectors 'div[role="menuitemradio"]', 'div.fxNQSd[role="menuitemradio"]', 'div[role="menuitemradio"] div.mLuXec', # Inner text container # Generic menu item selectors (fallback) '[role="menuitemradio"]', '[role="menuitem"]', 'div[role="menu"] > div' ] # Combined selector for efficiency combined_selector = ", ".join(menu_item_selectors) try: # Wait for menu items to appear menu_items = WebDriverWait(driver, 5).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, combined_selector)) ) # Process menu items to find matches visible_items = [] for item in menu_items: try: # Skip invisible items if not item.is_displayed(): continue # Handle different element types if item.get_attribute('role') == 'menuitemradio': # This is a top-level menu item try: # Try to find text in the inner div.mLuXec element first text_elements = item.find_elements(By.CSS_SELECTOR, 'div.mLuXec') if text_elements and text_elements[0].is_displayed(): text = text_elements[0].text.strip() visible_items.append((item, text)) else: # Fall back to the item's own text text = item.text.strip() visible_items.append((item, text)) except: # Last resort - use the item's own text text = item.text.strip() visible_items.append((item, text)) elif 'mLuXec' in (item.get_attribute('class') or ''): # This is the text container element - get its parent menuitemradio try: text = item.text.strip() parent = driver.execute_script( "return arguments[0].closest('[role=\"menuitemradio\"]');", item ) if parent: visible_items.append((parent, text)) except: continue else: # Generic menu item handling text = item.text.strip() visible_items.append((item, text)) except Exception as e: log.debug(f"Error processing menu item: {e}") continue log.info(f"Found {len(visible_items)} visible menu items") for i, (_, text) in enumerate(visible_items): log.debug(f" Menu item {i + 1}: '{text}'") # Determine the target menu item based on sort method target_item = None matched_text = None # 1. First try direct text matching wanted_labels = SORT_OPTIONS.get(method, []) for item, text in visible_items: for label in wanted_labels: if (label in text or text in label or (len(text) > 0 and len(label) > 0 and text.lower().startswith(label.lower()[:3]))): target_item = item matched_text = text log.info(f"Found matching menu item: '{text}' for '{label}'") break if target_item: break # 2. If no match found, try position-based selection if not target_item and visible_items: position_map = { "relevance": 0, # Usually the first option "newest": 1, # Usually the second option "highest": 2, # Usually the third option "lowest": 3 # Usually the fourth option } pos = position_map.get(method, -1) if pos >= 0 and pos < len(visible_items): target_item, matched_text = visible_items[pos] log.info(f"Using position-based selection (position {pos}) for '{method}'") # 3. If target found, click it if target_item: # Ensure item is in view driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", target_item) time.sleep(0.3) # Try multiple click methods click_success = False click_methods = [ # Method 1: JavaScript click lambda: driver.execute_script("arguments[0].click();", target_item), # Method 2: Direct click lambda: target_item.click(), # Method 3: ActionChains click lambda: ActionChains(driver).move_to_element(target_item).click().perform(), # Method 4: Center click lambda: ActionChains(driver).move_to_element_with_offset( target_item, target_item.size['width'] // 2, target_item.size['height'] // 2 ).click().perform(), # Method 5: JavaScript click with custom event lambda: driver.execute_script(""" var el = arguments[0]; var evt = new MouseEvent('click', { bubbles: true, cancelable: true, view: window }); el.dispatchEvent(evt); """, target_item) ] for i, click_method in enumerate(click_methods): try: click_method() time.sleep(1.5) # Wait for sort to take effect # Try to verify sort happened by checking if menu closed still_open = self.check_if_menu_opened(driver) if not still_open: click_success = True log.info(f"Successfully clicked menu item with method {i + 1}") break except Exception as e: log.debug(f"Menu item click method {i + 1} failed: {e}") continue if click_success: log.info(f"Successfully set sort order to '{method}'") return True else: log.warning(f"Failed to click menu item - keeping default sort order") else: log.warning(f"No matching menu item found for '{method}'") # If we get here, we failed - try to close the menu by clicking elsewhere try: ActionChains(driver).move_by_offset(50, 50).click().perform() except: pass return False except TimeoutException: log.warning("Timeout waiting for menu items") return False except Exception as e: log.warning(f"Error in menu item selection: {e}") return False except Exception as e: log.warning(f"Error in set_sort method: {e}") return False def check_if_menu_opened(self, driver): """ Check if a sort menu has been opened after clicking the sort button. Uses multiple detection strategies optimized for Google Maps dropdowns. Returns True if menu is detected, False otherwise. """ try: # 1. First check for exact menu container selectors from the latest Google Maps UI specific_menu_selectors = [ 'div[role="menu"][id="action-menu"]', # Exact match from provided HTML 'div.fontBodyLarge.yu5kgd[role="menu"]', # Classes from provided HTML 'div.fxNQSd[role="menuitemradio"]', # Menu item class 'div.yu5kgd[role="menu"]' # Alternate class ] for selector in specific_menu_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): return True except: continue # 2. Check for generic menu containers generic_menu_selectors = [ 'div[role="menu"]', 'ul[role="menu"]', '[role="listbox"]' ] for selector in generic_menu_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): return True except: continue # 3. Look for menu items menu_item_selectors = [ 'div[role="menuitemradio"]', # Google Maps specific 'div.fxNQSd', # Class-based detection 'div.mLuXec', # Text container class '[role="menuitem"]', # Generic menu items '[role="option"]' # Alternative role ] visible_items = 0 for selector in menu_item_selectors: elements = driver.find_elements(By.CSS_SELECTOR, selector) for element in elements: try: if element.is_displayed(): visible_items += 1 if visible_items >= 2: # At least 2 menu items should be visible return True except: continue # 4. Advanced detection with JavaScript # Checks if there are newly visible elements with menu-related roles or classes try: js_detection = """ return (function() { // Check for visible menu elements var menuElements = document.querySelectorAll('div[role="menu"], div[role="menuitemradio"], div.fxNQSd'); for (var i = 0; i < menuElements.length; i++) { var style = window.getComputedStyle(menuElements[i]); if (style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0') { return true; } } // Check for any recently appeared elements that might be a menu var possibleMenus = document.querySelectorAll('div.yu5kgd, div.fontBodyLarge'); for (var i = 0; i < possibleMenus.length; i++) { var style = window.getComputedStyle(possibleMenus[i]); var rect = possibleMenus[i].getBoundingClientRect(); // Check if element is visible and has a meaningful size if (style.display !== 'none' && style.visibility !== 'hidden' && rect.width > 50 && rect.height > 50) { return true; } } return false; })(); """ menu_detected = driver.execute_script(js_detection) if menu_detected: return True except Exception as js_error: log.debug(f"Error in JavaScript menu detection: {js_error}") # 5. Last resort: check if any positioning styles were applied to elements # This can detect menu containers that have been positioned absolutely try: position_check = """ return (function() { // Look for absolutely positioned elements that appeared recently var elements = document.querySelectorAll('div[style*="position: absolute"]'); for (var i = 0; i < elements.length; i++) { var el = elements[i]; var style = window.getComputedStyle(el); var hasMenuItems = el.querySelectorAll('div[role="menuitemradio"], div.fxNQSd').length > 0; if (style.display !== 'none' && style.visibility !== 'hidden' && hasMenuItems) { return true; } } return false; })(); """ position_detected = driver.execute_script(position_check) if position_detected: return True except: pass return False except Exception as e: log.debug(f"Error checking menu state: {e}") return False def scrape(self): """Main scraper method""" start_time = time.time() url = self.config.get("url") headless = self.config.get("headless", True) sort_by = self.config.get("sort_by", "relevance") stop_on_match = self.config.get("stop_on_match", False) # Override stop_on_match if translation mode is enabled if self.append_translations or self.force_full_scan: stop_on_match = False log.info("Translation mode enabled - forcing full scan of all reviews") log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") log.info(f"Translation mode: append_translations={self.append_translations}, language={self.translation_language}") log.info(f"URL: {url}") # Initialize storage # If not overwriting, load existing data if self.overwrite_existing: docs = {} seen = set() else: # Try to get from MongoDB first if enabled docs = {} if self.use_mongodb and self.mongodb: docs = self.mongodb.fetch_existing_reviews() # If backup_to_json is enabled, also load from JSON for merging if self.backup_to_json: json_docs = self.json_storage.load_json_docs() # Merge JSON docs with MongoDB docs for review_id, review in json_docs.items(): if review_id not in docs: docs[review_id] = review # Load seen IDs from file seen = self.json_storage.load_seen() driver = None try: driver = self.setup_driver(headless) wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout driver.get(url) wait.until(lambda d: "google.com/maps" in d.current_url) self.dismiss_cookies(driver) self.click_reviews_tab(driver) self.set_sort(driver, sort_by) # Add a wait after setting sort to allow results to load time.sleep(3) # Increased wait time for reviews to load # Use try-except to handle cases where the pane is not found pane = None pane_selectors = [ PANE_SEL, # Original selector 'div[role="main"]', # Simpler main container 'body', # Ultimate fallback ] for pane_selector in pane_selectors: try: pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, pane_selector))) log.info(f"Found scrollable pane using selector: {pane_selector}") break except TimeoutException: log.debug(f"Pane selector '{pane_selector}' not found") continue if not pane: log.warning("Could not find any scrollable pane. Page structure might have changed.") return False pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) idle = 0 processed_ids = set() # Track processed IDs in current session # Prefetch selector to avoid repeated lookups try: driver.execute_script("window.scrollablePane = arguments[0];", pane) scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" except Exception as e: log.warning(f"Error setting up scroll script: {e}") scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling max_attempts = 10 # Limit the number of attempts to find reviews attempts = 0 # In translation mode, track total unique reviews to detect when we stop finding new ones all_review_ids_seen = set() last_unique_count = 0 no_new_reviews_count = 0 while attempts < max_attempts: try: cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) fresh_cards: List[WebElement] = [] # Check for valid cards if len(cards) == 0: log.info(f"No review cards found in iteration {attempts + 1} using selector '{CARD_SEL}'") # Try alternative selectors alternative_selectors = [ 'div[data-review-id]', '.jftiEf[data-review-id]', '[data-review-id]' ] for alt_sel in alternative_selectors: alt_cards = pane.find_elements(By.CSS_SELECTOR, alt_sel) log.info(f"Alternative selector '{alt_sel}': Found {len(alt_cards)} cards") if alt_cards: cards = alt_cards break if len(cards) == 0: # If no cards found in pane, try searching the entire page log.info("No cards found in pane, searching entire page...") page_cards = driver.find_elements(By.CSS_SELECTOR, 'div[data-review-id]') log.info(f"Found {len(page_cards)} review cards on entire page") if page_cards: cards = page_cards log.info("Using review cards found on entire page") else: attempts += 1 # Try scrolling anyway driver.execute_script(scroll_script) time.sleep(1) continue else: log.info(f"Found {len(cards)} review cards in iteration {attempts + 1}") for c in cards: try: cid = c.get_attribute("data-review-id") if not cid: continue # In translation mode, process all cards even if seen before if self.append_translations: # In translation mode, we process all reviews to add potential translations # We don't use processed_ids to track, so process all cards if cid in seen: log.debug(f"Translation mode: Processing {cid} again (was seen before, adding translation)") else: log.debug(f"Translation mode: Processing {cid} (new review)") fresh_cards.append(c) else: # Normal mode: skip seen reviews if cid in seen or cid in processed_ids: if stop_on_match and cid and (cid in seen or cid in processed_ids): idle = 999 break continue fresh_cards.append(c) except StaleElementReferenceException: continue except Exception as e: log.debug(f"Error getting review ID: {e}") continue # In translation mode, track all unique review IDs to detect when we've seen all reviews if self.append_translations: current_review_ids = set() for c in cards: try: cid = c.get_attribute("data-review-id") if cid: current_review_ids.add(cid) all_review_ids_seen.add(cid) except: continue # Check if we found new unique reviews current_unique_count = len(all_review_ids_seen) if current_unique_count == last_unique_count: no_new_reviews_count += 1 log.info(f"Translation mode: No new reviews found ({no_new_reviews_count}/5) - total unique: {current_unique_count}") else: no_new_reviews_count = 0 log.info(f"Translation mode: Found new reviews - total unique: {current_unique_count} (was {last_unique_count})") last_unique_count = current_unique_count # If we haven't found new reviews for 5 iterations, we're done if no_new_reviews_count >= 5: log.info("Translation mode: No new reviews found for 5 iterations - stopping") break # Log how many fresh cards we found log.info(f"Found {len(fresh_cards)} fresh cards out of {len(cards)} total cards (translation_mode={self.append_translations})") for card in fresh_cards: try: raw = RawReview.from_card(card) # In translation mode, don't add to processed_ids to allow re-processing if not self.append_translations: processed_ids.add(raw.id) # Track this ID to avoid re-processing except StaleElementReferenceException: continue except Exception: log.warning("⚠️ parse error – storing stub\n%s", traceback.format_exc(limit=1).strip()) try: raw_id = card.get_attribute("data-review-id") or "" raw = RawReview(id=raw_id, text="", lang="und") # In translation mode, don't add to processed_ids to allow re-processing if not self.append_translations: processed_ids.add(raw_id) except StaleElementReferenceException: continue # Use translation-aware merge if translation mode is enabled if self.append_translations: docs[raw.id] = merge_review_with_translation(docs.get(raw.id), raw, append_translations=True) else: docs[raw.id] = merge_review(docs.get(raw.id), raw) seen.add(raw.id) pbar.update(1) idle = 0 attempts = 0 # Reset attempts counter when we successfully process a review # In translation mode, be more patient before giving up max_idle = 10 if self.append_translations else 3 if idle >= max_idle: log.info(f"Stopping after {max_idle} idle iterations") break if not fresh_cards: idle += 1 attempts += 1 # In translation mode, log why we're not finding fresh cards if self.append_translations: log.debug(f"No fresh cards in translation mode - idle: {idle}/{max_idle}, attempts: {attempts}") else: # Reset idle counter when we have fresh cards idle = 0 # Use JavaScript for smoother scrolling try: driver.execute_script(scroll_script) except Exception as e: log.warning(f"Error scrolling: {e}") # Try a simpler scroll method driver.execute_script("window.scrollBy(0, 300);") # Dynamic sleep: sleep less when processing many reviews sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 time.sleep(sleep_time) except StaleElementReferenceException: # The pane or other element went stale, try to re-find log.debug("Stale element encountered, re-finding elements") try: pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) driver.execute_script("window.scrollablePane = arguments[0];", pane) except Exception: log.warning("Could not re-find reviews pane after stale element") break except Exception as e: log.warning(f"Error during review processing: {e}") attempts += 1 time.sleep(1) pbar.close() # Save to MongoDB if enabled if self.use_mongodb and self.mongodb: log.info("Saving reviews to MongoDB...") self.mongodb.save_reviews(docs) # Backup to JSON if enabled if self.backup_to_json: log.info("Backing up to JSON...") self.json_storage.save_json_docs(docs) self.json_storage.save_seen(seen) log.info("✅ Finished – total unique reviews: %s", len(docs)) end_time = time.time() elapsed_time = end_time - start_time log.info(f"Execution completed in {elapsed_time:.2f} seconds") return True except Exception as e: log.error(f"Error during scraping: {e}") log.error(traceback.format_exc()) return False finally: if driver is not None: try: driver.quit() except Exception: pass if self.mongodb: try: self.mongodb.close() except Exception: pass # """ # Selenium scraping logic for Google Maps Reviews. # """ # # import os # import time # import logging # import traceback # import platform # from typing import Dict, Any, List # # import undetected_chromedriver as uc # from selenium.common.exceptions import TimeoutException, StaleElementReferenceException # from selenium.webdriver import Chrome # from selenium.webdriver.common.by import By # from selenium.webdriver.remote.webelement import WebElement # from selenium.webdriver.support import expected_conditions as EC # from selenium.webdriver.support.ui import WebDriverWait # from tqdm import tqdm # # from modules.models import RawReview # from modules.data_storage import MongoDBStorage, JSONStorage, merge_review, merge_review_with_translation # # # Logger # log = logging.getLogger("scraper") # # # CSS Selectors # PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' # CARD_SEL = "div[data-review-id]" # COOKIE_BTN = ('button[aria-label*="Accept" i],' # 'button[jsname="hZCF7e"],' # 'button[data-mdc-dialog-action="accept"]') # SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' # MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' # # SORT_LABELS = { # text shown in Google Maps' menu # "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), # "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), # "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), # "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), # } # # REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", # "recensioni", "bewertungen", "口コミ", "レビュー", # "리뷰", "評論", "评论", "рецензии", "ביקורת"} # # # class GoogleReviewsScraper: # """Main scraper class for Google Maps reviews""" # # def __init__(self, config: Dict[str, Any]): # """Initialize scraper with configuration""" # self.config = config # self.use_mongodb = config.get("use_mongodb", True) # self.mongodb = MongoDBStorage(config) if self.use_mongodb else None # self.json_storage = JSONStorage(config) # self.backup_to_json = config.get("backup_to_json", True) # self.overwrite_existing = config.get("overwrite_existing", False) # # def setup_driver(self, headless: bool) -> Chrome: # """ # Set up and configure Chrome driver with flexibility for different environments. # Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). # """ # # Determine if we're running in a container # in_container = os.environ.get('CHROME_BIN') is not None # # # Create Chrome options # opts = uc.ChromeOptions() # opts.add_argument("--window-size=1400,900") # opts.add_argument("--ignore-certificate-errors") # opts.add_argument("--disable-gpu") # Improves performance # opts.add_argument("--disable-dev-shm-usage") # Helps with stability # opts.add_argument("--no-sandbox") # More stable in some environments # # # Use headless mode if requested # if headless: # opts.add_argument("--headless=new") # # # Log platform information for debugging # log.info(f"Platform: {platform.platform()}") # log.info(f"Python version: {platform.python_version()}") # # # If in container, use environment-provided binaries # if in_container: # chrome_binary = os.environ.get('CHROME_BIN') # chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') # # log.info(f"Container environment detected") # log.info(f"Chrome binary: {chrome_binary}") # log.info(f"ChromeDriver path: {chromedriver_path}") # # if chrome_binary and os.path.exists(chrome_binary): # log.info(f"Using Chrome binary from environment: {chrome_binary}") # opts.binary_location = chrome_binary # # try: # # Try creating Chrome driver with undetected_chromedriver # log.info("Attempting to create undetected_chromedriver instance") # driver = uc.Chrome(options=opts) # log.info("Successfully created undetected_chromedriver instance") # except Exception as e: # # Fall back to regular Selenium if undetected_chromedriver fails # log.warning(f"Failed to create undetected_chromedriver instance: {e}") # log.info("Falling back to regular Selenium Chrome") # # # Import Selenium webdriver here to avoid potential import issues # from selenium import webdriver # from selenium.webdriver.chrome.service import Service # # if chromedriver_path and os.path.exists(chromedriver_path): # log.info(f"Using ChromeDriver from path: {chromedriver_path}") # service = Service(executable_path=chromedriver_path) # driver = webdriver.Chrome(service=service, options=opts) # else: # log.info("Using default ChromeDriver") # driver = webdriver.Chrome(options=opts) # else: # # On regular OS, use default undetected_chromedriver # log.info("Using standard undetected_chromedriver setup") # driver = uc.Chrome(options=opts) # # # Set page load timeout to avoid hanging # driver.set_page_load_timeout(30) # log.info("Chrome driver setup completed successfully") # return driver # # def dismiss_cookies(self, driver: Chrome): # """ # Dismiss cookie consent dialogs if present. # Handles stale element references by re-finding elements if needed. # """ # try: # # Use WebDriverWait with expected_conditions to handle stale elements # WebDriverWait(driver, 3).until( # EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) # ) # log.info("Cookie consent dialog found, attempting to dismiss") # # # Get elements again after waiting to avoid stale references # elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) # for elem in elements: # try: # if elem.is_displayed(): # elem.click() # log.info("Cookie dialog dismissed") # return True # except Exception as e: # log.debug(f"Error clicking cookie button: {e}") # continue # except TimeoutException: # # This is expected if no cookie dialog is present # log.debug("No cookie consent dialog detected") # except Exception as e: # log.debug(f"Error handling cookie dialog: {e}") # # return False # # def is_reviews_tab(self, tab: WebElement) -> bool: # """Check if a tab is the reviews tab""" # try: # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) # except StaleElementReferenceException: # return False # except Exception as e: # log.debug(f"Error checking if tab is reviews tab: {e}") # return False # # def click_reviews_tab(self, driver: Chrome): # """ # Click on the reviews tab in Google Maps with improved stale element handling. # """ # end = time.time() + 15 # Timeout after 15 seconds # while time.time() < end: # try: # # Find all tab elements # tabs = driver.find_elements(By.CSS_SELECTOR, '[role="tab"], button[aria-label]') # # for tab in tabs: # try: # # Check if this is the reviews tab # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # is_review_tab = tab.get_attribute("data-tab-index") == "1" or any( # w in label for w in REVIEW_WORDS) # # if is_review_tab: # # Scroll the tab into view # driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) # time.sleep(0.2) # Small wait after scrolling # # # Try to click the tab # log.info("Found reviews tab, attempting to click") # tab.click() # log.info("Successfully clicked reviews tab") # return True # except Exception as e: # # Element might be stale or not clickable, try the next one # log.debug(f"Error with tab element: {str(e)}") # continue # # # If we get here, we didn't find a suitable tab in this iteration # log.debug("No reviews tab found in this iteration, waiting...") # time.sleep(0.5) # Wait before next attempt # # except Exception as e: # # General exception handling # log.debug(f"Exception while looking for reviews tab: {str(e)}") # time.sleep(0.5) # # # If we exit the loop, we've timed out # log.warning("Timeout while looking for reviews tab") # raise TimeoutException("Reviews tab not found") # # def set_sort(self, driver: Chrome, method: str): # """ # Set the sorting method for reviews with improved error handling. # """ # if method == "relevance": # return True # Default order, no need to change # # log.info(f"Attempting to set sort order to '{method}'") # # try: # # First try to find and click the sort button # sort_buttons = driver.find_elements(By.CSS_SELECTOR, SORT_BTN) # if not sort_buttons: # log.warning(f"Sort button not found - keeping default sort order") # return False # # # Try to click the first visible sort button # for sort_button in sort_buttons: # try: # if sort_button.is_displayed() and sort_button.is_enabled(): # sort_button.click() # log.info("Clicked sort button") # time.sleep(0.5) # Wait for menu to appear # break # except Exception as e: # log.debug(f"Error clicking sort button: {e}") # continue # else: # log.warning("No clickable sort button found") # return False # # # Now find and click the menu item for the desired sort method # wanted = SORT_LABELS[method] # menu_items = WebDriverWait(driver, 3).until( # EC.presence_of_all_elements_located((By.CSS_SELECTOR, MENU_ITEMS)) # ) # # for item in menu_items: # try: # label = item.text.strip() # if label in wanted: # item.click() # log.info(f"Selected sort option: {label}") # time.sleep(0.5) # Wait for sorting to take effect # return True # except Exception as e: # log.debug(f"Error clicking menu item: {e}") # continue # # log.warning(f"Sort option '{method}' not found in menu - keeping default") # return False # # except Exception as e: # log.warning(f"Error setting sort order: {e}") # return False # # def scrape(self): # """Main scraper method""" # start_time = time.time() # # url = self.config.get("url") # headless = self.config.get("headless", True) # sort_by = self.config.get("sort_by", "relevance") # stop_on_match = self.config.get("stop_on_match", False) # # log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") # log.info(f"URL: {url}") # # # Initialize storage # # If not overwriting, load existing data # if self.overwrite_existing: # docs = {} # seen = set() # else: # # Try to get from MongoDB first if enabled # docs = {} # if self.use_mongodb and self.mongodb: # docs = self.mongodb.fetch_existing_reviews() # # # If backup_to_json is enabled, also load from JSON for merging # if self.backup_to_json: # json_docs = self.json_storage.load_json_docs() # # Merge JSON docs with MongoDB docs # for review_id, review in json_docs.items(): # if review_id not in docs: # docs[review_id] = review # # # Load seen IDs from file # seen = self.json_storage.load_seen() # # driver = None # try: # driver = self.setup_driver(headless) # wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout # # driver.get(url) # wait.until(lambda d: "google.com/maps" in d.current_url) # # self.dismiss_cookies(driver) # self.click_reviews_tab(driver) # self.set_sort(driver, sort_by) # # # Add a wait after setting sort to allow results to load # time.sleep(1) # # # Use try-except to handle cases where the pane is not found # try: # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # except TimeoutException: # log.warning("Could not find reviews pane. Page structure might have changed.") # return False # # pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) # idle = 0 # processed_ids = set() # Track processed IDs in current session # # # Prefetch selector to avoid repeated lookups # try: # driver.execute_script("window.scrollablePane = arguments[0];", pane) # scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" # except Exception as e: # log.warning(f"Error setting up scroll script: {e}") # scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling # # max_attempts = 10 # Limit the number of attempts to find reviews # attempts = 0 # # while attempts < max_attempts: # try: # cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) # fresh_cards: List[WebElement] = [] # # # Check for valid cards # if len(cards) == 0: # log.debug("No review cards found in this iteration") # attempts += 1 # # Try scrolling anyway # driver.execute_script(scroll_script) # time.sleep(1) # continue # # for c in cards: # try: # cid = c.get_attribute("data-review-id") # if not cid or cid in seen or cid in processed_ids: # if stop_on_match and cid and (cid in seen or cid in processed_ids): # idle = 999 # break # continue # fresh_cards.append(c) # except StaleElementReferenceException: # continue # except Exception as e: # log.debug(f"Error getting review ID: {e}") # continue # # for card in fresh_cards: # try: # raw = RawReview.from_card(card) # processed_ids.add(raw.id) # Track this ID to avoid re-processing # except StaleElementReferenceException: # continue # except Exception: # log.warning("⚠️ parse error – storing stub\n%s", # traceback.format_exc(limit=1).strip()) # try: # raw_id = card.get_attribute("data-review-id") or "" # raw = RawReview(id=raw_id, text="", lang="und") # processed_ids.add(raw_id) # except StaleElementReferenceException: # continue # # docs[raw.id] = merge_review(docs.get(raw.id), raw) # seen.add(raw.id) # pbar.update(1) # idle = 0 # attempts = 0 # Reset attempts counter when we successfully process a review # # if idle >= 3: # break # # if not fresh_cards: # idle += 1 # attempts += 1 # # # Use JavaScript for smoother scrolling # try: # driver.execute_script(scroll_script) # except Exception as e: # log.warning(f"Error scrolling: {e}") # # Try a simpler scroll method # driver.execute_script("window.scrollBy(0, 300);") # # # Dynamic sleep: sleep less when processing many reviews # sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 # time.sleep(sleep_time) # # except StaleElementReferenceException: # # The pane or other element went stale, try to re-find # log.debug("Stale element encountered, re-finding elements") # try: # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # driver.execute_script("window.scrollablePane = arguments[0];", pane) # except Exception: # log.warning("Could not re-find reviews pane after stale element") # break # except Exception as e: # log.warning(f"Error during review processing: {e}") # attempts += 1 # time.sleep(1) # # pbar.close() # # # Save to MongoDB if enabled # if self.use_mongodb and self.mongodb: # log.info("Saving reviews to MongoDB...") # self.mongodb.save_reviews(docs) # # # Backup to JSON if enabled # if self.backup_to_json: # log.info("Backing up to JSON...") # self.json_storage.save_json_docs(docs) # self.json_storage.save_seen(seen) # # log.info("✅ Finished – total unique reviews: %s", len(docs)) # # end_time = time.time() # elapsed_time = end_time - start_time # log.info(f"Execution completed in {elapsed_time:.2f} seconds") # # return True # # except Exception as e: # log.error(f"Error during scraping: {e}") # log.error(traceback.format_exc()) # return False # # finally: # if driver is not None: # try: # driver.quit() # except Exception: # pass # # if self.mongodb: # try: # self.mongodb.close() # except Exception: # pass # # # """ # # Selenium scraping logic for Google Maps Reviews. # # """ # # # # import re # # import time # # import logging # # import traceback # # from typing import Dict, Any, Set, List # # # # import undetected_chromedriver as uc # # from selenium.common.exceptions import TimeoutException # # from selenium.webdriver import Chrome # # from selenium.webdriver.common.by import By # # from selenium.webdriver.remote.webelement import WebElement # # from selenium.webdriver.support import expected_conditions as EC # # from selenium.webdriver.support.ui import WebDriverWait # # from tqdm import tqdm # # # # from modules.models import RawReview # # from modules.data_storage import MongoDBStorage, JSONStorage, merge_review, merge_review_with_translation # # from modules.utils import click_if # # # # # Logger # # log = logging.getLogger("scraper") # # # # # CSS Selectors # # PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' # # CARD_SEL = "div[data-review-id]" # # COOKIE_BTN = ('button[aria-label*="Accept" i],' # # 'button[jsname="hZCF7e"],' # # 'button[data-mdc-dialog-action="accept"]') # # SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' # # MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' # # # # SORT_LABELS = { # text shown in Google Maps' menu # # "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), # # "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), # # "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), # # "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), # # } # # # # REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", # # "recensioni", "bewertungen", "口コミ", "レビュー", # # "리뷰", "評論", "评论", "рецензии"} # # # # # # class GoogleReviewsScraper: # # """Main scraper class for Google Maps reviews""" # # # # def __init__(self, config: Dict[str, Any]): # # """Initialize scraper with configuration""" # # self.config = config # # self.use_mongodb = config.get("use_mongodb", True) # # self.mongodb = MongoDBStorage(config) if self.use_mongodb else None # # self.json_storage = JSONStorage(config) # # self.backup_to_json = config.get("backup_to_json", True) # # self.overwrite_existing = config.get("overwrite_existing", False) # # # # def setup_driver(self, headless: bool) -> Chrome: # # """Set up and configure Chrome driver""" # # opts = uc.ChromeOptions() # # opts.add_argument("--window-size=1400,900") # # opts.add_argument("--ignore-certificate-errors") # # opts.add_argument("--disable-gpu") # Improves performance # # opts.add_argument("--disable-dev-shm-usage") # Helps with stability # # opts.add_argument("--no-sandbox") # More stable in some environments # # # # if headless: # # opts.add_argument("--headless=new") # # # # driver = uc.Chrome(options=opts) # # # Set page load timeout to avoid hanging # # driver.set_page_load_timeout(30) # # return driver # # # # def dismiss_cookies(self, driver: Chrome): # # """Dismiss cookie consent dialogs""" # # click_if(driver, COOKIE_BTN, timeout=3.0) # Reduced timeout for faster operation # # # # def is_reviews_tab(self, tab: WebElement) -> bool: # # """Check if a tab is the reviews tab""" # # label = (tab.get_attribute("aria-label") or tab.text or "").lower() # # return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) # # # # def click_reviews_tab(self, driver: Chrome): # # """Click on the reviews tab in Google Maps""" # # end = time.time() + 15 # Reduced timeout from 30 to 15 seconds # # while time.time() < end: # # for tab in driver.find_elements(By.CSS_SELECTOR, # # '[role="tab"], button[aria-label]'): # # if self.is_reviews_tab(tab): # # driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) # # try: # # tab.click() # # return # # except Exception: # # continue # # time.sleep(.2) # Reduced sleep time from 0.4 to 0.2 # # raise TimeoutException("Reviews tab not found") # # # # def set_sort(self, driver: Chrome, method: str): # # """Set the sorting method for reviews""" # # if method == "relevance": # # return # default order # # if not click_if(driver, SORT_BTN): # # return # # # # wanted = SORT_LABELS[method] # # # # for item in driver.find_elements(By.CSS_SELECTOR, MENU_ITEMS): # # label = item.text.strip() # # if label in wanted: # # item.click() # # time.sleep(0.5) # Reduced wait time from 1.0 to 0.5 # # return # # log.warning("⚠️ sort option %s not found – keeping default", method) # # # # def scrape(self): # # """Main scraper method""" # # start_time = time.time() # # # # url = self.config.get("url") # # headless = self.config.get("headless", True) # # sort_by = self.config.get("sort_by", "relevance") # # stop_on_match = self.config.get("stop_on_match", False) # # # # log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") # # log.info(f"URL: {url}") # # # # # Initialize storage # # # If not overwriting, load existing data # # if self.overwrite_existing: # # docs = {} # # seen = set() # # else: # # # Try to get from MongoDB first if enabled # # docs = {} # # if self.use_mongodb and self.mongodb: # # docs = self.mongodb.fetch_existing_reviews() # # # # # If backup_to_json is enabled, also load from JSON for merging # # if self.backup_to_json: # # json_docs = self.json_storage.load_json_docs() # # # Merge JSON docs with MongoDB docs # # for review_id, review in json_docs.items(): # # if review_id not in docs: # # docs[review_id] = review # # # # # Load seen IDs from file # # seen = self.json_storage.load_seen() # # # # driver = self.setup_driver(headless) # # wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout # # # # try: # # driver.get(url) # # wait.until(lambda d: "google.com/maps" in d.current_url) # # # # self.dismiss_cookies(driver) # # self.click_reviews_tab(driver) # # self.set_sort(driver, sort_by) # # # # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) # # pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) # # idle = 0 # # processed_ids = set() # Track processed IDs in current session # # # # # Prefetch selector to avoid repeated lookups # # driver.execute_script("window.scrollablePane = arguments[0];", pane) # # scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" # # # # while True: # # cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) # # fresh_cards: List[WebElement] = [] # # # # for c in cards: # # cid = c.get_attribute("data-review-id") # # if cid in seen or cid in processed_ids: # # if stop_on_match: # # idle = 999 # # break # # continue # # fresh_cards.append(c) # # # # for card in fresh_cards: # # try: # # raw = RawReview.from_card(card) # # processed_ids.add(raw.id) # Track this ID to avoid re-processing # # except Exception: # # log.warning("⚠️ parse error – storing stub\n%s", # # traceback.format_exc(limit=1).strip()) # # raw_id = card.get_attribute("data-review-id") or "" # # raw = RawReview(id=raw_id, text="", lang="und") # # processed_ids.add(raw_id) # # # # docs[raw.id] = merge_review(docs.get(raw.id), raw) # # seen.add(raw.id) # # pbar.update(1) # # idle = 0 # # # # if idle >= 3: # # break # # # # if not fresh_cards: # # idle += 1 # # # # # Use JavaScript for smoother scrolling # # driver.execute_script(scroll_script) # # # # # Dynamic sleep: sleep less when processing many reviews # # sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 # # time.sleep(sleep_time) # # # # pbar.close() # # # # # Save to MongoDB if enabled # # if self.use_mongodb and self.mongodb: # # log.info("Saving reviews to MongoDB...") # # self.mongodb.save_reviews(docs) # # # # # Backup to JSON if enabled # # if self.backup_to_json: # # log.info("Backing up to JSON...") # # self.json_storage.save_json_docs(docs) # # self.json_storage.save_seen(seen) # # # # log.info("✅ Finished – total unique reviews: %s", len(docs)) # # # # end_time = time.time() # # elapsed_time = end_time - start_time # # log.info(f"Execution completed in {elapsed_time:.2f} seconds") # # # # finally: # # driver.quit() # # if self.mongodb: # # self.mongodb.close()