Performance improvements: - Validation speed: 59.71s → 10.96s (5.5x improvement) - Removed 50+ console.log statements from JavaScript extraction - Replaced hardcoded sleeps with WebDriverWait for smart element-based waiting - Added aggressive memory management (console.clear, GC, image unloading every 20 scrolls) Scraping improvements: - Increased idle detection from 6 to 12 consecutive idle scrolls for completeness - Added real-time progress updates every 5 scrolls with percentage calculation - Added crash recovery to extract partial reviews if Chrome crashes - Removed artificial 200-review limit to scrape ALL reviews Timestamp tracking: - Added updated_at field separate from started_at for progress tracking - Frontend now shows both "Started" (fixed) and "Last Update" (dynamic) Robustness improvements: - Added 5 fallback CSS selectors to handle different Google Maps page structures - Now tries: div.jftiEf.fontBodyMedium, div.jftiEf, div[data-review-id], etc. - Automatic selector detection logs which selector works for debugging Test results: - Successfully scraped 550 reviews in 150.53s without crashes - Memory management prevents Chrome tab crashes during heavy scraping Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
186 lines
6.5 KiB
Python
186 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test concurrent job handling in production API.
|
|
Verifies that multiple simultaneous requests work correctly.
|
|
"""
|
|
import asyncio
|
|
import httpx
|
|
import time
|
|
from datetime import datetime
|
|
|
|
API_BASE_URL = "http://localhost:8000"
|
|
|
|
# Test URLs (using the same URL is fine for testing)
|
|
TEST_URLS = [
|
|
"https://www.google.com/maps/place/Soho+Factory/@54.6738155,25.2595844,17z/",
|
|
] * 5 # 5 concurrent jobs
|
|
|
|
|
|
async def submit_job(client: httpx.AsyncClient, url: str, job_num: int):
|
|
"""Submit a single scraping job"""
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Submitting...")
|
|
|
|
try:
|
|
response = await client.post(
|
|
f"{API_BASE_URL}/scrape",
|
|
json={"url": url},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
job_id = data['job_id']
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Started (ID: {job_id[:8]}...)")
|
|
return job_id, job_num
|
|
else:
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Failed - {response.status_code}")
|
|
return None, job_num
|
|
|
|
except Exception as e:
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Error - {e}")
|
|
return None, job_num
|
|
|
|
|
|
async def monitor_job(client: httpx.AsyncClient, job_id: str, job_num: int):
|
|
"""Monitor a job until completion"""
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
try:
|
|
response = await client.get(
|
|
f"{API_BASE_URL}/jobs/{job_id}",
|
|
timeout=5.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
job = response.json()
|
|
status = job['status']
|
|
|
|
if status == 'completed':
|
|
elapsed = time.time() - start_time
|
|
reviews = job.get('reviews_count', 0)
|
|
scrape_time = job.get('scrape_time', 0)
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ✅ COMPLETED - {reviews} reviews in {scrape_time:.1f}s (total: {elapsed:.1f}s)")
|
|
return True, elapsed, reviews
|
|
|
|
elif status == 'failed':
|
|
elapsed = time.time() - start_time
|
|
error = job.get('error_message', 'Unknown error')
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ❌ FAILED - {error}")
|
|
return False, elapsed, 0
|
|
|
|
elif status == 'running':
|
|
# Still running, wait and check again
|
|
await asyncio.sleep(2)
|
|
else:
|
|
# Pending, wait longer
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Monitor error - {e}")
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
async def test_concurrent_jobs():
|
|
"""Test multiple concurrent jobs"""
|
|
print("=" * 70)
|
|
print("Testing Concurrent Job Handling")
|
|
print("=" * 70)
|
|
print(f"Submitting {len(TEST_URLS)} jobs simultaneously...\n")
|
|
|
|
overall_start = time.time()
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
# Test 1: Check API is available
|
|
try:
|
|
response = await client.get(f"{API_BASE_URL}/", timeout=5.0)
|
|
if response.status_code != 200:
|
|
print("❌ API not available!")
|
|
return
|
|
print("✅ API is available\n")
|
|
except Exception as e:
|
|
print(f"❌ Cannot connect to API: {e}")
|
|
print("\nPlease start the API server first:")
|
|
print(" python api_server_production.py")
|
|
return
|
|
|
|
# Test 2: Submit all jobs concurrently
|
|
print(f"Step 1: Submitting {len(TEST_URLS)} jobs in parallel...")
|
|
print("-" * 70)
|
|
|
|
submit_tasks = [
|
|
submit_job(client, url, i+1)
|
|
for i, url in enumerate(TEST_URLS)
|
|
]
|
|
|
|
results = await asyncio.gather(*submit_tasks)
|
|
job_ids = [(job_id, num) for job_id, num in results if job_id]
|
|
|
|
print(f"\n✅ Submitted {len(job_ids)}/{len(TEST_URLS)} jobs successfully\n")
|
|
|
|
if not job_ids:
|
|
print("❌ No jobs were submitted successfully!")
|
|
return
|
|
|
|
# Test 3: Monitor all jobs concurrently
|
|
print("Step 2: Monitoring jobs until completion...")
|
|
print("-" * 70)
|
|
|
|
monitor_tasks = [
|
|
monitor_job(client, job_id, num)
|
|
for job_id, num in job_ids
|
|
]
|
|
|
|
completion_results = await asyncio.gather(*monitor_tasks)
|
|
|
|
# Test 4: Analyze results
|
|
print("\n" + "=" * 70)
|
|
print("Results Summary")
|
|
print("=" * 70)
|
|
|
|
total_elapsed = time.time() - overall_start
|
|
successful = sum(1 for success, _, _ in completion_results if success)
|
|
failed = sum(1 for success, _, _ in completion_results if not success)
|
|
|
|
avg_time = sum(elapsed for _, elapsed, _ in completion_results) / len(completion_results)
|
|
total_reviews = sum(reviews for _, _, reviews in completion_results)
|
|
|
|
print(f"Total jobs: {len(job_ids)}")
|
|
print(f"Successful: {successful}")
|
|
print(f"Failed: {failed}")
|
|
print(f"Total reviews: {total_reviews}")
|
|
print(f"Average job time: {avg_time:.1f}s")
|
|
print(f"Total wall time: {total_elapsed:.1f}s")
|
|
print()
|
|
|
|
# Check if jobs ran in parallel
|
|
if total_elapsed < avg_time * len(job_ids) * 0.8:
|
|
print("✅ Jobs ran IN PARALLEL! (wall time < sum of job times)")
|
|
speedup = (avg_time * len(job_ids)) / total_elapsed
|
|
print(f" Speedup: {speedup:.1f}x faster than sequential")
|
|
else:
|
|
print("⚠️ Jobs may have run SEQUENTIALLY")
|
|
print(f" Expected parallel time: ~{avg_time:.1f}s")
|
|
print(f" Actual time: {total_elapsed:.1f}s")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
# Check memory/resource usage
|
|
print("\n💡 Notes:")
|
|
print(" - Each job runs a headless Chrome instance")
|
|
print(" - Memory usage: ~500MB per concurrent job")
|
|
print(f" - Current test: {len(job_ids)} jobs = ~{len(job_ids) * 500}MB RAM")
|
|
print(" - For production: Consider limiting concurrent jobs")
|
|
print(" (Phase 2 adds Redis queue + worker pool for this)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(test_concurrent_jobs())
|
|
except KeyboardInterrupt:
|
|
print("\n\nTest interrupted by user")
|
|
except Exception as e:
|
|
print(f"\n❌ Test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|