Files
whyrating-engine-legacy/modules/scraper_clean.py
Alejandro Gutiérrez 0e8a711a9c Fix clean scraper: specific selectors, consent reload, DOM parsing
- Use div.jftiEf[data-review-id] selector to exclude button elements
- Reload original URL after consent (prevents URL corruption)
- Parse full DOM data after scrolling stops
- Deduplicate API reviews by author match
- Remove slow "More" button clicking for speed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 20:40:15 +00:00

515 lines
17 KiB
Python

"""
Clean Google Maps Reviews Scraper
- Simple down scrolling
- DOM scraping + API interception
"""
import re
import json
import time
import threading
from selenium.webdriver.common.by import By
def parse_api_review(raw: list) -> dict:
"""Parse a review from API response array."""
try:
if not isinstance(raw, list) or len(raw) < 5:
return None
author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else ""
timestamp = raw[1] if len(raw) > 1 else ""
text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else ""
rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0
if not (1 <= rating <= 5):
return None
# Owner response
owner_response = None
for idx in [9, 18]:
if len(raw) > idx and raw[idx] and isinstance(raw[idx], list):
resp = raw[idx]
if len(resp) > 1:
owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""}
break
return {
"author": author,
"text": text,
"rating": rating,
"timestamp": timestamp,
"owner_response": owner_response,
"source": "api"
}
except:
return None
def extract_reviews_from_api_body(body: str) -> list:
"""Extract reviews from API response body."""
reviews = []
try:
# Remove )]}' prefix
if body.startswith(")]}'"):
body = body[4:].strip()
data = json.loads(body)
# Recursively find review arrays
def find_reviews(obj, depth=0):
if depth > 12:
return
if isinstance(obj, list):
# Check if this looks like a review
if len(obj) > 4 and isinstance(obj[0], str) and isinstance(obj[4], int):
if 1 <= obj[4] <= 5:
rev = parse_api_review(obj)
if rev and rev["author"]:
reviews.append(rev)
return
for item in obj:
find_reviews(item, depth + 1)
elif isinstance(obj, dict):
for v in obj.values():
find_reviews(v, depth + 1)
find_reviews(data)
except:
pass
return reviews
def parse_dom_review(card) -> dict:
"""Parse a review from DOM element."""
try:
# Get review ID
review_id = card.get_attribute("data-review-id") or ""
if not review_id:
try:
id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]")
review_id = id_el.get_attribute("data-review-id") or ""
except:
pass
# Author - multiple selectors
author = ""
for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']:
try:
author_el = card.find_element(By.CSS_SELECTOR, sel)
author = author_el.text.strip()
if author:
break
except:
pass
# Rating from aria-label on span[role="img"]
rating = 0
try:
stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]')
aria = stars_el.get_attribute("aria-label") or ""
# Extract number from label (handles "5 stars", "5 estrellas", etc.)
num = re.search(r'[\d\.]+', aria.replace(',', '.'))
if num:
rating = int(float(num.group()))
except:
pass
# Review text - try multiple selectors
text = ""
for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']:
try:
text_el = card.find_element(By.CSS_SELECTOR, sel)
text = text_el.text.strip()
if text:
break
except:
pass
# Note: "More" button clicking removed for speed
# Full text can be expanded later if needed
# Timestamp
timestamp = ""
try:
time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]')
timestamp = time_el.text.strip()
except:
pass
# Owner response
owner_response = None
try:
resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd")
if resp_box:
resp_text = ""
resp_date = ""
try:
resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd")
resp_text = resp_text_el.text.strip()
except:
pass
try:
resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd")
resp_date = resp_date_el.text.strip()
except:
pass
if resp_text:
owner_response = {"text": resp_text, "timestamp": resp_date}
except:
pass
if not review_id and not author:
return None
return {
"id": review_id,
"author": author,
"text": text,
"rating": rating,
"timestamp": timestamp,
"owner_response": owner_response,
"source": "dom"
}
except Exception:
return None
def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15) -> dict:
"""
Scrape Google Maps reviews.
Args:
driver: Selenium WebDriver instance
url: Google Maps place URL
max_reviews: Maximum reviews to collect
timeout_no_new: Seconds to wait with no new reviews before stopping
Returns:
dict with reviews list and metadata
"""
# Storage - use review ID as key
reviews = {} # review_id -> review
# Force English language
if "hl=" not in url:
url = url + ("&" if "?" in url else "?") + "hl=en"
# Navigate to URL
print(f"🌐 Loading: {url[:80]}...")
driver.get(url)
time.sleep(3)
# Handle consent popup if present
if "consent.google" in driver.current_url:
print(" Handling consent popup...")
try:
accept_btns = driver.find_elements(By.CSS_SELECTOR, "button")
for btn in accept_btns:
txt = btn.text.lower()
if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt:
btn.click()
time.sleep(2)
break
except:
pass
# Reload original URL after consent (redirect can corrupt URL)
print(" Reloading after consent...")
driver.get(url)
time.sleep(3)
# Click reviews tab if present (multilingual support)
review_keywords = ["review", "reseña", "avis", "bewertung", "recensione", "opiniones"]
try:
tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']")
for tab in tabs:
tab_text = tab.text.lower()
if any(kw in tab_text for kw in review_keywords):
print(f" Clicking reviews tab: '{tab.text}'")
tab.click()
time.sleep(2)
break
except:
pass
# Find scrollable reviews container
def find_scroll_container():
selectors = [
"div.m6QErb.DxyBCb.kA9KIf.dS8AEf",
"div.m6QErb.DxyBCb.kA9KIf",
"div.m6QErb.DxyBCb",
"div.m6QErb[aria-label]",
"div.DxyBCb.kA9KIf.dS8AEf",
"div[role='main'] div.m6QErb",
]
for sel in selectors:
try:
els = driver.find_elements(By.CSS_SELECTOR, sel)
for el in els:
if el.is_displayed() and el.size['height'] > 100:
return el
except:
pass
return None
# Wait and retry for scroll container
scroll_container = None
for attempt in range(10):
scroll_container = find_scroll_container()
if scroll_container:
break
print(f" Waiting for reviews panel... ({attempt+1}/10)")
time.sleep(1)
if not scroll_container:
print("❌ Could not find reviews scroll container")
# Debug: print page source snippet
try:
print("Page title:", driver.title)
print("Current URL:", driver.current_url[:100])
except:
pass
return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"}
print("✅ Found scroll container")
# Extract total review count from page
total_reviews = None
try:
page_text = driver.page_source
# Look for "XX reviews" pattern
patterns = [
r'(\d{1,3}(?:,\d{3})*)\s+reviews?',
r'(\d+\.?\d*K)\s+reviews?',
r'(\d{1,3}(?:,\d{3})*)\s+reseñas?',
]
for pattern in patterns:
matches = re.findall(pattern, page_text, re.IGNORECASE)
if matches:
count_str = matches[0]
if 'K' in count_str.upper():
total_reviews = int(float(count_str.upper().replace('K', '')) * 1000)
else:
total_reviews = int(count_str.replace(',', ''))
print(f"📊 Total reviews on page: {total_reviews}")
break
except:
pass
# Wait for reviews to fully load after tab click
time.sleep(2)
# PHASE 2: Inject API interceptor for scroll-loaded reviews
print("🔌 Injecting API interceptor...")
driver.execute_script("""
if (window.__reviewInterceptorInjected) return;
window.__reviewInterceptorInjected = true;
window.__interceptedResponses = [];
// Intercept fetch
const originalFetch = window.fetch;
window.fetch = async function(...args) {
const url = args[0].toString();
const response = await originalFetch.apply(this, args);
if (url.includes('listugcposts') || url.includes('review')) {
try {
const clone = response.clone();
const text = await clone.text();
window.__interceptedResponses.push({url: url, body: text});
} catch(e) {}
}
return response;
};
// Intercept XHR
const originalXHR = window.XMLHttpRequest;
window.XMLHttpRequest = function() {
const xhr = new originalXHR();
const originalOpen = xhr.open;
let reqUrl = '';
xhr.open = function(method, url, ...rest) {
reqUrl = url;
return originalOpen.apply(this, [method, url, ...rest]);
};
xhr.addEventListener('load', function() {
if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) {
try {
window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText});
} catch(e) {}
}
});
return xhr;
};
for (let prop of Object.getOwnPropertyNames(originalXHR)) {
try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {}
}
""")
def get_api_reviews():
"""Get reviews from intercepted API responses."""
api_revs = []
try:
responses = driver.execute_script("""
var r = window.__interceptedResponses || [];
window.__interceptedResponses = [];
return r;
""")
for resp in (responses or []):
body = resp.get("body", "")
api_revs.extend(extract_reviews_from_api_body(body))
except:
pass
return api_revs
# Store pane in window for scroll thread
driver.execute_script("window.scrollablePane = arguments[0];", scroll_container)
# Background scroll thread (fast, continuous)
stop_scrolling = threading.Event()
def scroll_worker():
while not stop_scrolling.is_set():
try:
driver.execute_script("""
var p = window.scrollablePane;
if (p) p.scrollTop = p.scrollHeight;
""")
except:
pass
time.sleep(0.1) # 10x per second
scroll_thread = threading.Thread(target=scroll_worker, daemon=True)
scroll_thread.start()
# Main collection loop
last_new_time = time.time()
last_count = len(reviews)
check_num = 0
print(f"🔄 Scrolling... (timeout: {timeout_no_new}s with no new)", flush=True)
while True:
check_num += 1
time.sleep(1.0) # Check every second
# Collect from API (doesn't affect scroll)
for rev in get_api_reviews():
if not any(r.get("author") == rev["author"] for r in reviews.values()):
key = f"api_{rev['author'][:20]}_{rev['rating']}"
reviews[key] = rev
# Collect review IDs via JavaScript (doesn't affect scroll position!)
# Use specific selector to only get actual review cards, not buttons
try:
review_ids = driver.execute_script("""
var ids = [];
document.querySelectorAll('div.jftiEf[data-review-id]').forEach(function(el) {
ids.push(el.getAttribute('data-review-id'));
});
return ids;
""")
for rid in (review_ids or []):
if rid and rid not in reviews:
reviews[rid] = {"id": rid, "source": "dom", "_needs_parse": True}
except:
pass
current_count = len(reviews)
# Check for new reviews
if current_count > last_count:
last_new_time = time.time()
last_count = current_count
# Progress update
elapsed = time.time() - last_new_time
if total_reviews:
pct = (current_count / total_reviews) * 100
print(f" 📊 {current_count}/{total_reviews} ({pct:.0f}%) | idle: {elapsed:.1f}s", flush=True)
else:
print(f" 📊 {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", flush=True)
# Stop conditions
if current_count >= max_reviews:
print(f"✅ Reached max: {current_count}")
stop_scrolling.set()
break
if total_reviews and current_count >= total_reviews:
print(f"✅ Got all {total_reviews} reviews!")
stop_scrolling.set()
break
if time.time() - last_new_time >= timeout_no_new:
print(f"⏱️ Timeout: no new reviews for {timeout_no_new}s")
stop_scrolling.set()
break
# FINAL PHASE: Parse full review data from DOM (scroll is stopped)
print("📝 Parsing full review data...")
api_reviews_collected = {k: v for k, v in reviews.items() if v.get("source") == "api"}
reviews.clear()
# Parse all DOM cards now that scrolling is done
# Use specific selector to only get actual review cards (div.jftiEf), not buttons
try:
cards = driver.find_elements(By.CSS_SELECTOR, "div.jftiEf[data-review-id]")
for card in cards:
review = parse_dom_review(card)
if review and review.get("id"):
reviews[review["id"]] = review
except Exception as e:
print(f" Warning: DOM parse error: {e}")
# Merge API reviews (only add if not already in DOM)
api_added = 0
for key, api_rev in api_reviews_collected.items():
# Check if this author already exists in DOM reviews
author = api_rev.get("author", "")
if author and not any(r.get("author") == author for r in reviews.values()):
reviews[f"api_{key}"] = api_rev
api_added += 1
# Final results
review_list = list(reviews.values())
dom_count = sum(1 for r in review_list if r.get("source") == "dom")
api_count = sum(1 for r in review_list if r.get("source") == "api")
print(f"\n📋 Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})")
return {
"reviews": review_list,
"total": len(review_list),
"checks": check_num,
"url": url
}
# Test function
if __name__ == "__main__":
from seleniumbase import Driver
# Test URL - 79 reviews
TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8"
print("🚀 Starting clean scraper test...")
# Set up driver
driver = Driver(uc=True, headless=False)
driver.set_window_size(1200, 900)
try:
result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15)
print(f"\n✅ Got {result['total']} reviews in {result['checks']} checks")
# Show sample
if result["reviews"]:
print("\n📝 Sample review:")
sample = result["reviews"][0]
print(f" Author: {sample['author']}")
print(f" Rating: {sample['rating']}")
print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)")
finally:
driver.quit()
print("\n🏁 Done")