""" Clean Google Maps Reviews Scraper - Simple down scrolling - DOM scraping + API interception """ import re import json import time import threading from selenium.webdriver.common.by import By def parse_api_review(raw: list) -> dict: """Parse a review from API response array.""" try: if not isinstance(raw, list) or len(raw) < 5: return None author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else "" timestamp = raw[1] if len(raw) > 1 else "" text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else "" rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0 if not (1 <= rating <= 5): return None # Filter out garbage data (language codes, metadata, etc.) if len(author) <= 3: # Real names are longer than 3 chars return None if author.lower() in ['google', 'maps', 'reviews', 'es', 'en', 'it', 'no', 'de', 'fr', 'pt']: return None # Timestamp should look like a date, not a URL or language code if timestamp and ('http' in str(timestamp) or len(str(timestamp)) <= 3): return None # Owner response owner_response = None for idx in [9, 18]: if len(raw) > idx and raw[idx] and isinstance(raw[idx], list): resp = raw[idx] if len(resp) > 1: owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""} break return { "author": author, "text": text, "rating": rating, "timestamp": timestamp, "owner_response": owner_response, "source": "api" } except: return None def extract_reviews_from_api_body(body: str) -> list: """Extract reviews from API response body.""" reviews = [] try: # Remove )]}' prefix if body.startswith(")]}'"): body = body[4:].strip() data = json.loads(body) # Recursively find review arrays def find_reviews(obj, depth=0): if depth > 12: return if isinstance(obj, list): # Check if this looks like a review if len(obj) > 4 and isinstance(obj[0], str) and isinstance(obj[4], int): if 1 <= obj[4] <= 5: rev = parse_api_review(obj) if rev and rev["author"]: reviews.append(rev) return for item in obj: find_reviews(item, depth + 1) elif isinstance(obj, dict): for v in obj.values(): find_reviews(v, depth + 1) find_reviews(data) except: pass return reviews def parse_dom_review(card) -> dict: """Parse a review from DOM element.""" try: # Get review ID review_id = card.get_attribute("data-review-id") or "" if not review_id: try: id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]") review_id = id_el.get_attribute("data-review-id") or "" except: pass # Author - multiple selectors author = "" for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']: try: author_el = card.find_element(By.CSS_SELECTOR, sel) author = author_el.text.strip() if author: break except: pass # Rating from aria-label on span[role="img"] rating = 0 try: stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]') aria = stars_el.get_attribute("aria-label") or "" # Extract number from label (handles "5 stars", "5 estrellas", etc.) num = re.search(r'[\d\.]+', aria.replace(',', '.')) if num: rating = int(float(num.group())) except: pass # Review text - try multiple selectors text = "" for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']: try: text_el = card.find_element(By.CSS_SELECTOR, sel) text = text_el.text.strip() if text: break except: pass # Note: "More" button clicking removed for speed # Full text can be expanded later if needed # Timestamp timestamp = "" try: time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]') timestamp = time_el.text.strip() except: pass # Owner response owner_response = None try: resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd") if resp_box: resp_text = "" resp_date = "" try: resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd") resp_text = resp_text_el.text.strip() except: pass try: resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd") resp_date = resp_date_el.text.strip() except: pass if resp_text: owner_response = {"text": resp_text, "timestamp": resp_date} except: pass if not review_id and not author: return None return { "id": review_id, "author": author, "text": text, "rating": rating, "timestamp": timestamp, "owner_response": owner_response, "source": "dom" } except Exception: return None def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15) -> dict: """ Scrape Google Maps reviews. Args: driver: Selenium WebDriver instance url: Google Maps place URL max_reviews: Maximum reviews to collect timeout_no_new: Seconds to wait with no new reviews before stopping Returns: dict with reviews list and metadata """ # Storage - use review ID as key reviews = {} # review_id -> review # Force English language if "hl=" not in url: url = url + ("&" if "?" in url else "?") + "hl=en" # Navigate to URL print(f"🌐 Loading: {url[:80]}...") driver.get(url) # Handle consent popup if redirected (poll with tiny sleep) start = time.time() while time.time() - start < 5: # Max 5s for consent if "consent.google" in driver.current_url: print(" Handling consent popup...") try: for btn in driver.find_elements(By.CSS_SELECTOR, "button"): txt = btn.text.lower() if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt: btn.click() # Reload original URL after consent print(" Reloading after consent...") driver.get(url) break except: pass break # Check if we're already on the target page if "maps/place" in driver.current_url and "consent" not in driver.current_url: break time.sleep(0.01) # 10ms - responsive but low CPU # Click reviews tab - poll until found review_keywords = ["review", "reseΓ±a", "avis", "bewertung", "recensione", "opiniones"] start = time.time() while time.time() - start < 5: # Max 5s for tabs try: tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']") for tab in tabs: tab_text = tab.text.lower() if any(kw in tab_text for kw in review_keywords): print(f" Clicking reviews tab: '{tab.text}'") tab.click() break else: time.sleep(0.01) # 10ms between polls continue break # Found and clicked except: time.sleep(0.01) # Find scrollable reviews container def find_scroll_container(): selectors = [ "div.m6QErb.DxyBCb.kA9KIf.dS8AEf", "div.m6QErb.DxyBCb.kA9KIf", "div.m6QErb.DxyBCb", "div.m6QErb[aria-label]", "div.DxyBCb.kA9KIf.dS8AEf", "div[role='main'] div.m6QErb", ] for sel in selectors: try: els = driver.find_elements(By.CSS_SELECTOR, sel) for el in els: if el.is_displayed() and el.size['height'] > 100: return el except: pass return None # Poll for scroll container (10ms intervals - fast but low CPU) scroll_container = None start = time.time() last_print = 0 while time.time() - start < 10: # Max 10s scroll_container = find_scroll_container() if scroll_container: break elapsed = int(time.time() - start) if elapsed > last_print: print(f" Waiting for reviews panel... ({elapsed}s)") last_print = elapsed time.sleep(0.01) # 10ms - responsive but low CPU if not scroll_container: print("❌ Could not find reviews scroll container") # Debug: print page source snippet try: print("Page title:", driver.title) print("Current URL:", driver.current_url[:100]) except: pass return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"} print("βœ… Found scroll container") # Extract total review count from page total_reviews = None try: page_text = driver.page_source # Look for "XX reviews" pattern patterns = [ r'(\d{1,3}(?:,\d{3})*)\s+reviews?', r'(\d+\.?\d*K)\s+reviews?', r'(\d{1,3}(?:,\d{3})*)\s+reseΓ±as?', ] for pattern in patterns: matches = re.findall(pattern, page_text, re.IGNORECASE) if matches: count_str = matches[0] if 'K' in count_str.upper(): total_reviews = int(float(count_str.upper().replace('K', '')) * 1000) else: total_reviews = int(count_str.replace(',', '')) print(f"πŸ“Š Total reviews on page: {total_reviews}") break except: pass # PHASE 2: Inject API interceptor for scroll-loaded reviews print("πŸ”Œ Injecting API interceptor...") driver.execute_script(""" if (window.__reviewInterceptorInjected) return; window.__reviewInterceptorInjected = true; window.__interceptedResponses = []; // Intercept fetch const originalFetch = window.fetch; window.fetch = async function(...args) { const url = args[0].toString(); const response = await originalFetch.apply(this, args); if (url.includes('listugcposts') || url.includes('review')) { try { const clone = response.clone(); const text = await clone.text(); window.__interceptedResponses.push({url: url, body: text}); } catch(e) {} } return response; }; // Intercept XHR const originalXHR = window.XMLHttpRequest; window.XMLHttpRequest = function() { const xhr = new originalXHR(); const originalOpen = xhr.open; let reqUrl = ''; xhr.open = function(method, url, ...rest) { reqUrl = url; return originalOpen.apply(this, [method, url, ...rest]); }; xhr.addEventListener('load', function() { if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) { try { window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText}); } catch(e) {} } }); return xhr; }; for (let prop of Object.getOwnPropertyNames(originalXHR)) { try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {} } """) def get_api_reviews(): """Get reviews from intercepted API responses.""" api_revs = [] try: responses = driver.execute_script(""" var r = window.__interceptedResponses || []; window.__interceptedResponses = []; return r; """) for resp in (responses or []): body = resp.get("body", "") api_revs.extend(extract_reviews_from_api_body(body)) except: pass return api_revs # Store pane in window for scroll thread driver.execute_script("window.scrollablePane = arguments[0];", scroll_container) # Background scroll thread (fast, continuous) stop_scrolling = threading.Event() def scroll_worker(): while not stop_scrolling.is_set(): try: driver.execute_script(""" var p = window.scrollablePane; if (p) p.scrollTop = p.scrollHeight; """) except: pass time.sleep(0.1) # 10x per second scroll_thread = threading.Thread(target=scroll_worker, daemon=True) scroll_thread.start() # Main collection loop last_new_time = time.time() last_count = len(reviews) check_num = 0 print(f"πŸ”„ Scrolling... (timeout: {timeout_no_new}s with no new)", flush=True) while True: check_num += 1 time.sleep(1.0) # Check every second # Collect from API (doesn't affect scroll) for rev in get_api_reviews(): if not any(r.get("author") == rev["author"] for r in reviews.values()): key = f"api_{rev['author'][:20]}_{rev['rating']}" reviews[key] = rev # Collect review IDs via JavaScript (doesn't affect scroll position!) # Use specific selector to only get actual review cards, not buttons try: review_ids = driver.execute_script(""" var ids = []; document.querySelectorAll('div.jftiEf[data-review-id]').forEach(function(el) { ids.push(el.getAttribute('data-review-id')); }); return ids; """) for rid in (review_ids or []): if rid and rid not in reviews: reviews[rid] = {"id": rid, "source": "dom", "_needs_parse": True} except: pass current_count = len(reviews) # Check for new reviews if current_count > last_count: last_new_time = time.time() last_count = current_count # Progress update elapsed = time.time() - last_new_time if total_reviews: pct = (current_count / total_reviews) * 100 print(f" πŸ“Š {current_count}/{total_reviews} ({pct:.0f}%) | idle: {elapsed:.1f}s", flush=True) else: print(f" πŸ“Š {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", flush=True) # Stop conditions if current_count >= max_reviews: print(f"βœ… Reached max: {current_count}") stop_scrolling.set() break if total_reviews and current_count >= total_reviews: print(f"βœ… Got all {total_reviews} reviews!") stop_scrolling.set() break if time.time() - last_new_time >= timeout_no_new: print(f"⏱️ Timeout: no new reviews for {timeout_no_new}s") stop_scrolling.set() break # FINAL PHASE: Parse full review data from DOM (scroll is stopped) print("πŸ“ Parsing full review data...") api_reviews_collected = {k: v for k, v in reviews.items() if v.get("source") == "api"} reviews.clear() # Parse all DOM cards now that scrolling is done # Use specific selector to only get actual review cards (div.jftiEf), not buttons try: cards = driver.find_elements(By.CSS_SELECTOR, "div.jftiEf[data-review-id]") for card in cards: review = parse_dom_review(card) if review and review.get("id"): reviews[review["id"]] = review except Exception as e: print(f" Warning: DOM parse error: {e}") # Merge API reviews (only add if not already in DOM) api_added = 0 for key, api_rev in api_reviews_collected.items(): # Check if this author already exists in DOM reviews author = api_rev.get("author", "") if author and not any(r.get("author") == author for r in reviews.values()): reviews[f"api_{key}"] = api_rev api_added += 1 # Final results review_list = list(reviews.values()) dom_count = sum(1 for r in review_list if r.get("source") == "dom") api_count = sum(1 for r in review_list if r.get("source") == "api") print(f"\nπŸ“‹ Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})") return { "reviews": review_list, "total": len(review_list), "checks": check_num, "url": url } # Test function if __name__ == "__main__": from seleniumbase import Driver # Test URL - 79 reviews TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8" print("πŸš€ Starting clean scraper test...") # Set up driver driver = Driver(uc=True, headless=False) driver.set_window_size(1200, 900) try: result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15) print(f"\nβœ… Got {result['total']} reviews in {result['checks']} checks") # Show sample if result["reviews"]: print("\nπŸ“ Sample review:") sample = result["reviews"][0] print(f" Author: {sample['author']}") print(f" Rating: {sample['rating']}⭐") print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)") finally: driver.quit() print("\n🏁 Done")