""" Clean Google Maps Reviews Scraper - Simple down scrolling - DOM scraping + API interception """ import re import json import time import threading from selenium.webdriver.common.by import By def parse_api_review(raw: list) -> dict: """Parse a review from API response array.""" try: if not isinstance(raw, list) or len(raw) < 5: return None author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else "" timestamp = raw[1] if len(raw) > 1 else "" text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else "" rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0 if not (1 <= rating <= 5): return None # Filter out garbage data (language codes, metadata, etc.) if len(author) <= 3: # Real names are longer than 3 chars return None if author.lower() in ['google', 'maps', 'reviews', 'es', 'en', 'it', 'no', 'de', 'fr', 'pt']: return None # Timestamp should look like a date, not a URL or language code if timestamp and ('http' in str(timestamp) or len(str(timestamp)) <= 3): return None # Owner response owner_response = None for idx in [9, 18]: if len(raw) > idx and raw[idx] and isinstance(raw[idx], list): resp = raw[idx] if len(resp) > 1: owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""} break return { "author": author, "text": text, "rating": rating, "timestamp": timestamp, "owner_response": owner_response, "source": "api" } except: return None def extract_reviews_from_api_body(body: str) -> list: """Extract reviews from API response body.""" reviews = [] try: # Remove )]}' prefix if body.startswith(")]}'"): body = body[4:].strip() data = json.loads(body) # Recursively find review arrays def find_reviews(obj, depth=0): if depth > 12: return if isinstance(obj, list): # Check if this looks like a review if len(obj) > 4 and isinstance(obj[0], str) and isinstance(obj[4], int): if 1 <= obj[4] <= 5: rev = parse_api_review(obj) if rev and rev["author"]: reviews.append(rev) return for item in obj: find_reviews(item, depth + 1) elif isinstance(obj, dict): for v in obj.values(): find_reviews(v, depth + 1) find_reviews(data) except: pass return reviews def parse_dom_review(card) -> dict: """Parse a review from DOM element.""" try: # Get review ID review_id = card.get_attribute("data-review-id") or "" if not review_id: try: id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]") review_id = id_el.get_attribute("data-review-id") or "" except: pass # Author - multiple selectors author = "" for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']: try: author_el = card.find_element(By.CSS_SELECTOR, sel) author = author_el.text.strip() if author: break except: pass # Rating from aria-label on span[role="img"] rating = 0 try: stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]') aria = stars_el.get_attribute("aria-label") or "" # Extract number from label (handles "5 stars", "5 estrellas", etc.) num = re.search(r'[\d\.]+', aria.replace(',', '.')) if num: rating = int(float(num.group())) except: pass # Review text - try multiple selectors text = "" for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']: try: text_el = card.find_element(By.CSS_SELECTOR, sel) text = text_el.text.strip() if text: break except: pass # Note: "More" button clicking removed for speed # Full text can be expanded later if needed # Timestamp timestamp = "" try: time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]') timestamp = time_el.text.strip() except: pass # Owner response owner_response = None try: resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd") if resp_box: resp_text = "" resp_date = "" try: resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd") resp_text = resp_text_el.text.strip() except: pass try: resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd") resp_date = resp_date_el.text.strip() except: pass if resp_text: owner_response = {"text": resp_text, "timestamp": resp_date} except: pass if not review_id and not author: return None return { "id": review_id, "author": author, "text": text, "rating": rating, "timestamp": timestamp, "owner_response": owner_response, "source": "dom" } except Exception: return None def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15) -> dict: """ Scrape Google Maps reviews. Args: driver: Selenium WebDriver instance url: Google Maps place URL max_reviews: Maximum reviews to collect timeout_no_new: Seconds to wait with no new reviews before stopping Returns: dict with reviews list and metadata """ # Storage - use review ID as key reviews = {} # review_id -> review # Don't force language - let Google show all reviews in user's locale # Navigate to URL print(f"🌐 Loading: {url[:80]}...") driver.get(url) # Handle consent popup if redirected (poll with tiny sleep) start = time.time() while time.time() - start < 5: # Max 5s for consent if "consent.google" in driver.current_url: print(" Handling consent popup...") try: for btn in driver.find_elements(By.CSS_SELECTOR, "button"): txt = btn.text.lower() if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt: btn.click() # Reload original URL after consent print(" Reloading after consent...") driver.get(url) break except: pass break # Check if we're already on the target page if "maps/place" in driver.current_url and "consent" not in driver.current_url: break time.sleep(0.01) # 10ms - responsive but low CPU # Click reviews tab - poll until found review_keywords = ["review", "reseΓ±a", "avis", "bewertung", "recensione", "opiniones"] start = time.time() while time.time() - start < 5: # Max 5s for tabs try: tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']") for tab in tabs: tab_text = tab.text.lower() if any(kw in tab_text for kw in review_keywords): print(f" Clicking reviews tab: '{tab.text}'") tab.click() break else: time.sleep(0.01) # 10ms between polls continue break # Found and clicked except: time.sleep(0.01) # Find scrollable reviews container def find_scroll_container(): selectors = [ "div.m6QErb.DxyBCb.kA9KIf.dS8AEf", "div.m6QErb.DxyBCb.kA9KIf", "div.m6QErb.DxyBCb", "div.m6QErb[aria-label]", "div.DxyBCb.kA9KIf.dS8AEf", "div[role='main'] div.m6QErb", ] for sel in selectors: try: els = driver.find_elements(By.CSS_SELECTOR, sel) for el in els: if el.is_displayed() and el.size['height'] > 100: return el except: pass return None # Poll for scroll container (10ms intervals - fast but low CPU) scroll_container = None start = time.time() last_print = 0 while time.time() - start < 10: # Max 10s scroll_container = find_scroll_container() if scroll_container: break elapsed = int(time.time() - start) if elapsed > last_print: print(f" Waiting for reviews panel... ({elapsed}s)") last_print = elapsed time.sleep(0.01) # 10ms - responsive but low CPU if not scroll_container: print("❌ Could not find reviews scroll container") # Debug: print page source snippet try: print("Page title:", driver.title) print("Current URL:", driver.current_url[:100]) except: pass return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"} print("βœ… Found scroll container") # Extract total review count from page total_reviews = None try: page_text = driver.page_source # Look for "XX reviews" pattern patterns = [ r'(\d{1,3}(?:,\d{3})*)\s+reviews?', r'(\d+\.?\d*K)\s+reviews?', r'(\d{1,3}(?:,\d{3})*)\s+reseΓ±as?', ] for pattern in patterns: matches = re.findall(pattern, page_text, re.IGNORECASE) if matches: count_str = matches[0] if 'K' in count_str.upper(): total_reviews = int(float(count_str.upper().replace('K', '')) * 1000) else: total_reviews = int(count_str.replace(',', '')) print(f"πŸ“Š Total reviews on page: {total_reviews}") break except: pass # PHASE 2: Inject API interceptor for scroll-loaded reviews print("πŸ”Œ Injecting API interceptor...") driver.execute_script(""" if (window.__reviewInterceptorInjected) return; window.__reviewInterceptorInjected = true; window.__interceptedResponses = []; // Intercept fetch const originalFetch = window.fetch; window.fetch = async function(...args) { const url = args[0].toString(); const response = await originalFetch.apply(this, args); if (url.includes('listugcposts') || url.includes('review')) { try { const clone = response.clone(); const text = await clone.text(); window.__interceptedResponses.push({url: url, body: text}); } catch(e) {} } return response; }; // Intercept XHR const originalXHR = window.XMLHttpRequest; window.XMLHttpRequest = function() { const xhr = new originalXHR(); const originalOpen = xhr.open; let reqUrl = ''; xhr.open = function(method, url, ...rest) { reqUrl = url; return originalOpen.apply(this, [method, url, ...rest]); }; xhr.addEventListener('load', function() { if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) { try { window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText}); } catch(e) {} } }); return xhr; }; for (let prop of Object.getOwnPropertyNames(originalXHR)) { try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {} } """) def get_api_reviews(): """Get reviews from intercepted API responses.""" api_revs = [] try: responses = driver.execute_script(""" var r = window.__interceptedResponses || []; window.__interceptedResponses = []; return r; """) for resp in (responses or []): body = resp.get("body", "") api_revs.extend(extract_reviews_from_api_body(body)) except: pass return api_revs # Sort by newest first (helps with loading) try: sort_btn = driver.execute_script(""" var btns = document.querySelectorAll('button[data-value="sort"]'); if (btns.length) return btns[0]; // Try aria-label var all = document.querySelectorAll('button[aria-label*="Sort"]'); if (all.length) return all[0]; return null; """) if sort_btn: sort_btn.click() time.sleep(0.3) # Click "Newest" option driver.execute_script(""" var items = document.querySelectorAll('[role="menuitemradio"], [data-index="1"]'); for (var i = 0; i < items.length; i++) { var txt = items[i].textContent.toLowerCase(); if (txt.includes('newest') || txt.includes('recent') || txt.includes('mΓ‘s reciente')) { items[i].click(); break; } } """) time.sleep(0.5) print(" πŸ“… Sorted by newest") except: pass # Block images to speed up scrolling (use CDP) try: driver.execute_cdp_cmd('Network.setBlockedURLs', { 'urls': ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp', '*.svg', '*googleusercontent.com/*'] }) driver.execute_cdp_cmd('Network.enable', {}) print(" 🚫 Blocking images for faster scrolling") except Exception as e: pass # CDP might not be available in all setups # Simple scroll - scrollTop = scrollHeight (proven to work) driver.execute_script("window.scrollablePane = arguments[0];", scroll_container) stop_scrolling = threading.Event() def scroll_worker(): while not stop_scrolling.is_set(): try: driver.execute_script(""" var p = window.scrollablePane; if (p) p.scrollTop = p.scrollHeight; """) except: pass time.sleep(0.1) scroll_thread = threading.Thread(target=scroll_worker, daemon=True) scroll_thread.start() # Recovery function - use real mouse actions when stuck from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys recovery_count = [0] def unstick_scroll(): recovery_count[0] += 1 method = recovery_count[0] % 4 try: if method == 1: # Method 1: Click pane and send Page Down keys scroll_container.click() ActionChains(driver).send_keys(Keys.PAGE_DOWN).perform() ActionChains(driver).send_keys(Keys.PAGE_DOWN).perform() elif method == 2: # Method 2: Real mouse wheel scroll ActionChains(driver).move_to_element(scroll_container)\ .scroll_by_amount(0, 800).perform() elif method == 3: # Method 3: Scroll up significantly then back down (force reload) driver.execute_script(""" var p = window.scrollablePane; if (p) p.scrollTop = Math.max(0, p.scrollTop - 2000); """) time.sleep(0.3) driver.execute_script(""" var p = window.scrollablePane; if (p) p.scrollTop = p.scrollHeight; """) else: # Method 4: Click last review card to focus, then scroll driver.execute_script(""" var cards = document.querySelectorAll('div.jftiEf[data-review-id]'); if (cards.length > 0) { cards[cards.length - 1].scrollIntoView({block: 'end'}); cards[cards.length - 1].click(); } """) time.sleep(0.2) driver.execute_script(""" var p = window.scrollablePane; if (p) p.scrollTop = p.scrollHeight; """) except: pass # Main collection loop last_new_time = time.time() last_count = len(reviews) check_num = 0 print(f"πŸ”„ Scrolling... (timeout: {timeout_no_new}s with no new)", flush=True) while True: check_num += 1 time.sleep(1.0) # Check every second # Collect from API (doesn't affect scroll) for rev in get_api_reviews(): if not any(r.get("author") == rev["author"] for r in reviews.values()): key = f"api_{rev['author'][:20]}_{rev['rating']}" reviews[key] = rev # Parse reviews in real-time (Google Maps uses virtual scroll - elements get removed!) # We must parse NOW, not later try: cards = driver.find_elements(By.CSS_SELECTOR, "div.jftiEf[data-review-id]") for card in cards: rid = card.get_attribute("data-review-id") if rid and rid not in reviews: # Parse immediately - element may be gone later! review = parse_dom_review(card) if review: reviews[rid] = review except: pass current_count = len(reviews) # Check for new reviews if current_count > last_count: last_new_time = time.time() last_count = current_count # Check if loading (spinner visible OR network activity) try: loading_status = driver.execute_script(""" var status = {spinner: false, network: false}; // Check for Google's loading indicators var spinner = document.querySelector('div[role="progressbar"]'); if (spinner && spinner.offsetParent !== null) status.spinner = true; var loading = document.querySelector('.qjESne, .loading'); if (loading && loading.offsetParent !== null) status.spinner = true; // Check for recent network activity (API interceptor) var responses = window.__interceptedResponses || []; var lastCount = window.__lastResponseCount || 0; if (responses.length > lastCount) { status.network = true; window.__lastResponseCount = responses.length; } return status; """) is_loading = loading_status.get('spinner') or loading_status.get('network') if is_loading: last_new_time = time.time() # Reset timer while loading except: is_loading = False # Progress update elapsed = time.time() - last_new_time if total_reviews: pct = (current_count / total_reviews) * 100 print(f" πŸ“Š {current_count}/{total_reviews} ({pct:.0f}%) | idle: {elapsed:.1f}s", flush=True) else: print(f" πŸ“Š {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", flush=True) # STUCK DETECTION: If no new reviews for 3s+, try to unstick # Trigger at 3s, 6s, 9s... (every 3 seconds while stuck) if elapsed >= 3 and int(elapsed) % 3 == 0: print(f" πŸ”§ Recovery attempt #{recovery_count[0] + 1}...", flush=True) unstick_scroll() # Stop conditions if current_count >= max_reviews: print(f"βœ… Reached max: {current_count}") stop_scrolling.set() break # Check scroll state - track if content is still being added try: scroll_state = driver.execute_script(""" var p = window.scrollablePane; if (!p) return {atBottom: true, height: 0}; var atBottom = (p.scrollTop + p.clientHeight >= p.scrollHeight - 50); var height = p.scrollHeight; var lastHeight = window.__lastScrollHeight || 0; var growing = height > lastHeight; window.__lastScrollHeight = height; return {atBottom: atBottom, height: height, growing: growing}; """) at_bottom = scroll_state.get('atBottom', True) content_growing = scroll_state.get('growing', False) except: at_bottom = True content_growing = False # Reset timer if content is growing (new reviews loading) if content_growing: last_new_time = time.time() # Dynamic timeout based on state and recovery attempts # - 5s if at bottom AND content stopped growing AND multiple recovery attempts failed # - 15s max otherwise (keep trying) recovery_failed = recovery_count[0] >= 5 and elapsed >= 5 truly_done = at_bottom and not content_growing and recovery_failed timeout_hit = elapsed >= 15 if truly_done or timeout_hit: print(f"βœ… All reviews loaded: {current_count}") stop_scrolling.set() break # Reviews already parsed during scrolling (real-time parsing) print("πŸ“ Finalizing review data...") # Separate API and DOM reviews api_reviews_collected = {k: v for k, v in reviews.items() if v.get("source") == "api"} dom_reviews = {k: v for k, v in reviews.items() if v.get("source") == "dom"} # Merge API reviews (only add if not already in DOM) api_added = 0 for key, api_rev in api_reviews_collected.items(): # Check if this author already exists in DOM reviews author = api_rev.get("author", "") if author and not any(r.get("author") == author for r in reviews.values()): reviews[f"api_{key}"] = api_rev api_added += 1 # Final results review_list = list(reviews.values()) dom_count = sum(1 for r in review_list if r.get("source") == "dom") api_count = sum(1 for r in review_list if r.get("source") == "api") print(f"\nπŸ“‹ Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})") return { "reviews": review_list, "total": len(review_list), "checks": check_num, "url": url } # Test function if __name__ == "__main__": from seleniumbase import Driver # Test URL - 79 reviews TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8" print("πŸš€ Starting clean scraper test...") # Set up driver driver = Driver(uc=True, headless=False) driver.set_window_size(1200, 900) try: result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15) print(f"\nβœ… Got {result['total']} reviews in {result['checks']} checks") # Show sample if result["reviews"]: print("\nπŸ“ Sample review:") sample = result["reviews"][0] print(f" Author: {sample['author']}") print(f" Rating: {sample['rating']}⭐") print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)") finally: driver.quit() print("\n🏁 Done")