Key improvements: - Background thread scrolling at 10Hz (0.1s intervals) for smooth continuous scroll - JavaScript-based review ID collection (doesn't affect scroll position) - API interception via injected fetch/XHR interceptor - Total review count extraction from page - Auto-stop when all reviews collected or timeout reached The scroll issue was caused by Selenium's find_elements() affecting scroll position. Using pure JavaScript for data collection keeps scroll pinned to bottom. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
501 lines
17 KiB
Python
501 lines
17 KiB
Python
"""
|
|
Clean Google Maps Reviews Scraper
|
|
- Simple down scrolling
|
|
- DOM scraping + API interception
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import time
|
|
import threading
|
|
from selenium.webdriver.common.by import By
|
|
|
|
|
|
def parse_api_review(raw: list) -> dict:
|
|
"""Parse a review from API response array."""
|
|
try:
|
|
if not isinstance(raw, list) or len(raw) < 5:
|
|
return None
|
|
|
|
author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else ""
|
|
timestamp = raw[1] if len(raw) > 1 else ""
|
|
text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else ""
|
|
rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0
|
|
|
|
if not (1 <= rating <= 5):
|
|
return None
|
|
|
|
# Owner response
|
|
owner_response = None
|
|
for idx in [9, 18]:
|
|
if len(raw) > idx and raw[idx] and isinstance(raw[idx], list):
|
|
resp = raw[idx]
|
|
if len(resp) > 1:
|
|
owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""}
|
|
break
|
|
|
|
return {
|
|
"author": author,
|
|
"text": text,
|
|
"rating": rating,
|
|
"timestamp": timestamp,
|
|
"owner_response": owner_response,
|
|
"source": "api"
|
|
}
|
|
except:
|
|
return None
|
|
|
|
|
|
def extract_reviews_from_api_body(body: str) -> list:
|
|
"""Extract reviews from API response body."""
|
|
reviews = []
|
|
try:
|
|
# Remove )]}' prefix
|
|
if body.startswith(")]}'"):
|
|
body = body[4:].strip()
|
|
|
|
data = json.loads(body)
|
|
|
|
# Recursively find review arrays
|
|
def find_reviews(obj, depth=0):
|
|
if depth > 12:
|
|
return
|
|
if isinstance(obj, list):
|
|
# Check if this looks like a review
|
|
if len(obj) > 4 and isinstance(obj[0], str) and isinstance(obj[4], int):
|
|
if 1 <= obj[4] <= 5:
|
|
rev = parse_api_review(obj)
|
|
if rev and rev["author"]:
|
|
reviews.append(rev)
|
|
return
|
|
for item in obj:
|
|
find_reviews(item, depth + 1)
|
|
elif isinstance(obj, dict):
|
|
for v in obj.values():
|
|
find_reviews(v, depth + 1)
|
|
|
|
find_reviews(data)
|
|
except:
|
|
pass
|
|
return reviews
|
|
|
|
def parse_dom_review(card) -> dict:
|
|
"""Parse a review from DOM element."""
|
|
try:
|
|
# Get review ID
|
|
review_id = card.get_attribute("data-review-id") or ""
|
|
if not review_id:
|
|
try:
|
|
id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]")
|
|
review_id = id_el.get_attribute("data-review-id") or ""
|
|
except:
|
|
pass
|
|
|
|
# Author - multiple selectors
|
|
author = ""
|
|
for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']:
|
|
try:
|
|
author_el = card.find_element(By.CSS_SELECTOR, sel)
|
|
author = author_el.text.strip()
|
|
if author:
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Rating from aria-label on span[role="img"]
|
|
rating = 0
|
|
try:
|
|
stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]')
|
|
aria = stars_el.get_attribute("aria-label") or ""
|
|
# Extract number from label (handles "5 stars", "5 estrellas", etc.)
|
|
num = re.search(r'[\d\.]+', aria.replace(',', '.'))
|
|
if num:
|
|
rating = int(float(num.group()))
|
|
except:
|
|
pass
|
|
|
|
# Review text - try multiple selectors
|
|
text = ""
|
|
for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']:
|
|
try:
|
|
text_el = card.find_element(By.CSS_SELECTOR, sel)
|
|
text = text_el.text.strip()
|
|
if text:
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Click "More" button to expand text if truncated
|
|
try:
|
|
more_btn = card.find_element(By.CSS_SELECTOR, "button.kyuRq")
|
|
if more_btn.is_displayed():
|
|
more_btn.click()
|
|
# Re-read text after expanding
|
|
for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', '.wiI7pd']:
|
|
try:
|
|
text_el = card.find_element(By.CSS_SELECTOR, sel)
|
|
expanded = text_el.text.strip()
|
|
if expanded and len(expanded) > len(text):
|
|
text = expanded
|
|
break
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# Timestamp
|
|
timestamp = ""
|
|
try:
|
|
time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]')
|
|
timestamp = time_el.text.strip()
|
|
except:
|
|
pass
|
|
|
|
# Owner response
|
|
owner_response = None
|
|
try:
|
|
resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd")
|
|
if resp_box:
|
|
resp_text = ""
|
|
resp_date = ""
|
|
try:
|
|
resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd")
|
|
resp_text = resp_text_el.text.strip()
|
|
except:
|
|
pass
|
|
try:
|
|
resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd")
|
|
resp_date = resp_date_el.text.strip()
|
|
except:
|
|
pass
|
|
if resp_text:
|
|
owner_response = {"text": resp_text, "timestamp": resp_date}
|
|
except:
|
|
pass
|
|
|
|
if not review_id and not author:
|
|
return None
|
|
|
|
return {
|
|
"id": review_id,
|
|
"author": author,
|
|
"text": text,
|
|
"rating": rating,
|
|
"timestamp": timestamp,
|
|
"owner_response": owner_response,
|
|
"source": "dom"
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15) -> dict:
|
|
"""
|
|
Scrape Google Maps reviews.
|
|
|
|
Args:
|
|
driver: Selenium WebDriver instance
|
|
url: Google Maps place URL
|
|
max_reviews: Maximum reviews to collect
|
|
timeout_no_new: Seconds to wait with no new reviews before stopping
|
|
|
|
Returns:
|
|
dict with reviews list and metadata
|
|
"""
|
|
|
|
# Storage - use review ID as key
|
|
reviews = {} # review_id -> review
|
|
|
|
# Force English language
|
|
if "hl=" not in url:
|
|
url = url + ("&" if "?" in url else "?") + "hl=en"
|
|
|
|
# Navigate to URL
|
|
print(f"🌐 Loading: {url[:80]}...")
|
|
driver.get(url)
|
|
time.sleep(3)
|
|
|
|
# Handle consent popup if present
|
|
if "consent.google" in driver.current_url:
|
|
print(" Handling consent popup...")
|
|
try:
|
|
accept_btns = driver.find_elements(By.CSS_SELECTOR, "button")
|
|
for btn in accept_btns:
|
|
txt = btn.text.lower()
|
|
if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt:
|
|
btn.click()
|
|
time.sleep(2)
|
|
break
|
|
except:
|
|
pass
|
|
time.sleep(2)
|
|
|
|
# Click reviews tab if present (multilingual support)
|
|
review_keywords = ["review", "reseña", "avis", "bewertung", "recensione", "opiniones"]
|
|
try:
|
|
tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']")
|
|
for tab in tabs:
|
|
tab_text = tab.text.lower()
|
|
if any(kw in tab_text for kw in review_keywords):
|
|
print(f" Clicking reviews tab: '{tab.text}'")
|
|
tab.click()
|
|
time.sleep(2)
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Find scrollable reviews container
|
|
def find_scroll_container():
|
|
selectors = [
|
|
"div.m6QErb.DxyBCb.kA9KIf.dS8AEf",
|
|
"div.m6QErb.DxyBCb.kA9KIf",
|
|
"div.m6QErb.DxyBCb",
|
|
"div.m6QErb[aria-label]",
|
|
"div.DxyBCb.kA9KIf.dS8AEf",
|
|
"div[role='main'] div.m6QErb",
|
|
]
|
|
for sel in selectors:
|
|
try:
|
|
els = driver.find_elements(By.CSS_SELECTOR, sel)
|
|
for el in els:
|
|
if el.is_displayed() and el.size['height'] > 100:
|
|
return el
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
# Wait and retry for scroll container
|
|
scroll_container = None
|
|
for attempt in range(10):
|
|
scroll_container = find_scroll_container()
|
|
if scroll_container:
|
|
break
|
|
print(f" Waiting for reviews panel... ({attempt+1}/10)")
|
|
time.sleep(1)
|
|
|
|
if not scroll_container:
|
|
print("❌ Could not find reviews scroll container")
|
|
# Debug: print page source snippet
|
|
try:
|
|
print("Page title:", driver.title)
|
|
print("Current URL:", driver.current_url[:100])
|
|
except:
|
|
pass
|
|
return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"}
|
|
|
|
print("✅ Found scroll container")
|
|
|
|
# Extract total review count from page
|
|
total_reviews = None
|
|
try:
|
|
page_text = driver.page_source
|
|
# Look for "XX reviews" pattern
|
|
patterns = [
|
|
r'(\d{1,3}(?:,\d{3})*)\s+reviews?',
|
|
r'(\d+\.?\d*K)\s+reviews?',
|
|
r'(\d{1,3}(?:,\d{3})*)\s+reseñas?',
|
|
]
|
|
for pattern in patterns:
|
|
matches = re.findall(pattern, page_text, re.IGNORECASE)
|
|
if matches:
|
|
count_str = matches[0]
|
|
if 'K' in count_str.upper():
|
|
total_reviews = int(float(count_str.upper().replace('K', '')) * 1000)
|
|
else:
|
|
total_reviews = int(count_str.replace(',', ''))
|
|
print(f"📊 Total reviews on page: {total_reviews}")
|
|
break
|
|
except:
|
|
pass
|
|
|
|
# Wait for reviews to fully load after tab click
|
|
time.sleep(2)
|
|
|
|
# PHASE 2: Inject API interceptor for scroll-loaded reviews
|
|
print("🔌 Injecting API interceptor...")
|
|
driver.execute_script("""
|
|
if (window.__reviewInterceptorInjected) return;
|
|
window.__reviewInterceptorInjected = true;
|
|
window.__interceptedResponses = [];
|
|
|
|
// Intercept fetch
|
|
const originalFetch = window.fetch;
|
|
window.fetch = async function(...args) {
|
|
const url = args[0].toString();
|
|
const response = await originalFetch.apply(this, args);
|
|
if (url.includes('listugcposts') || url.includes('review')) {
|
|
try {
|
|
const clone = response.clone();
|
|
const text = await clone.text();
|
|
window.__interceptedResponses.push({url: url, body: text});
|
|
} catch(e) {}
|
|
}
|
|
return response;
|
|
};
|
|
|
|
// Intercept XHR
|
|
const originalXHR = window.XMLHttpRequest;
|
|
window.XMLHttpRequest = function() {
|
|
const xhr = new originalXHR();
|
|
const originalOpen = xhr.open;
|
|
let reqUrl = '';
|
|
xhr.open = function(method, url, ...rest) {
|
|
reqUrl = url;
|
|
return originalOpen.apply(this, [method, url, ...rest]);
|
|
};
|
|
xhr.addEventListener('load', function() {
|
|
if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) {
|
|
try {
|
|
window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText});
|
|
} catch(e) {}
|
|
}
|
|
});
|
|
return xhr;
|
|
};
|
|
for (let prop of Object.getOwnPropertyNames(originalXHR)) {
|
|
try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {}
|
|
}
|
|
""")
|
|
|
|
def get_api_reviews():
|
|
"""Get reviews from intercepted API responses."""
|
|
api_revs = []
|
|
try:
|
|
responses = driver.execute_script("""
|
|
var r = window.__interceptedResponses || [];
|
|
window.__interceptedResponses = [];
|
|
return r;
|
|
""")
|
|
for resp in (responses or []):
|
|
body = resp.get("body", "")
|
|
api_revs.extend(extract_reviews_from_api_body(body))
|
|
except:
|
|
pass
|
|
return api_revs
|
|
|
|
# Store pane in window for scroll thread
|
|
driver.execute_script("window.scrollablePane = arguments[0];", scroll_container)
|
|
|
|
# Background scroll thread (fast, continuous)
|
|
stop_scrolling = threading.Event()
|
|
|
|
def scroll_worker():
|
|
while not stop_scrolling.is_set():
|
|
try:
|
|
driver.execute_script("""
|
|
var p = window.scrollablePane;
|
|
if (p) p.scrollTop = p.scrollHeight;
|
|
""")
|
|
except:
|
|
pass
|
|
time.sleep(0.1) # 10x per second
|
|
|
|
scroll_thread = threading.Thread(target=scroll_worker, daemon=True)
|
|
scroll_thread.start()
|
|
|
|
# Main collection loop
|
|
last_new_time = time.time()
|
|
last_count = len(reviews)
|
|
check_num = 0
|
|
|
|
print(f"🔄 Scrolling... (timeout: {timeout_no_new}s with no new)", flush=True)
|
|
|
|
while True:
|
|
check_num += 1
|
|
time.sleep(1.0) # Check every second
|
|
|
|
# Collect from API (doesn't affect scroll)
|
|
for rev in get_api_reviews():
|
|
if not any(r.get("author") == rev["author"] for r in reviews.values()):
|
|
key = f"api_{rev['author'][:20]}_{rev['rating']}"
|
|
reviews[key] = rev
|
|
|
|
# Collect review IDs via JavaScript (doesn't affect scroll position!)
|
|
try:
|
|
review_ids = driver.execute_script("""
|
|
var ids = [];
|
|
document.querySelectorAll('[data-review-id]').forEach(function(el) {
|
|
ids.push(el.getAttribute('data-review-id'));
|
|
});
|
|
return ids;
|
|
""")
|
|
for rid in (review_ids or []):
|
|
if rid and rid not in reviews:
|
|
reviews[rid] = {"id": rid, "source": "dom"}
|
|
except:
|
|
pass
|
|
|
|
current_count = len(reviews)
|
|
|
|
# Check for new reviews
|
|
if current_count > last_count:
|
|
last_new_time = time.time()
|
|
last_count = current_count
|
|
|
|
# Progress update
|
|
elapsed = time.time() - last_new_time
|
|
if total_reviews:
|
|
pct = (current_count / total_reviews) * 100
|
|
print(f" 📊 {current_count}/{total_reviews} ({pct:.0f}%) | idle: {elapsed:.1f}s", flush=True)
|
|
else:
|
|
print(f" 📊 {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", flush=True)
|
|
|
|
# Stop conditions
|
|
if current_count >= max_reviews:
|
|
print(f"✅ Reached max: {current_count}")
|
|
stop_scrolling.set()
|
|
break
|
|
|
|
if total_reviews and current_count >= total_reviews:
|
|
print(f"✅ Got all {total_reviews} reviews!")
|
|
stop_scrolling.set()
|
|
break
|
|
|
|
if time.time() - last_new_time >= timeout_no_new:
|
|
print(f"⏱️ Timeout: no new reviews for {timeout_no_new}s")
|
|
stop_scrolling.set()
|
|
break
|
|
|
|
# Final results
|
|
review_list = list(reviews.values())
|
|
dom_count = sum(1 for r in review_list if r.get("source") == "dom")
|
|
api_count = sum(1 for r in review_list if r.get("source") == "api")
|
|
print(f"\n📋 Total: {len(review_list)} reviews (DOM: {dom_count}, API: {api_count})")
|
|
|
|
return {
|
|
"reviews": review_list,
|
|
"total": len(review_list),
|
|
"checks": check_num,
|
|
"url": url
|
|
}
|
|
|
|
|
|
# Test function
|
|
if __name__ == "__main__":
|
|
from seleniumbase import Driver
|
|
|
|
# Test URL - 79 reviews
|
|
TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8"
|
|
|
|
print("🚀 Starting clean scraper test...")
|
|
|
|
# Set up driver
|
|
driver = Driver(uc=True, headless=False)
|
|
driver.set_window_size(1200, 900)
|
|
|
|
try:
|
|
result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15)
|
|
print(f"\n✅ Got {result['total']} reviews in {result['checks']} checks")
|
|
|
|
# Show sample
|
|
if result["reviews"]:
|
|
print("\n📝 Sample review:")
|
|
sample = result["reviews"][0]
|
|
print(f" Author: {sample['author']}")
|
|
print(f" Rating: {sample['rating']}⭐")
|
|
print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)")
|
|
|
|
finally:
|
|
driver.quit()
|
|
print("\n🏁 Done")
|