#!/usr/bin/env python """ Run classification pipeline for a scraping job. Usage: python run_classification.py 22c747a6-b913-4ae4-82bc-14b4195008b6 """ import asyncio import logging import os import sys from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("run_classification") async def run_pipeline(job_id: str): """Run the classification pipeline for a job.""" from reviewiq_pipeline import Pipeline from reviewiq_pipeline.config import Config # Get database URL from environment or use default database_url = os.environ.get( "DATABASE_URL", "postgresql://scraper:scraper123@localhost:5437/scraper" ) logger.info(f"Processing job {job_id}") # Initialize pipeline config = Config( database_url=database_url, llm_provider="anthropic", llm_model="claude-sonnet-4-5-20250929", openai_api_key=os.environ.get("OPENAI_API_KEY"), anthropic_api_key="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", classification_batch_size=25, classification_max_concurrent=5, classification_target_utilization=0.70, ) pipeline = Pipeline(config) try: await pipeline.initialize() logger.info("Pipeline initialized") # Run all stages (normalize, classify, route, aggregate) # Just pass job_id - pipeline will fetch and transform reviews from database logger.info("Starting pipeline execution...") start_time = datetime.now() result = await pipeline.process( {"job_id": job_id}, stages=["normalize", "classify", "route", "aggregate"], ) elapsed = (datetime.now() - start_time).total_seconds() # Print results if result.success: logger.info(f"Pipeline completed successfully in {elapsed:.1f}s") else: logger.warning(f"Pipeline completed with errors in {elapsed:.1f}s") if result.error: logger.error(f"Error: {result.error}") # Stage summaries for stage_name, stage_result in result.stage_results.items(): # Handle both object and dict access success = getattr(stage_result, 'success', None) or stage_result.get('success', False) data = getattr(stage_result, 'data', None) or stage_result.get('data', {}) error = getattr(stage_result, 'error', None) or stage_result.get('error') duration_ms = getattr(stage_result, 'duration_ms', None) or stage_result.get('duration_ms', 0) if success: stats = data.get("stats", {}) if data else {} if stage_name == "normalize": logger.info(f" Stage 1 (Normalize): {stats.get('output_count', '?')} reviews") elif stage_name == "classify": logger.info( f" Stage 2 (Classify): {stats.get('success_count', '?')} reviews, " f"{stats.get('total_spans', '?')} spans, " f"${stats.get('llm_cost_usd', 0):.4f} LLM cost" ) elif stage_name == "route": logger.info( f" Stage 3 (Route): {stats.get('spans_routed', '?')} spans, " f"{stats.get('issues_created', '?')} issues" ) elif stage_name == "aggregate": logger.info(f" Stage 4 (Aggregate): {stats.get('facts_upserted', '?')} facts") logger.info(f" Duration: {duration_ms}ms") else: logger.error(f" {stage_name}: FAILED - {error}") return result except Exception as e: logger.exception(f"Pipeline failed: {e}") raise finally: await pipeline.close() if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python run_classification.py ") sys.exit(1) job_id = sys.argv[1] # Validate UUID format import uuid try: uuid.UUID(job_id) except ValueError: print(f"Invalid job ID format: {job_id}") sys.exit(1) result = asyncio.run(run_pipeline(job_id)) if result and not result.success: sys.exit(1)