Files
whyrating-engine-legacy/modules/scraper_clean.py
Alejandro Gutiérrez 0682c0ec61 Add get_business_card_info to scraper_clean with multilingual support
Replaces fast_scraper validation with efficient polling-based extraction
using the same navigation pattern as scrape_reviews:
- 10ms polling for consent handling (no fixed waits)
- 100ms polling for data extraction
- Exits early when data found

Supports multiple languages:
- Rating: stars/estrellas/étoiles/sterne/stelle
- Reviews: reviews/reseñas/avis/bewertungen/recensioni
- Handles comma decimals (4,8 -> 4.8)

Result: 6.3s to extract name, address, rating, total_reviews

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 17:52:06 +00:00

1275 lines
50 KiB
Python

"""
Clean Google Maps Reviews Scraper
- Simple down scrolling
- DOM scraping + API interception
"""
import re
import json
import time
import threading
from datetime import datetime
from selenium.webdriver.common.by import By
class LogCapture:
"""Captures scraper logs for storage and viewing."""
def __init__(self):
self.logs = []
def log(self, message: str, level: str = "INFO", source: str = "scraper"):
"""Add a log entry with timestamp."""
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level,
"source": source,
"message": message
}
self.logs.append(entry)
# Also print for console visibility
print(message, flush=True)
def info(self, message: str, source: str = "scraper"):
self.log(message, "INFO", source)
def warning(self, message: str, source: str = "scraper"):
self.log(message, "WARNING", source)
def error(self, message: str, source: str = "scraper"):
self.log(message, "ERROR", source)
def get_logs(self):
return self.logs
def parse_api_review(raw: list) -> dict:
"""Parse a review from API response array."""
try:
if not isinstance(raw, list) or len(raw) < 5:
return None
author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else ""
timestamp = raw[1] if len(raw) > 1 else ""
text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else ""
rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0
if not (1 <= rating <= 5):
return None
# Filter out garbage data (language codes, metadata, etc.)
if len(author) <= 3: # Real names are longer than 3 chars
return None
if author.lower() in ['google', 'maps', 'reviews', 'es', 'en', 'it', 'no', 'de', 'fr', 'pt']:
return None
# Timestamp should look like a date, not a URL or language code
if timestamp and ('http' in str(timestamp) or len(str(timestamp)) <= 3):
return None
# Owner response
owner_response = None
for idx in [9, 18]:
if len(raw) > idx and raw[idx] and isinstance(raw[idx], list):
resp = raw[idx]
if len(resp) > 1:
owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""}
break
return {
"author": author,
"text": text,
"rating": rating,
"timestamp": timestamp,
"owner_response": owner_response,
"source": "api"
}
except:
return None
def extract_reviews_from_api_body(body: str) -> list:
"""Extract reviews from API response body using correct Google Maps structure."""
reviews = []
try:
# Remove )]}' prefix
if body.startswith(")]}'"):
body = body[4:].strip()
data = json.loads(body)
# Google Maps API structure: data[2] contains review arrays
# Each review: data[2][X][0] where:
# Author: [1][4][5][0]
# Rating: [2][0][0]
# Text: [2][15][0][0]
# Time: [1][6]
if not isinstance(data, list) or len(data) < 3:
return reviews
reviews_area = data[2]
if not isinstance(reviews_area, list):
return reviews
for item in reviews_area:
try:
if not isinstance(item, list) or len(item) < 1:
continue
review_data = item[0]
if not isinstance(review_data, list) or len(review_data) < 3:
continue
# Extract fields using correct paths
review_id = ""
author = ""
rating = 0
text = ""
timestamp = ""
# Review ID: [0] - same format as DOM's data-review-id
try:
review_id = review_data[0]
except (IndexError, TypeError):
pass
# Author: [1][4][5][0]
try:
author = review_data[1][4][5][0]
except (IndexError, TypeError):
pass
# Rating: [2][0][0]
try:
rating = review_data[2][0][0]
except (IndexError, TypeError):
pass
# Text: [2][15][0][0]
try:
text = review_data[2][15][0][0]
except (IndexError, TypeError):
pass
# Timestamp: [1][6]
try:
timestamp = review_data[1][6]
except (IndexError, TypeError):
pass
# Validate and add (include review_id for deduplication)
if author and isinstance(rating, int) and 1 <= rating <= 5:
reviews.append({
"review_id": review_id,
"author": author,
"text": text or "",
"rating": rating,
"timestamp": timestamp or "",
"source": "api"
})
except:
continue
except:
pass
return reviews
def parse_dom_review(card) -> dict:
"""Parse a review from DOM element."""
try:
# Get review ID
review_id = card.get_attribute("data-review-id") or ""
if not review_id:
try:
id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]")
review_id = id_el.get_attribute("data-review-id") or ""
except:
pass
# Author - multiple selectors
author = ""
for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']:
try:
author_el = card.find_element(By.CSS_SELECTOR, sel)
author = author_el.text.strip()
if author:
break
except:
pass
# Rating from aria-label on span[role="img"]
rating = 0
try:
stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]')
aria = stars_el.get_attribute("aria-label") or ""
# Extract number from label (handles "5 stars", "5 estrellas", etc.)
num = re.search(r'[\d\.]+', aria.replace(',', '.'))
if num:
rating = int(float(num.group()))
except:
pass
# Review text - try multiple selectors
text = ""
for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']:
try:
text_el = card.find_element(By.CSS_SELECTOR, sel)
text = text_el.text.strip()
if text:
break
except:
pass
# Note: "More" button clicking removed for speed
# Full text can be expanded later if needed
# Timestamp
timestamp = ""
try:
time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]')
timestamp = time_el.text.strip()
except:
pass
# Owner response
owner_response = None
try:
resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd")
if resp_box:
resp_text = ""
resp_date = ""
try:
resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd")
resp_text = resp_text_el.text.strip()
except:
pass
try:
resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd")
resp_date = resp_date_el.text.strip()
except:
pass
if resp_text:
owner_response = {"text": resp_text, "timestamp": resp_date}
except:
pass
if not review_id and not author:
return None
return {
"id": review_id,
"author": author,
"text": text,
"rating": rating,
"timestamp": timestamp,
"owner_response": owner_response,
"source": "dom"
}
except Exception:
return None
def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15,
flush_callback=None, flush_batch_size: int = 500, log_capture: LogCapture = None,
progress_callback=None) -> dict:
"""
Scrape Google Maps reviews.
Args:
driver: Selenium WebDriver instance
url: Google Maps place URL
max_reviews: Maximum reviews to collect
timeout_no_new: Seconds to wait with no new reviews before stopping
flush_callback: Optional callback(reviews_list) called every flush_batch_size reviews
This allows streaming data to disk and freeing memory
flush_batch_size: Number of reviews to collect before flushing (default 500)
log_capture: Optional LogCapture instance for storing logs
progress_callback: Optional callback(current_count, total_count) called every iteration
Returns:
dict with reviews list and metadata
"""
# Use provided log_capture or create a dummy that just prints
log = log_capture or LogCapture()
# Storage - use review ID as key
reviews = {} # review_id -> review
seen_ids = set() # Track all IDs we've seen (persists after flush)
total_flushed = [0] # Use list for closure mutation
review_order = {} # review_id -> position (DOM visual order for sorting)
order_counter = [0] # Current order position
# Track total reviews (persists across refreshes)
total_reviews = [None] # Use list for closure mutation
# Hard refresh counter
hard_refresh_count = [0]
max_hard_refreshes = 3 # Max number of hard refreshes before giving up
# Find scrollable reviews container helper
def find_scroll_container():
selectors = [
"div.m6QErb.DxyBCb.kA9KIf.dS8AEf",
"div.m6QErb.DxyBCb.kA9KIf",
"div.m6QErb.DxyBCb",
"div.m6QErb[aria-label]",
"div.DxyBCb.kA9KIf.dS8AEf",
"div[role='main'] div.m6QErb",
]
for sel in selectors:
try:
els = driver.find_elements(By.CSS_SELECTOR, sel)
for el in els:
if el.is_displayed() and el.size['height'] > 100:
return el
except:
pass
return None
def setup_reviews_page(is_refresh=False):
"""
Setup the reviews page for scraping.
Returns (scroll_container, stop_scrolling_event) or (None, None) on failure.
Can be called after initial load or after a hard refresh.
"""
nonlocal total_reviews
refresh_label = " (after refresh)" if is_refresh else ""
# Navigate to URL (only on initial load or refresh)
if not is_refresh:
log.info(f"🌐 Loading: {url[:80]}...")
else:
log.info(f"🔄 Hard refresh #{hard_refresh_count[0]}: reloading page...")
driver.get(url)
# Handle consent popup if redirected (poll with tiny sleep)
start = time.time()
while time.time() - start < 5: # Max 5s for consent
if "consent.google" in driver.current_url:
log.info(" Handling consent popup...")
try:
for btn in driver.find_elements(By.CSS_SELECTOR, "button"):
txt = btn.text.lower()
if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt:
btn.click()
# Reload original URL after consent
log.info(" Reloading after consent...")
driver.get(url)
break
except:
pass
break
# Check if we're already on the target page
if "maps/place" in driver.current_url and "consent" not in driver.current_url:
break
time.sleep(0.01) # 10ms - responsive but low CPU
# Extract total review count BEFORE clicking reviews tab (it's on Overview)
# Only on first load (don't overwrite if we already have it)
if total_reviews[0] is None:
start = time.time()
while time.time() - start < 5:
try:
count = driver.execute_script("""
var reviewSpans = document.querySelectorAll('span[role="img"]');
for (var i = 0; i < reviewSpans.length; i++) {
var label = reviewSpans[i].getAttribute('aria-label') || '';
var match = label.match(/^([\\d,\\.]+)\\s*review/i);
if (match) {
return parseInt(match[1].replace(/[,\\.]/g, ''));
}
}
return null;
""")
if count:
total_reviews[0] = count
log.info(f"📊 Total reviews on page: {count}")
break
except:
pass
time.sleep(0.1)
# Click reviews tab - poll until found
review_keywords = ["review", "reseña", "avis", "bewertung", "recensione", "opiniones"]
start = time.time()
tab_clicked = False
while time.time() - start < 5: # Max 5s for tabs
try:
tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']")
for tab in tabs:
tab_text = tab.text.lower()
if any(kw in tab_text for kw in review_keywords):
if not is_refresh:
log.info(f" Clicking reviews tab: '{tab.text}'")
tab.click()
tab_clicked = True
break
if tab_clicked:
break
time.sleep(0.01) # 10ms between polls
except:
time.sleep(0.01)
# Poll for scroll container (10ms intervals - fast but low CPU)
scroll_container = None
start = time.time()
last_print = 0
while time.time() - start < 10: # Max 10s
scroll_container = find_scroll_container()
if scroll_container:
break
elapsed = int(time.time() - start)
if elapsed > last_print:
log.info(f" Waiting for reviews panel...{refresh_label} ({elapsed}s)")
last_print = elapsed
time.sleep(0.01) # 10ms - responsive but low CPU
if not scroll_container:
log.error(f"❌ Could not find reviews scroll container{refresh_label}")
try:
log.error(f"Page title: {driver.title}")
log.error(f"Current URL: {driver.current_url[:100]}")
except:
pass
return None, None
log.info(f"✅ Found scroll container{refresh_label}")
# Inject API interceptor (needs to be re-injected after refresh)
if not is_refresh:
log.info("🔌 Injecting API interceptor...")
driver.execute_script("""
// Always re-setup on refresh
window.__reviewInterceptorInjected = true;
window.__interceptedResponses = window.__interceptedResponses || [];
// Intercept fetch (only if not already patched)
if (!window.__fetchPatched) {
window.__fetchPatched = true;
const originalFetch = window.fetch;
window.fetch = async function(...args) {
const url = args[0].toString();
const response = await originalFetch.apply(this, args);
if (url.includes('listugcposts') || url.includes('review')) {
try {
const clone = response.clone();
const text = await clone.text();
window.__interceptedResponses.push({url: url, body: text});
} catch(e) {}
}
return response;
};
}
// Intercept XHR (only if not already patched)
if (!window.__xhrPatched) {
window.__xhrPatched = true;
const originalXHR = window.XMLHttpRequest;
window.XMLHttpRequest = function() {
const xhr = new originalXHR();
const originalOpen = xhr.open;
let reqUrl = '';
xhr.open = function(method, url, ...rest) {
reqUrl = url;
return originalOpen.apply(this, [method, url, ...rest]);
};
xhr.addEventListener('load', function() {
if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) {
try {
window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText});
} catch(e) {}
}
});
return xhr;
};
for (let prop of Object.getOwnPropertyNames(originalXHR)) {
try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {}
}
}
""")
# Sort by newest first
try:
sort_btn = driver.execute_script("""
var btns = document.querySelectorAll('button[data-value="sort"]');
if (btns.length) return btns[0];
var all = document.querySelectorAll('button[aria-label*="Sort"]');
if (all.length) return all[0];
return null;
""")
if sort_btn:
sort_btn.click()
time.sleep(0.3)
driver.execute_script("""
var items = document.querySelectorAll('[role="menuitemradio"], [data-index="1"]');
for (var i = 0; i < items.length; i++) {
var txt = items[i].textContent.toLowerCase();
if (txt.includes('newest') || txt.includes('recent') || txt.includes('más reciente')) {
items[i].click();
break;
}
}
""")
time.sleep(0.5)
log.info(" 📅 Sorted by newest")
# Re-find scroll container after sorting (DOM may be recreated)
new_container = find_scroll_container()
if new_container:
scroll_container = new_container
log.info(" 🔄 Refreshed scroll container reference")
except:
pass
# Expand "More" buttons for full text
try:
expanded = driver.execute_script("""
var buttons = document.querySelectorAll('button.w8nwRe.kyuRq');
var count = 0;
for (var i = 0; i < buttons.length; i++) {
if (buttons[i].textContent.trim() === 'More') {
buttons[i].click();
count++;
}
}
return count;
""")
if expanded > 0:
log.info(f" 📝 Expanded {expanded} truncated reviews")
except:
pass
# Block images to speed up scrolling (use CDP)
try:
driver.execute_cdp_cmd('Network.setBlockedURLs', {
'urls': ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp', '*.svg', '*googleusercontent.com/*']
})
driver.execute_cdp_cmd('Network.enable', {})
if not is_refresh:
log.info(" 🚫 Blocking images for faster scrolling")
except:
pass
# Setup scrollable pane reference
driver.execute_script("window.scrollablePane = arguments[0];", scroll_container)
# Create scroll worker
stop_scrolling = threading.Event()
def scroll_worker():
while not stop_scrolling.is_set():
try:
driver.execute_script("""
var p = window.scrollablePane;
if (p) p.scrollTop = p.scrollHeight;
""")
except:
pass
time.sleep(0.1)
scroll_thread = threading.Thread(target=scroll_worker, daemon=True)
scroll_thread.start()
return scroll_container, stop_scrolling
# Initial page setup
scroll_container, stop_scrolling = setup_reviews_page(is_refresh=False)
if not scroll_container:
return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"}
def get_api_reviews():
"""Get reviews from intercepted API responses."""
api_revs = []
try:
responses = driver.execute_script("""
var r = window.__interceptedResponses || [];
window.__interceptedResponses = [];
return r;
""")
for resp in (responses or []):
body = resp.get("body", "")
api_revs.extend(extract_reviews_from_api_body(body))
except:
pass
return api_revs
# Recovery function - use real mouse actions when stuck
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
recovery_count = [0]
def unstick_scroll():
nonlocal scroll_container
recovery_count[0] += 1
method = recovery_count[0] % 4
try:
if method == 1:
# Method 1: Click pane and send Page Down keys
scroll_container.click()
ActionChains(driver).send_keys(Keys.PAGE_DOWN).perform()
ActionChains(driver).send_keys(Keys.PAGE_DOWN).perform()
elif method == 2:
# Method 2: Real mouse wheel scroll
ActionChains(driver).move_to_element(scroll_container)\
.scroll_by_amount(0, 800).perform()
elif method == 3:
# Method 3: Scroll up significantly then back down (force reload)
driver.execute_script("""
var p = window.scrollablePane;
if (p) p.scrollTop = Math.max(0, p.scrollTop - 2000);
""")
time.sleep(0.3)
driver.execute_script("""
var p = window.scrollablePane;
if (p) p.scrollTop = p.scrollHeight;
""")
else:
# Method 4: Scroll last card into view, then scroll pane (no click to avoid opening profile)
driver.execute_script("""
var cards = document.querySelectorAll('[data-review-id]');
if (cards.length > 0) {
cards[cards.length - 1].scrollIntoView({block: 'end', behavior: 'smooth'});
}
""")
time.sleep(0.3)
driver.execute_script("""
var p = window.scrollablePane;
if (p) p.scrollTop = p.scrollHeight;
""")
except:
pass
def do_hard_refresh():
"""Hard refresh the page and re-setup everything. Returns True on success."""
nonlocal scroll_container, stop_scrolling
hard_refresh_count[0] += 1
if hard_refresh_count[0] > max_hard_refreshes:
log.warning(f" ⚠️ Max hard refreshes ({max_hard_refreshes}) reached, giving up")
return False
# Stop current scroll worker
stop_scrolling.set()
time.sleep(0.2)
# Re-setup page
new_container, new_stop = setup_reviews_page(is_refresh=True)
if new_container:
scroll_container = new_container
stop_scrolling = new_stop
recovery_count[0] = 0 # Reset recovery count after successful refresh
log.info(f" ✅ Hard refresh successful, resuming with {len(seen_ids)} reviews already collected")
return True
else:
log.error(f" ❌ Hard refresh failed to find scroll container")
return False
# Main collection loop
last_new_time = time.time()
last_count = len(reviews)
check_num = 0
log.info(f"🔄 Scrolling... (timeout: {timeout_no_new}s with no new)")
cycle_start = time.time()
while True:
check_num += 1
time.sleep(1.0) # Check every second
# TIMING: Track cycle performance
t0 = time.time()
cycle_delta = t0 - cycle_start
cycle_start = t0
# Collect from API (doesn't affect scroll) - API has FULL TEXT in original language
# Use review_id as key to avoid duplicates with DOM
t1 = time.time()
for rev in get_api_reviews():
rid = rev.get('review_id', '')
if rid and rid not in seen_ids:
reviews[rid] = rev
seen_ids.add(rid)
api_time = time.time() - t1
# Expand any new "More" buttons for full text (batch click, fast)
try:
driver.execute_script("""
var buttons = document.querySelectorAll('button.w8nwRe.kyuRq');
for (var i = 0; i < buttons.length; i++) {
if (buttons[i].textContent.trim() === 'More') {
buttons[i].click();
}
}
""")
except:
pass
# Parse reviews using ROBUST selectors (no class names - uses data/aria attributes)
# This survives Google's CSS class name changes
# Also removes separators from previously-hidden cards to keep DOM light
t2 = time.time()
dom_cards = 0
try:
seen_list = list(seen_ids)
parsed_reviews = driver.execute_script("""
var seenSet = new Set(arguments[0]);
var results = [];
var processedIds = new Set();
var sepsRemoved = 0;
// ROBUST: Find cards by data attribute only (not class names)
var cards = document.querySelectorAll('[data-review-id]');
for (var i = 0; i < cards.length; i++) {
var card = cards[i];
var rid = card.getAttribute('data-review-id');
var isHidden = card.style.display === 'none';
// CLEANUP: Remove separators adjacent to already-hidden cards
// This keeps DOM light without breaking Google's virtual scroll
if (isHidden) {
var sibling = card.nextElementSibling;
while (sibling) {
var nextSib = sibling.nextElementSibling;
var classes = sibling.className || '';
if (classes.includes('AyRUI') || classes.includes('TFQHme')) {
sibling.remove();
sepsRemoved++;
sibling = nextSib;
} else {
break;
}
}
continue;
}
// Skip if no ID or already processed this cycle
if (!rid || processedIds.has(rid)) continue;
// Only process top-level review cards (have aria-label with author name)
if (!card.getAttribute('aria-label')) continue;
processedIds.add(rid);
// Already seen from API - just track order, skip content
// BUT still hide the card to keep DOM light!
if (seenSet.has(rid)) {
results.push({id: rid, orderOnly: true});
// Hide this card since we already have its data from API
card.style.display = 'none';
card.innerHTML = '';
continue;
}
var author = '', text = '', rating = 0, timestamp = '';
// AUTHOR: Extract from "Photo of {Name}" button aria-label
var photoBtn = card.querySelector('button[aria-label^="Photo of"]');
if (photoBtn) {
author = photoBtn.getAttribute('aria-label').replace('Photo of ', '').trim();
}
// Fallback: card's own aria-label is the author name
if (!author) {
author = card.getAttribute('aria-label') || '';
}
// RATING: span with role="img" and aria-label containing "star"
var ratingEl = card.querySelector('span[role="img"][aria-label*="star"]');
if (ratingEl) {
var match = ratingEl.getAttribute('aria-label').match(/(\\d)/);
if (match) rating = parseInt(match[1]);
}
// TIMESTAMP: Find span with "X time ago" pattern
var spans = card.querySelectorAll('span');
for (var j = 0; j < spans.length; j++) {
var spanText = spans[j].textContent.trim();
if (spanText.match(/^(\\d+|a|an)\\s+(second|minute|hour|day|week|month|year)s?\\s+ago$/i)) {
timestamp = spanText;
break;
}
}
// TEXT: Find longest text span (not timestamp/UI elements)
var longestText = '';
for (var j = 0; j < spans.length; j++) {
var spanText = spans[j].textContent.trim();
if (spanText === timestamp) continue;
if (spanText.match(/^\\d+ stars?$/i)) continue;
if (spanText === 'More' || spanText === 'Less') continue;
if (spanText.match(/^(Like\\d*|Share)$/)) continue;
if (spanText.length > longestText.length && spanText.length > 10) {
longestText = spanText;
}
}
text = longestText;
if (author && rating >= 1 && rating <= 5) {
results.push({
id: rid,
orderOnly: false,
author: author,
text: text,
rating: rating,
timestamp: timestamp,
source: 'dom'
});
}
// ALWAYS hide processed cards to keep DOM light
// (even if extraction failed - we've seen this card)
card.style.display = 'none';
card.innerHTML = '';
}
return {reviews: results, cardCount: cards.length, sepsRemoved: sepsRemoved};
""", seen_list)
dom_cards = parsed_reviews.get('cardCount', 0) if parsed_reviews else 0
new_reviews = parsed_reviews.get('reviews', []) if parsed_reviews else []
for rev in new_reviews:
rid = rev.pop('id')
order_only = rev.pop('orderOnly', False)
# Track DOM order for ALL reviews (for sorting output)
if rid not in review_order:
review_order[rid] = order_counter[0]
order_counter[0] += 1
# Only add content for new reviews (not already from API)
if not order_only:
reviews[rid] = rev
seen_ids.add(rid)
except Exception as e:
log.error(f" ❌ DOM parse error: {e}")
dom_time = time.time() - t2
# BATCH FLUSH: If we have enough reviews, flush to callback and clear memory
# Sort by DOM order before flushing
t3 = time.time()
if flush_callback and len(reviews) >= flush_batch_size:
log.info(f" 💾 Flushing {len(reviews)} reviews to disk...")
sorted_reviews = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf')))
flush_callback([r for _, r in sorted_reviews])
total_flushed[0] += len(reviews)
reviews.clear() # Free memory, but keep seen_ids and review_order
flush_time = time.time() - t3
current_count = total_flushed[0] + len(reviews)
# TIMING: Print if cycle is slow (>2s)
if cycle_delta > 2.0:
log.warning(f" ⚠️ SLOW cycle: {cycle_delta:.1f}s (api:{api_time:.1f}s dom:{dom_time:.1f}s/{dom_cards}cards flush:{flush_time:.1f}s seen:{len(seen_ids)})")
# Check for new reviews
if current_count > last_count:
last_new_time = time.time()
last_count = current_count
# Check if loading (spinner visible OR network activity)
try:
loading_status = driver.execute_script("""
var status = {spinner: false, network: false};
// Check for Google's loading indicators
var spinner = document.querySelector('div[role="progressbar"]');
if (spinner && spinner.offsetParent !== null) status.spinner = true;
var loading = document.querySelector('.qjESne, .loading');
if (loading && loading.offsetParent !== null) status.spinner = true;
// Check for recent network activity (API interceptor)
var responses = window.__interceptedResponses || [];
var lastCount = window.__lastResponseCount || 0;
if (responses.length > lastCount) {
status.network = true;
window.__lastResponseCount = responses.length;
}
return status;
""")
is_loading = loading_status.get('spinner') or loading_status.get('network')
if is_loading:
last_new_time = time.time() # Reset timer while loading
except:
is_loading = False
# Progress update
elapsed = time.time() - last_new_time
if total_reviews[0]:
pct = (current_count / total_reviews[0]) * 100
log.info(f" 📊 {current_count}/{total_reviews[0]} ({pct:.0f}%) | idle: {elapsed:.1f}s")
else:
log.info(f" 📊 {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s")
# Call progress callback on every iteration (for real-time log updates)
if progress_callback:
progress_callback(current_count, total_reviews[0])
# Stop conditions - check BEFORE recovery attempts
if current_count >= max_reviews:
log.info(f"✅ Reached max: {current_count}")
stop_scrolling.set()
break
# Also stop if we have all reviews from the page
if total_reviews[0] and current_count >= total_reviews[0]:
log.info(f"✅ All {current_count} reviews collected")
stop_scrolling.set()
break
# STUCK DETECTION: If no new reviews for 3s+, try to unstick
# Only if we haven't collected all reviews yet
if elapsed >= 3 and int(elapsed) % 3 == 0:
# After 8+ failed recovery attempts, try hard refresh
if recovery_count[0] >= 8 and hard_refresh_count[0] < max_hard_refreshes:
log.info(f" 🔄 Soft recovery failed {recovery_count[0]} times, trying hard refresh...")
if do_hard_refresh():
last_new_time = time.time() # Reset timer after refresh
continue # Skip to next iteration
else:
log.info(f" 🔧 Recovery attempt #{recovery_count[0] + 1}...")
unstick_scroll()
# Check scroll state - track if content is still being added
try:
scroll_state = driver.execute_script("""
var p = window.scrollablePane;
if (!p) return {atBottom: true, height: 0};
var atBottom = (p.scrollTop + p.clientHeight >= p.scrollHeight - 50);
var height = p.scrollHeight;
var lastHeight = window.__lastScrollHeight || 0;
var growing = height > lastHeight;
window.__lastScrollHeight = height;
return {atBottom: atBottom, height: height, growing: growing};
""")
at_bottom = scroll_state.get('atBottom', True)
content_growing = scroll_state.get('growing', False)
except:
at_bottom = True
content_growing = False
# Reset timer if content is growing (new reviews loading)
if content_growing:
last_new_time = time.time()
# Dynamic timeout based on state and recovery attempts
# - Try hard refresh before giving up if we still have refreshes left
# - 5s if at bottom AND content stopped growing AND multiple recovery attempts failed
# - 15s max otherwise (keep trying)
recovery_failed = recovery_count[0] >= 5 and elapsed >= 5
truly_done = at_bottom and not content_growing and recovery_failed
timeout_hit = elapsed >= timeout_no_new
if truly_done or timeout_hit:
# Last chance: try hard refresh before giving up
if hard_refresh_count[0] < max_hard_refreshes and current_count < (total_reviews[0] or max_reviews):
log.info(f" 🔄 Timeout reached, trying hard refresh before giving up...")
if do_hard_refresh():
last_new_time = time.time()
continue # Keep trying
log.info(f"✅ All reviews loaded: {current_count}")
stop_scrolling.set()
break
# Flush any remaining reviews (sorted by DOM order)
if flush_callback and reviews:
log.info(f" 💾 Final flush: {len(reviews)} reviews...")
sorted_reviews = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf')))
flush_callback([r for _, r in sorted_reviews])
total_flushed[0] += len(reviews)
reviews.clear()
# Reviews already parsed during scrolling (real-time parsing)
log.info("📝 Finalizing review data...")
# Final results (sorted by DOM order)
sorted_items = sorted(reviews.items(), key=lambda x: review_order.get(x[0], float('inf')))
review_list = [r for _, r in sorted_items]
grand_total = total_flushed[0] + len(review_list)
dom_count = sum(1 for r in review_list if r.get("source") == "dom")
api_count = sum(1 for r in review_list if r.get("source") == "api")
if total_flushed[0] > 0:
log.info(f"📋 Total: {grand_total} unique reviews (flushed: {total_flushed[0]}, in memory: {len(review_list)})")
else:
log.info(f"📋 Total: {len(review_list)} unique reviews (DOM: {dom_count}, API: {api_count})")
return {
"reviews": review_list, # Only unflushed reviews (flushed already sent to callback)
"total": grand_total,
"total_flushed": total_flushed[0],
"checks": check_num,
"url": url,
"logs": log.get_logs()
}
def fast_scrape_reviews(url: str, headless: bool = False, max_scrolls: int = 999999,
progress_callback=None, driver=None, return_driver: bool = False,
log_capture: LogCapture = None):
"""
Production-compatible wrapper for scrape_reviews.
Matches the API expected by job_manager.py.
Args:
url: Google Maps URL to scrape
headless: Run Chrome in headless mode
max_scrolls: Not used (kept for API compatibility)
progress_callback: Optional callback(current_count, total_count) for progress
driver: Existing driver instance to reuse
return_driver: If True, return driver in result
log_capture: Optional LogCapture instance for real-time log access
Returns:
Dictionary with: reviews, count, total_reviews, time, success, error, driver, logs
"""
from seleniumbase import Driver
start_time = time.time()
driver_provided = driver is not None
should_close_driver = not return_driver and not driver_provided
# Use provided log_capture or create new one
log_capture = log_capture or LogCapture()
try:
# Create driver if not provided
if not driver:
driver = Driver(
uc=True,
headless=headless,
page_load_strategy="normal",
agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
driver.set_window_size(1200, 900) # Proper viewport for Google Maps
# Set Chrome geolocation to US (Boston, MA) using CDP
# This ensures Google Maps shows US results regardless of server location
try:
driver.execute_cdp_cmd('Emulation.setGeolocationOverride', {
'latitude': 42.3601,
'longitude': -71.0589,
'accuracy': 100
})
log_capture.info("Set geolocation to US (Boston, MA)")
except Exception as e:
log_capture.warning(f"Could not set geolocation: {e}")
# Add URL parameters for consistent results
if 'hl=' not in url:
separator = '&' if '?' in url else '?'
url = f"{url}{separator}hl=en"
if 'gl=' not in url:
url = f"{url}&gl=us"
# Create progress wrapper if callback provided
flush_callback = None
if progress_callback:
collected = [0]
def flush_with_progress(reviews_batch):
collected[0] += len(reviews_batch)
progress_callback(collected[0], None)
flush_callback = flush_with_progress
# Run the scraper with progress callback for real-time updates
result = scrape_reviews(
driver=driver,
url=url,
max_reviews=999999, # Effectively unlimited
timeout_no_new=15,
flush_callback=flush_callback,
flush_batch_size=100, # Smaller batches for more frequent progress
log_capture=log_capture,
progress_callback=progress_callback # Pass through for real-time log updates
)
elapsed = time.time() - start_time
# Return in expected format
response = {
"reviews": result.get("reviews", []),
"count": result.get("total", 0),
"total_reviews": result.get("total", 0),
"time": elapsed,
"success": True,
"error": None,
"logs": result.get("logs", [])
}
if return_driver:
response["driver"] = driver
elif should_close_driver:
try:
driver.quit()
except:
pass
return response
except Exception as e:
elapsed = time.time() - start_time
if should_close_driver and driver:
try:
driver.quit()
except:
pass
# Log error to the existing log_capture
log_capture.error(f"Scraper failed: {str(e)}")
return {
"reviews": [],
"count": 0,
"total_reviews": 0,
"time": elapsed,
"success": False,
"error": str(e),
"driver": driver if return_driver else None,
"logs": log_capture.get_logs()
}
# Test function
if __name__ == "__main__":
from seleniumbase import Driver
# Test URL - 79 reviews
TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8"
print("🚀 Starting clean scraper test...")
# Set up driver
driver = Driver(uc=True, headless=False)
driver.set_window_size(1200, 900)
try:
result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15)
print(f"\n✅ Got {result['total']} reviews in {result['checks']} checks")
# Show sample
if result["reviews"]:
print("\n📝 Sample review:")
sample = result["reviews"][0]
print(f" Author: {sample['author']}")
print(f" Rating: {sample['rating']}")
print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)")
finally:
driver.quit()
print("\n🏁 Done")
def get_business_card_info(url: str, headless: bool = True, driver=None, return_driver: bool = False) -> dict:
"""
Extract business card info from Google Maps.
Uses the same efficient polling navigation as scrape_reviews (no fixed waits).
Returns:
dict with: name, address, rating, total_reviews, success, error, time
"""
from seleniumbase import Driver
start_time = time.time()
driver_provided = driver is not None
should_close_driver = not return_driver and not driver_provided
try:
# Create driver if not provided
if not driver:
driver = Driver(uc=True, headless=headless)
# Set geolocation to US
try:
driver.execute_cdp_cmd('Emulation.setGeolocationOverride', {
'latitude': 42.3601, 'longitude': -71.0589, 'accuracy': 100
})
except:
pass
# Navigate to URL
driver.get(url)
# Handle consent popup - poll with 10ms sleep (same as scrape_reviews)
start = time.time()
while time.time() - start < 5:
if "consent.google" in driver.current_url:
try:
for btn in driver.find_elements(By.CSS_SELECTOR, "button"):
txt = btn.text.lower()
if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt:
btn.click()
driver.get(url)
break
except:
pass
break
if "maps/place" in driver.current_url or ("maps" in driver.current_url and "consent" not in driver.current_url):
break
time.sleep(0.01) # 10ms - responsive but low CPU
# Poll for business info (same pattern as total_reviews extraction)
info = {"name": None, "rating": None, "total_reviews": None, "address": None}
start = time.time()
while time.time() - start < 5:
try:
info = driver.execute_script("""
var result = {name: null, rating: null, total_reviews: null, address: null};
// Business name from h1
var h1 = document.querySelector('h1');
if (h1) result.name = h1.textContent.trim();
// Rating and reviews from span[role="img"] aria-labels
// Handles multiple languages: stars/estrellas/étoiles, reviews/reseñas/avis
var spans = document.querySelectorAll('span[role="img"]');
for (var i = 0; i < spans.length; i++) {
var label = spans[i].getAttribute('aria-label') || '';
// Rating: "4.8 stars" or "4,8 estrellas" or "4,8 étoiles"
var rMatch = label.match(/^([\\d,.]+)\\s*(stars?|estrellas?|étoiles?|sterne?|stelle)/i);
if (rMatch && !result.rating) {
result.rating = parseFloat(rMatch[1].replace(',', '.'));
}
// Reviews: "79 reviews" or "79 reseñas" or "79 avis"
var revMatch = label.match(/^([\\d,\\.]+)\\s*(reviews?|reseñas?|avis|bewertungen|recensioni)/i);
if (revMatch && !result.total_reviews) {
result.total_reviews = parseInt(revMatch[1].replace(/[,\\.]/g, ''));
}
}
// Address from button
var addrBtn = document.querySelector('button[data-item-id="address"]');
if (addrBtn) {
var label = addrBtn.getAttribute('aria-label');
if (label) result.address = label.replace(/^(Address|Dirección|Adresse):\\s*/i, '');
}
return result;
""")
# Exit early if we have the essentials
if info.get("name") and info.get("total_reviews") is not None:
break
except:
pass
time.sleep(0.1) # 100ms between polls
return {
"name": info.get("name"),
"address": info.get("address"),
"rating": info.get("rating"),
"total_reviews": info.get("total_reviews"),
"success": bool(info.get("name")),
"error": None,
"time": time.time() - start_time
}
except Exception as e:
return {
"name": None,
"address": None,
"rating": None,
"total_reviews": None,
"success": False,
"error": str(e),
"time": time.time() - start_time
}
finally:
if should_close_driver and driver:
try:
driver.quit()
except:
pass