Files
whyrating-engine-legacy/test_concurrent_jobs.py
Alejandro Gutiérrez faa0704737 Optimize scraper performance and add fallback selectors for robustness
Performance improvements:
- Validation speed: 59.71s → 10.96s (5.5x improvement)
- Removed 50+ console.log statements from JavaScript extraction
- Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting
- Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls)

Scraping improvements:
- Increased idle detection from 6 to 12 consecutive idle scrolls for completeness
- Added real-time progress updates every 5 scrolls with percentage calculation
- Added crash recovery to extract partial reviews if Chrome crashes
- Removed artificial 200-review limit to scrape ALL reviews

Timestamp tracking:
- Added updated_at field separate from started_at for progress tracking
- Frontend now shows both "Started" (fixed) and "Last Update" (dynamic)

Robustness improvements:
- Added 5 fallback CSS selectors to handle different Google Maps page structures
- Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc.
- Automatic selector detection logs which selector works for debugging

Test results:
- Successfully scraped 550 reviews in 150.53s without crashes
- Memory management prevents Chrome tab crashes during heavy scraping

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-18 19:49:24 +00:00

186 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
Test concurrent job handling in production API.
Verifies that multiple simultaneous requests work correctly.
"""
import asyncio
import httpx
import time
from datetime import datetime
API_BASE_URL = "http://localhost:8000"
# Test URLs (using the same URL is fine for testing)
TEST_URLS = [
"https://www.google.com/maps/place/Soho+Factory/@54.6738155,25.2595844,17z/",
] * 5 # 5 concurrent jobs
async def submit_job(client: httpx.AsyncClient, url: str, job_num: int):
"""Submit a single scraping job"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Submitting...")
try:
response = await client.post(
f"{API_BASE_URL}/scrape",
json={"url": url},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
job_id = data['job_id']
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Started (ID: {job_id[:8]}...)")
return job_id, job_num
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Failed - {response.status_code}")
return None, job_num
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Error - {e}")
return None, job_num
async def monitor_job(client: httpx.AsyncClient, job_id: str, job_num: int):
"""Monitor a job until completion"""
start_time = time.time()
while True:
try:
response = await client.get(
f"{API_BASE_URL}/jobs/{job_id}",
timeout=5.0
)
if response.status_code == 200:
job = response.json()
status = job['status']
if status == 'completed':
elapsed = time.time() - start_time
reviews = job.get('reviews_count', 0)
scrape_time = job.get('scrape_time', 0)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ✅ COMPLETED - {reviews} reviews in {scrape_time:.1f}s (total: {elapsed:.1f}s)")
return True, elapsed, reviews
elif status == 'failed':
elapsed = time.time() - start_time
error = job.get('error_message', 'Unknown error')
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ❌ FAILED - {error}")
return False, elapsed, 0
elif status == 'running':
# Still running, wait and check again
await asyncio.sleep(2)
else:
# Pending, wait longer
await asyncio.sleep(1)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Monitor error - {e}")
await asyncio.sleep(2)
async def test_concurrent_jobs():
"""Test multiple concurrent jobs"""
print("=" * 70)
print("Testing Concurrent Job Handling")
print("=" * 70)
print(f"Submitting {len(TEST_URLS)} jobs simultaneously...\n")
overall_start = time.time()
async with httpx.AsyncClient() as client:
# Test 1: Check API is available
try:
response = await client.get(f"{API_BASE_URL}/", timeout=5.0)
if response.status_code != 200:
print("❌ API not available!")
return
print("✅ API is available\n")
except Exception as e:
print(f"❌ Cannot connect to API: {e}")
print("\nPlease start the API server first:")
print(" python api_server_production.py")
return
# Test 2: Submit all jobs concurrently
print(f"Step 1: Submitting {len(TEST_URLS)} jobs in parallel...")
print("-" * 70)
submit_tasks = [
submit_job(client, url, i+1)
for i, url in enumerate(TEST_URLS)
]
results = await asyncio.gather(*submit_tasks)
job_ids = [(job_id, num) for job_id, num in results if job_id]
print(f"\n✅ Submitted {len(job_ids)}/{len(TEST_URLS)} jobs successfully\n")
if not job_ids:
print("❌ No jobs were submitted successfully!")
return
# Test 3: Monitor all jobs concurrently
print("Step 2: Monitoring jobs until completion...")
print("-" * 70)
monitor_tasks = [
monitor_job(client, job_id, num)
for job_id, num in job_ids
]
completion_results = await asyncio.gather(*monitor_tasks)
# Test 4: Analyze results
print("\n" + "=" * 70)
print("Results Summary")
print("=" * 70)
total_elapsed = time.time() - overall_start
successful = sum(1 for success, _, _ in completion_results if success)
failed = sum(1 for success, _, _ in completion_results if not success)
avg_time = sum(elapsed for _, elapsed, _ in completion_results) / len(completion_results)
total_reviews = sum(reviews for _, _, reviews in completion_results)
print(f"Total jobs: {len(job_ids)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
print(f"Total reviews: {total_reviews}")
print(f"Average job time: {avg_time:.1f}s")
print(f"Total wall time: {total_elapsed:.1f}s")
print()
# Check if jobs ran in parallel
if total_elapsed < avg_time * len(job_ids) * 0.8:
print("✅ Jobs ran IN PARALLEL! (wall time < sum of job times)")
speedup = (avg_time * len(job_ids)) / total_elapsed
print(f" Speedup: {speedup:.1f}x faster than sequential")
else:
print("⚠️ Jobs may have run SEQUENTIALLY")
print(f" Expected parallel time: ~{avg_time:.1f}s")
print(f" Actual time: {total_elapsed:.1f}s")
print("\n" + "=" * 70)
# Check memory/resource usage
print("\n💡 Notes:")
print(" - Each job runs a headless Chrome instance")
print(" - Memory usage: ~500MB per concurrent job")
print(f" - Current test: {len(job_ids)} jobs = ~{len(job_ids) * 500}MB RAM")
print(" - For production: Consider limiting concurrent jobs")
print(" (Phase 2 adds Redis queue + worker pool for this)")
if __name__ == "__main__":
try:
asyncio.run(test_concurrent_jobs())
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
except Exception as e:
print(f"\n❌ Test failed: {e}")
import traceback
traceback.print_exc()