Add browser fingerprint support and analytics metadata display
- Transfer user's browser fingerprint (user-agent, viewport, timezone, language, geolocation) to Chrome for more authentic scraping - Display review topics from Google Maps in analytics dashboard - Show business category badge in analytics header - Fix date_text null handling in analytics (handle undefined/timestamp fields) - Add review_topics and business_category to JobStatus interface Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -133,12 +133,36 @@ app.add_middleware(
|
||||
|
||||
# ==================== Request/Response Models ====================
|
||||
|
||||
class GeolocationModel(BaseModel):
|
||||
"""Geolocation coordinates"""
|
||||
lat: float = Field(..., description="Latitude")
|
||||
lng: float = Field(..., description="Longitude")
|
||||
|
||||
|
||||
class ViewportModel(BaseModel):
|
||||
"""Browser viewport size"""
|
||||
width: int = Field(..., description="Viewport width")
|
||||
height: int = Field(..., description="Viewport height")
|
||||
|
||||
|
||||
class BrowserFingerprintModel(BaseModel):
|
||||
"""Browser fingerprint to replicate user's browser"""
|
||||
geolocation: Optional[GeolocationModel] = None
|
||||
userAgent: Optional[str] = Field(None, description="User agent string")
|
||||
viewport: Optional[ViewportModel] = Field(None, description="Screen resolution")
|
||||
timezone: Optional[str] = Field(None, description="Timezone (e.g., Europe/Madrid)")
|
||||
language: Optional[str] = Field(None, description="Browser language (e.g., en-US)")
|
||||
platform: Optional[str] = Field(None, description="Platform (e.g., MacIntel, Win32)")
|
||||
|
||||
|
||||
class ScrapeRequest(BaseModel):
|
||||
"""Request model for starting a scrape job"""
|
||||
url: HttpUrl = Field(..., description="Google Maps URL to scrape")
|
||||
webhook_url: Optional[HttpUrl] = Field(None, description="Webhook URL for async notifications")
|
||||
webhook_secret: Optional[str] = Field(None, description="Secret for webhook HMAC signature")
|
||||
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional custom metadata")
|
||||
geolocation: Optional[GeolocationModel] = Field(None, description="User's geolocation for Chrome")
|
||||
browser_fingerprint: Optional[BrowserFingerprintModel] = Field(None, description="User's browser fingerprint")
|
||||
|
||||
|
||||
class JobResponse(BaseModel):
|
||||
@@ -149,6 +173,7 @@ class JobResponse(BaseModel):
|
||||
created_at: str
|
||||
started_at: Optional[str] = None
|
||||
completed_at: Optional[str] = None
|
||||
updated_at: Optional[str] = None # Last update time for progress tracking
|
||||
reviews_count: Optional[int] = None
|
||||
total_reviews: Optional[int] = None # Total reviews available for this place
|
||||
scrape_time: Optional[float] = None
|
||||
@@ -157,6 +182,8 @@ class JobResponse(BaseModel):
|
||||
# Business metadata
|
||||
business_name: Optional[str] = None
|
||||
business_address: Optional[str] = None
|
||||
business_category: Optional[str] = None # Category (e.g., "Barber shop")
|
||||
review_topics: Optional[List[Dict[str, Any]]] = None # Topic filters with mention counts
|
||||
|
||||
|
||||
class ReviewsResponse(BaseModel):
|
||||
@@ -206,12 +233,32 @@ async def start_scrape(request: ScrapeRequest):
|
||||
raise HTTPException(status_code=500, detail="Database not initialized")
|
||||
|
||||
try:
|
||||
# Merge browser fingerprint into metadata if provided
|
||||
metadata = request.metadata or {}
|
||||
if request.browser_fingerprint:
|
||||
fp = request.browser_fingerprint
|
||||
metadata['browser_fingerprint'] = {
|
||||
"userAgent": fp.userAgent,
|
||||
"timezone": fp.timezone,
|
||||
"language": fp.language,
|
||||
"platform": fp.platform,
|
||||
}
|
||||
if fp.viewport:
|
||||
metadata['browser_fingerprint']['viewport'] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
||||
if fp.geolocation:
|
||||
metadata['browser_fingerprint']['geolocation'] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
||||
elif request.geolocation:
|
||||
metadata['geolocation'] = {
|
||||
'lat': request.geolocation.lat,
|
||||
'lng': request.geolocation.lng
|
||||
}
|
||||
|
||||
# Create job in database
|
||||
job_id = await db.create_job(
|
||||
url=str(request.url),
|
||||
webhook_url=str(request.webhook_url) if request.webhook_url else None,
|
||||
webhook_secret=request.webhook_secret,
|
||||
metadata=request.metadata
|
||||
metadata=metadata
|
||||
)
|
||||
|
||||
# Start scraping job in background
|
||||
@@ -240,6 +287,25 @@ async def get_job(job_id: UUID):
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
|
||||
# Parse review_topics if it's a string (JSONB might be returned as string)
|
||||
review_topics = job.get('review_topics')
|
||||
if isinstance(review_topics, str):
|
||||
try:
|
||||
review_topics = json.loads(review_topics)
|
||||
except:
|
||||
review_topics = None
|
||||
|
||||
# Extract business info from metadata if available
|
||||
metadata = job.get('metadata')
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except:
|
||||
metadata = None
|
||||
|
||||
business_name = metadata.get('business_name') if metadata else None
|
||||
business_category = metadata.get('business_category') if metadata else None
|
||||
|
||||
return JobResponse(
|
||||
job_id=str(job['job_id']),
|
||||
status=job['status'],
|
||||
@@ -247,11 +313,15 @@ async def get_job(job_id: UUID):
|
||||
created_at=job['created_at'].isoformat(),
|
||||
started_at=job['started_at'].isoformat() if job['started_at'] else None,
|
||||
completed_at=job['completed_at'].isoformat() if job['completed_at'] else None,
|
||||
updated_at=job['updated_at'].isoformat() if job.get('updated_at') else None,
|
||||
reviews_count=job['reviews_count'],
|
||||
total_reviews=job.get('total_reviews'),
|
||||
scrape_time=job['scrape_time'],
|
||||
error_message=job['error_message'],
|
||||
webhook_url=job.get('webhook_url')
|
||||
webhook_url=job.get('webhook_url'),
|
||||
business_name=business_name,
|
||||
business_category=business_category,
|
||||
review_topics=review_topics
|
||||
)
|
||||
|
||||
|
||||
@@ -541,25 +611,32 @@ async def stream_all_jobs():
|
||||
@app.get("/jobs/{job_id}/reviews", response_model=ReviewsResponse, summary="Get Job Reviews")
|
||||
async def get_job_reviews(job_id: UUID):
|
||||
"""
|
||||
Get the actual reviews data for a completed job.
|
||||
Get reviews data for a job.
|
||||
|
||||
Returns 404 if job not found or not completed yet.
|
||||
Returns reviews for completed, partial, or running jobs (if reviews have been collected).
|
||||
Returns 404 if job not found or no reviews available yet.
|
||||
"""
|
||||
if not db:
|
||||
raise HTTPException(status_code=500, detail="Database not initialized")
|
||||
|
||||
reviews = await db.get_job_reviews(job_id)
|
||||
# Get reviews (includes completed, running, and partial jobs)
|
||||
reviews = await db.get_job_reviews(job_id, include_partial=True)
|
||||
if reviews is None:
|
||||
job = await db.get_job(job_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
elif job['status'] != 'completed':
|
||||
elif job['status'] == 'pending':
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Job not completed yet (current status: {job['status']})"
|
||||
detail="Job has not started yet"
|
||||
)
|
||||
elif job['status'] == 'failed':
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Job failed without saving any reviews: {job.get('error_message', 'Unknown error')}"
|
||||
)
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Reviews data not available")
|
||||
raise HTTPException(status_code=404, detail="No reviews data available yet")
|
||||
|
||||
return ReviewsResponse(
|
||||
job_id=str(job_id),
|
||||
@@ -603,6 +680,15 @@ async def list_jobs(
|
||||
|
||||
business_name = metadata.get('business_name') if metadata else None
|
||||
business_address = metadata.get('business_address') if metadata else None
|
||||
business_category = metadata.get('business_category') if metadata else None
|
||||
|
||||
# Parse review_topics if it's a string
|
||||
review_topics = job.get('review_topics')
|
||||
if isinstance(review_topics, str):
|
||||
try:
|
||||
review_topics = json.loads(review_topics)
|
||||
except:
|
||||
review_topics = None
|
||||
|
||||
result.append(JobResponse(
|
||||
job_id=str(job['job_id']),
|
||||
@@ -615,7 +701,9 @@ async def list_jobs(
|
||||
scrape_time=job.get('scrape_time'),
|
||||
error_message=job.get('error_message'),
|
||||
business_name=business_name,
|
||||
business_address=business_address
|
||||
business_address=business_address,
|
||||
business_category=business_category,
|
||||
review_topics=review_topics
|
||||
))
|
||||
|
||||
return result
|
||||
@@ -640,63 +728,69 @@ async def check_reviews(request: ScrapeRequest):
|
||||
Get business card information from Google Maps.
|
||||
Returns business name, address, rating, and review count.
|
||||
|
||||
Uses pre-warmed Chrome worker from pool for instant response.
|
||||
Creates a fresh Chrome instance for reliable results (same as full scraper).
|
||||
This is used to show the business confirmation card in the UI.
|
||||
"""
|
||||
worker = None
|
||||
recycle_worker = False
|
||||
|
||||
try:
|
||||
url = str(request.url)
|
||||
|
||||
# Get pre-warmed worker from validation pool
|
||||
worker = await asyncio.to_thread(get_validation_worker, timeout=10)
|
||||
# Use the SAME scraper algorithm with validation_only=True for early return
|
||||
# Creates a fresh Chrome instance (same as full scraper) to avoid stale browser state
|
||||
# Pooled browsers can have cookies/state that cause Google to render pages differently
|
||||
|
||||
if worker:
|
||||
log.info(f"Using worker {worker.worker_id} for business card extraction")
|
||||
# Use the pooled worker (don't close it)
|
||||
result = await asyncio.to_thread(
|
||||
get_business_card_info,
|
||||
url=url,
|
||||
driver=worker.driver,
|
||||
return_driver=True
|
||||
)
|
||||
|
||||
# Check if the result indicates a session error
|
||||
if not result['success'] and result.get('error'):
|
||||
error_msg = result.get('error', '').lower()
|
||||
if 'invalid session' in error_msg or 'session' in error_msg:
|
||||
log.warning(f"Worker {worker.worker_id} has invalid session, will recycle")
|
||||
recycle_worker = True
|
||||
# Build fingerprint dict from request
|
||||
fingerprint = None
|
||||
if request.browser_fingerprint:
|
||||
fp = request.browser_fingerprint
|
||||
fingerprint = {
|
||||
"userAgent": fp.userAgent,
|
||||
"timezone": fp.timezone,
|
||||
"language": fp.language,
|
||||
"platform": fp.platform,
|
||||
}
|
||||
if fp.viewport:
|
||||
fingerprint["viewport"] = {"width": fp.viewport.width, "height": fp.viewport.height}
|
||||
if fp.geolocation:
|
||||
fingerprint["geolocation"] = {"lat": fp.geolocation.lat, "lng": fp.geolocation.lng}
|
||||
log.info(f"Creating Chrome with user fingerprint: {fp.platform}, {fp.timezone}")
|
||||
elif request.geolocation:
|
||||
fingerprint = {"geolocation": {"lat": request.geolocation.lat, "lng": request.geolocation.lng}}
|
||||
log.info(f"Creating Chrome with geolocation only")
|
||||
else:
|
||||
# Fallback: create temporary worker
|
||||
log.warning("No pooled worker available, creating temporary instance")
|
||||
result = await asyncio.to_thread(
|
||||
get_business_card_info,
|
||||
url=url
|
||||
)
|
||||
log.info(f"Creating Chrome with default settings")
|
||||
|
||||
# SIMPLIFIED VALIDATION: If we found a business (name + rating), assume it has reviews
|
||||
# Let the actual scraper determine if reviews exist
|
||||
has_business = bool(result.get('name') and result.get('rating'))
|
||||
result = await asyncio.to_thread(
|
||||
fast_scrape_reviews,
|
||||
url=url,
|
||||
headless=False, # Use Xvfb display
|
||||
validation_only=True, # Return early after getting total_reviews
|
||||
browser_fingerprint=fingerprint # Pass user's browser fingerprint
|
||||
)
|
||||
|
||||
# Extract validation info from the result
|
||||
validation_info = result.get('validation_info', {})
|
||||
total_reviews = validation_info.get('total_reviews') or result.get('total_reviews') or 0
|
||||
name = validation_info.get('name')
|
||||
rating = validation_info.get('rating')
|
||||
category = validation_info.get('category')
|
||||
address = validation_info.get('address')
|
||||
|
||||
# Has reviews if we found a business with the Reviews tab (indicated by total_reviews > 0)
|
||||
has_reviews = bool(name and total_reviews > 0)
|
||||
|
||||
return {
|
||||
"has_reviews": has_business, # Boolean: true if business exists
|
||||
"total_reviews": result.get('total_reviews') or 0, # Show 0 if unknown
|
||||
"name": result.get('name'),
|
||||
"address": result.get('address'),
|
||||
"rating": result.get('rating'),
|
||||
"success": result['success'],
|
||||
"has_reviews": has_reviews, # True if business has reviews
|
||||
"total_reviews": total_reviews,
|
||||
"name": name,
|
||||
"address": address,
|
||||
"rating": rating,
|
||||
"category": category,
|
||||
"success": result.get('success', True),
|
||||
"error": result.get('error')
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error checking reviews: {e}")
|
||||
# If it's a session error, recycle the worker
|
||||
if worker:
|
||||
error_msg = str(e).lower()
|
||||
if 'invalid session' in error_msg or 'session' in error_msg:
|
||||
recycle_worker = True
|
||||
|
||||
return {
|
||||
"has_reviews": False,
|
||||
@@ -704,10 +798,6 @@ async def check_reviews(request: ScrapeRequest):
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
finally:
|
||||
# Release worker back to pool (or recycle if broken)
|
||||
if worker:
|
||||
await asyncio.to_thread(release_validation_worker, worker, recycle=recycle_worker)
|
||||
|
||||
|
||||
@app.get("/stats", response_model=StatsResponse, summary="Get Statistics")
|
||||
@@ -808,6 +898,21 @@ async def run_scraping_job(job_id: UUID):
|
||||
job = await db.get_job(job_id)
|
||||
url = job['url']
|
||||
|
||||
# Extract browser fingerprint from metadata if available
|
||||
browser_fingerprint = None
|
||||
metadata = job.get('metadata')
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except:
|
||||
metadata = None
|
||||
if metadata and 'browser_fingerprint' in metadata:
|
||||
browser_fingerprint = metadata['browser_fingerprint']
|
||||
log.info(f"Using user fingerprint: {browser_fingerprint.get('platform')}, {browser_fingerprint.get('timezone')}")
|
||||
elif metadata and 'geolocation' in metadata:
|
||||
browser_fingerprint = {'geolocation': metadata['geolocation']}
|
||||
log.info(f"Using user geolocation only")
|
||||
|
||||
# Broadcast job started via SSE
|
||||
await broadcast_job_update(job_id_str, "job_started", {
|
||||
"job_id": job_id_str,
|
||||
@@ -821,9 +926,17 @@ async def run_scraping_job(job_id: UUID):
|
||||
# Create log capture instance that we can access for real-time logs
|
||||
log_capture = LogCapture()
|
||||
|
||||
# Track total reviews for incremental saves
|
||||
total_reviews_seen = [None]
|
||||
# Accumulate all reviews for incremental saves (flush_callback receives batches)
|
||||
all_reviews_collected = []
|
||||
|
||||
# Progress callback to update job status with current/total counts AND logs
|
||||
def progress_callback(current_count: int, total_count: int):
|
||||
"""Update job progress and logs from worker thread"""
|
||||
if total_count:
|
||||
total_reviews_seen[0] = total_count
|
||||
|
||||
async def update():
|
||||
# Get current logs from the shared log_capture
|
||||
current_logs = log_capture.get_logs()
|
||||
@@ -847,6 +960,22 @@ async def run_scraping_job(job_id: UUID):
|
||||
# Schedule the coroutine on the event loop
|
||||
asyncio.run_coroutine_threadsafe(update(), loop)
|
||||
|
||||
# Flush callback to save reviews incrementally (crash recovery)
|
||||
# Note: flush_callback receives batches, so we accumulate them
|
||||
def flush_callback(reviews_batch: list):
|
||||
"""Accumulate and save reviews to DB incrementally from worker thread"""
|
||||
# Extend our collection with the new batch
|
||||
all_reviews_collected.extend(reviews_batch)
|
||||
|
||||
async def save():
|
||||
await db.save_reviews_incremental(
|
||||
job_id=job_id,
|
||||
reviews=all_reviews_collected, # Save ALL reviews so far
|
||||
total_reviews=total_reviews_seen[0]
|
||||
)
|
||||
# Schedule the coroutine on the event loop
|
||||
asyncio.run_coroutine_threadsafe(save(), loop)
|
||||
|
||||
# Run scraping with progress callback and shared log capture
|
||||
# headless=False because Docker uses Xvfb virtual display
|
||||
result = await asyncio.to_thread(
|
||||
@@ -854,17 +983,20 @@ async def run_scraping_job(job_id: UUID):
|
||||
url=url,
|
||||
headless=False,
|
||||
progress_callback=progress_callback,
|
||||
log_capture=log_capture
|
||||
log_capture=log_capture,
|
||||
flush_callback=flush_callback,
|
||||
browser_fingerprint=browser_fingerprint # Pass user's browser fingerprint
|
||||
)
|
||||
|
||||
if result['success']:
|
||||
# Save results to database (including scraper logs)
|
||||
# Save results to database (including scraper logs and review topics)
|
||||
await db.save_job_result(
|
||||
job_id=job_id,
|
||||
reviews=result['reviews'],
|
||||
scrape_time=result['time'],
|
||||
total_reviews=result.get('total_reviews'),
|
||||
scrape_logs=result.get('logs')
|
||||
scrape_logs=result.get('logs'),
|
||||
review_topics=result.get('review_topics')
|
||||
)
|
||||
|
||||
log.info(
|
||||
@@ -898,68 +1030,142 @@ async def run_scraping_job(job_id: UUID):
|
||||
)
|
||||
|
||||
else:
|
||||
# Job failed - save logs for debugging
|
||||
await db.update_job_status(
|
||||
job_id,
|
||||
JobStatus.FAILED,
|
||||
error_message=result.get('error', 'Unknown error'),
|
||||
scrape_logs=result.get('logs')
|
||||
)
|
||||
# Job failed - check if we have partial reviews saved
|
||||
current_job = await db.get_job(job_id)
|
||||
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
||||
|
||||
log.error(f"Failed job {job_id}: {result.get('error')}")
|
||||
|
||||
# Broadcast job failed via SSE
|
||||
await broadcast_job_update(job_id_str, "job_failed", {
|
||||
"job_id": job_id_str,
|
||||
"status": "failed",
|
||||
"error_message": result.get('error'),
|
||||
"logs": result.get('logs', [])
|
||||
})
|
||||
|
||||
# Send failure webhook if configured
|
||||
if job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='failed',
|
||||
error_message=result.get('error'),
|
||||
secret=job.get('webhook_secret'),
|
||||
db=db
|
||||
if partial_count > 0:
|
||||
# Mark as partial - we have some reviews saved
|
||||
await db.mark_job_partial(
|
||||
job_id,
|
||||
error_message=result.get('error', 'Unknown error'),
|
||||
scrape_logs=result.get('logs')
|
||||
)
|
||||
|
||||
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before error: {result.get('error')}")
|
||||
|
||||
# Broadcast job partial via SSE
|
||||
await broadcast_job_update(job_id_str, "job_partial", {
|
||||
"job_id": job_id_str,
|
||||
"status": "partial",
|
||||
"reviews_count": partial_count,
|
||||
"total_reviews": current_job.get('total_reviews'),
|
||||
"error_message": result.get('error'),
|
||||
"logs": result.get('logs', [])
|
||||
})
|
||||
|
||||
# Send partial webhook if configured
|
||||
if job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='partial',
|
||||
reviews_count=partial_count,
|
||||
error_message=result.get('error'),
|
||||
secret=job.get('webhook_secret'),
|
||||
db=db
|
||||
)
|
||||
else:
|
||||
# No reviews saved - mark as failed
|
||||
await db.update_job_status(
|
||||
job_id,
|
||||
JobStatus.FAILED,
|
||||
error_message=result.get('error', 'Unknown error'),
|
||||
scrape_logs=result.get('logs')
|
||||
)
|
||||
|
||||
log.error(f"Failed job {job_id}: {result.get('error')}")
|
||||
|
||||
# Broadcast job failed via SSE
|
||||
await broadcast_job_update(job_id_str, "job_failed", {
|
||||
"job_id": job_id_str,
|
||||
"status": "failed",
|
||||
"error_message": result.get('error'),
|
||||
"logs": result.get('logs', [])
|
||||
})
|
||||
|
||||
# Send failure webhook if configured
|
||||
if job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='failed',
|
||||
error_message=result.get('error'),
|
||||
secret=job.get('webhook_secret'),
|
||||
db=db
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Error in scraping job {job_id}: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
await db.update_job_status(
|
||||
job_id,
|
||||
JobStatus.FAILED,
|
||||
error_message=str(e)
|
||||
)
|
||||
# Check if we have partial reviews saved
|
||||
current_job = await db.get_job(job_id)
|
||||
partial_count = current_job.get('reviews_count', 0) if current_job else 0
|
||||
|
||||
# Broadcast job failed via SSE
|
||||
await broadcast_job_update(job_id_str, "job_failed", {
|
||||
"job_id": job_id_str,
|
||||
"status": "failed",
|
||||
"error_message": str(e),
|
||||
"logs": []
|
||||
})
|
||||
|
||||
# Send failure webhook
|
||||
job = await db.get_job(job_id)
|
||||
if job and job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='failed',
|
||||
if partial_count > 0:
|
||||
# Mark as partial - we have some reviews saved
|
||||
await db.mark_job_partial(
|
||||
job_id,
|
||||
error_message=str(e),
|
||||
secret=job.get('webhook_secret'),
|
||||
db=db
|
||||
scrape_logs=log_capture.get_logs() if log_capture else None
|
||||
)
|
||||
|
||||
log.warning(f"Partial job {job_id}: {partial_count} reviews saved before exception: {e}")
|
||||
|
||||
# Broadcast job partial via SSE
|
||||
await broadcast_job_update(job_id_str, "job_partial", {
|
||||
"job_id": job_id_str,
|
||||
"status": "partial",
|
||||
"reviews_count": partial_count,
|
||||
"total_reviews": current_job.get('total_reviews'),
|
||||
"error_message": str(e),
|
||||
"logs": log_capture.get_logs() if log_capture else []
|
||||
})
|
||||
|
||||
# Send partial webhook
|
||||
if current_job and current_job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=current_job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='partial',
|
||||
reviews_count=partial_count,
|
||||
error_message=str(e),
|
||||
secret=current_job.get('webhook_secret'),
|
||||
db=db
|
||||
)
|
||||
else:
|
||||
# No reviews saved - mark as failed
|
||||
await db.update_job_status(
|
||||
job_id,
|
||||
JobStatus.FAILED,
|
||||
error_message=str(e)
|
||||
)
|
||||
|
||||
# Broadcast job failed via SSE
|
||||
await broadcast_job_update(job_id_str, "job_failed", {
|
||||
"job_id": job_id_str,
|
||||
"status": "failed",
|
||||
"error_message": str(e),
|
||||
"logs": []
|
||||
})
|
||||
|
||||
# Send failure webhook
|
||||
if current_job and current_job.get('webhook_url'):
|
||||
webhook_manager = WebhookManager()
|
||||
await webhook_manager.send_job_completed_webhook(
|
||||
webhook_url=current_job['webhook_url'],
|
||||
job_id=job_id,
|
||||
status='failed',
|
||||
error_message=str(e),
|
||||
secret=current_job.get('webhook_secret'),
|
||||
db=db
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
Reference in New Issue
Block a user