Files
whyrating-engine-legacy/packages/reviewiq-pipeline
Alejandro Gutiérrez 824634aa76 feat: Add extensible multi-pipeline integration system
This commit implements a plugin-like pipeline architecture with:

Pipeline Core Package (packages/pipeline-core/):
- BasePipeline abstract class all pipelines implement
- PipelineRegistry for database-backed discovery/management
- PipelineRunner for execution with status tracking
- DashboardConfig contracts for dynamic widget definitions

Database Migration (006_pipeline_registry.sql):
- pipeline.registry table for registered pipelines
- pipeline.executions table for execution history
- Views for execution stats and monitoring

ReviewIQ Pipeline Refactor:
- Implements BasePipeline interface
- Adds get_dashboard_config() with widget definitions
- Adds get_widget_data() methods for all dashboard widgets
- Maintains backward compatibility with Pipeline alias

Generic Pipeline API (api/routes/pipelines.py):
- GET /api/pipelines - List all registered pipelines
- GET /api/pipelines/{id} - Pipeline details
- POST /api/pipelines/{id}/execute - Execute pipeline
- GET /api/pipelines/{id}/dashboard - Dashboard config
- GET /api/pipelines/{id}/widgets/{w} - Widget data
- GET /api/pipelines/{id}/executions - Execution history

Frontend Dynamic Dashboard System:
- DynamicDashboard component renders from config
- WidgetRegistry maps types to components
- Widget components: StatCard, LineChart, BarChart,
  PieChart, DataTable, Heatmap
- Pipeline API client library

Frontend Pipeline Pages:
- /pipelines - List all registered pipelines
- /pipelines/[id] - Dynamic dashboard for pipeline
- /pipelines/[id]/executions - Execution history
- Pipelines nav item in Sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 19:05:38 +00:00
..

ReviewIQ Pipeline

LLM-powered review classification and analysis pipeline using URT (Universal Review Taxonomy) v5.1.

Features

  • Stage 1: Normalization - Text cleaning, language detection, deduplication
  • Stage 2: LLM Classification - Span extraction with URT codes using OpenAI/Anthropic
  • Stage 3: Issue Routing - Route negative spans to issues for tracking
  • Stage 4: Fact Aggregation - Pre-aggregate metrics for dashboard queries

Installation

pip install reviewiq-pipeline

Or install from source:

pip install -e packages/reviewiq-pipeline

Quick Start

Python API

from reviewiq_pipeline import Pipeline, Config

# Initialize
config = Config(
    database_url="postgresql://...",
    llm_provider="openai",
    llm_api_key="sk-...",
    taxonomy_version="v5.1"
)
pipeline = Pipeline(config)

# Run full pipeline
result = await pipeline.process(scraper_output)

# Or run individual stages
stage1_result = await pipeline.normalize(scraper_output)
stage2_result = await pipeline.classify(stage1_result)
stage3_result = await pipeline.route(stage2_result)
stage4_result = await pipeline.aggregate(business_id, date)

# Validate
validation = await pipeline.validate(job_id)

CLI

# Run migrations
reviewiq-pipeline migrate --database-url $DATABASE_URL

# Process a job
reviewiq-pipeline run --job-id <UUID> --stages 1,2,3,4

# Validate pipeline output
reviewiq-pipeline validate --job-id <UUID>

Configuration

Environment variables:

  • DATABASE_URL - PostgreSQL connection string
  • LLM_PROVIDER - openai or anthropic
  • OPENAI_API_KEY - OpenAI API key (if using OpenAI)
  • ANTHROPIC_API_KEY - Anthropic API key (if using Anthropic)
  • TAXONOMY_VERSION - URT taxonomy version (default: v5.1)

Development

# Install with dev dependencies
pip install -e "packages/reviewiq-pipeline[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=reviewiq_pipeline

# Type checking
mypy src/reviewiq_pipeline

# Linting
ruff check src/reviewiq_pipeline

License

MIT