#!/usr/bin/env python3 """ Test concurrent job handling in production API. Verifies that multiple simultaneous requests work correctly. """ import asyncio import httpx import time from datetime import datetime API_BASE_URL = "http://localhost:8000" # Test URLs (using the same URL is fine for testing) TEST_URLS = [ "https://www.google.com/maps/place/Soho+Factory/@54.6738155,25.2595844,17z/", ] * 5 # 5 concurrent jobs async def submit_job(client: httpx.AsyncClient, url: str, job_num: int): """Submit a single scraping job""" print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Submitting...") try: response = await client.post( f"{API_BASE_URL}/scrape", json={"url": url}, timeout=10.0 ) if response.status_code == 200: data = response.json() job_id = data['job_id'] print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Started (ID: {job_id[:8]}...)") return job_id, job_num else: print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Failed - {response.status_code}") return None, job_num except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Error - {e}") return None, job_num async def monitor_job(client: httpx.AsyncClient, job_id: str, job_num: int): """Monitor a job until completion""" start_time = time.time() while True: try: response = await client.get( f"{API_BASE_URL}/jobs/{job_id}", timeout=5.0 ) if response.status_code == 200: job = response.json() status = job['status'] if status == 'completed': elapsed = time.time() - start_time reviews = job.get('reviews_count', 0) scrape_time = job.get('scrape_time', 0) print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ✅ COMPLETED - {reviews} reviews in {scrape_time:.1f}s (total: {elapsed:.1f}s)") return True, elapsed, reviews elif status == 'failed': elapsed = time.time() - start_time error = job.get('error_message', 'Unknown error') print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: ❌ FAILED - {error}") return False, elapsed, 0 elif status == 'running': # Still running, wait and check again await asyncio.sleep(2) else: # Pending, wait longer await asyncio.sleep(1) except Exception as e: print(f"[{datetime.now().strftime('%H:%M:%S')}] Job {job_num}: Monitor error - {e}") await asyncio.sleep(2) async def test_concurrent_jobs(): """Test multiple concurrent jobs""" print("=" * 70) print("Testing Concurrent Job Handling") print("=" * 70) print(f"Submitting {len(TEST_URLS)} jobs simultaneously...\n") overall_start = time.time() async with httpx.AsyncClient() as client: # Test 1: Check API is available try: response = await client.get(f"{API_BASE_URL}/", timeout=5.0) if response.status_code != 200: print("❌ API not available!") return print("✅ API is available\n") except Exception as e: print(f"❌ Cannot connect to API: {e}") print("\nPlease start the API server first:") print(" python api_server_production.py") return # Test 2: Submit all jobs concurrently print(f"Step 1: Submitting {len(TEST_URLS)} jobs in parallel...") print("-" * 70) submit_tasks = [ submit_job(client, url, i+1) for i, url in enumerate(TEST_URLS) ] results = await asyncio.gather(*submit_tasks) job_ids = [(job_id, num) for job_id, num in results if job_id] print(f"\n✅ Submitted {len(job_ids)}/{len(TEST_URLS)} jobs successfully\n") if not job_ids: print("❌ No jobs were submitted successfully!") return # Test 3: Monitor all jobs concurrently print("Step 2: Monitoring jobs until completion...") print("-" * 70) monitor_tasks = [ monitor_job(client, job_id, num) for job_id, num in job_ids ] completion_results = await asyncio.gather(*monitor_tasks) # Test 4: Analyze results print("\n" + "=" * 70) print("Results Summary") print("=" * 70) total_elapsed = time.time() - overall_start successful = sum(1 for success, _, _ in completion_results if success) failed = sum(1 for success, _, _ in completion_results if not success) avg_time = sum(elapsed for _, elapsed, _ in completion_results) / len(completion_results) total_reviews = sum(reviews for _, _, reviews in completion_results) print(f"Total jobs: {len(job_ids)}") print(f"Successful: {successful}") print(f"Failed: {failed}") print(f"Total reviews: {total_reviews}") print(f"Average job time: {avg_time:.1f}s") print(f"Total wall time: {total_elapsed:.1f}s") print() # Check if jobs ran in parallel if total_elapsed < avg_time * len(job_ids) * 0.8: print("✅ Jobs ran IN PARALLEL! (wall time < sum of job times)") speedup = (avg_time * len(job_ids)) / total_elapsed print(f" Speedup: {speedup:.1f}x faster than sequential") else: print("⚠️ Jobs may have run SEQUENTIALLY") print(f" Expected parallel time: ~{avg_time:.1f}s") print(f" Actual time: {total_elapsed:.1f}s") print("\n" + "=" * 70) # Check memory/resource usage print("\n💡 Notes:") print(" - Each job runs a headless Chrome instance") print(" - Memory usage: ~500MB per concurrent job") print(f" - Current test: {len(job_ids)} jobs = ~{len(job_ids) * 500}MB RAM") print(" - For production: Consider limiting concurrent jobs") print(" (Phase 2 adds Redis queue + worker pool for this)") if __name__ == "__main__": try: asyncio.run(test_concurrent_jobs()) except KeyboardInterrupt: print("\n\nTest interrupted by user") except Exception as e: print(f"\n❌ Test failed: {e}") import traceback traceback.print_exc()