diff --git a/modules/scraper_clean.py b/modules/scraper_clean.py new file mode 100644 index 0000000..49945f0 --- /dev/null +++ b/modules/scraper_clean.py @@ -0,0 +1,500 @@ +""" +Clean Google Maps Reviews Scraper +- Simple down scrolling +- DOM scraping + API interception +""" + +import re +import json +import time +import threading +from selenium.webdriver.common.by import By + + +def parse_api_review(raw: list) -> dict: + """Parse a review from API response array.""" + try: + if not isinstance(raw, list) or len(raw) < 5: + return None + + author = raw[0] if len(raw) > 0 and isinstance(raw[0], str) else "" + timestamp = raw[1] if len(raw) > 1 else "" + text = raw[3] if len(raw) > 3 and isinstance(raw[3], str) else "" + rating = raw[4] if len(raw) > 4 and isinstance(raw[4], int) else 0 + + if not (1 <= rating <= 5): + return None + + # Owner response + owner_response = None + for idx in [9, 18]: + if len(raw) > idx and raw[idx] and isinstance(raw[idx], list): + resp = raw[idx] + if len(resp) > 1: + owner_response = {"text": resp[1], "timestamp": resp[0] if resp[0] else ""} + break + + return { + "author": author, + "text": text, + "rating": rating, + "timestamp": timestamp, + "owner_response": owner_response, + "source": "api" + } + except: + return None + + +def extract_reviews_from_api_body(body: str) -> list: + """Extract reviews from API response body.""" + reviews = [] + try: + # Remove )]}' prefix + if body.startswith(")]}'"): + body = body[4:].strip() + + data = json.loads(body) + + # Recursively find review arrays + def find_reviews(obj, depth=0): + if depth > 12: + return + if isinstance(obj, list): + # Check if this looks like a review + if len(obj) > 4 and isinstance(obj[0], str) and isinstance(obj[4], int): + if 1 <= obj[4] <= 5: + rev = parse_api_review(obj) + if rev and rev["author"]: + reviews.append(rev) + return + for item in obj: + find_reviews(item, depth + 1) + elif isinstance(obj, dict): + for v in obj.values(): + find_reviews(v, depth + 1) + + find_reviews(data) + except: + pass + return reviews + +def parse_dom_review(card) -> dict: + """Parse a review from DOM element.""" + try: + # Get review ID + review_id = card.get_attribute("data-review-id") or "" + if not review_id: + try: + id_el = card.find_element(By.CSS_SELECTOR, "[data-review-id]") + review_id = id_el.get_attribute("data-review-id") or "" + except: + pass + + # Author - multiple selectors + author = "" + for sel in ['div[class*="d4r55"]', '.d4r55', 'button[data-review-id] + div']: + try: + author_el = card.find_element(By.CSS_SELECTOR, sel) + author = author_el.text.strip() + if author: + break + except: + pass + + # Rating from aria-label on span[role="img"] + rating = 0 + try: + stars_el = card.find_element(By.CSS_SELECTOR, 'span[role="img"]') + aria = stars_el.get_attribute("aria-label") or "" + # Extract number from label (handles "5 stars", "5 estrellas", etc.) + num = re.search(r'[\d\.]+', aria.replace(',', '.')) + if num: + rating = int(float(num.group())) + except: + pass + + # Review text - try multiple selectors + text = "" + for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', 'div.MyEned span.wiI7pd', '.wiI7pd']: + try: + text_el = card.find_element(By.CSS_SELECTOR, sel) + text = text_el.text.strip() + if text: + break + except: + pass + + # Click "More" button to expand text if truncated + try: + more_btn = card.find_element(By.CSS_SELECTOR, "button.kyuRq") + if more_btn.is_displayed(): + more_btn.click() + # Re-read text after expanding + for sel in ['span[jsname="bN97Pc"]', 'span[jsname="fbQN7e"]', '.wiI7pd']: + try: + text_el = card.find_element(By.CSS_SELECTOR, sel) + expanded = text_el.text.strip() + if expanded and len(expanded) > len(text): + text = expanded + break + except: + pass + except: + pass + + # Timestamp + timestamp = "" + try: + time_el = card.find_element(By.CSS_SELECTOR, 'span[class*="rsqaWe"]') + timestamp = time_el.text.strip() + except: + pass + + # Owner response + owner_response = None + try: + resp_box = card.find_element(By.CSS_SELECTOR, "div.CDe7pd") + if resp_box: + resp_text = "" + resp_date = "" + try: + resp_text_el = resp_box.find_element(By.CSS_SELECTOR, "div.wiI7pd") + resp_text = resp_text_el.text.strip() + except: + pass + try: + resp_date_el = resp_box.find_element(By.CSS_SELECTOR, "span.DZSIDd") + resp_date = resp_date_el.text.strip() + except: + pass + if resp_text: + owner_response = {"text": resp_text, "timestamp": resp_date} + except: + pass + + if not review_id and not author: + return None + + return { + "id": review_id, + "author": author, + "text": text, + "rating": rating, + "timestamp": timestamp, + "owner_response": owner_response, + "source": "dom" + } + except Exception: + return None + + +def scrape_reviews(driver, url: str, max_reviews: int = 5000, timeout_no_new: int = 15) -> dict: + """ + Scrape Google Maps reviews. + + Args: + driver: Selenium WebDriver instance + url: Google Maps place URL + max_reviews: Maximum reviews to collect + timeout_no_new: Seconds to wait with no new reviews before stopping + + Returns: + dict with reviews list and metadata + """ + + # Storage - use review ID as key + reviews = {} # review_id -> review + + # Force English language + if "hl=" not in url: + url = url + ("&" if "?" in url else "?") + "hl=en" + + # Navigate to URL + print(f"🌐 Loading: {url[:80]}...") + driver.get(url) + time.sleep(3) + + # Handle consent popup if present + if "consent.google" in driver.current_url: + print(" Handling consent popup...") + try: + accept_btns = driver.find_elements(By.CSS_SELECTOR, "button") + for btn in accept_btns: + txt = btn.text.lower() + if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt: + btn.click() + time.sleep(2) + break + except: + pass + time.sleep(2) + + # Click reviews tab if present (multilingual support) + review_keywords = ["review", "reseΓ±a", "avis", "bewertung", "recensione", "opiniones"] + try: + tabs = driver.find_elements(By.CSS_SELECTOR, "button[role='tab']") + for tab in tabs: + tab_text = tab.text.lower() + if any(kw in tab_text for kw in review_keywords): + print(f" Clicking reviews tab: '{tab.text}'") + tab.click() + time.sleep(2) + break + except: + pass + + # Find scrollable reviews container + def find_scroll_container(): + selectors = [ + "div.m6QErb.DxyBCb.kA9KIf.dS8AEf", + "div.m6QErb.DxyBCb.kA9KIf", + "div.m6QErb.DxyBCb", + "div.m6QErb[aria-label]", + "div.DxyBCb.kA9KIf.dS8AEf", + "div[role='main'] div.m6QErb", + ] + for sel in selectors: + try: + els = driver.find_elements(By.CSS_SELECTOR, sel) + for el in els: + if el.is_displayed() and el.size['height'] > 100: + return el + except: + pass + return None + + # Wait and retry for scroll container + scroll_container = None + for attempt in range(10): + scroll_container = find_scroll_container() + if scroll_container: + break + print(f" Waiting for reviews panel... ({attempt+1}/10)") + time.sleep(1) + + if not scroll_container: + print("❌ Could not find reviews scroll container") + # Debug: print page source snippet + try: + print("Page title:", driver.title) + print("Current URL:", driver.current_url[:100]) + except: + pass + return {"reviews": [], "total": 0, "scrolls": 0, "error": "No scroll container found"} + + print("βœ… Found scroll container") + + # Extract total review count from page + total_reviews = None + try: + page_text = driver.page_source + # Look for "XX reviews" pattern + patterns = [ + r'(\d{1,3}(?:,\d{3})*)\s+reviews?', + r'(\d+\.?\d*K)\s+reviews?', + r'(\d{1,3}(?:,\d{3})*)\s+reseΓ±as?', + ] + for pattern in patterns: + matches = re.findall(pattern, page_text, re.IGNORECASE) + if matches: + count_str = matches[0] + if 'K' in count_str.upper(): + total_reviews = int(float(count_str.upper().replace('K', '')) * 1000) + else: + total_reviews = int(count_str.replace(',', '')) + print(f"πŸ“Š Total reviews on page: {total_reviews}") + break + except: + pass + + # Wait for reviews to fully load after tab click + time.sleep(2) + + # PHASE 2: Inject API interceptor for scroll-loaded reviews + print("πŸ”Œ Injecting API interceptor...") + driver.execute_script(""" + if (window.__reviewInterceptorInjected) return; + window.__reviewInterceptorInjected = true; + window.__interceptedResponses = []; + + // Intercept fetch + const originalFetch = window.fetch; + window.fetch = async function(...args) { + const url = args[0].toString(); + const response = await originalFetch.apply(this, args); + if (url.includes('listugcposts') || url.includes('review')) { + try { + const clone = response.clone(); + const text = await clone.text(); + window.__interceptedResponses.push({url: url, body: text}); + } catch(e) {} + } + return response; + }; + + // Intercept XHR + const originalXHR = window.XMLHttpRequest; + window.XMLHttpRequest = function() { + const xhr = new originalXHR(); + const originalOpen = xhr.open; + let reqUrl = ''; + xhr.open = function(method, url, ...rest) { + reqUrl = url; + return originalOpen.apply(this, [method, url, ...rest]); + }; + xhr.addEventListener('load', function() { + if (reqUrl.includes('listugcposts') || reqUrl.includes('review')) { + try { + window.__interceptedResponses.push({url: reqUrl, body: xhr.responseText}); + } catch(e) {} + } + }); + return xhr; + }; + for (let prop of Object.getOwnPropertyNames(originalXHR)) { + try { window.XMLHttpRequest[prop] = originalXHR[prop]; } catch(e) {} + } + """) + + def get_api_reviews(): + """Get reviews from intercepted API responses.""" + api_revs = [] + try: + responses = driver.execute_script(""" + var r = window.__interceptedResponses || []; + window.__interceptedResponses = []; + return r; + """) + for resp in (responses or []): + body = resp.get("body", "") + api_revs.extend(extract_reviews_from_api_body(body)) + except: + pass + return api_revs + + # Store pane in window for scroll thread + driver.execute_script("window.scrollablePane = arguments[0];", scroll_container) + + # Background scroll thread (fast, continuous) + stop_scrolling = threading.Event() + + def scroll_worker(): + while not stop_scrolling.is_set(): + try: + driver.execute_script(""" + var p = window.scrollablePane; + if (p) p.scrollTop = p.scrollHeight; + """) + except: + pass + time.sleep(0.1) # 10x per second + + scroll_thread = threading.Thread(target=scroll_worker, daemon=True) + scroll_thread.start() + + # Main collection loop + last_new_time = time.time() + last_count = len(reviews) + check_num = 0 + + print(f"πŸ”„ Scrolling... (timeout: {timeout_no_new}s with no new)", flush=True) + + while True: + check_num += 1 + time.sleep(1.0) # Check every second + + # Collect from API (doesn't affect scroll) + for rev in get_api_reviews(): + if not any(r.get("author") == rev["author"] for r in reviews.values()): + key = f"api_{rev['author'][:20]}_{rev['rating']}" + reviews[key] = rev + + # Collect review IDs via JavaScript (doesn't affect scroll position!) + try: + review_ids = driver.execute_script(""" + var ids = []; + document.querySelectorAll('[data-review-id]').forEach(function(el) { + ids.push(el.getAttribute('data-review-id')); + }); + return ids; + """) + for rid in (review_ids or []): + if rid and rid not in reviews: + reviews[rid] = {"id": rid, "source": "dom"} + except: + pass + + current_count = len(reviews) + + # Check for new reviews + if current_count > last_count: + last_new_time = time.time() + last_count = current_count + + # Progress update + elapsed = time.time() - last_new_time + if total_reviews: + pct = (current_count / total_reviews) * 100 + print(f" πŸ“Š {current_count}/{total_reviews} ({pct:.0f}%) | idle: {elapsed:.1f}s", flush=True) + else: + print(f" πŸ“Š {current_count} reviews | idle: {elapsed:.1f}s/{timeout_no_new}s", flush=True) + + # Stop conditions + if current_count >= max_reviews: + print(f"βœ… Reached max: {current_count}") + stop_scrolling.set() + break + + if total_reviews and current_count >= total_reviews: + print(f"βœ… Got all {total_reviews} reviews!") + stop_scrolling.set() + break + + if time.time() - last_new_time >= timeout_no_new: + print(f"⏱️ Timeout: no new reviews for {timeout_no_new}s") + stop_scrolling.set() + break + + # Final results + review_list = list(reviews.values()) + dom_count = sum(1 for r in review_list if r.get("source") == "dom") + api_count = sum(1 for r in review_list if r.get("source") == "api") + print(f"\nπŸ“‹ Total: {len(review_list)} reviews (DOM: {dom_count}, API: {api_count})") + + return { + "reviews": review_list, + "total": len(review_list), + "checks": check_num, + "url": url + } + + +# Test function +if __name__ == "__main__": + from seleniumbase import Driver + + # Test URL - 79 reviews + TEST_URL = "https://www.google.com/maps/place/R.+Fleitas+Peluqueros/@28.1302986,-15.4448111,821m/data=!3m1!1e3!4m6!3m5!1s0xc40951a43c21f19:0x85f89601b9909c72!8m2!3d28.1299805!4d-15.4436854!16s%2Fg%2F11gbwtk8c8" + + print("πŸš€ Starting clean scraper test...") + + # Set up driver + driver = Driver(uc=True, headless=False) + driver.set_window_size(1200, 900) + + try: + result = scrape_reviews(driver, TEST_URL, max_reviews=100, timeout_no_new=15) + print(f"\nβœ… Got {result['total']} reviews in {result['checks']} checks") + + # Show sample + if result["reviews"]: + print("\nπŸ“ Sample review:") + sample = result["reviews"][0] + print(f" Author: {sample['author']}") + print(f" Rating: {sample['rating']}⭐") + print(f" Text: {sample['text'][:100]}..." if sample['text'] else " Text: (none)") + + finally: + driver.quit() + print("\n🏁 Done")