Files
2026-02-02 18:19:00 +00:00

133 lines
4.4 KiB
Python

#!/usr/bin/env python
"""
Run classification pipeline for a scraping job.
Usage:
python run_classification.py 22c747a6-b913-4ae4-82bc-14b4195008b6
"""
import asyncio
import logging
import os
import sys
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("run_classification")
async def run_pipeline(job_id: str):
"""Run the classification pipeline for a job."""
from reviewiq_pipeline import Pipeline
from reviewiq_pipeline.config import Config
# Get database URL from environment or use default
database_url = os.environ.get(
"DATABASE_URL",
"postgresql://scraper:scraper123@localhost:5437/scraper"
)
logger.info(f"Processing job {job_id}")
# Initialize pipeline
config = Config(
database_url=database_url,
llm_provider="anthropic",
llm_model="claude-sonnet-4-5-20250929",
openai_api_key=os.environ.get("OPENAI_API_KEY"),
anthropic_api_key="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",
classification_batch_size=25,
classification_max_concurrent=5,
classification_target_utilization=0.70,
)
pipeline = Pipeline(config)
try:
await pipeline.initialize()
logger.info("Pipeline initialized")
# Run all stages (normalize, classify, route, aggregate)
# Just pass job_id - pipeline will fetch and transform reviews from database
logger.info("Starting pipeline execution...")
start_time = datetime.now()
result = await pipeline.process(
{"job_id": job_id},
stages=["normalize", "classify", "route", "aggregate"],
)
elapsed = (datetime.now() - start_time).total_seconds()
# Print results
if result.success:
logger.info(f"Pipeline completed successfully in {elapsed:.1f}s")
else:
logger.warning(f"Pipeline completed with errors in {elapsed:.1f}s")
if result.error:
logger.error(f"Error: {result.error}")
# Stage summaries
for stage_name, stage_result in result.stage_results.items():
# Handle both object and dict access
success = getattr(stage_result, 'success', None) or stage_result.get('success', False)
data = getattr(stage_result, 'data', None) or stage_result.get('data', {})
error = getattr(stage_result, 'error', None) or stage_result.get('error')
duration_ms = getattr(stage_result, 'duration_ms', None) or stage_result.get('duration_ms', 0)
if success:
stats = data.get("stats", {}) if data else {}
if stage_name == "normalize":
logger.info(f" Stage 1 (Normalize): {stats.get('output_count', '?')} reviews")
elif stage_name == "classify":
logger.info(
f" Stage 2 (Classify): {stats.get('success_count', '?')} reviews, "
f"{stats.get('total_spans', '?')} spans, "
f"${stats.get('llm_cost_usd', 0):.4f} LLM cost"
)
elif stage_name == "route":
logger.info(
f" Stage 3 (Route): {stats.get('spans_routed', '?')} spans, "
f"{stats.get('issues_created', '?')} issues"
)
elif stage_name == "aggregate":
logger.info(f" Stage 4 (Aggregate): {stats.get('facts_upserted', '?')} facts")
logger.info(f" Duration: {duration_ms}ms")
else:
logger.error(f" {stage_name}: FAILED - {error}")
return result
except Exception as e:
logger.exception(f"Pipeline failed: {e}")
raise
finally:
await pipeline.close()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python run_classification.py <job_id>")
sys.exit(1)
job_id = sys.argv[1]
# Validate UUID format
import uuid
try:
uuid.UUID(job_id)
except ValueError:
print(f"Invalid job ID format: {job_id}")
sys.exit(1)
result = asyncio.run(run_pipeline(job_id))
if result and not result.success:
sys.exit(1)