#!/usr/bin/env python3 """ Quick CLI tool to test scrapers without the web frontend. Usage: # Test with a business name (will search Google Maps) python tools/test_scraper.py "ClickRent Gran Canaria" # Test with max reviews limit python tools/test_scraper.py "Starbucks NYC" --max 100 # Test with full URL python tools/test_scraper.py --url "https://www.google.com/maps/place/..." # Headless mode (no browser window) python tools/test_scraper.py "ClickRent" --headless # Verbose logging python tools/test_scraper.py "ClickRent" -v """ import sys import os import argparse import time import json from datetime import datetime # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def main(): parser = argparse.ArgumentParser( description='Test Google Reviews scraper from CLI', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument('query', nargs='?', help='Business name to search (e.g., "Starbucks NYC")') parser.add_argument('--url', '-u', help='Direct Google Maps URL (overrides query)') parser.add_argument('--max', '-m', type=int, default=500, help='Max reviews to scrape (default: 500)') parser.add_argument('--timeout', '-t', type=int, default=15, help='Timeout for no new reviews (default: 15s)') parser.add_argument('--headless', action='store_true', help='Run in headless mode (no browser window)') parser.add_argument('--output', '-o', help='Output JSON file (default: prints summary)') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose logging') args = parser.parse_args() if not args.query and not args.url: parser.error('Either query or --url is required') # Build URL if args.url: url = args.url else: from urllib.parse import quote url = f"https://www.google.com/maps/search/?api=1&query={quote(args.query)}&hl=en" print(f"\n{'='*60}") print(f"šŸ” SCRAPER TEST") print(f"{'='*60}") print(f"URL: {url}") print(f"Max reviews: {args.max}") print(f"Timeout: {args.timeout}s") print(f"Headless: {args.headless}") print(f"{'='*60}\n") # Import scraper from seleniumbase import Driver from scrapers.google_reviews.v1_0_0 import scrape_reviews, LogCapture # Set up log capture log_capture = LogCapture() # Track reviews for real-time progress reviews_collected = [] def progress_callback(current, total): if args.verbose: print(f" Progress: {current}/{total or '?'}") def flush_callback(reviews): reviews_collected.extend(reviews) print(f" šŸ“„ Flushed {len(reviews)} reviews (total: {len(reviews_collected)})") # Set up driver print("šŸš€ Starting browser...") driver = Driver(uc=True, headless=args.headless) driver.set_window_size(1200, 900) start_time = time.time() try: result = scrape_reviews( driver=driver, url=url, max_reviews=args.max, timeout_no_new=args.timeout, log_capture=log_capture, flush_callback=flush_callback, progress_callback=progress_callback, flush_batch_size=100 ) elapsed = time.time() - start_time # Combine flushed + remaining reviews all_reviews = reviews_collected + result.get('reviews', []) print(f"\n{'='*60}") print(f"āœ… SCRAPE COMPLETE") print(f"{'='*60}") print(f"Total reviews: {len(all_reviews)}") print(f"Time: {elapsed:.1f}s") if len(all_reviews) > 0 and elapsed > 0: print(f"Speed: {len(all_reviews)/elapsed:.1f} reviews/sec") print(f"Scrolls: {result.get('scrolls', 0)}") if result.get('error'): print(f"āš ļø Error: {result['error']}") # Show business info if available if result.get('business_name'): print(f"\nšŸ“ Business: {result['business_name']}") if result.get('business_address'): print(f" Address: {result['business_address']}") if result.get('total_reviews'): print(f" Total on Google: {result['total_reviews']}") # Show sample review if all_reviews: print(f"\nšŸ“ Sample review:") sample = all_reviews[0] print(f" Author: {sample.get('author', 'N/A')}") print(f" Rating: {'⭐' * sample.get('rating', 0)}") if sample.get('text'): text = sample['text'][:100] + '...' if len(sample.get('text', '')) > 100 else sample.get('text', '') print(f" Text: {text}") # Save output if requested if args.output: output_data = { 'timestamp': datetime.now().isoformat(), 'url': url, 'query': args.query, 'total_reviews': len(all_reviews), 'elapsed_seconds': elapsed, 'speed': len(all_reviews)/elapsed if elapsed > 0 else 0, 'business_name': result.get('business_name'), 'error': result.get('error'), 'reviews': all_reviews } with open(args.output, 'w') as f: json.dump(output_data, f, indent=2) print(f"\nšŸ’¾ Saved to: {args.output}") print(f"{'='*60}\n") return 0 if not result.get('error') else 1 except Exception as e: print(f"\nāŒ SCRAPE FAILED: {e}") import traceback traceback.print_exc() return 1 finally: print("šŸ›‘ Closing browser...") driver.quit() if __name__ == '__main__': sys.exit(main())