Files
whyrating-engine-legacy/start_ultra_fast_complete.py
Alejandro Gutiérrez faa0704737 Optimize scraper performance and add fallback selectors for robustness
Performance improvements:
- Validation speed: 59.71s → 10.96s (5.5x improvement)
- Removed 50+ console.log statements from JavaScript extraction
- Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting
- Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls)

Scraping improvements:
- Increased idle detection from 6 to 12 consecutive idle scrolls for completeness
- Added real-time progress updates every 5 scrolls with percentage calculation
- Added crash recovery to extract partial reviews if Chrome crashes
- Removed artificial 200-review limit to scrape ALL reviews

Timestamp tracking:
- Added updated_at field separate from started_at for progress tracking
- Frontend now shows both "Started" (fixed) and "Last Update" (dynamic)

Robustness improvements:
- Added 5 fallback CSS selectors to handle different Google Maps page structures
- Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc.
- Automatic selector detection logs which selector works for debugging

Test results:
- Successfully scraped 550 reviews in 150.53s without crashes
- Memory management prevents Chrome tab crashes during heavy scraping

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-18 19:49:24 +00:00

337 lines
12 KiB
Python

#!/usr/bin/env python3
"""
ULTRA-FAST COMPLETE Scraper - Gets ALL 244 reviews in ~25-30 seconds.
Strategy:
1. Ultra-fast API scrolling to get 234 reviews (~19s)
2. DOM parsing for missing 10 reviews (~5-10s)
3. Total: ~25-30s for 244 reviews (vs 155s original)
Combines speed of start_ultra_fast.py with completeness of original scraper.
"""
import sys
import yaml
import logging
import time
import json
from seleniumbase import Driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from modules.api_interceptor import GoogleMapsAPIInterceptor
logging.basicConfig(level=logging.WARNING, format='[%(levelname)s] %(message)s')
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
def load_config():
with open('config.yaml', 'r') as f:
return yaml.safe_load(f)
def parse_dom_reviews_fast(driver, max_reviews=20):
"""Fast DOM parsing using JavaScript - extracts data in bulk."""
# JavaScript to extract review data from first N reviews
extract_script = """
const reviews = [];
const elements = document.querySelectorAll('div.jftiEf.fontBodyMedium');
const maxCount = Math.min(arguments[0], elements.length);
for (let i = 0; i < maxCount; i++) {
const elem = elements[i];
const review = {};
try {
// Author
const authorElem = elem.querySelector('div.d4r55');
review.author = authorElem ? authorElem.textContent : null;
// Rating
const ratingElem = elem.querySelector('span.kvMYJc');
if (ratingElem) {
const ariaLabel = ratingElem.getAttribute('aria-label');
if (ariaLabel) {
const match = ariaLabel.match(/\\d+/);
review.rating = match ? parseFloat(match[0]) : null;
}
}
// Text
const textElem = elem.querySelector('span.wiI7pd');
review.text = textElem ? textElem.textContent : null;
// Date
const dateElem = elem.querySelector('span.rsqaWe');
review.date_text = dateElem ? dateElem.textContent : null;
// Avatar
const avatarElem = elem.querySelector('img.NBa7we');
review.avatar_url = avatarElem ? avatarElem.src : null;
// Profile URL
const profileElem = elem.querySelector('button.WEBjve');
review.profile_url = profileElem ? profileElem.getAttribute('data-review-id') : null;
if (review.author) {
reviews.push(review);
}
} catch (e) {
// Skip this review
}
}
return reviews;
"""
try:
# Execute JavaScript to get all review data at once
dom_reviews_data = driver.execute_script(extract_script, max_reviews)
# Convert to our format
dom_reviews = []
for review_data in dom_reviews_data:
if review_data.get('author') and review_data.get('date_text'):
review_id = f"dom_{hash(review_data['author'] + review_data['date_text'])}"
review_data['review_id'] = review_id
dom_reviews.append(review_data)
return dom_reviews
except Exception as e:
print(f" Error in fast DOM parse: {e}")
return []
def ultra_fast_complete_scrape():
"""Get ALL reviews with ultra-fast API + DOM fallback."""
config = load_config()
url = config.get('url')
headless = config.get('headless', False)
print("ULTRA-FAST COMPLETE SCRAPER - Getting ALL 244 reviews...")
print(f"URL: {url[:80]}...")
start_time = time.time()
api_reviews = {}
driver = Driver(uc=True, headless=headless, page_load_strategy="normal")
try:
# ====== PHASE 1: ULTRA-FAST API SCROLLING ======
print("\n[Phase 1] Ultra-fast API scrolling...")
# Step 1: Navigate
driver.get(url)
time.sleep(1.5)
# Dismiss cookies
try:
cookie_btns = driver.find_elements(By.CSS_SELECTOR,
'button[aria-label*="Accept" i],button[aria-label*="Aceptar" i]')
if cookie_btns:
cookie_btns[0].click()
time.sleep(0.4)
except:
pass
# Click reviews tab
review_keywords = ['reviews', 'review', 'reseñas', 'reseña']
for selector in ['.LRkQ2', 'button[role="tab"]']:
try:
tabs = driver.find_elements(By.CSS_SELECTOR, selector)
for tab in tabs:
text = (tab.text or '').lower()
aria = (tab.get_attribute('aria-label') or '').lower()
if any(kw in text or kw in aria for kw in review_keywords):
driver.execute_script("arguments[0].click();", tab)
time.sleep(0.4)
break
except:
continue
# Wait for page stability
time.sleep(1.0)
# Find pane
pane = None
try:
wait = WebDriverWait(driver, 3)
pane = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde')))
except TimeoutException:
try:
pane = wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, 'div.m6QErb.WNBkOb.XiKgde')))
except:
print("ERROR: Could not find pane")
return []
# Setup API interceptor
interceptor = GoogleMapsAPIInterceptor(driver)
interceptor.setup_interception()
interceptor.inject_response_interceptor()
time.sleep(0.3)
# Setup scroll
driver.execute_script("window.scrollablePane = arguments[0];", pane)
scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);"
# Trigger initial scroll
driver.execute_script(scroll_script)
time.sleep(0.3)
print(" Fast scrolling for API reviews...")
# Rapid scrolling
target_reviews = 240
max_scrolls = 35
for i in range(max_scrolls):
driver.execute_script(scroll_script)
time.sleep(0.27)
# Collect responses
try:
responses = interceptor.get_intercepted_responses()
if responses:
parsed = interceptor.parse_reviews_from_responses(responses)
for review in parsed:
if review.review_id and review.review_id not in api_reviews:
api_reviews[review.review_id] = {
'review_id': review.review_id,
'author': review.author,
'rating': review.rating,
'text': review.text,
'date_text': review.date_text,
'avatar_url': review.avatar_url,
'profile_url': review.profile_url,
}
if (i + 1) % 10 == 0:
print(f" {len(api_reviews)} reviews...")
if len(api_reviews) >= target_reviews:
break
except:
pass
# Final API collection
try:
responses = interceptor.get_intercepted_responses()
if responses:
parsed = interceptor.parse_reviews_from_responses(responses)
for review in parsed:
if review.review_id and review.review_id not in api_reviews:
api_reviews[review.review_id] = {
'review_id': review.review_id,
'author': review.author,
'rating': review.rating,
'text': review.text,
'date_text': review.date_text,
'avatar_url': review.avatar_url,
'profile_url': review.profile_url,
}
except:
pass
phase1_time = time.time() - start_time
print(f" ✅ Phase 1 complete: {len(api_reviews)} reviews in {phase1_time:.2f}s")
# ====== PHASE 2: DOM PARSING FOR MISSING REVIEWS ======
missing_count = 244 - len(api_reviews)
if missing_count > 0:
print(f"\n[Phase 2] Fast DOM parsing for {missing_count} missing reviews...")
# Scroll to top (missing reviews likely at top)
driver.execute_script("window.scrollablePane.scrollTo(0, 0);", pane)
time.sleep(0.5) # Brief wait for scroll
# Fast JavaScript-based parsing (only first 20 reviews)
dom_reviews = parse_dom_reviews_fast(driver, max_reviews=min(missing_count + 10, 25))
# Add DOM reviews that aren't in API reviews
# Use author + rating + date as key for better duplicate detection
api_keys = set()
for api_review in api_reviews.values():
key = (
api_review.get('author', ''),
api_review.get('rating', 0),
(api_review.get('date_text', '') or '')[:20] # First 20 chars of date
)
api_keys.add(key)
dom_added = 0
for dom_review in dom_reviews:
# Create key for this DOM review
dom_key = (
dom_review.get('author', ''),
dom_review.get('rating', 0),
(dom_review.get('date_text', '') or '')[:20]
)
# Only add if not already in API reviews
if dom_key not in api_keys and dom_review.get('review_id'):
api_reviews[dom_review['review_id']] = dom_review
api_keys.add(dom_key) # Track this to avoid duplicates within DOM too
dom_added += 1
phase2_time = time.time() - start_time - phase1_time
print(f" ✅ Phase 2 complete: +{dom_added} reviews from DOM in {phase2_time:.2f}s")
# ====== RESULTS ======
elapsed = time.time() - start_time
all_reviews = list(api_reviews.values())
print(f"\n{'='*50}")
print(f"✅ COMPLETED!")
print(f"Reviews: {len(all_reviews)}/244 ({len(all_reviews)/244*100:.1f}%)")
print(f"Time: {elapsed:.2f}s")
print(f"Speed: {len(all_reviews)/elapsed:.1f} reviews/sec")
print(f"Speedup: {155/elapsed:.1f}x faster! 🚀")
print(f"{'='*50}")
if len(all_reviews) >= 244:
print(f"🎯 Got ALL 244 reviews!")
elif len(all_reviews) >= 240:
print(f"⚠️ Missing {244-len(all_reviews)} reviews")
else:
print(f"⚠️ Missing {244-len(all_reviews)} reviews - may need more DOM parsing")
print()
# Save
with open('google_reviews_ultra_fast_complete.json', 'w', encoding='utf-8') as f:
json.dump(all_reviews, f, indent=2, ensure_ascii=False)
print(f"💾 Saved to google_reviews_ultra_fast_complete.json")
if all_reviews:
print(f"\nSample: {all_reviews[0]['author']} - {all_reviews[0]['rating']}")
return all_reviews
finally:
try:
driver.quit()
except:
pass
if __name__ == '__main__':
try:
reviews = ultra_fast_complete_scrape()
sys.exit(0 if reviews else 1)
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(1)
except Exception as e:
print(f"ERROR: {e}")
import traceback
traceback.print_exc()
sys.exit(1)