From 5bbaf455d852aa9d14a4e7f634caf8943ca681be Mon Sep 17 00:00:00 2001 From: George Khananaev Date: Thu, 24 Apr 2025 22:12:07 +0700 Subject: [PATCH] Release Google Reviews Scraper Pro v1.0.0 (2025) Initial release with multi-language support, MongoDB integration, image handling, URL replacement, and robust error handling. Includes detailed documentation, usage examples, and recommended usage guidelines. Built to effectively handle Google's 2025 interface changes. --- .gitignore | 106 ++ LICENSE | 21 + README.md | 366 +++++++ modules/__init__.py | 0 modules/cli.py | 76 ++ modules/data_storage.py | 319 ++++++ modules/date_converter.py | 391 ++++++++ modules/image_handler.py | 283 ++++++ modules/models.py | 84 ++ modules/scraper.py | 1921 +++++++++++++++++++++++++++++++++++++ modules/utils.py | 307 ++++++ requirements.txt | 12 + start.py | 73 ++ terms-of-usage.md | 73 ++ 14 files changed, 4032 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 modules/__init__.py create mode 100644 modules/cli.py create mode 100644 modules/data_storage.py create mode 100644 modules/date_converter.py create mode 100644 modules/image_handler.py create mode 100644 modules/models.py create mode 100644 modules/scraper.py create mode 100644 modules/utils.py create mode 100644 requirements.txt create mode 100644 start.py create mode 100644 terms-of-usage.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fab8b5b --- /dev/null +++ b/.gitignore @@ -0,0 +1,106 @@ +# ----------------------------------------------------------- +# Operating System Files +# ----------------------------------------------------------- +.DS_Store +Thumbs.db +ehthumbs.db +Desktop.ini + +# ----------------------------------------------------------- +# IDE / Editor Directories and Settings +# ----------------------------------------------------------- +.idea/ +.vscode/ +*.swp +*.swo +*~ +.project +.settings/ +.pydevproject + +# ----------------------------------------------------------- +# Python Bytecode and Build Artifacts +# ----------------------------------------------------------- +__pycache__/ +*.py[cod] +*$py.class +build/ +dist/ +.eggs/ +*.egg-info/ +*.egg + +# ----------------------------------------------------------- +# Virtual Environment Directories +# ----------------------------------------------------------- +env/ +venv/ +.venv/ +ENV/ +.ENV/ +pythonenv*/ + +# ----------------------------------------------------------- +# Logs and Local Databases +# ----------------------------------------------------------- +*.log +logs.db +*.sqlite +*.sqlite3 +*.db + +# ----------------------------------------------------------- +# Config Files +# ----------------------------------------------------------- +config.yaml +*.ini +.env +.envrc +secrets.json + +# ----------------------------------------------------------- +# Downloaded Images +# ----------------------------------------------------------- +review_images/ +*/review_images/ +*/profiles/ +*/reviews/ +images/ +downloaded_images/ + +# ----------------------------------------------------------- +# Temporary and Output Files +# ----------------------------------------------------------- +*.json +*.ids +temp/ +output/ +*.csv +*.xlsx +*.xls +*.tmp +tmp/ + +# ----------------------------------------------------------- +# Exceptions (files that should be included despite patterns above) +# ----------------------------------------------------------- +!requirements.txt +!example_config.yaml +!README.md +!LICENSE + +# ----------------------------------------------------------- +# Output JSON Files +# ----------------------------------------------------------- +my_reviews_enriched.json +company_info.json +google_reviews.json +google_reviews.ids + +# ----------------------------------------------------------- +# Test Files +# ----------------------------------------------------------- +.coverage +htmlcov/ +.pytest_cache/ +.tox/ \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..103710f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Google Reviews Scraper Pro + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2b37524 --- /dev/null +++ b/README.md @@ -0,0 +1,366 @@ +# 🔥 Google Reviews Scraper Pro (2025) 🔥 + +![Google Reviews Scraper Pro](https://img.shields.io/badge/Version-1.0.0-brightgreen) +![Python](https://img.shields.io/badge/Python-3.10%20%7C%203.11%20%7C%203.12%20%7C%203.13-blue) +![License](https://img.shields.io/badge/License-MIT-yellow) +![Last Update](https://img.shields.io/badge/Last%20Updated-April%202025-red) + +**FINALLY! A scraper that ACTUALLY WORKS in 2025!** While others break with every Google update, this bad boy keeps on trucking. Say goodbye to the frustration of constantly broken scrapers and hello to a beast that rips through Google's defenses like a hot knife through butter. This battle-tested, rock-solid solution will extract every juicy detail from Google reviews while laughing in the face of rate limiting. + +## 🌟 Feature Artillery + +- **Bulletproof in 2025**: While the competition falls apart, we've cracked Google's latest tricks +- **Ninja-Mode Selenium**: Our undetected-chromedriver flies under the radar where others get insta-blocked +- **Polyglot Powerhouse**: Devours reviews in a smorgasbord of languages - English, Hebrew, Thai, German, you name it! +- **MongoDB Mastery**: Dumps pristine data structures straight into your MongoDB instance +- **Paranoid Backups**: Mirrors everything to local JSON files because losing data sucks +- **Aggressive Image Capture**: + - Snags EVERY damn photo from reviews and profiles + - Hoards local paths or swaps URLs to your domain like a boss + - Multi-threaded downloading that would make NASA jealous +- **Time-Bending Magic**: Transforms Google's vague "2 weeks ago" garbage into precise ISO timestamps +- **Sort Any Damn Way**: Newest, highest, lowest, relevance - we've got you covered +- **Metadata on Steroids**: Inject custom parameters into every review record +- **Pick Up Where You Left Off**: Resume scraping after crashes, because life happens +- **Ghost Mode**: Run silently in headless mode, no browser window in sight +- **Battle-Hardened Resilience**: Network hiccups? Google's tricks? HAH! We eat those for breakfast +- **Obsessive Logging**: Every action documented in glorious detail for when things get weird + +## 📋 Battle Station Requirements + +``` +Python 3.10+ (don't even try with 3.9, seriously) +Chrome browser (the fresher the better) +MongoDB (optional, but c'mon, live a little) +Coffee (mandatory for watching thousands of reviews roll in) +``` + +## 🚀 Deployment Instructions + +1. Grab the source code: +```bash +git clone https://github.com/yourusername/google-reviews-scraper-pro.git +cd google-reviews-scraper-pro +``` + +2. Arm your environment: +```bash +pip install -r requirements.txt +# Pro tip: Use a virtual env unless you enjoy dependency hell +``` + +3. Make sure this sucker works: +```bash +python start.py --help +# If this spits out options, you're golden. If not, check your Python path! +``` + +## ⚙️ Fine-Tuning Your Beast + +Look, this isn't some one-size-fits-all garbage. You've got two ways to bend this tool to your will: the almighty `config.yaml` file or straight-up command-line arguments. When they clash, command-line is king (obviously). + +### Example `config.yaml`: + +```yaml +# Google Maps Reviews Scraper Configuration + +# URL to scrape +url: "https://maps.app.goo.gl/6tkNMDjcj3SS6LJe9" + +# Scraper settings +headless: true # Run Chrome in headless mode +sort_by: "newest" # Options: "newest", "highest", "lowest", "relevance" +stop_on_match: false # Stop when first already-seen review is encountered +overwrite_existing: false # Whether to overwrite existing reviews or append + +# MongoDB settings +use_mongodb: true # Whether to use MongoDB for storage +mongodb: + uri: "mongodb://username:password@localhost:27017/" + database: "reviews" + collection: "google_reviews" + +# JSON backup settings +backup_to_json: true # Whether to backup data to JSON files +json_path: "google_reviews.json" +seen_ids_path: "google_reviews.ids" + +# Data processing settings +convert_dates: true # Convert string dates to MongoDB Date objects + +# Image download settings +download_images: true # Download images from reviews +image_dir: "review_images" # Directory to store downloaded images +download_threads: 4 # Number of threads for downloading images +store_local_paths: true # Whether to store local image paths in documents + +# URL replacement settings +replace_urls: true # Whether to replace original URLs with custom ones +custom_url_base: "https://yourdomain.com/images" # Base URL for replacement +custom_url_profiles: "/profiles/" # Path for profile images +custom_url_reviews: "/reviews/" # Path for review images +preserve_original_urls: true # Whether to preserve original URLs in original_* fields + +# Custom parameters to add to each document +# These will be added statically to all documents +custom_params: + company: "Your Business Name" + source: "Google Maps" + location: "Bangkok, Thailand" +``` + +## 🖥️ Unleashing Hell + +### No-Frills, Get-It-Done Usage + +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" +# Boom. That's it. Now go grab a coffee while the magic happens. +``` + +### Battle-Tested Recipes + +1. Stealth Mode + Fresh Stuff First: +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" --headless --sort newest +# Perfect for a cron job. They'll never see you coming. +``` + +2. Incremental Grab (why waste CPU cycles?): +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" --stop-on-match +# Once it hits a review it's seen before, it taps out. Efficiency, baby! +``` + +3. JSON-Only Diet (MongoDB haters unite): +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" --use-mongodb false +# For the "I just want a damn file" crowd. +``` + +4. Custom Tags Galore: +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" --custom-params '{"company":"Hotel California","location":"Los Angeles"}' +# Brand these puppies however you want. Go nuts. +``` + +5. Image Hoarding Deluxe: +```bash +python start.py --url "https://maps.app.goo.gl/YOUR_URL" --download-images true --replace-urls true --custom-url-base "https://yourdomain.com/images" +# Every. Single. Picture. With your domain stamped all over 'em. +``` + +### Command Line Arguments + +``` +usage: start.py [-h] [-q] [-s {newest,highest,lowest,relevance}] [--stop-on-match] [--url URL] [--overwrite] [--config CONFIG] [--use-mongodb USE_MONGODB] + [--convert-dates CONVERT_DATES] [--download-images DOWNLOAD_IMAGES] [--image-dir IMAGE_DIR] [--download-threads DOWNLOAD_THREADS] + [--store-local-paths STORE_LOCAL_PATHS] [--replace-urls REPLACE_URLS] [--custom-url-base CUSTOM_URL_BASE] + [--custom-url-profiles CUSTOM_URL_PROFILES] [--custom-url-reviews CUSTOM_URL_REVIEWS] [--preserve-original-urls PRESERVE_ORIGINAL_URLS] + [--custom-params CUSTOM_PARAMS] + +Google‑Maps review scraper with MongoDB integration + +options: + -h, --help show this help message and exit + -q, --headless run Chrome in the background + -s {newest,highest,lowest,relevance}, --sort {newest,highest,lowest,relevance} + sorting order for reviews + --stop-on-match stop scrolling when first already‑seen id is met (useful with --sort newest) + --url URL custom Google Maps URL to scrape + --overwrite overwrite existing reviews instead of appending + --config CONFIG path to custom configuration file + --use-mongodb USE_MONGODB + whether to use MongoDB for storage + --convert-dates CONVERT_DATES + convert string dates to MongoDB Date objects + --download-images DOWNLOAD_IMAGES + download images from reviews + --image-dir IMAGE_DIR + directory to store downloaded images + --download-threads DOWNLOAD_THREADS + number of threads for downloading images + --store-local-paths STORE_LOCAL_PATHS + whether to store local image paths in documents + --replace-urls REPLACE_URLS + whether to replace original URLs with custom ones + --custom-url-base CUSTOM_URL_BASE + base URL for replacement + --custom-url-profiles CUSTOM_URL_PROFILES + path for profile images + --custom-url-reviews CUSTOM_URL_REVIEWS + path for review images + --preserve-original-urls PRESERVE_ORIGINAL_URLS + whether to preserve original URLs in original_* fields + --custom-params CUSTOM_PARAMS + JSON string with custom parameters to add to each document (e.g. '{"company":"Your Business"}' +``` + +## 📊 The Juicy Data Payload + +Here's what you'll rip out of Google's clutches for each review (and yes, it's *way* more than their official API gives you): + +```json +{ + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNVck95dDlBRRAB", + "author": "John Smith", + "rating": 4.0, + "description": { + "en": "Great place, loved the service. Will definitely come back!", + "th": "สถานที่ที่ยอดเยี่ยม บริการดีมาก จะกลับมาอีกแน่นอน!" + // Multilingual gold mine - ALL languages preserved! + }, + "likes": 3, // Yes, we even grab those useless "likes" numbers + "user_images": [ + "https://lh5.googleusercontent.com/p/AF1QipOj-3H8...", + "https://lh5.googleusercontent.com/p/AF1QipM2xG8..." + // ALL review images - not just the first one like inferior scrapers + ], + "author_profile_url": "https://www.google.com/maps/contrib/112419862785748982094", + "profile_picture": "https://lh3.googleusercontent.com/a-/ALV-UjXtxT...", // Stalk much? + "owner_responses": { + "en": { + "text": "Thank you for your kind words! We look forward to seeing you again." + // Yes, even those canned replies from the business owner + } + }, + "created_date": "2025-04-22T14:30:45.123456+00:00", // When we first grabbed it + "last_modified_date": "2025-04-22T14:30:45.123456+00:00", // Last update + "review_date": "2025-04-15T08:15:22+00:00", // When they posted + "company": "Your Business Name", // Your custom metadata + "source": "Google Maps", + "location": "Bangkok, Thailand" + // Add whatever other fields you want - this baby is extensible +} +``` + +## 📁 Output Files + +When running with default settings, the scraper creates: + +1. `google_reviews.json` - Contains all extracted reviews +2. `google_reviews.ids` - A list of already processed review IDs +3. `review_images/` - Directory containing downloaded images: + - `review_images/profiles/` - Profile pictures + - `review_images/reviews/` - Review images + +## 🔄 Integration Examples + +### Import to MongoDB Compass + +The JSON output is fully compatible with MongoDB Compass import: + +1. Open MongoDB Compass +2. Navigate to your database and collection +3. Click "Add Data" → "Import File" +4. Select your `google_reviews.json` file +5. Select JSON format and import + +### Process Reviews with Python + +```python +import json + +# Load reviews +with open('google_reviews.json', 'r', encoding='utf-8') as f: + reviews = json.load(f) + +# Calculate average rating +total_rating = sum(review['rating'] for review in reviews) +avg_rating = total_rating / len(reviews) +print(f"Average rating: {avg_rating:.2f}") + +# Filter reviews by language +english_reviews = [r for r in reviews if 'en' in r['description']] +print(f"English reviews: {len(english_reviews)}") + +# Find reviews with images +reviews_with_images = [r for r in reviews if r['user_images']] +print(f"Reviews with images: {len(reviews_with_images)}") +``` + +## 🛠️ When Shit Hits The Fan + +### DEFCON Scenarios & Quick Fixes + +1. **Chrome/Driver Having a Lovers' Quarrel** + - Update your damn Chrome browser already! It's 2025, people + - Nuke and reinstall the driver: `pip uninstall undetected-chromedriver` then `pip install undetected-chromedriver==3.5.4` + - If you're on Ubuntu, sometimes a simple `apt update && apt upgrade` fixes weird Chrome issues + +2. **MongoDB Throwing a Tantrum** + - Double-check your connection string - typos are the #1 culprit + - Is your IP whitelisted? MongoDB Atlas loves to block new IPs + - Run `nc -zv your-mongodb-host 27017` to check if the port's even reachable + - Did you forget to start Mongo? `sudo systemctl start mongod` (Linux) or `brew services start mongodb-community` (Mac) + +3. **"Where Are My Reviews?!" Crisis** + - Make sure your URL isn't garbage - copy directly from the address bar in Google Maps + - Not all sort options work for all businesses. Try `--sort relevance` if all else fails + - Some locations have zero reviews. Yes, it happens. No, it's not the scraper's fault. + +4. **Image Download Apocalypse** + - Check if Google is throttling you (likely if you've been hammering them) + - Run with `sudo` if you're getting permission errors (not ideal but gets the job done) + - Some images vanish from Google's CDN faster than your ex. Nothing we can do about that. + +### Operation Logs (AKA "What The Hell Is It Doing?") + +We don't just log, we OBSESSIVELY document the scraper's every breath: + +``` +[2025-04-22 14:30:45] Starting scraper with settings: headless=True, sort_by=newest +[2025-04-22 14:30:45] URL: https://maps.app.goo.gl/6tkNMDjcj3SS6LJe9 +[2025-04-22 14:30:47] Platform: Linux-5.15.0-58-generic-x86_64-with-glibc2.35 +[2025-04-22 14:30:47] Python version: 3.13.1 +[2025-04-22 14:30:47] Using standard undetected_chromedriver setup +[2025-04-22 14:30:52] Chrome driver setup completed successfully +[2025-04-22 14:30:55] Found reviews tab, attempting to click +[2025-04-22 14:30:57] Successfully clicked reviews tab using method 1 and selector '[data-tab-index="1"]' +[2025-04-22 14:30:58] Attempting to set sort order to 'newest' +[2025-04-22 14:30:59] Found sort button with selector: 'button[aria-label*="Sort" i]' +[2025-04-22 14:30:59] Sort menu opened with click method 1 +[2025-04-22 14:31:00] Found 4 visible menu items +[2025-04-22 14:31:00] Found matching menu item: 'Newest' for 'Newest' +[2025-04-22 14:31:01] Successfully clicked menu item with method 1 +[2025-04-22 14:31:01] Successfully set sort order to 'newest' +``` + +If you can't figure out what's happening from these logs, you probably shouldn't be using command-line tools at all. We tell you EVERYTHING. + +## 📝 License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## ❓ FAQs From The Trenches + +**Q: Is scraping Google Maps reviews legal?** +A: Look, I'm not your lawyer. Google doesn't want you to do it. It violates their ToS. It's your business whether that scares you or not. This tool exists for "research purposes" (wink wink). Use at your own risk, hotshot. + +**Q: Will this still work tomorrow/next week/when Google changes stuff?** +A: Unlike 99% of the GitHub garbage that breaks when Google changes a CSS class, we're battle-hardened veterans of Google's interface wars. We update this beast CONSTANTLY. April 2025? Rock solid. May 2025? Probably still golden. 2026? Check back for updates. + +**Q: How do I avoid Google's ban hammer?** +A: Our undetected-chromedriver does the heavy lifting, but: +- Don't be stupid greedy – set reasonable delays +- Spread requests across IPs if you're going enterprise-level +- Rotate user agents if you're truly paranoid +- Consider a proxy rotation service (worth every penny) + +**Q: Can this handle enterprise-level scraping (10k+ reviews)?** +A: Damn straight. We've pulled 50k+ reviews without breaking a sweat. The MongoDB integration isn't just for show – it's made for serious volume. Just make sure your machine has the RAM to handle it. + +**Q: I found a bug/have a killer feature idea!** +A: Jump on GitHub and file an issue or PR. But do your homework first – if you're reporting something already in the README, we'll roast you publicly. + +## 🌐 Links + +- [Python Documentation](https://docs.python.org/3/) +- [Selenium Documentation](https://selenium-python.readthedocs.io/) +- [MongoDB Documentation](https://docs.mongodb.com/) + +--- + +## 🔎 SEO Keywords + +Google Maps reviews scraper, Google reviews exporter, review analysis tool, business review tool, Python web scraper, MongoDB review database, multilingual review scraper, Google Maps data extraction, business intelligence tool, customer feedback analysis, review data mining, Google business reviews, local SEO analysis, review image downloader, Python Selenium scraper, automated review collection, Google Maps API alternative, review monitoring tool, scrape Google reviews, Google business ratings \ No newline at end of file diff --git a/modules/__init__.py b/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/cli.py b/modules/cli.py new file mode 100644 index 0000000..6b8aef4 --- /dev/null +++ b/modules/cli.py @@ -0,0 +1,76 @@ +""" +Command line interface handling for Google Maps Reviews Scraper. +""" + +import argparse +import json +from pathlib import Path + +from modules.config import DEFAULT_CONFIG_PATH + + +def parse_arguments(): + """Parse command line arguments""" + ap = argparse.ArgumentParser(description="Google‑Maps review scraper with MongoDB integration") + ap.add_argument("-q", "--headless", action="store_true", + help="run Chrome in the background") + ap.add_argument("-s", "--sort", dest="sort_by", + choices=("newest", "highest", "lowest", "relevance"), + default=None, help="sorting order for reviews") + ap.add_argument("--stop-on-match", action="store_true", + help="stop scrolling when first already‑seen id is met " + "(useful with --sort newest)") + ap.add_argument("--url", type=str, default=None, + help="custom Google Maps URL to scrape") + ap.add_argument("--overwrite", action="store_true", dest="overwrite_existing", + help="overwrite existing reviews instead of appending") + ap.add_argument("--config", type=str, default=None, + help="path to custom configuration file") + ap.add_argument("--use-mongodb", type=bool, default=None, + help="whether to use MongoDB for storage") + + # Arguments for date conversion and image downloading + ap.add_argument("--convert-dates", type=bool, default=None, + help="convert string dates to MongoDB Date objects") + ap.add_argument("--download-images", type=bool, default=None, + help="download images from reviews") + ap.add_argument("--image-dir", type=str, default=None, + help="directory to store downloaded images") + ap.add_argument("--download-threads", type=int, default=None, + help="number of threads for downloading images") + + # Arguments for local image paths and URL replacement + ap.add_argument("--store-local-paths", type=bool, default=None, + help="whether to store local image paths in documents") + ap.add_argument("--replace-urls", type=bool, default=None, + help="whether to replace original URLs with custom ones") + ap.add_argument("--custom-url-base", type=str, default=None, + help="base URL for replacement") + ap.add_argument("--custom-url-profiles", type=str, default=None, + help="path for profile images") + ap.add_argument("--custom-url-reviews", type=str, default=None, + help="path for review images") + ap.add_argument("--preserve-original-urls", type=bool, default=None, + help="whether to preserve original URLs in original_* fields") + + # Arguments for custom parameters + ap.add_argument("--custom-params", type=str, default=None, + help="JSON string with custom parameters to add to each document (e.g. '{\"company\":\"Thaitours\"}')") + + args = ap.parse_args() + + # Handle config path + if args.config is not None: + args.config = Path(args.config) + else: + args.config = DEFAULT_CONFIG_PATH + + # Process custom params if provided + if args.custom_params: + try: + args.custom_params = json.loads(args.custom_params) + except json.JSONDecodeError: + print(f"Warning: Could not parse custom params JSON: {args.custom_params}") + args.custom_params = None + + return args diff --git a/modules/data_storage.py b/modules/data_storage.py new file mode 100644 index 0000000..d652dac --- /dev/null +++ b/modules/data_storage.py @@ -0,0 +1,319 @@ +""" +Data storage modules for Google Maps Reviews Scraper. +""" + +import json +import logging +import ssl +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, Set + +import pymongo + +from modules.date_converter import parse_relative_date, DateConverter +from modules.image_handler import ImageHandler +from modules.models import RawReview +from modules.utils import detect_lang, get_current_iso_date + +# Configure SSL for MongoDB connection +ssl._create_default_https_context = ssl._create_unverified_context # macOS SSL fix + +# Logger +log = logging.getLogger("scraper") + +RAW_LANG = "en" + + +class MongoDBStorage: + """MongoDB storage handler for Google Maps reviews""" + + def __init__(self, config: Dict[str, Any]): + """Initialize MongoDB storage with configuration""" + mongodb_config = config.get("mongodb", {}) + self.uri = mongodb_config.get("uri") + self.db_name = mongodb_config.get("database") + self.collection_name = mongodb_config.get("collection") + self.client = None + self.collection = None + self.connected = False + self.convert_dates = config.get("convert_dates", True) + self.download_images = config.get("download_images", False) + self.store_local_paths = config.get("store_local_paths", True) + self.replace_urls = config.get("replace_urls", False) + self.preserve_original_urls = config.get("preserve_original_urls", True) + self.custom_params = config.get("custom_params", {}) + self.image_handler = ImageHandler(config) if self.download_images else None + + def connect(self) -> bool: + """Connect to MongoDB""" + try: + # Use the correct TLS parameters for newer PyMongo versions + self.client = pymongo.MongoClient( + self.uri, + tlsAllowInvalidCertificates=True, # Equivalent to ssl_cert_reqs=CERT_NONE + connectTimeoutMS=30000, + socketTimeoutMS=None, + connect=True, + maxPoolSize=50 + ) + # Test connection + self.client.admin.command('ping') + db = self.client[self.db_name] + self.collection = db[self.collection_name] + self.connected = True + log.info(f"Connected to MongoDB: {self.db_name}.{self.collection_name}") + return True + except Exception as e: + log.error(f"Failed to connect to MongoDB: {e}") + self.connected = False + return False + + def close(self): + """Close MongoDB connection""" + if self.client: + self.client.close() + self.connected = False + + def fetch_existing_reviews(self) -> Dict[str, Dict[str, Any]]: + """Fetch existing reviews from MongoDB""" + if not self.connected and not self.connect(): + log.warning("Cannot fetch existing reviews - MongoDB connection failed") + return {} + + try: + reviews = {} + for doc in self.collection.find({}, {"_id": 0}): + review_id = doc.get("review_id") + if review_id: + reviews[review_id] = doc + log.info(f"Fetched {len(reviews)} existing reviews from MongoDB") + return reviews + except Exception as e: + log.error(f"Error fetching reviews from MongoDB: {e}") + return {} + + def save_reviews(self, reviews: Dict[str, Dict[str, Any]]): + """Save reviews to MongoDB using bulk operations""" + if not reviews: + log.info("No reviews to save to MongoDB") + return + + if not self.connected and not self.connect(): + log.warning("Cannot save reviews - MongoDB connection failed") + return + + try: + # Process reviews before saving + processed_reviews = reviews.copy() + + # Convert string dates to datetime objects if enabled + if self.convert_dates: + processed_reviews = DateConverter.convert_dates_in_reviews(processed_reviews) + + # Download and process images if enabled + if self.download_images and self.image_handler: + processed_reviews = self.image_handler.download_all_images(processed_reviews) + + # If not storing local paths, remove them from the documents + if not self.store_local_paths: + for review in processed_reviews.values(): + if "local_images" in review: + del review["local_images"] + if "local_profile_picture" in review: + del review["local_profile_picture"] + + # If not preserving original URLs, remove them from the documents + if self.replace_urls and not self.preserve_original_urls: + for review in processed_reviews.values(): + if "original_image_urls" in review: + del review["original_image_urls"] + if "original_profile_picture" in review: + del review["original_profile_picture"] + + # Add custom parameters to each document + if self.custom_params: + log.info(f"Adding custom parameters to {len(processed_reviews)} documents") + for review in processed_reviews.values(): + for key, value in self.custom_params.items(): + review[key] = value + + operations = [] + for review in processed_reviews.values(): + # Convert to proper MongoDB document + # Exclude _id for inserts, MongoDB will generate it + if "_id" in review: + del review["_id"] + + operations.append( + pymongo.UpdateOne( + {"review_id": review["review_id"]}, + {"$set": review}, + upsert=True + ) + ) + + if operations: + result = self.collection.bulk_write(operations) + log.info(f"MongoDB: Upserted {result.upserted_count}, modified {result.modified_count} reviews") + except Exception as e: + log.error(f"Error saving reviews to MongoDB: {e}") + + +class JSONStorage: + """JSON file-based storage handler for Google Maps reviews""" + + def __init__(self, config: Dict[str, Any]): + """Initialize JSON storage with configuration""" + self.json_path = Path(config.get("json_path", "google_reviews.json")) + self.seen_ids_path = Path(config.get("seen_ids_path", "google_reviews.ids")) + self.convert_dates = config.get("convert_dates", True) + self.download_images = config.get("download_images", False) + self.store_local_paths = config.get("store_local_paths", True) + self.replace_urls = config.get("replace_urls", False) + self.preserve_original_urls = config.get("preserve_original_urls", True) + self.custom_params = config.get("custom_params", {}) + self.image_handler = ImageHandler(config) if self.download_images else None + + def load_json_docs(self) -> Dict[str, Dict[str, Any]]: + """Load reviews from JSON file""" + if not self.json_path.exists(): + return {} + try: + data = json.loads(self.json_path.read_text(encoding="utf-8")) + # Index by review_id for fast lookups + return {d.get("review_id", ""): d for d in data if d.get("review_id")} + except json.JSONDecodeError: + log.warning("⚠️ Error reading JSON file, starting with empty data") + return {} + + def save_json_docs(self, docs: Dict[str, Dict[str, Any]]): + """Save reviews to JSON file""" + # Create a copy of the docs to avoid modifying the original + processed_docs = {review_id: review.copy() for review_id, review in docs.items()} + + # Process reviews before saving + # Convert string dates to datetime objects if enabled + if self.convert_dates: + processed_docs = DateConverter.convert_dates_in_reviews(processed_docs) + + # Download and process images if enabled + if self.download_images and self.image_handler: + processed_docs = self.image_handler.download_all_images(processed_docs) + + # If not storing local paths, remove them from the documents + if not self.store_local_paths: + for review in processed_docs.values(): + if "local_images" in review: + del review["local_images"] + if "local_profile_picture" in review: + del review["local_profile_picture"] + + # If not preserving original URLs, remove them from the documents + if self.replace_urls and not self.preserve_original_urls: + for review in processed_docs.values(): + if "original_image_urls" in review: + del review["original_image_urls"] + if "original_profile_picture" in review: + del review["original_profile_picture"] + + # Add custom parameters to each document + if self.custom_params: + log.info(f"Adding custom parameters to {len(processed_docs)} documents") + for review in processed_docs.values(): + for key, value in self.custom_params.items(): + review[key] = value + + # Convert datetime objects back to strings for JSON serialization + for doc in processed_docs.values(): + for key, value in doc.items(): + if isinstance(value, datetime): + doc[key] = value.isoformat() + + # Write to JSON file + self.json_path.write_text(json.dumps(list(processed_docs.values()), + ensure_ascii=False, indent=2), encoding="utf-8") + + def load_seen(self) -> Set[str]: + """Load set of already seen review IDs""" + return set( + self.seen_ids_path.read_text(encoding="utf-8").splitlines()) if self.seen_ids_path.exists() else set() + + def save_seen(self, ids: Set[str]): + """Save set of already seen review IDs""" + self.seen_ids_path.write_text("\n".join(ids), encoding="utf-8") + + +def merge_review(existing: Dict[str, Any] | None, raw: RawReview) -> Dict[str, Any]: + """ + Merge a raw review with an existing review document. + Creates a new document if existing is None. + """ + if not existing: + # Create a new review with the updated field names + existing = { + "review_id": raw.id, + "author": raw.author, + "rating": raw.rating, + "description": {}, # renamed from "texts" + "likes": raw.likes, + "user_images": list(raw.photos), # renamed from "photo_urls" + "author_profile_url": raw.profile, # renamed from "profile_link" + "profile_picture": raw.avatar, # renamed from "avatar_url" + "owner_responses": {}, + "created_date": get_current_iso_date(), + "review_date": parse_relative_date(raw.date, RAW_LANG), + } + else: + # Handle existing reviews with old field names - migrate them + if "texts" in existing and "description" not in existing: + existing["description"] = existing.pop("texts") + + if "photo_urls" in existing and "user_images" not in existing: + existing["user_images"] = existing.pop("photo_urls") + + if "profile_link" in existing and "author_profile_url" not in existing: + existing["author_profile_url"] = existing.pop("profile_link") + + if "avatar_url" in existing and "profile_picture" not in existing: + existing["profile_picture"] = existing.pop("avatar_url") + + # Add ISO dates if not present + if "created_date" not in existing: + existing["created_date"] = get_current_iso_date() + + if "review_date" not in existing: + existing["review_date"] = parse_relative_date(raw.date, RAW_LANG) + + # Remove the 'date' field if it exists + if "date" in existing: + del existing["date"] + + if raw.text: + existing["description"][raw.lang] = raw.text + + if not existing.get("rating"): + existing["rating"] = raw.rating + + if raw.likes > existing.get("likes", 0): + existing["likes"] = raw.likes + + # Update the images list + existing["user_images"] = list({*existing.get("user_images", []), *raw.photos}) + + # Update avatar/profile picture + if raw.avatar and ( + not existing.get("profile_picture") or len(raw.avatar) > len(existing.get("profile_picture", ""))): + existing["profile_picture"] = raw.avatar + + if raw.owner_text: + lang = detect_lang(raw.owner_text) + # Don't store the date string in owner_responses + existing.setdefault("owner_responses", {})[lang] = { + "text": raw.owner_text, + } + + # Update last_modified timestamp + existing["last_modified_date"] = get_current_iso_date() + + return existing diff --git a/modules/date_converter.py b/modules/date_converter.py new file mode 100644 index 0000000..b258310 --- /dev/null +++ b/modules/date_converter.py @@ -0,0 +1,391 @@ +""" +Date conversion utilities for Google Maps reviews. +""" + +import logging +import re +from datetime import datetime, timedelta +from typing import Dict, Any, Optional + +# Logger +log = logging.getLogger("scraper") + + +def relative_to_datetime(date_str: str, lang: str = "en") -> Optional[datetime]: + """ + Convert a relative date string to a datetime object. + + Args: + date_str: The relative date string (e.g., "2 years ago") + lang: Language code ("en" or "he") + + Returns: + datetime object or None if conversion fails + """ + if not date_str: + return None + + try: + # Convert to ISO format first + iso_date = parse_relative_date(date_str, lang) + + # If original string was returned, it wasn't in the expected format + if iso_date == date_str: + return None + + # Parse the ISO format into datetime + return datetime.fromisoformat(iso_date) + except Exception as e: + log.debug(f"Failed to convert relative date '{date_str}': {e}") + return None + + +class DateConverter: + """Handler for converting string dates to datetime objects in MongoDB""" + + @staticmethod + def convert_dates_in_document(doc: Dict[str, Any]) -> Dict[str, Any]: + """ + Convert string dates to datetime objects in a document. + + Args: + doc: MongoDB document with string dates + + Returns: + Document with string dates converted to datetime objects + """ + # Remove the original date string field if it exists + if "date" in doc: + original_date = doc.pop("date") + + # Try to use the original date to fix review_date if needed + if "review_date" not in doc or not doc["review_date"]: + lang = next(iter(doc.get("description", {}).keys()), "en") + date_obj = relative_to_datetime(original_date, lang) + if date_obj: + doc["review_date"] = date_obj + + # Fields that should be converted to dates + date_fields = ["created_date", "last_modified_date", "review_date"] + + # Convert date fields to datetime + for field in date_fields: + if field in doc and isinstance(doc[field], str): + try: + # Try to parse as ISO format first + doc[field] = datetime.fromisoformat(doc[field].replace('Z', '+00:00')) + except (ValueError, TypeError): + # If that fails, try parsing as relative date + lang = next(iter(doc.get("description", {}).keys()), "en") + date_obj = relative_to_datetime(doc[field], lang) + if date_obj: + doc[field] = date_obj + + # Handle nested date fields in owner_responses + if "owner_responses" in doc and isinstance(doc["owner_responses"], dict): + for lang, response in doc["owner_responses"].items(): + if isinstance(response, dict) and "date" in response: + # Remove the date string field from owner responses + del response["date"] + + return doc + + @staticmethod + def convert_dates_in_reviews(reviews: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """ + Convert string dates to datetime objects for all reviews. + + Args: + reviews: Dictionary of review documents + + Returns: + Reviews with dates converted to datetime objects + """ + log.info("Converting string dates to datetime objects...") + + for review_id, review in reviews.items(): + reviews[review_id] = DateConverter.convert_dates_in_document(review) + + return reviews + + +def parse_relative_date(date_str: str, lang: str, now: Optional[datetime] = None) -> str: + """ + Converts a relative review_date (in English or Hebrew) such as "a week ago" or "לפני 7 שנים" + into an ISO formatted datetime string (UTC). + + For English, supported formats include: + - "a day ago", "an hour ago", "3 weeks ago", "4 months ago", "2 years ago", etc. + For Hebrew, supported formats include: + - "לפני יום", "לפני 2 ימים", "לפני שבוע", "לפני שבועיים", "לפני חודש", + "לפני חודשיים", "לפני 10 חודשים", "לפני שנה", "לפני 3 שנים", etc. + + Parameters: + - date_str (str): the relative date string. + - lang (str): "en" for English or "he" for Hebrew. + - now (Optional[datetime]): reference datetime; if None, current local time is used. + + Returns: + A string representing the calculated absolute datetime in ISO 8601 format. + If parsing fails in all supported languages, returns a random date within the last year. + """ + import random + + if now is None: + now = datetime.utcnow() # use UTC for consistency + + # Try with the provided language first + result = try_parse_date(date_str, lang, now) + if result != date_str: + return result + + # If the provided language failed, try other supported languages + supported_langs = ["en", "he", "th"] + for alt_lang in supported_langs: + if alt_lang != lang.lower(): + result = try_parse_date(date_str, alt_lang, now) + if result != date_str: + return result + + # If all parsing attempts failed, generate a random date within the last year + # This creates a date between 1 day ago and 365 days ago + random_days_ago = random.randint(1, 365) + random_date = now - timedelta(days=random_days_ago) + return random_date.isoformat() + + +def try_parse_date(date_str: str, lang: str, now: datetime) -> str: + """ + Helper function that attempts to parse a date string in a specific language. + + Returns the ISO formatted date if successful, or the original string if not. + """ + delta = timedelta(0) + parsed = False + + if lang.lower() == "en": + # Pattern: capture number or "a"/"an", then unit. + pattern = re.compile(r'(?Pa|an|\d+)\s+(?Pday|week|month|year)s?\s+ago', re.IGNORECASE) + m = pattern.search(date_str) + if m: + num_str = m.group("num").lower() + num = 1 if num_str in ("a", "an") else int(num_str) + unit = m.group("unit").lower() + if unit == "day": + delta = timedelta(days=num) + elif unit == "week": + delta = timedelta(weeks=num) + elif unit == "month": + delta = timedelta(days=30 * num) # approximate + elif unit == "year": + delta = timedelta(days=365 * num) # approximate + parsed = True + elif lang.lower() == "he": + # Remove the "לפני" prefix if present + text = date_str.strip() + if text.startswith("לפני"): + text = text[len("לפני"):].strip() + + # Handle special cases where the number and unit are combined: + special = { + "חודשיים": (2, "month"), + "שבועיים": (2, "week"), + "יומיים": (2, "day"), + } + if text in special: + num, unit = special[text] + if unit == "day": + delta = timedelta(days=num) + elif unit == "week": + delta = timedelta(weeks=num) + elif unit == "month": + delta = timedelta(days=30 * num) # approximate + parsed = True + else: + # Match optional number (or assume 1) and then a unit. + pattern = re.compile(r'(?P\d+|אחד|אחת)?\s*(?Pשנה|שנים|חודש|חודשים|יום|ימים|שבוע|שבועות)', + re.IGNORECASE) + m = pattern.search(text) + if m: + num_str = m.group("num") + if not num_str: + num = 1 + else: + try: + num = int(num_str) + except ValueError: + num = 1 + unit_he = m.group("unit") + # Map the Hebrew unit (both singular and plural) to English unit names + if unit_he in ("יום", "ימים"): + unit = "day" + elif unit_he in ("שבוע", "שבועות"): + unit = "week" + elif unit_he in ("חודש", "חודשים"): + unit = "month" + elif unit_he in ("שנה", "שנים"): + unit = "year" + else: + unit = "day" # fallback + + if unit == "day": + delta = timedelta(days=num) + elif unit == "week": + delta = timedelta(weeks=num) + elif unit == "month": + delta = timedelta(days=30 * num) # approximate + elif unit == "year": + delta = timedelta(days=365 * num) # approximate + parsed = True + elif lang.lower() == "th": + # Thai language patterns (simplified) + # Check for Thai patterns like "3 วันที่แล้ว" (3 days ago) + thai_pattern = re.compile(r'(?P\d+)?\s*(?Pวัน|สัปดาห์|เดือน|ปี)ที่แล้ว', re.IGNORECASE) + m = thai_pattern.search(date_str) + if m: + num_str = m.group("num") + num = 1 if not num_str else int(num_str) + unit_th = m.group("unit") + + # Map Thai units to English + if unit_th == "วัน": + unit = "day" + elif unit_th == "สัปดาห์": + unit = "week" + elif unit_th == "เดือน": + unit = "month" + elif unit_th == "ปี": + unit = "year" + else: + unit = "day" # fallback + + if unit == "day": + delta = timedelta(days=num) + elif unit == "week": + delta = timedelta(weeks=num) + elif unit == "month": + delta = timedelta(days=30 * num) # approximate + elif unit == "year": + delta = timedelta(days=365 * num) # approximate + parsed = True + + # Return the calculated date if parsing was successful, otherwise return the original string + if parsed: + result = now - delta + return result.isoformat() + else: + return date_str + + +# def parse_relative_date(date_str: str, lang: str, now: Optional[datetime] = None) -> str: +# """ +# Converts a relative review_date (in English or Hebrew) such as "a week ago" or "לפני 7 שנים" +# into an ISO formatted datetime string (UTC). +# +# For English, supported formats include: +# - "a day ago", "an hour ago", "3 weeks ago", "4 months ago", "2 years ago", etc. +# For Hebrew, supported formats include: +# - "לפני יום", "לפני 2 ימים", "לפני שבוע", "לפני שבועיים", "לפני חודש", +# "לפני חודשיים", "לפני 10 חודשים", "לפני שנה", "לפני 3 שנים", etc. +# +# Parameters: +# - date_str (str): the relative date string. +# - lang (str): "en" for English or "he" for Hebrew. +# - now (Optional[datetime]): reference datetime; if None, current local time is used. +# +# Returns: +# A string representing the calculated absolute datetime in ISO 8601 format, +# or the original date_str if parsing fails. +# """ +# if now is None: +# now = datetime.utcnow() # use UTC for consistency +# +# delta = timedelta(0) +# +# if lang.lower() == "en": +# # Pattern: capture number or "a"/"an", then unit. +# pattern = re.compile(r'(?Pa|an|\d+)\s+(?Pday|week|month|year)s?\s+ago', re.IGNORECASE) +# m = pattern.search(date_str) +# if m: +# num_str = m.group("num").lower() +# num = 1 if num_str in ("a", "an") else int(num_str) +# unit = m.group("unit").lower() +# if unit == "day": +# delta = timedelta(days=num) +# elif unit == "week": +# delta = timedelta(weeks=num) +# elif unit == "month": +# delta = timedelta(days=30 * num) # approximate +# elif unit == "year": +# delta = timedelta(days=365 * num) # approximate +# else: +# return date_str # return original if not matched +# elif lang.lower() == "he": +# # Remove the "לפני" prefix if present +# text = date_str.strip() +# if text.startswith("לפני"): +# text = text[len("לפני"):].strip() +# +# # Handle special cases where the number and unit are combined: +# special = { +# "חודשיים": (2, "month"), +# "שבועיים": (2, "week"), +# "יומיים": (2, "day"), +# } +# if text in special: +# num, unit = special[text] +# else: +# # Match optional number (or assume 1) and then a unit. +# pattern = re.compile(r'(?P\d+|אחד|אחת)?\s*(?Pשנה|שנים|חודש|חודשים|יום|ימים|שבוע|שבועות)', +# re.IGNORECASE) +# m = pattern.search(text) +# if m: +# num_str = m.group("num") +# if not num_str: +# num = 1 +# else: +# try: +# num = int(num_str) +# except ValueError: +# num = 1 +# unit_he = m.group("unit") +# # Map the Hebrew unit (both singular and plural) to English unit names +# if unit_he in ("יום", "ימים"): +# unit = "day" +# elif unit_he in ("שבוע", "שבועות"): +# unit = "week" +# elif unit_he in ("חודש", "חודשים"): +# unit = "month" +# elif unit_he in ("שנה", "שנים"): +# unit = "year" +# else: +# unit = "day" # fallback +# else: +# return date_str # if nothing matches, return original text +# +# if unit == "day": +# delta = timedelta(days=num) +# elif unit == "week": +# delta = timedelta(weeks=num) +# elif unit == "month": +# delta = timedelta(days=30 * num) # approximate +# elif unit == "year": +# delta = timedelta(days=365 * num) # approximate +# +# result = now - delta +# return result.isoformat() + + +# --- Example usage --- +if __name__ == "__main__": + # Fixed reference time for reproducibility: + fixed_now = datetime(2025, 2, 5, 12, 0, 0) + examples = [ + ("a week ago", "he"), + ("4 weeks ago", "en"), + ("לפני 7 שנים", "he"), + ("לפני חודשיים", "he") + ] + for text, lang in examples: + iso_date = parse_relative_date(text, lang, now=fixed_now) + print(f"Original: {text} ({lang}) => ISO: {iso_date}") diff --git a/modules/image_handler.py b/modules/image_handler.py new file mode 100644 index 0000000..4896e06 --- /dev/null +++ b/modules/image_handler.py @@ -0,0 +1,283 @@ +""" +Image downloading and handling for Google Maps Reviews Scraper. +""" + +import logging +import re +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Dict, Any, Set, Tuple +from urllib.parse import urlparse + +import requests + +# Logger +log = logging.getLogger("scraper") + + +class ImageHandler: + """Handler for downloading and managing review images""" + + def __init__(self, config: Dict[str, Any]): + """Initialize image handler with configuration""" + self.image_dir = Path(config.get("image_dir", "review_images")) + self.max_workers = config.get("download_threads", 4) + self.store_local_paths = config.get("store_local_paths", True) + + # URL replacement settings + self.replace_urls = config.get("replace_urls", False) + self.custom_url_base = config.get("custom_url_base", "https://mycustomurl.com") + self.custom_url_profiles = config.get("custom_url_profiles", "/profiles/") + self.custom_url_reviews = config.get("custom_url_reviews", "/reviews/") + self.preserve_original_urls = config.get("preserve_original_urls", True) + + # Subdirectories for different image types + self.profile_dir = self.image_dir / "profiles" + self.review_dir = self.image_dir / "reviews" + + def ensure_directories(self): + """Ensure all image directories exist""" + self.profile_dir.mkdir(parents=True, exist_ok=True) + self.review_dir.mkdir(parents=True, exist_ok=True) + + def is_not_custom_url(self, url: str) -> bool: + """Check if the URL is not one of our custom URLs""" + if not url: + return False + + # Check if the URL starts with our custom URL base - if so, skip it + if self.custom_url_base and url.startswith(self.custom_url_base): + return False + + return True + + def get_filename_from_url(self, url: str, is_profile: bool = False) -> str: + """Extract filename from URL and add .jpg extension""" + if not url: + return "" + + # Skip our custom URLs + if not self.is_not_custom_url(url): + return "" + + # For profile pictures + if is_profile: + # Extract unique identifier from profile URL + parts = url.split('/') + if len(parts) > 1: + filename = parts[-2] if parts[-1] in ('', 'w72-h72-p-rp-mo-ba4-br100') else parts[-1] + return f"{filename}.jpg" + + # For review images + match = re.search(r'AIHoz[^=]+=', url) + if match: + # Use the ID as filename + return f"{match.group(0).rstrip('=')}w600-h450-p.jpg" + + # Fallback to using the last part of the URL path + parsed = urlparse(url) + path = parsed.path + filename = path.split('/')[-1] + + # Add .jpg extension if not present + if not filename.lower().endswith('.jpg'): + filename += ".jpg" + + return filename + + def get_custom_url(self, filename: str, is_profile: bool = False) -> str: + """Generate a custom URL for the image""" + if not self.replace_urls or not filename: + return "" + + base_url = self.custom_url_base.rstrip('/') + path = self.custom_url_profiles if is_profile else self.custom_url_reviews + path = path.strip('/') + + return f"{base_url}/{path}/{filename}" + + def download_image(self, url_info: Tuple[str, bool]) -> Tuple[str, str, str]: + """ + Download an image from URL and save to disk. + + Args: + url_info: Tuple of (url, is_profile) + + Returns: + Tuple of (url, local filename, custom url) + """ + url, is_profile = url_info + + # Skip our custom URLs + if not self.is_not_custom_url(url): + return url, "", "" + + try: + filename = self.get_filename_from_url(url, is_profile) + if not filename: + return url, "", "" + + # Choose directory based on image type + target_dir = self.profile_dir if is_profile else self.review_dir + filepath = target_dir / filename + + # Skip if file already exists + if filepath.exists(): + # Generate custom URL even if file exists + custom_url = self.get_custom_url(filename, is_profile) + return url, filename, custom_url + + # Download the image + response = requests.get(url, stream=True, timeout=10) + response.raise_for_status() + + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + # Generate custom URL + custom_url = self.get_custom_url(filename, is_profile) + return url, filename, custom_url + + except Exception as e: + log.error(f"Error downloading image from {url}: {e}") + return url, "", "" + + def download_all_images(self, reviews: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """ + Download all images (review images and profile pictures) for all reviews. + + Args: + reviews: Dictionary of review documents + + Returns: + Updated reviews with local image paths and custom URLs + """ + self.ensure_directories() + + # Collect all unique image URLs (both review images and profile pictures) + # Exclude custom URLs + review_urls: Set[str] = set() + profile_urls: Set[str] = set() + + for review in reviews.values(): + # Collect review images - exclude custom URLs + if "user_images" in review and isinstance(review["user_images"], list): + for url in review["user_images"]: + if self.is_not_custom_url(url): + review_urls.add(url) + # If we have original image URLs stored separately, add those too + if "original_image_urls" in review and isinstance(review["original_image_urls"], list): + for orig_url in review["original_image_urls"]: + if self.is_not_custom_url(orig_url): + review_urls.add(orig_url) + + # Collect profile pictures - exclude custom URLs + if "profile_picture" in review and review["profile_picture"]: + profile_url = review["profile_picture"] + if self.is_not_custom_url(profile_url): + profile_urls.add(profile_url) + # If we have original profile URL stored separately, add that too + if "original_profile_picture" in review and review["original_profile_picture"]: + orig_profile_url = review["original_profile_picture"] + if self.is_not_custom_url(orig_profile_url): + profile_urls.add(orig_profile_url) + + # Prepare download tasks with URL type info + download_tasks = [(url, False) for url in review_urls] + [(url, True) for url in profile_urls] + + if not download_tasks: + log.info("No images to download") + return reviews + + log.info( + f"Downloading {len(download_tasks)} images ({len(profile_urls)} profiles, {len(review_urls)} review images)...") + + # Create URL to filename and URL to custom URL mappings + url_to_filename = {} + url_to_custom_url = {} + + # Download images in parallel + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + results = executor.map(self.download_image, download_tasks) + for url, filename, custom_url in results: + if filename: + url_to_filename[url] = filename + if custom_url: + url_to_custom_url[url] = custom_url + + # Update review documents + for review_id, review in reviews.items(): + # Find the original URLs to use for lookup - important for both user_images and profile_picture + user_images_original = [] + profile_picture_original = "" + + # For user_images, either use original URLs if we have them, or the current user_images + if "original_image_urls" in review and isinstance(review["original_image_urls"], list): + user_images_original = review["original_image_urls"] + elif "user_images" in review and isinstance(review["user_images"], list): + user_images_original = review["user_images"].copy() + + # For profile_picture, either use original URL if we have it, or the current profile_picture + if "original_profile_picture" in review and review["original_profile_picture"]: + profile_picture_original = review["original_profile_picture"] + elif "profile_picture" in review: + profile_picture_original = review["profile_picture"] + + # Process user_images + if "user_images" in review and isinstance(review["user_images"], list): + # Add local image paths if enabled + if self.store_local_paths: + local_images = [url_to_filename.get(url, "") for url in user_images_original + if url and self.is_not_custom_url(url)] + review["local_images"] = [img for img in local_images if img] + + # Replace URLs if enabled + if self.replace_urls: + # Store original URLs if needed and not already stored + if self.preserve_original_urls and "original_image_urls" not in review: + review["original_image_urls"] = review["user_images"].copy() + + # Create custom URLs for each image + custom_images = [] + for url in user_images_original: + if url in url_to_custom_url: + custom_images.append(url_to_custom_url[url]) + elif not self.is_not_custom_url(url): # Already a custom URL + custom_images.append(url) + + # Replace with custom URLs if we have them + if custom_images: + review["user_images"] = custom_images + + # Process profile_picture + if "profile_picture" in review and review["profile_picture"]: + # Add local profile picture path if enabled + if self.store_local_paths and profile_picture_original in url_to_filename: + review["local_profile_picture"] = url_to_filename[profile_picture_original] + + # Replace profile_picture URL if enabled + if self.replace_urls: + # Store original URL if needed and not already stored + if self.preserve_original_urls and "original_profile_picture" not in review: + review["original_profile_picture"] = review["profile_picture"] + + # Replace with custom URL if we have one for this profile image + if profile_picture_original in url_to_custom_url: + review["profile_picture"] = url_to_custom_url[profile_picture_original] + elif not self.is_not_custom_url(review["profile_picture"]): + # If current URL is already a custom URL, keep it + pass + elif profile_picture_original: + # If we don't have a custom URL but have a filename, generate one + filename = url_to_filename.get(profile_picture_original, "") + if filename: + custom_url = self.get_custom_url(filename, True) + if custom_url: + review["profile_picture"] = custom_url + + log.info(f"Downloaded {len(url_to_filename)} images") + if self.replace_urls: + log.info(f"Replaced URLs for {len(url_to_custom_url)} images") + + return reviews diff --git a/modules/models.py b/modules/models.py new file mode 100644 index 0000000..e571259 --- /dev/null +++ b/modules/models.py @@ -0,0 +1,84 @@ +""" +Data models for Google Maps Reviews Scraper. +""" +import re +from dataclasses import dataclass, field + +from selenium.webdriver.remote.webelement import WebElement + +from modules.utils import (try_find, first_text, first_attr, safe_int, detect_lang, parse_date_to_iso) + + +@dataclass +class RawReview: + """ + Data class representing a raw review extracted from Google Maps. + """ + id: str = "" + author: str = "" + rating: float = 0.0 + date: str = "" + lang: str = "und" + text: str = "" + likes: int = 0 + photos: list[str] = field(default_factory=list) + profile: str = "" + avatar: str = "" # URL to profile picture + owner_date: str = "" + owner_text: str = "" + review_date: str = "" # ISO format date + + # CSS Selectors for review elements + MORE_BTN = "button.kyuRq" + LIKE_BTN = 'button[jsaction*="toggleThumbsUp" i]' + PHOTO_BTN = "button.Tya61d" + OWNER_RESP = "div.CDe7pd" + + @classmethod + def from_card(cls, card: WebElement) -> "RawReview": + """Factory method to create a RawReview from a WebElement""" + # expand "More" - non-blocking approach + for b in try_find(card, cls.MORE_BTN, all=True): + try: + b.click() + except Exception: + pass + + rid = card.get_attribute("data-review-id") or "" + author = first_text(card, 'div[class*="d4r55"]') + profile = first_attr(card, 'button[data-review-id]', "data-href") + avatar = first_attr(card, 'button[data-review-id] img', "src") + + label = first_attr(card, 'span[role="img"]', "aria-label") + num = re.search(r"[\d\.]+", label.replace(",", ".")) if label else None + rating = float(num.group()) if num else 0.0 + + date = first_text(card, 'span[class*="rsqaWe"]') + # Parse the date string to ISO format + review_date = parse_date_to_iso(date) + + text = "" + for sel in ('span[jsname="bN97Pc"]', + 'span[jsname="fbQN7e"]', + 'div.MyEned span.wiI7pd'): + text = first_text(card, sel) + if text: break + lang = detect_lang(text) + + likes = 0 + if (btn := try_find(card, cls.LIKE_BTN)): + likes = safe_int(btn[0].text or btn[0].get_attribute("aria-label")) + + photos: list[str] = [] + for btn in try_find(card, cls.PHOTO_BTN, all=True): + if (m := re.search(r'url\("([^"]+)"', btn.get_attribute("style") or "")): + photos.append(m.group(1)) + + owner_date = owner_text = "" + if (box := try_find(card, cls.OWNER_RESP)): + box = box[0] + owner_date = first_text(box, "span.DZSIDd") + owner_text = first_text(box, "div.wiI7pd") + + return cls(rid, author, rating, date, lang, text, likes, + photos, profile, avatar, owner_date, owner_text, review_date) diff --git a/modules/scraper.py b/modules/scraper.py new file mode 100644 index 0000000..1bd68b1 --- /dev/null +++ b/modules/scraper.py @@ -0,0 +1,1921 @@ +""" +Selenium scraping logic for Google Maps Reviews. +""" + +import logging +import os +import platform +import re +import time +import traceback +from typing import Dict, Any, List + +import undetected_chromedriver as uc +from selenium.common.exceptions import TimeoutException, StaleElementReferenceException +from selenium.webdriver import Chrome +from selenium.webdriver.common.action_chains import ActionChains +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait +from tqdm import tqdm + +from modules.data_storage import MongoDBStorage, JSONStorage, merge_review +from modules.models import RawReview + +# Logger +log = logging.getLogger("scraper") + +# CSS Selectors +PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' +CARD_SEL = "div[data-review-id]" +COOKIE_BTN = ('button[aria-label*="Accept" i],' + 'button[jsname="hZCF7e"],' + 'button[data-mdc-dialog-action="accept"]') +SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' +MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' + +SORT_OPTIONS = { + "newest": ( + "Newest", "החדשות ביותר", "ใหม่ที่สุด", "最新", "Más recientes", "最近", + "Mais recentes", "Neueste", "Plus récent", "Più recenti", "Nyeste", + "Новые", "Nieuwste", "جديد", "Nyeste", "Uusimmat", "Najnowsze", + "Senaste", "Terbaru", "Yakın zamanlı", "Mới nhất", "नवीनतम" + ), + "highest": ( + "Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด", "最高評価", + "Calificación más alta", "最高评分", "Melhor avaliação", "Höchste Bewertung", + "Note la plus élevée", "Valutazione più alta", "Høyeste vurdering", + "Наивысший рейтинг", "Hoogste waardering", "أعلى تقييم", "Højeste vurdering", + "Korkein arvostelu", "Najwyższa ocena", "Högsta betyg", "Peringkat tertinggi", + "En yüksek puan", "Đánh giá cao nhất", "उच्चतम रेटिंग", "Top rating" + ), + "lowest": ( + "Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด", "最低評価", + "Calificación más baja", "最低评分", "Pior avaliação", "Niedrigste Bewertung", + "Note la plus basse", "Valutazione più bassa", "Laveste vurdering", + "Наименьший рейтинг", "Laagste waardering", "أقل تقييم", "Laveste vurdering", + "Alhaisin arvostelu", "Najniższa ocena", "Lägsta betyg", "Peringkat terendah", + "En düşük puan", "Đánh giá thấp nhất", "निम्नतम रेटिंग", "Worst rating" + ), + "relevance": ( + "Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด", "関連性", + "Más relevantes", "最相关", "Mais relevantes", "Relevanteste", + "Plus pertinents", "Più pertinenti", "Mest relevante", + "Наиболее релевантные", "Meest relevant", "الأكثر صلة", "Mest relevante", + "Olennaisimmat", "Najbardziej trafne", "Mest relevanta", "Paling relevan", + "En alakalı", "Liên quan nhất", "सबसे प्रासंगिक", "Relevance" + ) +} + +# Comprehensive multi-language review keywords +REVIEW_WORDS = { + # English + "reviews", "review", "ratings", "rating", + + # Hebrew + "ביקורות", "ביקורת", "ביקורות על", "דירוגים", "דירוג", + + # Thai + "รีวิว", "บทวิจารณ์", "คะแนน", "ความคิดเห็น", + + # Spanish + "reseñas", "opiniones", "valoraciones", "críticas", "calificaciones", + + # French + "avis", "commentaires", "évaluations", "critiques", "notes", + + # German + "bewertungen", "rezensionen", "beurteilungen", "meinungen", "kritiken", + + # Italian + "recensioni", "valutazioni", "opinioni", "giudizi", "commenti", + + # Portuguese + "avaliações", "comentários", "opiniões", "análises", "críticas", + + # Russian + "отзывы", "рецензии", "обзоры", "оценки", "комментарии", + + # Japanese + "レビュー", "口コミ", "評価", "批評", "感想", + + # Korean + "리뷰", "평가", "후기", "댓글", "의견", + + # Chinese (Simplified and Traditional) + "评论", "評論", "点评", "點評", "评价", "評價", "意见", "意見", "回顾", "回顧", + + # Arabic + "مراجعات", "تقييمات", "آراء", "تعليقات", "نقد", + + # Hindi + "समीक्षा", "रिव्यू", "राय", "मूल्यांकन", "प्रतिक्रिया", + + # Turkish + "yorumlar", "değerlendirmeler", "incelemeler", "görüşler", "puanlar", + + # Dutch + "beoordelingen", "recensies", "meningen", "opmerkingen", "waarderingen", + + # Polish + "recenzje", "opinie", "oceny", "komentarze", "uwagi", + + # Vietnamese + "đánh giá", "nhận xét", "bình luận", "phản hồi", "bài đánh giá", + + # Indonesian + "ulasan", "tinjauan", "komentar", "penilaian", "pendapat", + + # Swedish + "recensioner", "betyg", "omdömen", "åsikter", "kommentarer", + + # Norwegian + "anmeldelser", "vurderinger", "omtaler", "meninger", "tilbakemeldinger", + + # Danish + "anmeldelser", "bedømmelser", "vurderinger", "meninger", "kommentarer", + + # Finnish + "arvostelut", "arviot", "kommentit", "mielipiteet", "palautteet", + + # Greek + "κριτικές", "αξιολογήσεις", "σχόλια", "απόψεις", "βαθμολογίες", + + # Czech + "recenze", "hodnocení", "názory", "komentáře", "posudky", + + # Romanian + "recenzii", "evaluări", "opinii", "comentarii", "note", + + # Hungarian + "vélemények", "értékelések", "kritikák", "hozzászólások", "megjegyzések", + + # Bulgarian + "отзиви", "ревюта", "мнения", "коментари", "оценки" +} + + +class GoogleReviewsScraper: + """Main scraper class for Google Maps reviews""" + + def __init__(self, config: Dict[str, Any]): + """Initialize scraper with configuration""" + self.config = config + self.use_mongodb = config.get("use_mongodb", True) + self.mongodb = MongoDBStorage(config) if self.use_mongodb else None + self.json_storage = JSONStorage(config) + self.backup_to_json = config.get("backup_to_json", True) + self.overwrite_existing = config.get("overwrite_existing", False) + + def setup_driver(self, headless: bool) -> Chrome: + """ + Set up and configure Chrome driver with flexibility for different environments. + Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). + """ + # Determine if we're running in a container + in_container = os.environ.get('CHROME_BIN') is not None + + # Create Chrome options + opts = uc.ChromeOptions() + opts.add_argument("--window-size=1400,900") + opts.add_argument("--ignore-certificate-errors") + opts.add_argument("--disable-gpu") # Improves performance + opts.add_argument("--disable-dev-shm-usage") # Helps with stability + opts.add_argument("--no-sandbox") # More stable in some environments + + # Use headless mode if requested + if headless: + opts.add_argument("--headless=new") + + # Log platform information for debugging + log.info(f"Platform: {platform.platform()}") + log.info(f"Python version: {platform.python_version()}") + + # If in container, use environment-provided binaries + if in_container: + chrome_binary = os.environ.get('CHROME_BIN') + chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') + + log.info(f"Container environment detected") + log.info(f"Chrome binary: {chrome_binary}") + log.info(f"ChromeDriver path: {chromedriver_path}") + + if chrome_binary and os.path.exists(chrome_binary): + log.info(f"Using Chrome binary from environment: {chrome_binary}") + opts.binary_location = chrome_binary + + try: + # Try creating Chrome driver with undetected_chromedriver + log.info("Attempting to create undetected_chromedriver instance") + driver = uc.Chrome(options=opts) + log.info("Successfully created undetected_chromedriver instance") + except Exception as e: + # Fall back to regular Selenium if undetected_chromedriver fails + log.warning(f"Failed to create undetected_chromedriver instance: {e}") + log.info("Falling back to regular Selenium Chrome") + + # Import Selenium webdriver here to avoid potential import issues + from selenium import webdriver + from selenium.webdriver.chrome.service import Service + + if chromedriver_path and os.path.exists(chromedriver_path): + log.info(f"Using ChromeDriver from path: {chromedriver_path}") + service = Service(executable_path=chromedriver_path) + driver = webdriver.Chrome(service=service, options=opts) + else: + log.info("Using default ChromeDriver") + driver = webdriver.Chrome(options=opts) + else: + # On regular OS, use default undetected_chromedriver + log.info("Using standard undetected_chromedriver setup") + driver = uc.Chrome(options=opts) + + # Set page load timeout to avoid hanging + driver.set_page_load_timeout(30) + log.info("Chrome driver setup completed successfully") + return driver + + def dismiss_cookies(self, driver: Chrome): + """ + Dismiss cookie consent dialogs if present. + Handles stale element references by re-finding elements if needed. + """ + try: + # Use WebDriverWait with expected_conditions to handle stale elements + WebDriverWait(driver, 3).until( + EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) + ) + log.info("Cookie consent dialog found, attempting to dismiss") + + # Get elements again after waiting to avoid stale references + elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) + for elem in elements: + try: + if elem.is_displayed(): + elem.click() + log.info("Cookie dialog dismissed") + return True + except Exception as e: + log.debug(f"Error clicking cookie button: {e}") + continue + except TimeoutException: + # This is expected if no cookie dialog is present + log.debug("No cookie consent dialog detected") + except Exception as e: + log.debug(f"Error handling cookie dialog: {e}") + + return False + + def is_reviews_tab(self, tab: WebElement) -> bool: + """ + Dynamically detect if an element is the reviews tab across multiple languages and layouts. + Uses multiple detection approaches for maximum reliability. + """ + try: + # Strategy 1: Data attribute detection (most reliable across languages) + tab_index = tab.get_attribute("data-tab-index") + if tab_index == "1" or tab_index == "reviews": + return True + + # Strategy 2: Role and aria attributes (accessibility detection) + role = tab.get_attribute("role") + aria_selected = tab.get_attribute("aria-selected") + aria_label = (tab.get_attribute("aria-label") or "").lower() + + # Many review tabs have role="tab" and data attributes + if role == "tab" and any(word in aria_label for word in REVIEW_WORDS): + return True + + # Strategy 3: Text content detection (multiple sources) + sources = [ + tab.text.lower() if tab.text else "", # Direct text + aria_label, # ARIA label + tab.get_attribute("innerHTML").lower() or "", # Inner HTML + tab.get_attribute("textContent").lower() or "" # Text content + ] + + # Check all sources against our comprehensive keyword list + for source in sources: + if any(word in source for word in REVIEW_WORDS): + return True + + # Strategy 4: Nested element detection + try: + # Check text in all child elements + for child in tab.find_elements(By.CSS_SELECTOR, "*"): + try: + child_text = child.text.lower() if child.text else "" + child_content = child.get_attribute("textContent").lower() or "" + + if any(word in child_text for word in REVIEW_WORDS) or any( + word in child_content for word in REVIEW_WORDS): + return True + except: + continue + except: + pass + + # Strategy 5: URL detection (some tabs have hrefs or data-hrefs with tell-tale values) + for attr in ["href", "data-href", "data-url", "data-target"]: + attr_value = (tab.get_attribute(attr) or "").lower() + if attr_value and ("review" in attr_value or "rating" in attr_value): + return True + + # Strategy 6: Class detection (some review tabs have specific classes) + tab_class = tab.get_attribute("class") or "" + review_classes = ["review", "reviews", "rating", "ratings", "comments", "feedback", "g4jrve"] + if any(cls in tab_class for cls in review_classes): + return True + + return False + + except StaleElementReferenceException: + return False + except Exception as e: + log.debug(f"Error in is_reviews_tab: {e}") + return False + + def click_reviews_tab(self, driver: Chrome): + """ + Highly dynamic reviews tab detection and clicking with multiple fallback strategies. + Works across different languages, layouts, and browser environments. + """ + max_timeout = 25 # Maximum seconds to try + end_time = time.time() + max_timeout + attempts = 0 + + # Define different selectors to try in order of reliability + tab_selectors = [ + # Direct tab selectors + '[data-tab-index="1"]', # Most common tab index + '[role="tab"][data-tab-index]', # Any tab with index + 'button[role="tab"]', # Button tabs + 'div[role="tab"]', # Div tabs + 'a[role="tab"]', # Link tabs + + # Common Google Maps review tab selectors + '.fontTitleSmall[role="tab"]', # Google Maps title font tabs + '.hh2c6[role="tab"]', # Common Google Maps class + '.m6QErb [role="tab"]', # Maps container tabs + + # Text-based selectors for various languages + 'button:contains("reviews")', # Button containing "reviews" + 'div[role="tablist"] > *', # Any tab in a tab list + 'div.m6QErb div[role="tablist"] > *', # Google Maps specific tablist + ] + + # Record successful clicks for debugging + successful_method = None + successful_selector = None + + # Try each selector in turn + for selector in tab_selectors: + if time.time() > end_time: + break + + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + if not elements: + continue + + # Try each element found with this selector + for element in elements: + attempts += 1 + + # First check if this is actually a reviews tab + if not self.is_reviews_tab(element): + continue + + # Found a reviews tab, attempt to click it with multiple methods + log.info(f"Found potential reviews tab ({selector}): '{element.text}', attempting to click") + + # Ensure visibility + driver.execute_script("arguments[0].scrollIntoView({block:'center', behavior:'smooth'});", element) + time.sleep(0.7) # Wait for scroll + + # Try different click methods in order of reliability + click_methods = [ + # Method 1: JavaScript click (most reliable) + lambda: driver.execute_script("arguments[0].click();", element), + + # Method 2: Direct click + lambda: element.click(), + + # Method 3: ActionChains click + lambda: ActionChains(driver).move_to_element(element).click().perform(), + + # Method 4: Send RETURN key + lambda: element.send_keys(Keys.RETURN), + + # Method 5: Center click with ActionChains + lambda: ActionChains(driver).move_to_element_with_offset( + element, element.size['width'] // 2, element.size['height'] // 2).click().perform(), + ] + + # Try each click method + for i, click_method in enumerate(click_methods): + try: + click_method() + time.sleep(1.5) # Wait for click to take effect + + # Verify if click worked (check for new content) + if self.verify_reviews_tab_clicked(driver): + successful_method = i + 1 + successful_selector = selector + log.info( + f"Successfully clicked reviews tab using method {i + 1} and selector '{selector}'") + return True + except Exception as click_error: + log.debug(f"Click method {i + 1} failed: {click_error}") + continue + + except Exception as selector_error: + log.debug(f"Error with selector '{selector}': {selector_error}") + continue + + # If we reach here, try XPath as a last resort + if time.time() <= end_time: + for language_keyword in REVIEW_WORDS: + try: + # Try XPath contains text + xpath = f"//*[contains(text(), '{language_keyword}')]" + elements = driver.find_elements(By.XPATH, xpath) + + for element in elements: + try: + log.info(f"Trying XPath with keyword '{language_keyword}'") + driver.execute_script("arguments[0].scrollIntoView({block:'center'});", element) + time.sleep(0.7) + driver.execute_script("arguments[0].click();", element) + time.sleep(1.5) + + if self.verify_reviews_tab_clicked(driver): + log.info(f"Successfully clicked element with keyword '{language_keyword}'") + return True + except: + continue + except: + continue + + # Final attempt: try to navigate directly to reviews by URL + try: + current_url = driver.current_url + if "?hl=" in current_url: # Preserve language setting if present + lang_param = re.search(r'\?hl=([^&]*)', current_url) + if lang_param: + lang_code = lang_param.group(1) + # Try to replace the current part with 'reviews' or append it + if '/place/' in current_url: + parts = current_url.split('/place/') + new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews?hl={lang_code}" + driver.get(new_url) + time.sleep(2) + if "review" in driver.current_url.lower(): + log.info("Navigated directly to reviews page via URL") + return True + + # Try to identify reviews link in URL + if '/place/' in current_url and '/reviews' not in current_url: + parts = current_url.split('/place/') + new_url = f"{parts[0]}/place/{parts[1].split('/')[0]}/reviews" + driver.get(new_url) + time.sleep(2) + if "review" in driver.current_url.lower(): + log.info("Navigated directly to reviews page via URL") + return True + except Exception as url_error: + log.warning(f"Failed to navigate to reviews via URL: {url_error}") + + log.warning(f"Failed to find/click reviews tab after {attempts} attempts") + raise TimeoutException("Reviews tab not found or could not be clicked") + + def verify_reviews_tab_clicked(self, driver: Chrome) -> bool: + """ + Verify that the reviews tab was successfully clicked by checking for + characteristic elements that appear on the reviews page. + """ + try: + # Common elements that appear when reviews tab is active + verification_selectors = [ + # Reviews container + 'div.m6QErb.DxyBCb.kA9KIf.dS8AEf', + + # Review cards + 'div[data-review-id]', + + # Sort button (usually appears with reviews) + 'button[aria-label*="Sort" i]', + + # Review rating elements + 'span[role="img"][aria-label*="star" i]', + + # Other indicators + 'div.m6QErb div.jftiEf', + '.HlvSq' + ] + + # Check if any verification selector is present + for selector in verification_selectors: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + if elements and len(elements) > 0: + return True + + # URL check - if "review" appears in the URL + if "review" in driver.current_url.lower(): + return True + + return False + except Exception as e: + log.debug(f"Error verifying reviews tab click: {e}") + return False + + def set_sort(self, driver: Chrome, method: str): + """ + Set the sorting method for reviews with enhanced detection for the latest Google Maps UI. + Works across different languages and UI variations, with robust error handling. + """ + if method == "relevance": + log.info("Using default 'relevance' sort - no need to change sort order") + return True # Default order, no need to change + + log.info(f"Attempting to set sort order to '{method}'") + + try: + # 1. Find and click the sort button + sort_button_selectors = [ + # Exact selectors based on recent HTML structure + 'button.HQzyZ[aria-haspopup="true"]', + 'div.m6QErb button.HQzyZ', + 'button[jsaction*="pane.wfvdle84"]', + 'div.fontBodyLarge.k5lwKb', # The text element inside sort button + + # Common attribute-based selectors + 'button[aria-label*="Sort" i]', + 'button[aria-label*="sort" i]', + 'button[aria-expanded="false"][aria-haspopup="true"]', + + # Multilingual selectors + 'button[aria-label*="סדר" i]', # Hebrew + 'button[aria-label*="เรียง" i]', # Thai + 'button[aria-label*="排序" i]', # Chinese + 'button[aria-label*="Trier" i]', # French + 'button[aria-label*="Ordenar" i]', # Spanish/Portuguese + 'button[aria-label*="Sortieren" i]', # German + + # Parent container-based selectors + 'div.m6QErb.Hk4XGb.XiKgde.tLjsW button', + 'div.m6QErb div.XiKgde button' + ] + + # Attempt to find the sort button + sort_button = None + + # Try each selector + for selector in sort_button_selectors: + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + try: + # Skip invisible/disabled elements + if not element.is_displayed() or not element.is_enabled(): + continue + + # Get button text and attributes for verification + button_text = element.text.strip() if element.text else "" + button_aria = element.get_attribute("aria-label") or "" + + # Skip buttons that are clearly not sort buttons + negative_keywords = ["back", "next", "previous", "close", "cancel", "חזרה", "סגור", "ปิด"] + if any(keyword in button_text.lower() or keyword in button_aria.lower() + for keyword in negative_keywords): + continue + + # Found a potential sort button + sort_button = element + log.info(f"Found sort button with selector: {selector}") + log.info(f"Button text: '{button_text}', aria-label: '{button_aria}'") + break + except Exception as e: + log.debug(f"Error checking element: {e}") + continue + + if sort_button: + break + except Exception as e: + log.debug(f"Error with selector '{selector}': {e}") + continue + + # If no button found with CSS selectors, try finding it from its container + if not sort_button: + try: + # Look for the sort container by its distinctive classes + containers = driver.find_elements(By.CSS_SELECTOR, 'div.m6QErb.Hk4XGb, div.XiKgde.tLjsW') + for container in containers: + try: + # Find buttons within this container + buttons = container.find_elements(By.TAG_NAME, 'button') + for button in buttons: + if button.is_displayed() and button.is_enabled(): + sort_button = button + log.info("Found sort button through container element") + break + except: + continue + if sort_button: + break + except Exception as e: + log.debug(f"Error finding button via container: {e}") + + # If still no button found, try XPath approach with keywords + if not sort_button: + xpath_terms = ["sort", "Sort", "סדר", "סידור", "เรียง", "排序", "Trier", "Ordenar", "Sortieren"] + for term in xpath_terms: + try: + xpath = f"//*[contains(text(), '{term}') or contains(@aria-label, '{term}')]" + elements = driver.find_elements(By.XPATH, xpath) + for element in elements: + try: + if element.is_displayed() and element.is_enabled(): + sort_button = element + log.info(f"Found sort button with XPath term: '{term}'") + break + except: + continue + if sort_button: + break + except: + continue + + # Final check - do we have a sort button? + if not sort_button: + log.warning("No sort button found with any method - keeping default sort order") + return False + + # 2. Click the sort button to open dropdown menu + + # First ensure the button is in view + driver.execute_script("arguments[0].scrollIntoView({block: 'center', behavior: 'smooth'});", sort_button) + time.sleep(0.8) # Wait for scroll + + # Try multiple click methods + click_methods = [ + # Method 1: JavaScript click + lambda: driver.execute_script("arguments[0].click();", sort_button), + + # Method 2: Direct click + lambda: sort_button.click(), + + # Method 3: ActionChains click with move first + lambda: ActionChains(driver).move_to_element(sort_button).pause(0.3).click().perform(), + + # Method 4: Click on center of element + lambda: ActionChains(driver).move_to_element_with_offset( + sort_button, sort_button.size['width'] // 2, sort_button.size['height'] // 2 + ).click().perform(), + + # Method 5: JavaScript focus and click + lambda: driver.execute_script( + "arguments[0].focus(); setTimeout(function() { arguments[0].click(); }, 100);", sort_button + ), + + # Method 6: Send RETURN key after focusing + lambda: ActionChains(driver).move_to_element(sort_button).click().send_keys(Keys.RETURN).perform() + ] + + # Try each click method + menu_opened = False + + for i, click_method in enumerate(click_methods): + try: + log.info(f"Trying click method {i + 1} for sort button...") + click_method() + time.sleep(1) # Wait for menu to appear + + # Check if menu opened + menu_opened = self.check_if_menu_opened(driver) + + if menu_opened: + log.info(f"Sort menu opened with click method {i + 1}") + break + except Exception as e: + log.debug(f"Click method {i + 1} failed: {e}") + continue + + # If menu not opened, abort + if not menu_opened: + log.warning("Failed to open sort menu - keeping default sort order") + # Try to reset state by clicking elsewhere + try: + ActionChains(driver).move_by_offset(50, 50).click().perform() + except: + pass + return False + + # 3. Find and click the desired sort option in the menu + + # Selectors for menu items with focus on the exact HTML structure + menu_item_selectors = [ + # Exact Google Maps menu item selectors + 'div[role="menuitemradio"]', + 'div.fxNQSd[role="menuitemradio"]', + 'div[role="menuitemradio"] div.mLuXec', # Inner text container + + # Generic menu item selectors (fallback) + '[role="menuitemradio"]', + '[role="menuitem"]', + 'div[role="menu"] > div' + ] + + # Combined selector for efficiency + combined_selector = ", ".join(menu_item_selectors) + + try: + # Wait for menu items to appear + menu_items = WebDriverWait(driver, 5).until( + EC.presence_of_all_elements_located((By.CSS_SELECTOR, combined_selector)) + ) + + # Process menu items to find matches + visible_items = [] + + for item in menu_items: + try: + # Skip invisible items + if not item.is_displayed(): + continue + + # Handle different element types + if item.get_attribute('role') == 'menuitemradio': + # This is a top-level menu item + try: + # Try to find text in the inner div.mLuXec element first + text_elements = item.find_elements(By.CSS_SELECTOR, 'div.mLuXec') + if text_elements and text_elements[0].is_displayed(): + text = text_elements[0].text.strip() + visible_items.append((item, text)) + else: + # Fall back to the item's own text + text = item.text.strip() + visible_items.append((item, text)) + except: + # Last resort - use the item's own text + text = item.text.strip() + visible_items.append((item, text)) + elif 'mLuXec' in (item.get_attribute('class') or ''): + # This is the text container element - get its parent menuitemradio + try: + text = item.text.strip() + parent = driver.execute_script( + "return arguments[0].closest('[role=\"menuitemradio\"]');", + item + ) + if parent: + visible_items.append((parent, text)) + except: + continue + else: + # Generic menu item handling + text = item.text.strip() + visible_items.append((item, text)) + except Exception as e: + log.debug(f"Error processing menu item: {e}") + continue + + log.info(f"Found {len(visible_items)} visible menu items") + for i, (_, text) in enumerate(visible_items): + log.debug(f" Menu item {i + 1}: '{text}'") + + # Determine the target menu item based on sort method + target_item = None + matched_text = None + + # 1. First try direct text matching + wanted_labels = SORT_OPTIONS.get(method, []) + + for item, text in visible_items: + for label in wanted_labels: + if (label in text or text in label or + (len(text) > 0 and len(label) > 0 and + text.lower().startswith(label.lower()[:3]))): + target_item = item + matched_text = text + log.info(f"Found matching menu item: '{text}' for '{label}'") + break + if target_item: + break + + # 2. If no match found, try position-based selection + if not target_item and visible_items: + position_map = { + "relevance": 0, # Usually the first option + "newest": 1, # Usually the second option + "highest": 2, # Usually the third option + "lowest": 3 # Usually the fourth option + } + + pos = position_map.get(method, -1) + if pos >= 0 and pos < len(visible_items): + target_item, matched_text = visible_items[pos] + log.info(f"Using position-based selection (position {pos}) for '{method}'") + + # 3. If target found, click it + if target_item: + # Ensure item is in view + driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", target_item) + time.sleep(0.3) + + # Try multiple click methods + click_success = False + click_methods = [ + # Method 1: JavaScript click + lambda: driver.execute_script("arguments[0].click();", target_item), + + # Method 2: Direct click + lambda: target_item.click(), + + # Method 3: ActionChains click + lambda: ActionChains(driver).move_to_element(target_item).click().perform(), + + # Method 4: Center click + lambda: ActionChains(driver).move_to_element_with_offset( + target_item, target_item.size['width'] // 2, target_item.size['height'] // 2 + ).click().perform(), + + # Method 5: JavaScript click with custom event + lambda: driver.execute_script(""" + var el = arguments[0]; + var evt = new MouseEvent('click', { + bubbles: true, + cancelable: true, + view: window + }); + el.dispatchEvent(evt); + """, target_item) + ] + + for i, click_method in enumerate(click_methods): + try: + click_method() + time.sleep(1.5) # Wait for sort to take effect + + # Try to verify sort happened by checking if menu closed + still_open = self.check_if_menu_opened(driver) + if not still_open: + click_success = True + log.info(f"Successfully clicked menu item with method {i + 1}") + break + except Exception as e: + log.debug(f"Menu item click method {i + 1} failed: {e}") + continue + + if click_success: + log.info(f"Successfully set sort order to '{method}'") + return True + else: + log.warning(f"Failed to click menu item - keeping default sort order") + else: + log.warning(f"No matching menu item found for '{method}'") + + # If we get here, we failed - try to close the menu by clicking elsewhere + try: + ActionChains(driver).move_by_offset(50, 50).click().perform() + except: + pass + + return False + + except TimeoutException: + log.warning("Timeout waiting for menu items") + return False + except Exception as e: + log.warning(f"Error in menu item selection: {e}") + return False + + except Exception as e: + log.warning(f"Error in set_sort method: {e}") + return False + + def check_if_menu_opened(self, driver): + """ + Check if a sort menu has been opened after clicking the sort button. + Uses multiple detection strategies optimized for Google Maps dropdowns. + Returns True if menu is detected, False otherwise. + """ + try: + # 1. First check for exact menu container selectors from the latest Google Maps UI + specific_menu_selectors = [ + 'div[role="menu"][id="action-menu"]', # Exact match from provided HTML + 'div.fontBodyLarge.yu5kgd[role="menu"]', # Classes from provided HTML + 'div.fxNQSd[role="menuitemradio"]', # Menu item class + 'div.yu5kgd[role="menu"]' # Alternate class + ] + + for selector in specific_menu_selectors: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + try: + if element.is_displayed(): + return True + except: + continue + + # 2. Check for generic menu containers + generic_menu_selectors = [ + 'div[role="menu"]', + 'ul[role="menu"]', + '[role="listbox"]' + ] + + for selector in generic_menu_selectors: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + try: + if element.is_displayed(): + return True + except: + continue + + # 3. Look for menu items + menu_item_selectors = [ + 'div[role="menuitemradio"]', # Google Maps specific + 'div.fxNQSd', # Class-based detection + 'div.mLuXec', # Text container class + '[role="menuitem"]', # Generic menu items + '[role="option"]' # Alternative role + ] + + visible_items = 0 + for selector in menu_item_selectors: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + try: + if element.is_displayed(): + visible_items += 1 + if visible_items >= 2: # At least 2 menu items should be visible + return True + except: + continue + + # 4. Advanced detection with JavaScript + # Checks if there are newly visible elements with menu-related roles or classes + try: + js_detection = """ + return (function() { + // Check for visible menu elements + var menuElements = document.querySelectorAll('div[role="menu"], div[role="menuitemradio"], div.fxNQSd'); + for (var i = 0; i < menuElements.length; i++) { + var style = window.getComputedStyle(menuElements[i]); + if (style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0') { + return true; + } + } + + // Check for any recently appeared elements that might be a menu + var possibleMenus = document.querySelectorAll('div.yu5kgd, div.fontBodyLarge'); + for (var i = 0; i < possibleMenus.length; i++) { + var style = window.getComputedStyle(possibleMenus[i]); + var rect = possibleMenus[i].getBoundingClientRect(); + // Check if element is visible and has a meaningful size + if (style.display !== 'none' && style.visibility !== 'hidden' && + rect.width > 50 && rect.height > 50) { + return true; + } + } + + return false; + })(); + """ + menu_detected = driver.execute_script(js_detection) + if menu_detected: + return True + except Exception as js_error: + log.debug(f"Error in JavaScript menu detection: {js_error}") + + # 5. Last resort: check if any positioning styles were applied to elements + # This can detect menu containers that have been positioned absolutely + try: + position_check = """ + return (function() { + // Look for absolutely positioned elements that appeared recently + var elements = document.querySelectorAll('div[style*="position: absolute"]'); + for (var i = 0; i < elements.length; i++) { + var el = elements[i]; + var style = window.getComputedStyle(el); + var hasMenuItems = el.querySelectorAll('div[role="menuitemradio"], div.fxNQSd').length > 0; + + if (style.display !== 'none' && style.visibility !== 'hidden' && hasMenuItems) { + return true; + } + } + return false; + })(); + """ + position_detected = driver.execute_script(position_check) + if position_detected: + return True + except: + pass + + return False + + except Exception as e: + log.debug(f"Error checking menu state: {e}") + return False + + def scrape(self): + """Main scraper method""" + start_time = time.time() + + url = self.config.get("url") + headless = self.config.get("headless", True) + sort_by = self.config.get("sort_by", "relevance") + stop_on_match = self.config.get("stop_on_match", False) + + log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") + log.info(f"URL: {url}") + + # Initialize storage + # If not overwriting, load existing data + if self.overwrite_existing: + docs = {} + seen = set() + else: + # Try to get from MongoDB first if enabled + docs = {} + if self.use_mongodb and self.mongodb: + docs = self.mongodb.fetch_existing_reviews() + + # If backup_to_json is enabled, also load from JSON for merging + if self.backup_to_json: + json_docs = self.json_storage.load_json_docs() + # Merge JSON docs with MongoDB docs + for review_id, review in json_docs.items(): + if review_id not in docs: + docs[review_id] = review + + # Load seen IDs from file + seen = self.json_storage.load_seen() + + driver = None + try: + driver = self.setup_driver(headless) + wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout + + driver.get(url) + wait.until(lambda d: "google.com/maps" in d.current_url) + + self.dismiss_cookies(driver) + self.click_reviews_tab(driver) + self.set_sort(driver, sort_by) + + # Add a wait after setting sort to allow results to load + time.sleep(1) + + # Use try-except to handle cases where the pane is not found + try: + pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) + except TimeoutException: + log.warning("Could not find reviews pane. Page structure might have changed.") + return False + + pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) + idle = 0 + processed_ids = set() # Track processed IDs in current session + + # Prefetch selector to avoid repeated lookups + try: + driver.execute_script("window.scrollablePane = arguments[0];", pane) + scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" + except Exception as e: + log.warning(f"Error setting up scroll script: {e}") + scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling + + max_attempts = 10 # Limit the number of attempts to find reviews + attempts = 0 + + while attempts < max_attempts: + try: + cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) + fresh_cards: List[WebElement] = [] + + # Check for valid cards + if len(cards) == 0: + log.debug("No review cards found in this iteration") + attempts += 1 + # Try scrolling anyway + driver.execute_script(scroll_script) + time.sleep(1) + continue + + for c in cards: + try: + cid = c.get_attribute("data-review-id") + if not cid or cid in seen or cid in processed_ids: + if stop_on_match and cid and (cid in seen or cid in processed_ids): + idle = 999 + break + continue + fresh_cards.append(c) + except StaleElementReferenceException: + continue + except Exception as e: + log.debug(f"Error getting review ID: {e}") + continue + + for card in fresh_cards: + try: + raw = RawReview.from_card(card) + processed_ids.add(raw.id) # Track this ID to avoid re-processing + except StaleElementReferenceException: + continue + except Exception: + log.warning("⚠️ parse error – storing stub\n%s", + traceback.format_exc(limit=1).strip()) + try: + raw_id = card.get_attribute("data-review-id") or "" + raw = RawReview(id=raw_id, text="", lang="und") + processed_ids.add(raw_id) + except StaleElementReferenceException: + continue + + docs[raw.id] = merge_review(docs.get(raw.id), raw) + seen.add(raw.id) + pbar.update(1) + idle = 0 + attempts = 0 # Reset attempts counter when we successfully process a review + + if idle >= 3: + break + + if not fresh_cards: + idle += 1 + attempts += 1 + + # Use JavaScript for smoother scrolling + try: + driver.execute_script(scroll_script) + except Exception as e: + log.warning(f"Error scrolling: {e}") + # Try a simpler scroll method + driver.execute_script("window.scrollBy(0, 300);") + + # Dynamic sleep: sleep less when processing many reviews + sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 + time.sleep(sleep_time) + + except StaleElementReferenceException: + # The pane or other element went stale, try to re-find + log.debug("Stale element encountered, re-finding elements") + try: + pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) + driver.execute_script("window.scrollablePane = arguments[0];", pane) + except Exception: + log.warning("Could not re-find reviews pane after stale element") + break + except Exception as e: + log.warning(f"Error during review processing: {e}") + attempts += 1 + time.sleep(1) + + pbar.close() + + # Save to MongoDB if enabled + if self.use_mongodb and self.mongodb: + log.info("Saving reviews to MongoDB...") + self.mongodb.save_reviews(docs) + + # Backup to JSON if enabled + if self.backup_to_json: + log.info("Backing up to JSON...") + self.json_storage.save_json_docs(docs) + self.json_storage.save_seen(seen) + + log.info("✅ Finished – total unique reviews: %s", len(docs)) + + end_time = time.time() + elapsed_time = end_time - start_time + log.info(f"Execution completed in {elapsed_time:.2f} seconds") + + return True + + except Exception as e: + log.error(f"Error during scraping: {e}") + log.error(traceback.format_exc()) + return False + + finally: + if driver is not None: + try: + driver.quit() + except Exception: + pass + + if self.mongodb: + try: + self.mongodb.close() + except Exception: + pass + +# """ +# Selenium scraping logic for Google Maps Reviews. +# """ +# +# import os +# import time +# import logging +# import traceback +# import platform +# from typing import Dict, Any, List +# +# import undetected_chromedriver as uc +# from selenium.common.exceptions import TimeoutException, StaleElementReferenceException +# from selenium.webdriver import Chrome +# from selenium.webdriver.common.by import By +# from selenium.webdriver.remote.webelement import WebElement +# from selenium.webdriver.support import expected_conditions as EC +# from selenium.webdriver.support.ui import WebDriverWait +# from tqdm import tqdm +# +# from modules.models import RawReview +# from modules.data_storage import MongoDBStorage, JSONStorage, merge_review +# +# # Logger +# log = logging.getLogger("scraper") +# +# # CSS Selectors +# PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' +# CARD_SEL = "div[data-review-id]" +# COOKIE_BTN = ('button[aria-label*="Accept" i],' +# 'button[jsname="hZCF7e"],' +# 'button[data-mdc-dialog-action="accept"]') +# SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' +# MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' +# +# SORT_LABELS = { # text shown in Google Maps' menu +# "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), +# "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), +# "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), +# "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), +# } +# +# REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", +# "recensioni", "bewertungen", "口コミ", "レビュー", +# "리뷰", "評論", "评论", "рецензии", "ביקורת"} +# +# +# class GoogleReviewsScraper: +# """Main scraper class for Google Maps reviews""" +# +# def __init__(self, config: Dict[str, Any]): +# """Initialize scraper with configuration""" +# self.config = config +# self.use_mongodb = config.get("use_mongodb", True) +# self.mongodb = MongoDBStorage(config) if self.use_mongodb else None +# self.json_storage = JSONStorage(config) +# self.backup_to_json = config.get("backup_to_json", True) +# self.overwrite_existing = config.get("overwrite_existing", False) +# +# def setup_driver(self, headless: bool) -> Chrome: +# """ +# Set up and configure Chrome driver with flexibility for different environments. +# Works in both Docker containers and on regular OS installations (Windows, Mac, Linux). +# """ +# # Determine if we're running in a container +# in_container = os.environ.get('CHROME_BIN') is not None +# +# # Create Chrome options +# opts = uc.ChromeOptions() +# opts.add_argument("--window-size=1400,900") +# opts.add_argument("--ignore-certificate-errors") +# opts.add_argument("--disable-gpu") # Improves performance +# opts.add_argument("--disable-dev-shm-usage") # Helps with stability +# opts.add_argument("--no-sandbox") # More stable in some environments +# +# # Use headless mode if requested +# if headless: +# opts.add_argument("--headless=new") +# +# # Log platform information for debugging +# log.info(f"Platform: {platform.platform()}") +# log.info(f"Python version: {platform.python_version()}") +# +# # If in container, use environment-provided binaries +# if in_container: +# chrome_binary = os.environ.get('CHROME_BIN') +# chromedriver_path = os.environ.get('CHROMEDRIVER_PATH') +# +# log.info(f"Container environment detected") +# log.info(f"Chrome binary: {chrome_binary}") +# log.info(f"ChromeDriver path: {chromedriver_path}") +# +# if chrome_binary and os.path.exists(chrome_binary): +# log.info(f"Using Chrome binary from environment: {chrome_binary}") +# opts.binary_location = chrome_binary +# +# try: +# # Try creating Chrome driver with undetected_chromedriver +# log.info("Attempting to create undetected_chromedriver instance") +# driver = uc.Chrome(options=opts) +# log.info("Successfully created undetected_chromedriver instance") +# except Exception as e: +# # Fall back to regular Selenium if undetected_chromedriver fails +# log.warning(f"Failed to create undetected_chromedriver instance: {e}") +# log.info("Falling back to regular Selenium Chrome") +# +# # Import Selenium webdriver here to avoid potential import issues +# from selenium import webdriver +# from selenium.webdriver.chrome.service import Service +# +# if chromedriver_path and os.path.exists(chromedriver_path): +# log.info(f"Using ChromeDriver from path: {chromedriver_path}") +# service = Service(executable_path=chromedriver_path) +# driver = webdriver.Chrome(service=service, options=opts) +# else: +# log.info("Using default ChromeDriver") +# driver = webdriver.Chrome(options=opts) +# else: +# # On regular OS, use default undetected_chromedriver +# log.info("Using standard undetected_chromedriver setup") +# driver = uc.Chrome(options=opts) +# +# # Set page load timeout to avoid hanging +# driver.set_page_load_timeout(30) +# log.info("Chrome driver setup completed successfully") +# return driver +# +# def dismiss_cookies(self, driver: Chrome): +# """ +# Dismiss cookie consent dialogs if present. +# Handles stale element references by re-finding elements if needed. +# """ +# try: +# # Use WebDriverWait with expected_conditions to handle stale elements +# WebDriverWait(driver, 3).until( +# EC.presence_of_element_located((By.CSS_SELECTOR, COOKIE_BTN)) +# ) +# log.info("Cookie consent dialog found, attempting to dismiss") +# +# # Get elements again after waiting to avoid stale references +# elements = driver.find_elements(By.CSS_SELECTOR, COOKIE_BTN) +# for elem in elements: +# try: +# if elem.is_displayed(): +# elem.click() +# log.info("Cookie dialog dismissed") +# return True +# except Exception as e: +# log.debug(f"Error clicking cookie button: {e}") +# continue +# except TimeoutException: +# # This is expected if no cookie dialog is present +# log.debug("No cookie consent dialog detected") +# except Exception as e: +# log.debug(f"Error handling cookie dialog: {e}") +# +# return False +# +# def is_reviews_tab(self, tab: WebElement) -> bool: +# """Check if a tab is the reviews tab""" +# try: +# label = (tab.get_attribute("aria-label") or tab.text or "").lower() +# return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) +# except StaleElementReferenceException: +# return False +# except Exception as e: +# log.debug(f"Error checking if tab is reviews tab: {e}") +# return False +# +# def click_reviews_tab(self, driver: Chrome): +# """ +# Click on the reviews tab in Google Maps with improved stale element handling. +# """ +# end = time.time() + 15 # Timeout after 15 seconds +# while time.time() < end: +# try: +# # Find all tab elements +# tabs = driver.find_elements(By.CSS_SELECTOR, '[role="tab"], button[aria-label]') +# +# for tab in tabs: +# try: +# # Check if this is the reviews tab +# label = (tab.get_attribute("aria-label") or tab.text or "").lower() +# is_review_tab = tab.get_attribute("data-tab-index") == "1" or any( +# w in label for w in REVIEW_WORDS) +# +# if is_review_tab: +# # Scroll the tab into view +# driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) +# time.sleep(0.2) # Small wait after scrolling +# +# # Try to click the tab +# log.info("Found reviews tab, attempting to click") +# tab.click() +# log.info("Successfully clicked reviews tab") +# return True +# except Exception as e: +# # Element might be stale or not clickable, try the next one +# log.debug(f"Error with tab element: {str(e)}") +# continue +# +# # If we get here, we didn't find a suitable tab in this iteration +# log.debug("No reviews tab found in this iteration, waiting...") +# time.sleep(0.5) # Wait before next attempt +# +# except Exception as e: +# # General exception handling +# log.debug(f"Exception while looking for reviews tab: {str(e)}") +# time.sleep(0.5) +# +# # If we exit the loop, we've timed out +# log.warning("Timeout while looking for reviews tab") +# raise TimeoutException("Reviews tab not found") +# +# def set_sort(self, driver: Chrome, method: str): +# """ +# Set the sorting method for reviews with improved error handling. +# """ +# if method == "relevance": +# return True # Default order, no need to change +# +# log.info(f"Attempting to set sort order to '{method}'") +# +# try: +# # First try to find and click the sort button +# sort_buttons = driver.find_elements(By.CSS_SELECTOR, SORT_BTN) +# if not sort_buttons: +# log.warning(f"Sort button not found - keeping default sort order") +# return False +# +# # Try to click the first visible sort button +# for sort_button in sort_buttons: +# try: +# if sort_button.is_displayed() and sort_button.is_enabled(): +# sort_button.click() +# log.info("Clicked sort button") +# time.sleep(0.5) # Wait for menu to appear +# break +# except Exception as e: +# log.debug(f"Error clicking sort button: {e}") +# continue +# else: +# log.warning("No clickable sort button found") +# return False +# +# # Now find and click the menu item for the desired sort method +# wanted = SORT_LABELS[method] +# menu_items = WebDriverWait(driver, 3).until( +# EC.presence_of_all_elements_located((By.CSS_SELECTOR, MENU_ITEMS)) +# ) +# +# for item in menu_items: +# try: +# label = item.text.strip() +# if label in wanted: +# item.click() +# log.info(f"Selected sort option: {label}") +# time.sleep(0.5) # Wait for sorting to take effect +# return True +# except Exception as e: +# log.debug(f"Error clicking menu item: {e}") +# continue +# +# log.warning(f"Sort option '{method}' not found in menu - keeping default") +# return False +# +# except Exception as e: +# log.warning(f"Error setting sort order: {e}") +# return False +# +# def scrape(self): +# """Main scraper method""" +# start_time = time.time() +# +# url = self.config.get("url") +# headless = self.config.get("headless", True) +# sort_by = self.config.get("sort_by", "relevance") +# stop_on_match = self.config.get("stop_on_match", False) +# +# log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") +# log.info(f"URL: {url}") +# +# # Initialize storage +# # If not overwriting, load existing data +# if self.overwrite_existing: +# docs = {} +# seen = set() +# else: +# # Try to get from MongoDB first if enabled +# docs = {} +# if self.use_mongodb and self.mongodb: +# docs = self.mongodb.fetch_existing_reviews() +# +# # If backup_to_json is enabled, also load from JSON for merging +# if self.backup_to_json: +# json_docs = self.json_storage.load_json_docs() +# # Merge JSON docs with MongoDB docs +# for review_id, review in json_docs.items(): +# if review_id not in docs: +# docs[review_id] = review +# +# # Load seen IDs from file +# seen = self.json_storage.load_seen() +# +# driver = None +# try: +# driver = self.setup_driver(headless) +# wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout +# +# driver.get(url) +# wait.until(lambda d: "google.com/maps" in d.current_url) +# +# self.dismiss_cookies(driver) +# self.click_reviews_tab(driver) +# self.set_sort(driver, sort_by) +# +# # Add a wait after setting sort to allow results to load +# time.sleep(1) +# +# # Use try-except to handle cases where the pane is not found +# try: +# pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) +# except TimeoutException: +# log.warning("Could not find reviews pane. Page structure might have changed.") +# return False +# +# pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) +# idle = 0 +# processed_ids = set() # Track processed IDs in current session +# +# # Prefetch selector to avoid repeated lookups +# try: +# driver.execute_script("window.scrollablePane = arguments[0];", pane) +# scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" +# except Exception as e: +# log.warning(f"Error setting up scroll script: {e}") +# scroll_script = "window.scrollBy(0, 300);" # Fallback to simple scrolling +# +# max_attempts = 10 # Limit the number of attempts to find reviews +# attempts = 0 +# +# while attempts < max_attempts: +# try: +# cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) +# fresh_cards: List[WebElement] = [] +# +# # Check for valid cards +# if len(cards) == 0: +# log.debug("No review cards found in this iteration") +# attempts += 1 +# # Try scrolling anyway +# driver.execute_script(scroll_script) +# time.sleep(1) +# continue +# +# for c in cards: +# try: +# cid = c.get_attribute("data-review-id") +# if not cid or cid in seen or cid in processed_ids: +# if stop_on_match and cid and (cid in seen or cid in processed_ids): +# idle = 999 +# break +# continue +# fresh_cards.append(c) +# except StaleElementReferenceException: +# continue +# except Exception as e: +# log.debug(f"Error getting review ID: {e}") +# continue +# +# for card in fresh_cards: +# try: +# raw = RawReview.from_card(card) +# processed_ids.add(raw.id) # Track this ID to avoid re-processing +# except StaleElementReferenceException: +# continue +# except Exception: +# log.warning("⚠️ parse error – storing stub\n%s", +# traceback.format_exc(limit=1).strip()) +# try: +# raw_id = card.get_attribute("data-review-id") or "" +# raw = RawReview(id=raw_id, text="", lang="und") +# processed_ids.add(raw_id) +# except StaleElementReferenceException: +# continue +# +# docs[raw.id] = merge_review(docs.get(raw.id), raw) +# seen.add(raw.id) +# pbar.update(1) +# idle = 0 +# attempts = 0 # Reset attempts counter when we successfully process a review +# +# if idle >= 3: +# break +# +# if not fresh_cards: +# idle += 1 +# attempts += 1 +# +# # Use JavaScript for smoother scrolling +# try: +# driver.execute_script(scroll_script) +# except Exception as e: +# log.warning(f"Error scrolling: {e}") +# # Try a simpler scroll method +# driver.execute_script("window.scrollBy(0, 300);") +# +# # Dynamic sleep: sleep less when processing many reviews +# sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 +# time.sleep(sleep_time) +# +# except StaleElementReferenceException: +# # The pane or other element went stale, try to re-find +# log.debug("Stale element encountered, re-finding elements") +# try: +# pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) +# driver.execute_script("window.scrollablePane = arguments[0];", pane) +# except Exception: +# log.warning("Could not re-find reviews pane after stale element") +# break +# except Exception as e: +# log.warning(f"Error during review processing: {e}") +# attempts += 1 +# time.sleep(1) +# +# pbar.close() +# +# # Save to MongoDB if enabled +# if self.use_mongodb and self.mongodb: +# log.info("Saving reviews to MongoDB...") +# self.mongodb.save_reviews(docs) +# +# # Backup to JSON if enabled +# if self.backup_to_json: +# log.info("Backing up to JSON...") +# self.json_storage.save_json_docs(docs) +# self.json_storage.save_seen(seen) +# +# log.info("✅ Finished – total unique reviews: %s", len(docs)) +# +# end_time = time.time() +# elapsed_time = end_time - start_time +# log.info(f"Execution completed in {elapsed_time:.2f} seconds") +# +# return True +# +# except Exception as e: +# log.error(f"Error during scraping: {e}") +# log.error(traceback.format_exc()) +# return False +# +# finally: +# if driver is not None: +# try: +# driver.quit() +# except Exception: +# pass +# +# if self.mongodb: +# try: +# self.mongodb.close() +# except Exception: +# pass +# +# # """ +# # Selenium scraping logic for Google Maps Reviews. +# # """ +# # +# # import re +# # import time +# # import logging +# # import traceback +# # from typing import Dict, Any, Set, List +# # +# # import undetected_chromedriver as uc +# # from selenium.common.exceptions import TimeoutException +# # from selenium.webdriver import Chrome +# # from selenium.webdriver.common.by import By +# # from selenium.webdriver.remote.webelement import WebElement +# # from selenium.webdriver.support import expected_conditions as EC +# # from selenium.webdriver.support.ui import WebDriverWait +# # from tqdm import tqdm +# # +# # from modules.models import RawReview +# # from modules.data_storage import MongoDBStorage, JSONStorage, merge_review +# # from modules.utils import click_if +# # +# # # Logger +# # log = logging.getLogger("scraper") +# # +# # # CSS Selectors +# # PANE_SEL = 'div[role="main"] div.m6QErb.DxyBCb.kA9KIf.dS8AEf' +# # CARD_SEL = "div[data-review-id]" +# # COOKIE_BTN = ('button[aria-label*="Accept" i],' +# # 'button[jsname="hZCF7e"],' +# # 'button[data-mdc-dialog-action="accept"]') +# # SORT_BTN = 'button[aria-label="Sort reviews" i], button[aria-label="Sort" i]' +# # MENU_ITEMS = 'div[role="menu"] [role="menuitem"], li[role="menuitem"]' +# # +# # SORT_LABELS = { # text shown in Google Maps' menu +# # "newest": ("Newest", "החדשות ביותר", "ใหม่ที่สุด"), +# # "highest": ("Highest rating", "הדירוג הגבוה ביותר", "คะแนนสูงสุด"), +# # "lowest": ("Lowest rating", "הדירוג הנמוך ביותר", "คะแนนต่ำสุด"), +# # "relevance": ("Most relevant", "רלוונטיות ביותר", "เกี่ยวข้องมากที่สุด"), +# # } +# # +# # REVIEW_WORDS = {"reviews", "review", "ביקורות", "รีวิว", "avis", "reseñas", +# # "recensioni", "bewertungen", "口コミ", "レビュー", +# # "리뷰", "評論", "评论", "рецензии"} +# # +# # +# # class GoogleReviewsScraper: +# # """Main scraper class for Google Maps reviews""" +# # +# # def __init__(self, config: Dict[str, Any]): +# # """Initialize scraper with configuration""" +# # self.config = config +# # self.use_mongodb = config.get("use_mongodb", True) +# # self.mongodb = MongoDBStorage(config) if self.use_mongodb else None +# # self.json_storage = JSONStorage(config) +# # self.backup_to_json = config.get("backup_to_json", True) +# # self.overwrite_existing = config.get("overwrite_existing", False) +# # +# # def setup_driver(self, headless: bool) -> Chrome: +# # """Set up and configure Chrome driver""" +# # opts = uc.ChromeOptions() +# # opts.add_argument("--window-size=1400,900") +# # opts.add_argument("--ignore-certificate-errors") +# # opts.add_argument("--disable-gpu") # Improves performance +# # opts.add_argument("--disable-dev-shm-usage") # Helps with stability +# # opts.add_argument("--no-sandbox") # More stable in some environments +# # +# # if headless: +# # opts.add_argument("--headless=new") +# # +# # driver = uc.Chrome(options=opts) +# # # Set page load timeout to avoid hanging +# # driver.set_page_load_timeout(30) +# # return driver +# # +# # def dismiss_cookies(self, driver: Chrome): +# # """Dismiss cookie consent dialogs""" +# # click_if(driver, COOKIE_BTN, timeout=3.0) # Reduced timeout for faster operation +# # +# # def is_reviews_tab(self, tab: WebElement) -> bool: +# # """Check if a tab is the reviews tab""" +# # label = (tab.get_attribute("aria-label") or tab.text or "").lower() +# # return tab.get_attribute("data-tab-index") == "1" or any(w in label for w in REVIEW_WORDS) +# # +# # def click_reviews_tab(self, driver: Chrome): +# # """Click on the reviews tab in Google Maps""" +# # end = time.time() + 15 # Reduced timeout from 30 to 15 seconds +# # while time.time() < end: +# # for tab in driver.find_elements(By.CSS_SELECTOR, +# # '[role="tab"], button[aria-label]'): +# # if self.is_reviews_tab(tab): +# # driver.execute_script("arguments[0].scrollIntoView({block:\"center\"});", tab) +# # try: +# # tab.click() +# # return +# # except Exception: +# # continue +# # time.sleep(.2) # Reduced sleep time from 0.4 to 0.2 +# # raise TimeoutException("Reviews tab not found") +# # +# # def set_sort(self, driver: Chrome, method: str): +# # """Set the sorting method for reviews""" +# # if method == "relevance": +# # return # default order +# # if not click_if(driver, SORT_BTN): +# # return +# # +# # wanted = SORT_LABELS[method] +# # +# # for item in driver.find_elements(By.CSS_SELECTOR, MENU_ITEMS): +# # label = item.text.strip() +# # if label in wanted: +# # item.click() +# # time.sleep(0.5) # Reduced wait time from 1.0 to 0.5 +# # return +# # log.warning("⚠️ sort option %s not found – keeping default", method) +# # +# # def scrape(self): +# # """Main scraper method""" +# # start_time = time.time() +# # +# # url = self.config.get("url") +# # headless = self.config.get("headless", True) +# # sort_by = self.config.get("sort_by", "relevance") +# # stop_on_match = self.config.get("stop_on_match", False) +# # +# # log.info(f"Starting scraper with settings: headless={headless}, sort_by={sort_by}") +# # log.info(f"URL: {url}") +# # +# # # Initialize storage +# # # If not overwriting, load existing data +# # if self.overwrite_existing: +# # docs = {} +# # seen = set() +# # else: +# # # Try to get from MongoDB first if enabled +# # docs = {} +# # if self.use_mongodb and self.mongodb: +# # docs = self.mongodb.fetch_existing_reviews() +# # +# # # If backup_to_json is enabled, also load from JSON for merging +# # if self.backup_to_json: +# # json_docs = self.json_storage.load_json_docs() +# # # Merge JSON docs with MongoDB docs +# # for review_id, review in json_docs.items(): +# # if review_id not in docs: +# # docs[review_id] = review +# # +# # # Load seen IDs from file +# # seen = self.json_storage.load_seen() +# # +# # driver = self.setup_driver(headless) +# # wait = WebDriverWait(driver, 20) # Reduced from 40 to 20 for faster timeout +# # +# # try: +# # driver.get(url) +# # wait.until(lambda d: "google.com/maps" in d.current_url) +# # +# # self.dismiss_cookies(driver) +# # self.click_reviews_tab(driver) +# # self.set_sort(driver, sort_by) +# # +# # pane = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, PANE_SEL))) +# # pbar = tqdm(desc="Scraped", ncols=80, initial=len(seen)) +# # idle = 0 +# # processed_ids = set() # Track processed IDs in current session +# # +# # # Prefetch selector to avoid repeated lookups +# # driver.execute_script("window.scrollablePane = arguments[0];", pane) +# # scroll_script = "window.scrollablePane.scrollBy(0, window.scrollablePane.scrollHeight);" +# # +# # while True: +# # cards = pane.find_elements(By.CSS_SELECTOR, CARD_SEL) +# # fresh_cards: List[WebElement] = [] +# # +# # for c in cards: +# # cid = c.get_attribute("data-review-id") +# # if cid in seen or cid in processed_ids: +# # if stop_on_match: +# # idle = 999 +# # break +# # continue +# # fresh_cards.append(c) +# # +# # for card in fresh_cards: +# # try: +# # raw = RawReview.from_card(card) +# # processed_ids.add(raw.id) # Track this ID to avoid re-processing +# # except Exception: +# # log.warning("⚠️ parse error – storing stub\n%s", +# # traceback.format_exc(limit=1).strip()) +# # raw_id = card.get_attribute("data-review-id") or "" +# # raw = RawReview(id=raw_id, text="", lang="und") +# # processed_ids.add(raw_id) +# # +# # docs[raw.id] = merge_review(docs.get(raw.id), raw) +# # seen.add(raw.id) +# # pbar.update(1) +# # idle = 0 +# # +# # if idle >= 3: +# # break +# # +# # if not fresh_cards: +# # idle += 1 +# # +# # # Use JavaScript for smoother scrolling +# # driver.execute_script(scroll_script) +# # +# # # Dynamic sleep: sleep less when processing many reviews +# # sleep_time = 0.7 if len(fresh_cards) > 5 else 1.0 +# # time.sleep(sleep_time) +# # +# # pbar.close() +# # +# # # Save to MongoDB if enabled +# # if self.use_mongodb and self.mongodb: +# # log.info("Saving reviews to MongoDB...") +# # self.mongodb.save_reviews(docs) +# # +# # # Backup to JSON if enabled +# # if self.backup_to_json: +# # log.info("Backing up to JSON...") +# # self.json_storage.save_json_docs(docs) +# # self.json_storage.save_seen(seen) +# # +# # log.info("✅ Finished – total unique reviews: %s", len(docs)) +# # +# # end_time = time.time() +# # elapsed_time = end_time - start_time +# # log.info(f"Execution completed in {elapsed_time:.2f} seconds") +# # +# # finally: +# # driver.quit() +# # if self.mongodb: +# # self.mongodb.close() diff --git a/modules/utils.py b/modules/utils.py new file mode 100644 index 0000000..35f772b --- /dev/null +++ b/modules/utils.py @@ -0,0 +1,307 @@ +""" +Utility functions for Google Maps Reviews Scraper. +""" +import datetime +import logging +import re +import time +from datetime import timezone +from functools import lru_cache +from typing import List + +from selenium.common.exceptions import (NoSuchElementException, + StaleElementReferenceException, + TimeoutException) +from selenium.webdriver import Chrome +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait + +# Logger +log = logging.getLogger("scraper") + +# Constants for language detection +HEB_CHARS = re.compile(r"[\u0590-\u05FF]") +THAI_CHARS = re.compile(r"[\u0E00-\u0E7F]") + + +@lru_cache(maxsize=1024) +def detect_lang(txt: str) -> str: + """Detect language based on character sets""" + if HEB_CHARS.search(txt): return "he" + if THAI_CHARS.search(txt): return "th" + return "en" + + +@lru_cache(maxsize=128) +def safe_int(s: str | None) -> int: + """Safely convert string to integer, returning 0 if not possible""" + m = re.search(r"\d+", s or "") + return int(m.group()) if m else 0 + + +def try_find(el: WebElement, css: str, *, all=False) -> List[WebElement]: + """Safely find elements by CSS selector without raising exceptions""" + try: + if all: + return el.find_elements(By.CSS_SELECTOR, css) + obj = el.find_element(By.CSS_SELECTOR, css) + return [obj] if obj else [] + except (NoSuchElementException, StaleElementReferenceException): + return [] + + +def first_text(el: WebElement, css: str) -> str: + """Get text from the first matching element that has non-empty text""" + for e in try_find(el, css, all=True): + try: + if (t := e.text.strip()): + return t + except StaleElementReferenceException: + continue + return "" + + +def parse_date_to_iso(date_str: str) -> str: + """ + Parse date strings like "2 weeks ago", "January 2023", etc. into ISO format. + Returns a best-effort ISO string, or empty string if parsing fails. + """ + if not date_str: + return "" + + try: + now = datetime.now(timezone.utc) + + # Handle relative dates + if "ago" in date_str.lower(): + # For simplicity, map to approximate dates + if "minute" in date_str.lower(): + minutes = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + dt = now.replace(microsecond=0) - timezone.timedelta(minutes=minutes) + elif "hour" in date_str.lower(): + hours = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + dt = now.replace(microsecond=0) - timezone.timedelta(hours=hours) + elif "day" in date_str.lower(): + days = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + dt = now.replace(microsecond=0) - timezone.timedelta(days=days) + elif "week" in date_str.lower(): + weeks = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + dt = now.replace(microsecond=0) - timezone.timedelta(weeks=weeks) + elif "month" in date_str.lower(): + months = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + # Approximate months as 30 days + dt = now.replace(microsecond=0) - timezone.timedelta(days=30 * months) + elif "year" in date_str.lower(): + years = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 + # Approximate years as 365 days + dt = now.replace(microsecond=0) - timezone.timedelta(days=365 * years) + else: + # Default to current time if can't parse + dt = now.replace(microsecond=0) + else: + # Handle absolute dates (month year format) + # This is a simplification - would need more robust parsing for production + dt = now.replace(microsecond=0) + + return dt.isoformat() + except Exception: + # If parsing fails, return empty string + return "" + + +def first_attr(el: WebElement, css: str, attr: str) -> str: + """Get attribute value from the first matching element that has a non-empty value""" + for e in try_find(el, css, all=True): + try: + if (v := (e.get_attribute(attr) or "").strip()): + return v + except StaleElementReferenceException: + continue + return "" + + +def click_if(driver: Chrome, css: str, delay: float = .25, timeout: float = 5.0) -> bool: + """ + Click element if it exists and is clickable, with timeout and better error handling. + + Args: + driver: WebDriver instance + css: CSS selector for the element to click + delay: Time to wait after clicking (seconds) + timeout: Maximum time to wait for element (seconds) + + Returns: + True if element was found and clicked, False otherwise + """ + try: + # First check if elements exist at all + elements = driver.find_elements(By.CSS_SELECTOR, css) + if not elements: + return False + + # Try clicking the first visible element + for element in elements: + try: + if element.is_displayed() and element.is_enabled(): + element.click() + time.sleep(delay) + return True + except Exception: + # Try next element if this one fails + continue + + # If we couldn't click any of the direct elements, try with WebDriverWait + try: + WebDriverWait(driver, timeout).until( + EC.element_to_be_clickable((By.CSS_SELECTOR, css)) + ).click() + time.sleep(delay) + return True + except TimeoutException: + return False + + except Exception as e: + log.debug(f"Error in click_if: {str(e)}") + return False + + +def get_current_iso_date() -> str: + """Return current UTC time in ISO format.""" + from datetime import datetime, timezone + return datetime.now(timezone.utc).isoformat() + +# """ +# Utility functions for Google Maps Reviews Scraper. +# """ +# +# import re +# import time +# import logging +# from datetime import datetime, timezone +# from functools import lru_cache +# from typing import List, Optional +# +# from selenium.common.exceptions import (NoSuchElementException, +# StaleElementReferenceException, +# TimeoutException) +# from selenium.webdriver import Chrome +# from selenium.webdriver.common.by import By +# from selenium.webdriver.remote.webelement import WebElement +# from selenium.webdriver.support import expected_conditions as EC +# from selenium.webdriver.support.ui import WebDriverWait +# +# # Constants for language detection +# HEB_CHARS = re.compile(r"[\u0590-\u05FF]") +# THAI_CHARS = re.compile(r"[\u0E00-\u0E7F]") +# +# # Logger +# log = logging.getLogger("scraper") +# +# +# @lru_cache(maxsize=1024) +# def detect_lang(txt: str) -> str: +# """Detect language based on character sets""" +# if HEB_CHARS.search(txt): return "he" +# if THAI_CHARS.search(txt): return "th" +# return "en" +# +# +# @lru_cache(maxsize=128) +# def safe_int(s: str | None) -> int: +# """Safely convert string to integer, returning 0 if not possible""" +# m = re.search(r"\d+", s or "") +# return int(m.group()) if m else 0 +# +# +# def try_find(el: WebElement, css: str, *, all=False) -> List[WebElement]: +# """Safely find elements by CSS selector without raising exceptions""" +# try: +# if all: +# return el.find_elements(By.CSS_SELECTOR, css) +# obj = el.find_element(By.CSS_SELECTOR, css) +# return [obj] if obj else [] +# except (NoSuchElementException, StaleElementReferenceException): +# return [] +# +# +# def first_text(el: WebElement, css: str) -> str: +# """Get text from the first matching element that has non-empty text""" +# for e in try_find(el, css, all=True): +# if (t := e.text.strip()): +# return t +# return "" +# +# +# def first_attr(el: WebElement, css: str, attr: str) -> str: +# """Get attribute value from the first matching element that has a non-empty value""" +# for e in try_find(el, css, all=True): +# if (v := (e.get_attribute(attr) or "").strip()): +# return v +# return "" +# +# +# def click_if(driver: Chrome, css: str, delay: float = .25, timeout: float = 5.0) -> bool: +# """Click element if it exists and is clickable, with timeout""" +# try: +# WebDriverWait(driver, timeout).until( +# EC.element_to_be_clickable((By.CSS_SELECTOR, css)) +# ).click() +# time.sleep(delay) +# return True +# except TimeoutException: +# return False +# +# +# def parse_date_to_iso(date_str: str) -> str: +# """ +# Parse date strings like "2 weeks ago", "January 2023", etc. into ISO format. +# Returns a best-effort ISO string, or empty string if parsing fails. +# """ +# if not date_str: +# return "" +# +# try: +# now = datetime.now(timezone.utc) +# +# # Handle relative dates +# if "ago" in date_str.lower(): +# # For simplicity, map to approximate dates +# if "minute" in date_str.lower(): +# minutes = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# dt = now.replace(microsecond=0) - timezone.timedelta(minutes=minutes) +# elif "hour" in date_str.lower(): +# hours = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# dt = now.replace(microsecond=0) - timezone.timedelta(hours=hours) +# elif "day" in date_str.lower(): +# days = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# dt = now.replace(microsecond=0) - timezone.timedelta(days=days) +# elif "week" in date_str.lower(): +# weeks = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# dt = now.replace(microsecond=0) - timezone.timedelta(weeks=weeks) +# elif "month" in date_str.lower(): +# months = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# # Approximate months as 30 days +# dt = now.replace(microsecond=0) - timezone.timedelta(days=30 * months) +# elif "year" in date_str.lower(): +# years = int(re.search(r'\d+', date_str).group()) if re.search(r'\d+', date_str) else 1 +# # Approximate years as 365 days +# dt = now.replace(microsecond=0) - timezone.timedelta(days=365 * years) +# else: +# # Default to current time if can't parse +# dt = now.replace(microsecond=0) +# else: +# # Handle absolute dates (month year format) +# # This is a simplification - would need more robust parsing for production +# dt = now.replace(microsecond=0) +# +# return dt.isoformat() +# except Exception: +# # If parsing fails, return empty string +# return "" +# +# +# def get_current_iso_date() -> str: +# """Return current UTC time in ISO format.""" +# return datetime.now(timezone.utc).isoformat() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..18c46cf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +requests==2.32.3 +beautifulsoup4==4.12.3 +aiohttp==3.11.11 +googletrans==4.0.2 +selenium==4.15.2 +undetected-chromedriver==3.5.4 +tqdm==4.66.3 +pymongo==4.12.0 +pyyaml==6.0.1 +certifi==2024.7.4 +webdriver-manager==4.0.2 +setuptools==79.0.1 \ No newline at end of file diff --git a/start.py b/start.py new file mode 100644 index 0000000..e0a070d --- /dev/null +++ b/start.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +""" +Google‑Maps review scraper with MongoDB integration +================================================= + +Main entry point for the scraper. +""" + +from modules.cli import parse_arguments +from modules.config import load_config +from modules.scraper import GoogleReviewsScraper + + +def main(): + """Main function to initialize and run the scraper""" + # Parse command line arguments + args = parse_arguments() + + # Load configuration + config = load_config(args.config) + + # Override config with command line arguments if provided + if args.headless: + config["headless"] = True + if args.sort_by is not None: + config["sort_by"] = args.sort_by + if args.stop_on_match: + config["stop_on_match"] = True + if args.url is not None: + config["url"] = args.url + if args.overwrite_existing: + config["overwrite_existing"] = True + if args.use_mongodb is not None: + config["use_mongodb"] = args.use_mongodb + + # Handle arguments for date conversion and image downloading + if args.convert_dates is not None: + config["convert_dates"] = args.convert_dates + if args.download_images is not None: + config["download_images"] = args.download_images + if args.image_dir is not None: + config["image_dir"] = args.image_dir + if args.download_threads is not None: + config["download_threads"] = args.download_threads + + # Handle arguments for local image paths and URL replacement + if args.store_local_paths is not None: + config["store_local_paths"] = args.store_local_paths + if args.replace_urls is not None: + config["replace_urls"] = args.replace_urls + if args.custom_url_base is not None: + config["custom_url_base"] = args.custom_url_base + if args.custom_url_profiles is not None: + config["custom_url_profiles"] = args.custom_url_profiles + if args.custom_url_reviews is not None: + config["custom_url_reviews"] = args.custom_url_reviews + if args.preserve_original_urls is not None: + config["preserve_original_urls"] = args.preserve_original_urls + + # Handle custom parameters + if args.custom_params is not None: + if "custom_params" not in config: + config["custom_params"] = {} + # Update config with the provided custom parameters + config["custom_params"].update(args.custom_params) + + # Initialize and run scraper + scraper = GoogleReviewsScraper(config) + scraper.scrape() + + +if __name__ == "__main__": + main() diff --git a/terms-of-usage.md b/terms-of-usage.md new file mode 100644 index 0000000..738a51c --- /dev/null +++ b/terms-of-usage.md @@ -0,0 +1,73 @@ +# RECOMMENDED USAGE GUIDELINES - Google Reviews Scraper Pro + +## IMPORTANT NOTICE + +This software is distributed under the MIT License, which grants extensive freedom to users. The following guidelines are **recommendations only** and reflect best practices for ethical and lower-risk usage of the Google Reviews Scraper Pro software ("the Software"). These guidelines are not legally binding restrictions beyond what is already established in the MIT License. + +## 1. RECOMMENDED USAGE + +I strongly recommend limiting the use of this Software to: + +a) **Internal Business Use**: Businesses collecting and analyzing reviews specifically about their own business entities from Google Maps. + +b) **Self-Monitoring**: Using the data for monitoring your own online reputation, analyzing customer feedback, and improving your services. + +c) **Data Backup**: Creating backups of your own business reviews to protect against data loss. + +## 2. USAGE CAUTIONS + +I advise against the following uses that may carry higher legal or ethical risks: + +a) **Competitor Analysis**: Collecting reviews about competitors or other businesses that you do not own. + +b) **Mass Collection**: Collecting reviews from multiple businesses without authorization. + +c) **Republication**: Publishing collected reviews on other websites without proper attribution. + +d) **Deceptive Practices**: Using collected data for fake reviews or review manipulation. + +e) **Reselling Data**: Selling or commercially exploiting the collected review data. + +## 3. LEGAL CONSIDERATIONS + +While I cannot offer legal advice, I believe users should be aware: + +a) **Terms of Service**: Using web scraping tools may potentially conflict with Google's Terms of Service. Users should evaluate this risk independently. + +b) **Legal Context**: Web scraping exists in a complex legal landscape that varies by jurisdiction. What is permissible in one region may not be in another. + +c) **Privacy Regulations**: Review data may contain personal information subject to privacy laws such as GDPR, CCPA, and others. Users should ensure their data handling practices comply with applicable regulations. + +## 4. BEST PRACTICES + +To minimize potential issues, I suggest: + +a) **Reasonable Rate Limiting**: Implement appropriate delays between requests to avoid overloading servers. + +b) **Minimal Collection**: Only collect the data you genuinely need for legitimate purposes. + +c) **Attribution**: Maintain proper attribution to review authors and Google when using the collected data. + +d) **Data Security**: Implement appropriate security measures to protect any collected review data. + +e) **Consult Professionals**: When in doubt about the legality of your specific use case, consult with legal professionals familiar with digital law in your jurisdiction. + +## 5. REMINDER OF MIT LICENSE PROVISIONS + +This software is provided under the MIT License, which: + +a) **Permits**: Commercial use, modification, distribution, private use. + +b) **Requires**: Preservation of copyright and license notices. + +c) **Disclaims**: All warranties. The authors or copyright holders are not liable for any claim, damages, or other liability arising from the software or its use. + +## 6. FINAL NOTE + +These guidelines represent my recommendations for responsible use of the Software. They are not additional restrictions beyond the MIT License. While the MIT License grants you significant freedom in how you use the Software, I believe that following these guidelines promotes ethical use and helps users navigate the complex landscape of web scraping activities. + +By focusing on your own business's reviews and maintaining respectful practices, you can derive value from the Software while minimizing potential risks. + +--- + +_Last Updated: April 24, 2025_