diff --git a/api/routes/__init__.py b/api/routes/__init__.py index 9ea528a..ed4b4ee 100644 --- a/api/routes/__init__.py +++ b/api/routes/__init__.py @@ -6,6 +6,7 @@ This module exports all route modules for easy import into the main server. from api.routes.batches import router as batches_router, set_database as set_batches_db from api.routes.dashboard import router as dashboard_router, set_database as set_dashboard_db from api.routes.admin import router as admin_router, set_database as set_admin_db +from api.routes.pipelines import router as pipelines_router, set_database as set_pipelines_db __all__ = [ 'batches_router', @@ -14,4 +15,6 @@ __all__ = [ 'set_dashboard_db', 'admin_router', 'set_admin_db', + 'pipelines_router', + 'set_pipelines_db', ] diff --git a/api/routes/pipelines.py b/api/routes/pipelines.py new file mode 100644 index 0000000..4fc7ef1 --- /dev/null +++ b/api/routes/pipelines.py @@ -0,0 +1,560 @@ +#!/usr/bin/env python3 +""" +Generic Pipeline API endpoints. + +Provides a unified API for all registered pipelines: +- List available pipelines +- Get pipeline details and metadata +- Execute pipelines +- Get dashboard configuration +- Get widget data +- List execution history +""" + +import logging +from typing import Any + +import asyncpg +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel, Field + +log = logging.getLogger(__name__) + +# Create router +router = APIRouter(prefix="/api/pipelines", tags=["pipelines"]) + +# Database pool (set by main server) +_pool: asyncpg.Pool | None = None + +# Pipeline instances cache +_pipeline_instances: dict[str, Any] = {} + + +def set_database(pool: asyncpg.Pool) -> None: + """Set the database pool for pipeline operations.""" + global _pool + _pool = pool + + +# ==================== Pydantic Models ==================== + + +class PipelineInfo(BaseModel): + """Summary information about a pipeline.""" + + id: str = Field(..., description="Pipeline identifier") + name: str = Field(..., description="Display name") + description: str = Field(..., description="Human-readable description") + version: str = Field(..., description="Semantic version") + is_enabled: bool = Field(..., description="Whether pipeline is enabled") + stages: list[str] = Field(..., description="Available stages") + input_type: str = Field(..., description="Expected input type") + + +class PipelineDetail(PipelineInfo): + """Detailed pipeline information.""" + + module_path: str = Field(..., description="Python module path") + config: dict[str, Any] | None = Field(None, description="Pipeline configuration") + created_at: str | None = Field(None, description="Registration timestamp") + updated_at: str | None = Field(None, description="Last update timestamp") + + +class ExecuteRequest(BaseModel): + """Request to execute a pipeline.""" + + job_id: str | None = Field(None, description="Job ID to process") + business_id: str | None = Field(None, description="Business identifier") + input_data: dict[str, Any] | None = Field(None, description="Direct input data") + stages: list[str] | None = Field(None, description="Stages to run (default: all)") + options: dict[str, Any] | None = Field(None, description="Pipeline-specific options") + + +class ExecuteResponse(BaseModel): + """Response from pipeline execution.""" + + execution_id: str = Field(..., description="Execution identifier") + pipeline_id: str = Field(..., description="Pipeline that was executed") + success: bool = Field(..., description="Whether execution succeeded") + stages_run: list[str] = Field(..., description="Stages that were run") + error: str | None = Field(None, description="Error message if failed") + + +class ExecutionSummary(BaseModel): + """Summary of a pipeline execution.""" + + id: str = Field(..., description="Execution identifier") + pipeline_id: str = Field(..., description="Pipeline identifier") + job_id: str | None = Field(None, description="Associated job ID") + business_id: str | None = Field(None, description="Business identifier") + status: str = Field(..., description="Execution status") + stages_requested: list[str] = Field(..., description="Stages requested") + stages_completed: list[str] = Field(..., description="Stages completed") + error_message: str | None = Field(None, description="Error if failed") + started_at: str | None = Field(None, description="Start timestamp") + completed_at: str | None = Field(None, description="Completion timestamp") + created_at: str | None = Field(None, description="Creation timestamp") + + +class DashboardSectionModel(BaseModel): + """Dashboard section configuration.""" + + id: str + title: str + description: str | None = None + widgets: list[dict[str, Any]] + collapsed: bool | None = None + + +class DashboardConfigModel(BaseModel): + """Dashboard configuration for a pipeline.""" + + pipeline_id: str + title: str + description: str | None = None + sections: list[DashboardSectionModel] + default_time_range: str | None = None + refresh_interval: int | None = None + + +# ==================== Helper Functions ==================== + + +async def _get_pipeline_instance(pipeline_id: str) -> Any: + """Get or create a pipeline instance.""" + if pipeline_id in _pipeline_instances: + return _pipeline_instances[pipeline_id] + + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + # Look up pipeline in registry + async with _pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT pipeline_id, module_path, is_enabled + FROM pipeline.registry + WHERE pipeline_id = $1 + """, + pipeline_id, + ) + + if not row: + raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") + + if not row["is_enabled"]: + raise HTTPException(status_code=400, detail=f"Pipeline is disabled: {pipeline_id}") + + # Import and instantiate + try: + module_path = row["module_path"] + module_name, class_name = module_path.rsplit(":", 1) + + import importlib + + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + instance = cls() + + # Initialize the pipeline + await instance.initialize() + + _pipeline_instances[pipeline_id] = instance + return instance + + except Exception as e: + log.exception(f"Failed to load pipeline {pipeline_id}") + raise HTTPException( + status_code=500, detail=f"Failed to load pipeline: {e}" + ) + + +# ==================== API Endpoints ==================== + + +@router.get("/", response_model=list[PipelineInfo]) +async def list_pipelines( + enabled_only: bool = Query(True, description="Only return enabled pipelines"), +) -> list[PipelineInfo]: + """ + List all registered pipelines. + + Returns summary information for each pipeline including name, version, + available stages, and enabled status. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + if enabled_only: + rows = await conn.fetch( + """ + SELECT pipeline_id, name, description, version, + is_enabled, stages, input_type + FROM pipeline.registry + WHERE is_enabled = TRUE + ORDER BY name + """ + ) + else: + rows = await conn.fetch( + """ + SELECT pipeline_id, name, description, version, + is_enabled, stages, input_type + FROM pipeline.registry + ORDER BY name + """ + ) + + return [ + PipelineInfo( + id=row["pipeline_id"], + name=row["name"], + description=row["description"] or "", + version=row["version"], + is_enabled=row["is_enabled"], + stages=row["stages"] or [], + input_type=row["input_type"] or "dict", + ) + for row in rows + ] + + +@router.get("/{pipeline_id}", response_model=PipelineDetail) +async def get_pipeline(pipeline_id: str) -> PipelineDetail: + """ + Get detailed information about a pipeline. + + Includes metadata, configuration, and registration timestamps. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT pipeline_id, name, description, version, module_path, + is_enabled, stages, input_type, config, + created_at, updated_at + FROM pipeline.registry + WHERE pipeline_id = $1 + """, + pipeline_id, + ) + + if not row: + raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") + + return PipelineDetail( + id=row["pipeline_id"], + name=row["name"], + description=row["description"] or "", + version=row["version"], + is_enabled=row["is_enabled"], + stages=row["stages"] or [], + input_type=row["input_type"] or "dict", + module_path=row["module_path"], + config=row["config"], + created_at=row["created_at"].isoformat() if row["created_at"] else None, + updated_at=row["updated_at"].isoformat() if row["updated_at"] else None, + ) + + +@router.post("/{pipeline_id}/execute", response_model=ExecuteResponse) +async def execute_pipeline( + pipeline_id: str, + request: ExecuteRequest, +) -> ExecuteResponse: + """ + Execute a pipeline. + + The pipeline can be executed with: + - A job_id to process an existing scraper job + - Direct input_data for testing + - Specific stages to run (default: all) + """ + import uuid + + pipeline = await _get_pipeline_instance(pipeline_id) + + # Prepare input data + input_data = request.input_data or {} + if request.job_id: + input_data["job_id"] = request.job_id + if request.business_id: + input_data["business_id"] = request.business_id + + # Create execution record + execution_id = str(uuid.uuid4()) + stages = request.stages or pipeline.get_stage_names() + + if _pool: + async with _pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO pipeline.executions ( + id, pipeline_id, job_id, business_id, + status, stages_requested, created_at + ) + VALUES ($1, $2, $3, $4, 'running', $5, NOW()) + """, + uuid.UUID(execution_id), + pipeline_id, + uuid.UUID(request.job_id) if request.job_id else None, + request.business_id, + stages, + ) + + try: + # Execute pipeline + result = await pipeline.process(input_data, stages=stages) + + # Update execution status + if _pool: + async with _pool.acquire() as conn: + await conn.execute( + """ + UPDATE pipeline.executions + SET status = $2, stages_completed = $3, error_message = $4, + completed_at = NOW() + WHERE id = $1 + """, + uuid.UUID(execution_id), + "completed" if result.success else "failed", + result.stages_run, + result.error, + ) + + return ExecuteResponse( + execution_id=execution_id, + pipeline_id=pipeline_id, + success=result.success, + stages_run=result.stages_run, + error=result.error, + ) + + except Exception as e: + log.exception(f"Pipeline execution failed: {e}") + + # Update execution status + if _pool: + async with _pool.acquire() as conn: + await conn.execute( + """ + UPDATE pipeline.executions + SET status = 'failed', error_message = $2, completed_at = NOW() + WHERE id = $1 + """, + uuid.UUID(execution_id), + str(e), + ) + + raise HTTPException(status_code=500, detail=f"Execution failed: {e}") + + +@router.get("/{pipeline_id}/executions", response_model=list[ExecutionSummary]) +async def list_executions( + pipeline_id: str, + status: str | None = Query(None, description="Filter by status"), + limit: int = Query(50, ge=1, le=200, description="Max results"), + offset: int = Query(0, ge=0, description="Offset for pagination"), +) -> list[ExecutionSummary]: + """ + List execution history for a pipeline. + + Can filter by status and paginate results. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + conditions = ["pipeline_id = $1"] + params: list[Any] = [pipeline_id] + param_idx = 2 + + if status: + conditions.append(f"status = ${param_idx}") + params.append(status) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + async with _pool.acquire() as conn: + rows = await conn.fetch( + f""" + SELECT id, pipeline_id, job_id, business_id, status, + stages_requested, stages_completed, error_message, + started_at, completed_at, created_at + FROM pipeline.executions + WHERE {where_clause} + ORDER BY created_at DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """, + *params, + limit, + offset, + ) + + return [ + ExecutionSummary( + id=str(row["id"]), + pipeline_id=row["pipeline_id"], + job_id=str(row["job_id"]) if row["job_id"] else None, + business_id=row["business_id"], + status=row["status"], + stages_requested=row["stages_requested"] or [], + stages_completed=row["stages_completed"] or [], + error_message=row["error_message"], + started_at=row["started_at"].isoformat() if row["started_at"] else None, + completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, + created_at=row["created_at"].isoformat() if row["created_at"] else None, + ) + for row in rows + ] + + +@router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel) +async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel: + """ + Get dashboard configuration for a pipeline. + + Returns the widget configuration that the frontend uses to render + the pipeline's dynamic dashboard. + """ + pipeline = await _get_pipeline_instance(pipeline_id) + + try: + config = pipeline.get_dashboard_config() + + return DashboardConfigModel( + pipeline_id=config["pipeline_id"], + title=config["title"], + description=config.get("description"), + sections=[ + DashboardSectionModel( + id=s["id"], + title=s["title"], + description=s.get("description"), + widgets=s["widgets"], + collapsed=s.get("collapsed"), + ) + for s in config["sections"] + ], + default_time_range=config.get("default_time_range"), + refresh_interval=config.get("refresh_interval"), + ) + + except Exception as e: + log.exception(f"Failed to get dashboard config: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to get dashboard config: {e}" + ) + + +@router.get("/{pipeline_id}/widgets/{widget_id}") +async def get_widget_data( + pipeline_id: str, + widget_id: str, + business_id: str | None = Query(None, description="Filter by business"), + time_range: str = Query("30d", description="Time range (e.g., 7d, 30d, 90d)"), + page: int = Query(1, ge=1, description="Page number for paginated widgets"), + page_size: int = Query(10, ge=1, le=100, description="Items per page"), +) -> dict[str, Any]: + """ + Get data for a specific dashboard widget. + + The response format depends on the widget type. Common formats: + - stat_card: {value, trend, ...} + - chart: {data: [{x, y, ...}, ...]} + - table: {data: [...], total: n} + """ + pipeline = await _get_pipeline_instance(pipeline_id) + + try: + params = { + "business_id": business_id, + "time_range": time_range, + "page": page, + "page_size": page_size, + } + + data = await pipeline.get_widget_data(widget_id, params) + return data + + except Exception as e: + log.exception(f"Failed to get widget data: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to get widget data: {e}" + ) + + +@router.post("/{pipeline_id}/enable") +async def enable_pipeline(pipeline_id: str) -> dict[str, str]: + """Enable a disabled pipeline.""" + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pipeline.registry + SET is_enabled = TRUE, updated_at = NOW() + WHERE pipeline_id = $1 + """, + pipeline_id, + ) + + if result.split()[-1] == "0": + raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") + + # Clear cached instance + _pipeline_instances.pop(pipeline_id, None) + + return {"status": "enabled", "pipeline_id": pipeline_id} + + +@router.post("/{pipeline_id}/disable") +async def disable_pipeline(pipeline_id: str) -> dict[str, str]: + """Disable a pipeline.""" + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pipeline.registry + SET is_enabled = FALSE, updated_at = NOW() + WHERE pipeline_id = $1 + """, + pipeline_id, + ) + + if result.split()[-1] == "0": + raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") + + # Clear cached instance + _pipeline_instances.pop(pipeline_id, None) + + return {"status": "disabled", "pipeline_id": pipeline_id} + + +@router.get("/{pipeline_id}/health") +async def pipeline_health(pipeline_id: str) -> dict[str, Any]: + """ + Check pipeline health. + + Returns health status and any issues detected. + """ + pipeline = await _get_pipeline_instance(pipeline_id) + + try: + health = await pipeline.health_check() + return { + "pipeline_id": pipeline_id, + **health, + } + except Exception as e: + return { + "pipeline_id": pipeline_id, + "healthy": False, + "error": str(e), + } diff --git a/migrations/versions/006_pipeline_registry.sql b/migrations/versions/006_pipeline_registry.sql new file mode 100644 index 0000000..23a6a28 --- /dev/null +++ b/migrations/versions/006_pipeline_registry.sql @@ -0,0 +1,253 @@ +-- ============================================================================= +-- Migration: 006_pipeline_registry.sql +-- Pipeline Registry and Execution History +-- ============================================================================= +-- +-- Creates tables for the extensible pipeline system: +-- pipeline.registry - Registered pipelines and their metadata +-- pipeline.executions - Pipeline execution history +-- +-- This enables dynamic pipeline discovery, registration, and execution tracking. +-- +-- Date: 2026-01-24 +-- ============================================================================= + +-- Ensure pipeline schema exists (should already exist from 005) +CREATE SCHEMA IF NOT EXISTS pipeline; + + +-- ============================================================================= +-- SECTION 1: PIPELINE REGISTRY +-- ============================================================================= + +-- Pipeline registry table +-- Stores registered pipelines and their metadata for dynamic discovery +CREATE TABLE IF NOT EXISTS pipeline.registry ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Pipeline identity + pipeline_id VARCHAR(50) NOT NULL UNIQUE, + name VARCHAR(255) NOT NULL, + description TEXT, + version VARCHAR(20) NOT NULL, + + -- Module information for dynamic loading + module_path VARCHAR(255) NOT NULL, -- e.g., "reviewiq_pipeline.pipeline:ReviewIQPipeline" + + -- Pipeline configuration + stages TEXT[] NOT NULL DEFAULT '{}', + input_type VARCHAR(100) NOT NULL DEFAULT 'dict', + config JSONB, + + -- Status + is_enabled BOOLEAN NOT NULL DEFAULT TRUE, + + -- Timestamps + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +COMMENT ON TABLE pipeline.registry IS 'Registered pipelines available for execution'; +COMMENT ON COLUMN pipeline.registry.pipeline_id IS 'Unique pipeline identifier (e.g., "reviewiq")'; +COMMENT ON COLUMN pipeline.registry.module_path IS 'Python module path for dynamic import (package.module:ClassName)'; +COMMENT ON COLUMN pipeline.registry.stages IS 'Ordered list of stage names'; +COMMENT ON COLUMN pipeline.registry.input_type IS 'Expected input type (for documentation/validation)'; +COMMENT ON COLUMN pipeline.registry.config IS 'Pipeline-specific configuration as JSON'; + +-- Indexes for registry +CREATE INDEX IF NOT EXISTS idx_registry_enabled + ON pipeline.registry (is_enabled) + WHERE is_enabled = TRUE; + + +-- ============================================================================= +-- SECTION 2: EXECUTION HISTORY +-- ============================================================================= + +-- Execution status enum +DO $$ BEGIN + CREATE TYPE pipeline.execution_status AS ENUM ( + 'pending', + 'running', + 'completed', + 'failed', + 'cancelled' + ); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Pipeline execution history +-- Tracks each pipeline execution for monitoring and debugging +CREATE TABLE IF NOT EXISTS pipeline.executions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Pipeline reference + pipeline_id VARCHAR(50) NOT NULL, + + -- Optional associations + job_id UUID, -- Link to scraper job (soft FK to public.jobs) + business_id VARCHAR(255), -- Business being processed + + -- Execution status + status pipeline.execution_status NOT NULL DEFAULT 'pending', + + -- Stage tracking + stages_requested TEXT[] NOT NULL DEFAULT '{}', + stages_completed TEXT[] NOT NULL DEFAULT '{}', + current_stage VARCHAR(100), + + -- Input/output summaries (for quick reference without loading full data) + input_summary JSONB, + result_summary JSONB, + + -- Error tracking + error_message TEXT, + + -- Timestamps + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +COMMENT ON TABLE pipeline.executions IS 'Pipeline execution history for monitoring and debugging'; +COMMENT ON COLUMN pipeline.executions.pipeline_id IS 'Reference to pipeline.registry.pipeline_id'; +COMMENT ON COLUMN pipeline.executions.job_id IS 'Optional link to scraper job (soft FK)'; +COMMENT ON COLUMN pipeline.executions.stages_requested IS 'Stages requested to run'; +COMMENT ON COLUMN pipeline.executions.stages_completed IS 'Stages that completed successfully'; +COMMENT ON COLUMN pipeline.executions.input_summary IS 'Summary of input data (for display)'; +COMMENT ON COLUMN pipeline.executions.result_summary IS 'Summary of results (for display)'; + +-- Indexes for execution queries +CREATE INDEX IF NOT EXISTS idx_executions_pipeline_id + ON pipeline.executions (pipeline_id); + +CREATE INDEX IF NOT EXISTS idx_executions_job_id + ON pipeline.executions (job_id) + WHERE job_id IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_executions_business_id + ON pipeline.executions (business_id) + WHERE business_id IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_executions_status + ON pipeline.executions (status); + +CREATE INDEX IF NOT EXISTS idx_executions_created_at + ON pipeline.executions (created_at DESC); + +-- Composite index for common query patterns +CREATE INDEX IF NOT EXISTS idx_executions_pipeline_status + ON pipeline.executions (pipeline_id, status, created_at DESC); + + +-- ============================================================================= +-- SECTION 3: TRIGGER FOR UPDATED_AT +-- ============================================================================= + +-- Function to update updated_at timestamp +CREATE OR REPLACE FUNCTION pipeline.update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Trigger for registry table +DROP TRIGGER IF EXISTS tr_registry_updated_at ON pipeline.registry; +CREATE TRIGGER tr_registry_updated_at + BEFORE UPDATE ON pipeline.registry + FOR EACH ROW + EXECUTE FUNCTION pipeline.update_updated_at_column(); + + +-- ============================================================================= +-- SECTION 4: INITIAL DATA +-- ============================================================================= + +-- Register the ReviewIQ pipeline (can be updated by the application on startup) +INSERT INTO pipeline.registry ( + pipeline_id, + name, + description, + version, + module_path, + stages, + input_type, + is_enabled +) +VALUES ( + 'reviewiq', + 'ReviewIQ Classification Pipeline', + 'Classifies reviews using URT taxonomy, detects issues, and aggregates metrics', + '1.0.0', + 'reviewiq_pipeline.pipeline:ReviewIQPipeline', + ARRAY['normalize', 'classify', 'route', 'aggregate'], + 'ScraperV1Output', + TRUE +) +ON CONFLICT (pipeline_id) DO UPDATE SET + name = EXCLUDED.name, + description = EXCLUDED.description, + version = EXCLUDED.version, + module_path = EXCLUDED.module_path, + stages = EXCLUDED.stages, + input_type = EXCLUDED.input_type, + updated_at = NOW(); + + +-- ============================================================================= +-- SECTION 5: VIEWS +-- ============================================================================= + +-- View for recent executions with pipeline info +CREATE OR REPLACE VIEW pipeline.executions_with_pipeline AS +SELECT + e.id, + e.pipeline_id, + r.name AS pipeline_name, + e.job_id, + e.business_id, + e.status, + e.stages_requested, + e.stages_completed, + e.current_stage, + e.error_message, + e.started_at, + e.completed_at, + e.created_at, + CASE + WHEN e.status = 'running' THEN + EXTRACT(EPOCH FROM (NOW() - e.started_at))::INTEGER + WHEN e.completed_at IS NOT NULL THEN + EXTRACT(EPOCH FROM (e.completed_at - e.started_at))::INTEGER + ELSE NULL + END AS duration_seconds +FROM pipeline.executions e +LEFT JOIN pipeline.registry r ON e.pipeline_id = r.pipeline_id; + +COMMENT ON VIEW pipeline.executions_with_pipeline IS 'Executions joined with pipeline metadata and duration'; + + +-- View for pipeline execution statistics +CREATE OR REPLACE VIEW pipeline.execution_stats AS +SELECT + pipeline_id, + COUNT(*) AS total_executions, + COUNT(*) FILTER (WHERE status = 'completed') AS completed_count, + COUNT(*) FILTER (WHERE status = 'failed') AS failed_count, + COUNT(*) FILTER (WHERE status = 'running') AS running_count, + COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count, + AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) + FILTER (WHERE status = 'completed') AS avg_duration_seconds, + MAX(created_at) AS last_execution_at +FROM pipeline.executions +GROUP BY pipeline_id; + +COMMENT ON VIEW pipeline.execution_stats IS 'Aggregated execution statistics per pipeline'; + + +-- ============================================================================= +-- DONE +-- ============================================================================= diff --git a/packages/pipeline-core/README.md b/packages/pipeline-core/README.md new file mode 100644 index 0000000..76bf7bc --- /dev/null +++ b/packages/pipeline-core/README.md @@ -0,0 +1,119 @@ +# Pipeline Core + +Extensible multi-pipeline framework with dynamic dashboards. + +## Overview + +Pipeline Core provides the base abstractions for building pipelines that can be: +- Discovered and registered dynamically +- Executed with status tracking +- Rendered with auto-generated dashboards + +## Features + +- **BasePipeline** - Abstract base class all pipelines implement +- **PipelineRegistry** - Database-backed pipeline discovery and management +- **PipelineRunner** - Execution with status tracking +- **Dashboard Contracts** - TypedDicts for widget configuration + +## Installation + +```bash +pip install -e packages/pipeline-core +``` + +## Usage + +### Implementing a Pipeline + +```python +from pipeline_core import BasePipeline, PipelineMetadata, DashboardConfig + +class MyPipeline(BasePipeline): + @property + def metadata(self) -> PipelineMetadata: + return { + "id": "my-pipeline", + "name": "My Pipeline", + "description": "Does something useful", + "version": "1.0.0", + "stages": ["stage1", "stage2"], + "input_type": "MyInputType", + } + + async def initialize(self) -> None: + # Set up connections + pass + + async def close(self) -> None: + # Clean up + pass + + async def process(self, input_data, stages=None): + # Run the pipeline + pass + + def get_dashboard_config(self) -> DashboardConfig: + return { + "pipeline_id": "my-pipeline", + "title": "My Dashboard", + "sections": [...] + } + + async def get_widget_data(self, widget_id, params): + # Return widget data + pass +``` + +### Registering a Pipeline + +```python +from pipeline_core import PipelineRegistry +import asyncpg + +pool = await asyncpg.create_pool(database_url) +registry = PipelineRegistry(pool) + +await registry.register( + pipeline_id="my-pipeline", + name="My Pipeline", + description="Does something useful", + version="1.0.0", + module_path="my_package.pipeline:MyPipeline", + stages=["stage1", "stage2"], + input_type="MyInputType", +) +``` + +### Executing a Pipeline + +```python +from pipeline_core import PipelineRunner + +runner = PipelineRunner(pool, registry) + +execution_id, result = await runner.execute( + pipeline_id="my-pipeline", + request={ + "input_data": {"key": "value"}, + "stages": ["stage1"], + } +) +``` + +## Dashboard Widgets + +Pipelines declare dashboard widgets via `get_dashboard_config()`. Available widget types: + +- `stat_card` - KPI stat card with value and trend +- `line_chart` - Time series line chart +- `bar_chart` - Bar chart (horizontal or vertical) +- `pie_chart` - Pie/donut chart +- `table` - Data table with columns +- `heatmap` - Heatmap grid visualization +- `area_chart` - Stacked area chart +- `gauge` - Gauge/meter visualization + +## License + +MIT diff --git a/packages/pipeline-core/pyproject.toml b/packages/pipeline-core/pyproject.toml new file mode 100644 index 0000000..e65b875 --- /dev/null +++ b/packages/pipeline-core/pyproject.toml @@ -0,0 +1,65 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "pipeline-core" +version = "0.1.0" +description = "Pipeline Core - Extensible multi-pipeline framework with dynamic dashboards" +readme = "README.md" +license = "MIT" +requires-python = ">=3.11" +authors = [ + { name = "ReviewIQ Team" } +] +keywords = ["pipeline", "framework", "dashboard", "registry"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "asyncpg>=0.28.0", + "pydantic>=2.0", + "pydantic-settings>=2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "pytest-asyncio>=0.21.0", + "pytest-cov>=4.0", + "ruff>=0.1.0", + "mypy>=1.0", +] + +[project.urls] +Homepage = "https://github.com/reviewiq/pipeline-core" +Documentation = "https://github.com/reviewiq/pipeline-core#readme" +Repository = "https://github.com/reviewiq/pipeline-core" + +[tool.hatch.build.targets.wheel] +packages = ["src/pipeline_core"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +pythonpath = ["src"] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W", "UP"] +ignore = ["E501"] + +[tool.mypy] +python_version = "3.11" +strict = true +warn_return_any = true +warn_unused_ignores = true diff --git a/packages/pipeline-core/src/pipeline_core/__init__.py b/packages/pipeline-core/src/pipeline_core/__init__.py new file mode 100644 index 0000000..4b228a6 --- /dev/null +++ b/packages/pipeline-core/src/pipeline_core/__init__.py @@ -0,0 +1,34 @@ +""" +Pipeline Core - Extensible multi-pipeline framework with dynamic dashboards. + +This package provides the base abstractions for building pipelines that can be +discovered, registered, and rendered with dynamic dashboards. +""" + +from pipeline_core.base import BasePipeline, PipelineMetadata, PipelineResult +from pipeline_core.contracts import ( + DashboardConfig, + DashboardSection, + WidgetConfig, + WidgetType, +) +from pipeline_core.registry import PipelineRegistry +from pipeline_core.runner import PipelineRunner + +__version__ = "0.1.0" + +__all__ = [ + # Base classes + "BasePipeline", + "PipelineMetadata", + "PipelineResult", + # Contracts + "DashboardConfig", + "DashboardSection", + "WidgetConfig", + "WidgetType", + # Registry + "PipelineRegistry", + # Runner + "PipelineRunner", +] diff --git a/packages/pipeline-core/src/pipeline_core/base.py b/packages/pipeline-core/src/pipeline_core/base.py new file mode 100644 index 0000000..13ea02c --- /dev/null +++ b/packages/pipeline-core/src/pipeline_core/base.py @@ -0,0 +1,263 @@ +""" +Base Pipeline abstract class and related types. + +All pipelines must implement this interface to be compatible with the +pipeline registry, runner, and dynamic dashboard system. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, TypedDict + +from pipeline_core.contracts import DashboardConfig + + +class PipelineMetadata(TypedDict): + """Metadata describing a pipeline.""" + + id: str # Unique pipeline identifier (e.g., "reviewiq") + name: str # Display name (e.g., "ReviewIQ Classification Pipeline") + description: str # Human-readable description + version: str # Semantic version (e.g., "1.0.0") + stages: list[str] # Ordered list of stage names + input_type: str # Expected input type (e.g., "ScraperV1Output") + + +class StageResult(TypedDict, total=False): + """Result from running a single pipeline stage.""" + + stage: str # Stage name + success: bool # Whether the stage succeeded + data: dict[str, Any] # Stage output data + error: str | None # Error message if failed + duration_ms: int # Stage execution time + + +class PipelineResult: + """Result from running a pipeline.""" + + def __init__( + self, + pipeline_id: str, + stages_run: list[str] | None = None, + stage_results: dict[str, StageResult] | None = None, + success: bool = True, + error: str | None = None, + ): + """ + Initialize pipeline result. + + Args: + pipeline_id: Pipeline identifier + stages_run: List of stages that were run + stage_results: Results from each stage + success: Overall success status + error: Error message if failed + """ + self.pipeline_id = pipeline_id + self.stages_run = stages_run or [] + self.stage_results = stage_results or {} + self.success = success + self.error = error + + def get_stage_result(self, stage: str) -> StageResult | None: + """Get result for a specific stage.""" + return self.stage_results.get(stage) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + "pipeline_id": self.pipeline_id, + "stages_run": self.stages_run, + "stage_results": self.stage_results, + "success": self.success, + "error": self.error, + } + + @classmethod + def from_error(cls, pipeline_id: str, error: str) -> PipelineResult: + """Create a failed result from an error.""" + return cls( + pipeline_id=pipeline_id, + success=False, + error=error, + ) + + +class BasePipeline(ABC): + """ + Abstract base class for all pipelines. + + All pipelines must implement this interface to be compatible with: + - Pipeline registry (discovery and management) + - Pipeline runner (execution) + - Dynamic dashboard system (widget configuration and data) + + Example implementation: + + class ReviewIQPipeline(BasePipeline): + @property + def metadata(self) -> PipelineMetadata: + return { + "id": "reviewiq", + "name": "ReviewIQ Classification Pipeline", + "description": "Classifies reviews using URT taxonomy", + "version": "1.0.0", + "stages": ["normalize", "classify", "route", "aggregate"], + "input_type": "ScraperV1Output", + } + + async def initialize(self) -> None: + # Set up database connections, etc. + pass + + async def process(self, input_data, stages=None) -> PipelineResult: + # Run the pipeline + pass + + def get_dashboard_config(self) -> DashboardConfig: + return { + "pipeline_id": "reviewiq", + "title": "ReviewIQ Analytics", + "sections": [...] + } + + async def get_widget_data(self, widget_id, params) -> dict: + # Return data for a specific widget + pass + """ + + @property + @abstractmethod + def metadata(self) -> PipelineMetadata: + """ + Get pipeline metadata. + + Returns: + PipelineMetadata with id, name, description, version, stages, input_type + """ + ... + + @abstractmethod + async def initialize(self) -> None: + """ + Initialize the pipeline. + + This is called before any processing. Use it to: + - Establish database connections + - Load configuration + - Initialize services + + This method may be called multiple times but should be idempotent. + """ + ... + + @abstractmethod + async def close(self) -> None: + """ + Close and cleanup pipeline resources. + + This is called when the pipeline is no longer needed. Use it to: + - Close database connections + - Release resources + - Cleanup temporary files + """ + ... + + @abstractmethod + async def process( + self, + input_data: dict[str, Any], + stages: list[str] | None = None, + ) -> PipelineResult: + """ + Process input data through the pipeline. + + Args: + input_data: Input data dictionary (format depends on input_type) + stages: List of stages to run (default: all stages) + + Returns: + PipelineResult with stage outputs and validation results + """ + ... + + @abstractmethod + def get_dashboard_config(self) -> DashboardConfig: + """ + Get the dashboard configuration for this pipeline. + + Returns: + DashboardConfig with sections and widget definitions + + The frontend uses this configuration to dynamically render + the pipeline's dashboard with appropriate widgets. + """ + ... + + @abstractmethod + async def get_widget_data( + self, + widget_id: str, + params: dict[str, Any], + ) -> dict[str, Any]: + """ + Get data for a specific dashboard widget. + + Args: + widget_id: Widget identifier (from dashboard config) + params: Query parameters (e.g., time range, filters) + + Returns: + Dictionary with widget data in the format expected by the widget type + + Common params: + - business_id: Filter by business + - time_range: Time range (e.g., "7d", "30d", "custom") + - start_date: Start date for custom range + - end_date: End date for custom range + """ + ... + + # Optional methods with default implementations + + async def validate_input(self, input_data: dict[str, Any]) -> list[str]: + """ + Validate input data before processing. + + Args: + input_data: Input data to validate + + Returns: + List of validation error messages (empty if valid) + + Override this to add custom input validation. + """ + return [] + + async def health_check(self) -> dict[str, Any]: + """ + Perform a health check on the pipeline. + + Returns: + Dictionary with health status: + - healthy: bool + - checks: dict of individual check results + - message: optional message + + Override this to add custom health checks. + """ + return { + "healthy": True, + "checks": {}, + "message": None, + } + + def get_stage_names(self) -> list[str]: + """Get the list of stage names.""" + return self.metadata["stages"] + + def get_pipeline_id(self) -> str: + """Get the pipeline identifier.""" + return self.metadata["id"] diff --git a/packages/pipeline-core/src/pipeline_core/contracts.py b/packages/pipeline-core/src/pipeline_core/contracts.py new file mode 100644 index 0000000..2808f64 --- /dev/null +++ b/packages/pipeline-core/src/pipeline_core/contracts.py @@ -0,0 +1,252 @@ +""" +Dashboard and Widget contracts for the pipeline system. + +These TypedDicts define the data structures for dynamic dashboard configuration, +allowing pipelines to declare their dashboard widgets which the frontend renders. +""" + +from __future__ import annotations + +from typing import Any, Literal, TypedDict + + +# ============================================================================= +# Widget Types +# ============================================================================= + +WidgetType = Literal[ + "stat_card", # KPI stat card with value and optional trend + "line_chart", # Time series line chart + "bar_chart", # Bar chart (horizontal or vertical) + "pie_chart", # Pie/donut chart + "table", # Data table with columns + "heatmap", # Heatmap grid visualization + "area_chart", # Stacked area chart + "gauge", # Gauge/meter visualization +] + + +# ============================================================================= +# Widget Configuration +# ============================================================================= + + +class GridPosition(TypedDict): + """Grid position for a widget in the dashboard layout.""" + + x: int # Column position (0-based) + y: int # Row position (0-based) + w: int # Width in grid units + h: int # Height in grid units + + +class StatCardConfig(TypedDict, total=False): + """Configuration specific to stat card widgets.""" + + value_key: str # Key in data for the main value + label: str # Label to display + format: str # Format string (e.g., "{value:,}", "{value:.1%}") + trend_key: str | None # Key for trend value (optional) + trend_format: str | None # Format for trend (e.g., "+{value:.1%}") + icon: str | None # Icon name (optional) + color: str | None # Color theme (e.g., "blue", "green", "red") + + +class ChartAxisConfig(TypedDict, total=False): + """Configuration for chart axes.""" + + key: str # Data key for this axis + label: str # Axis label + type: Literal["number", "category", "time"] + format: str | None # Format string + + +class ChartSeriesConfig(TypedDict, total=False): + """Configuration for a chart data series.""" + + key: str # Data key + name: str # Display name + color: str | None # Series color + type: Literal["line", "bar", "area"] | None + + +class ChartConfig(TypedDict, total=False): + """Configuration for chart widgets (line, bar, area).""" + + x_axis: ChartAxisConfig + y_axis: ChartAxisConfig + series: list[ChartSeriesConfig] + stacked: bool + show_legend: bool + show_grid: bool + + +class PieChartConfig(TypedDict, total=False): + """Configuration for pie/donut chart widgets.""" + + value_key: str # Key for segment value + label_key: str # Key for segment label + colors: list[str] | None # Custom color palette + show_legend: bool + show_labels: bool + inner_radius: int | None # For donut chart (0 = pie) + + +class TableColumnConfig(TypedDict, total=False): + """Configuration for a table column.""" + + key: str # Data key + header: str # Column header + width: int | None # Column width + align: Literal["left", "center", "right"] + format: str | None # Format string + sortable: bool + + +class TableConfig(TypedDict, total=False): + """Configuration for table widgets.""" + + columns: list[TableColumnConfig] + row_key: str # Key for unique row identifier + page_size: int + show_pagination: bool + sortable: bool + filterable: bool + + +class HeatmapConfig(TypedDict, total=False): + """Configuration for heatmap widgets.""" + + x_key: str # Key for x-axis categories + y_key: str # Key for y-axis categories + value_key: str # Key for cell values + color_scale: list[str] # Color gradient + show_values: bool + format: str | None # Format for values + + +class GaugeConfig(TypedDict, total=False): + """Configuration for gauge widgets.""" + + value_key: str # Key for gauge value + min: float # Minimum value + max: float # Maximum value + thresholds: list[dict[str, Any]] # Color thresholds + format: str | None # Format string + + +# Union of all widget-specific configs +WidgetSpecificConfig = ( + StatCardConfig + | ChartConfig + | PieChartConfig + | TableConfig + | HeatmapConfig + | GaugeConfig +) + + +class WidgetConfig(TypedDict, total=False): + """Configuration for a dashboard widget.""" + + id: str # Unique widget identifier + type: WidgetType # Widget type + title: str # Widget title + grid: GridPosition # Grid position and size + config: dict[str, Any] # Widget-specific configuration + data_endpoint: str | None # Custom data endpoint (if not default) + refresh_interval: int | None # Auto-refresh interval in seconds + + +# ============================================================================= +# Dashboard Configuration +# ============================================================================= + + +class DashboardSection(TypedDict): + """A section in the dashboard containing widgets.""" + + id: str # Unique section identifier + title: str # Section title + description: str | None # Optional description + widgets: list[WidgetConfig] + collapsed: bool | None # Whether section is collapsed by default + + +class DashboardConfig(TypedDict): + """Full dashboard configuration for a pipeline.""" + + pipeline_id: str # Pipeline identifier + title: str # Dashboard title + description: str | None # Optional description + sections: list[DashboardSection] + default_time_range: str | None # Default time range (e.g., "7d", "30d") + refresh_interval: int | None # Global refresh interval in seconds + + +# ============================================================================= +# Execution Types +# ============================================================================= + + +class ExecutionStatus(TypedDict, total=False): + """Status of a pipeline execution.""" + + id: str # Execution ID + pipeline_id: str # Pipeline identifier + job_id: str | None # Associated job ID + business_id: str | None # Business identifier + status: Literal["pending", "running", "completed", "failed", "cancelled"] + stages_requested: list[str] + stages_completed: list[str] + current_stage: str | None + progress: float # 0.0 to 1.0 + input_summary: dict[str, Any] | None + result_summary: dict[str, Any] | None + error_message: str | None + started_at: str | None + completed_at: str | None + created_at: str + + +class ExecutionRequest(TypedDict, total=False): + """Request to execute a pipeline.""" + + job_id: str | None # Job ID to process + business_id: str | None # Business identifier + input_data: dict[str, Any] | None # Direct input data + stages: list[str] | None # Stages to run (default: all) + options: dict[str, Any] | None # Pipeline-specific options + + +# ============================================================================= +# Pipeline Info Types +# ============================================================================= + + +class PipelineInfo(TypedDict): + """Summary information about a pipeline.""" + + id: str # Pipeline ID (e.g., "reviewiq") + name: str # Display name + description: str + version: str + is_enabled: bool + stages: list[str] # Available stages + input_type: str # Expected input type + + +class PipelineDetail(TypedDict): + """Detailed pipeline information including metadata.""" + + id: str + name: str + description: str + version: str + is_enabled: bool + stages: list[str] + input_type: str + module_path: str + config: dict[str, Any] | None + created_at: str + updated_at: str diff --git a/packages/pipeline-core/src/pipeline_core/registry.py b/packages/pipeline-core/src/pipeline_core/registry.py new file mode 100644 index 0000000..0db5441 --- /dev/null +++ b/packages/pipeline-core/src/pipeline_core/registry.py @@ -0,0 +1,455 @@ +""" +Pipeline Registry - Database-backed discovery and management of pipelines. + +The registry maintains a list of registered pipelines and their metadata, +allowing the system to discover available pipelines and instantiate them. +""" + +from __future__ import annotations + +import importlib +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from pipeline_core.contracts import PipelineDetail, PipelineInfo + +if TYPE_CHECKING: + import asyncpg + + from pipeline_core.base import BasePipeline + +logger = logging.getLogger(__name__) + + +class PipelineRegistry: + """ + Database-backed registry for pipeline discovery and management. + + The registry stores pipeline metadata in a PostgreSQL table and provides + methods to register, list, and instantiate pipelines. + + Usage: + pool = await asyncpg.create_pool(database_url) + registry = PipelineRegistry(pool) + + # Register a pipeline + await registry.register( + pipeline_id="reviewiq", + name="ReviewIQ Pipeline", + description="Classifies reviews", + version="1.0.0", + module_path="reviewiq_pipeline.pipeline:ReviewIQPipeline", + ) + + # List pipelines + pipelines = await registry.list_pipelines() + + # Get a pipeline instance + pipeline = await registry.get_pipeline("reviewiq") + """ + + def __init__(self, pool: asyncpg.Pool): + """ + Initialize the registry. + + Args: + pool: asyncpg connection pool + """ + self._pool = pool + self._instances: dict[str, BasePipeline] = {} + + async def register( + self, + pipeline_id: str, + name: str, + description: str, + version: str, + module_path: str, + stages: list[str], + input_type: str, + config: dict[str, Any] | None = None, + is_enabled: bool = True, + ) -> None: + """ + Register a pipeline in the database. + + Args: + pipeline_id: Unique pipeline identifier + name: Display name + description: Human-readable description + version: Semantic version + module_path: Python module path (e.g., "package.module:ClassName") + stages: List of stage names + input_type: Expected input type + config: Optional pipeline configuration + is_enabled: Whether the pipeline is enabled + + Raises: + ValueError: If module_path is invalid + """ + # Validate module path format + if ":" not in module_path: + raise ValueError( + f"Invalid module_path: {module_path}. " + "Expected format: 'package.module:ClassName'" + ) + + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO pipeline.registry ( + pipeline_id, name, description, version, module_path, + stages, input_type, config, is_enabled, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW()) + ON CONFLICT (pipeline_id) DO UPDATE SET + name = EXCLUDED.name, + description = EXCLUDED.description, + version = EXCLUDED.version, + module_path = EXCLUDED.module_path, + stages = EXCLUDED.stages, + input_type = EXCLUDED.input_type, + config = EXCLUDED.config, + is_enabled = EXCLUDED.is_enabled, + updated_at = NOW() + """, + pipeline_id, + name, + description, + version, + module_path, + stages, + input_type, + config, + is_enabled, + ) + + logger.info(f"Registered pipeline: {pipeline_id} v{version}") + + async def register_from_instance( + self, + pipeline: BasePipeline, + module_path: str, + config: dict[str, Any] | None = None, + ) -> None: + """ + Register a pipeline from an instance. + + Args: + pipeline: Pipeline instance + module_path: Python module path + config: Optional configuration + """ + metadata = pipeline.metadata + await self.register( + pipeline_id=metadata["id"], + name=metadata["name"], + description=metadata["description"], + version=metadata["version"], + module_path=module_path, + stages=metadata["stages"], + input_type=metadata["input_type"], + config=config, + ) + # Cache the instance + self._instances[metadata["id"]] = pipeline + + async def unregister(self, pipeline_id: str) -> bool: + """ + Unregister a pipeline from the database. + + Args: + pipeline_id: Pipeline identifier to remove + + Returns: + True if pipeline was removed, False if not found + """ + async with self._pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM pipeline.registry WHERE pipeline_id = $1", + pipeline_id, + ) + + # Remove from cache + self._instances.pop(pipeline_id, None) + + deleted = result.split()[-1] != "0" + if deleted: + logger.info(f"Unregistered pipeline: {pipeline_id}") + return deleted + + async def set_enabled(self, pipeline_id: str, enabled: bool) -> bool: + """ + Enable or disable a pipeline. + + Args: + pipeline_id: Pipeline identifier + enabled: Whether to enable or disable + + Returns: + True if pipeline was updated, False if not found + """ + async with self._pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pipeline.registry + SET is_enabled = $2, updated_at = NOW() + WHERE pipeline_id = $1 + """, + pipeline_id, + enabled, + ) + + updated = result.split()[-1] != "0" + if updated: + logger.info(f"Set pipeline {pipeline_id} enabled={enabled}") + return updated + + async def list_pipelines( + self, + enabled_only: bool = True, + ) -> list[PipelineInfo]: + """ + List all registered pipelines. + + Args: + enabled_only: Only return enabled pipelines + + Returns: + List of PipelineInfo dictionaries + """ + async with self._pool.acquire() as conn: + if enabled_only: + rows = await conn.fetch( + """ + SELECT pipeline_id, name, description, version, + is_enabled, stages, input_type + FROM pipeline.registry + WHERE is_enabled = TRUE + ORDER BY name + """ + ) + else: + rows = await conn.fetch( + """ + SELECT pipeline_id, name, description, version, + is_enabled, stages, input_type + FROM pipeline.registry + ORDER BY name + """ + ) + + return [ + PipelineInfo( + id=row["pipeline_id"], + name=row["name"], + description=row["description"], + version=row["version"], + is_enabled=row["is_enabled"], + stages=row["stages"], + input_type=row["input_type"], + ) + for row in rows + ] + + async def get_pipeline_detail( + self, + pipeline_id: str, + ) -> PipelineDetail | None: + """ + Get detailed information about a pipeline. + + Args: + pipeline_id: Pipeline identifier + + Returns: + PipelineDetail or None if not found + """ + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT pipeline_id, name, description, version, module_path, + is_enabled, stages, input_type, config, + created_at, updated_at + FROM pipeline.registry + WHERE pipeline_id = $1 + """, + pipeline_id, + ) + + if not row: + return None + + return PipelineDetail( + id=row["pipeline_id"], + name=row["name"], + description=row["description"], + version=row["version"], + is_enabled=row["is_enabled"], + stages=row["stages"], + input_type=row["input_type"], + module_path=row["module_path"], + config=row["config"], + created_at=row["created_at"].isoformat() if row["created_at"] else None, + updated_at=row["updated_at"].isoformat() if row["updated_at"] else None, + ) + + async def get_pipeline( + self, + pipeline_id: str, + initialize: bool = True, + ) -> BasePipeline | None: + """ + Get a pipeline instance. + + Args: + pipeline_id: Pipeline identifier + initialize: Whether to call initialize() on the pipeline + + Returns: + Pipeline instance or None if not found + + This method caches pipeline instances for reuse. + """ + # Check cache first + if pipeline_id in self._instances: + return self._instances[pipeline_id] + + # Get pipeline details from database + detail = await self.get_pipeline_detail(pipeline_id) + if not detail or not detail["is_enabled"]: + return None + + # Import and instantiate the pipeline + try: + pipeline = self._import_pipeline(detail["module_path"]) + except Exception as e: + logger.error(f"Failed to import pipeline {pipeline_id}: {e}") + return None + + # Initialize if requested + if initialize: + try: + await pipeline.initialize() + except Exception as e: + logger.error(f"Failed to initialize pipeline {pipeline_id}: {e}") + return None + + # Cache and return + self._instances[pipeline_id] = pipeline + return pipeline + + def _import_pipeline(self, module_path: str) -> BasePipeline: + """ + Import a pipeline class from a module path. + + Args: + module_path: Path in format "package.module:ClassName" + + Returns: + Pipeline instance + """ + module_name, class_name = module_path.rsplit(":", 1) + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + return cls() + + async def close_all(self) -> None: + """Close all cached pipeline instances.""" + for pipeline_id, pipeline in self._instances.items(): + try: + await pipeline.close() + except Exception as e: + logger.error(f"Error closing pipeline {pipeline_id}: {e}") + + self._instances.clear() + + +class InMemoryPipelineRegistry: + """ + In-memory pipeline registry for testing and simple deployments. + + This registry doesn't persist to a database and stores pipelines in memory. + """ + + def __init__(self): + self._pipelines: dict[str, BasePipeline] = {} + self._enabled: dict[str, bool] = {} + + async def register(self, pipeline: BasePipeline) -> None: + """Register a pipeline instance.""" + pipeline_id = pipeline.metadata["id"] + self._pipelines[pipeline_id] = pipeline + self._enabled[pipeline_id] = True + logger.info(f"Registered pipeline: {pipeline_id}") + + async def unregister(self, pipeline_id: str) -> bool: + """Unregister a pipeline.""" + if pipeline_id in self._pipelines: + del self._pipelines[pipeline_id] + del self._enabled[pipeline_id] + return True + return False + + async def set_enabled(self, pipeline_id: str, enabled: bool) -> bool: + """Enable or disable a pipeline.""" + if pipeline_id in self._enabled: + self._enabled[pipeline_id] = enabled + return True + return False + + async def list_pipelines( + self, + enabled_only: bool = True, + ) -> list[PipelineInfo]: + """List all registered pipelines.""" + result = [] + for pipeline_id, pipeline in self._pipelines.items(): + is_enabled = self._enabled.get(pipeline_id, True) + if enabled_only and not is_enabled: + continue + + metadata = pipeline.metadata + result.append( + PipelineInfo( + id=pipeline_id, + name=metadata["name"], + description=metadata["description"], + version=metadata["version"], + is_enabled=is_enabled, + stages=metadata["stages"], + input_type=metadata["input_type"], + ) + ) + return result + + async def get_pipeline( + self, + pipeline_id: str, + initialize: bool = True, + ) -> BasePipeline | None: + """Get a pipeline instance.""" + if pipeline_id not in self._pipelines: + return None + + if not self._enabled.get(pipeline_id, True): + return None + + pipeline = self._pipelines[pipeline_id] + + if initialize: + await pipeline.initialize() + + return pipeline + + async def close_all(self) -> None: + """Close all pipeline instances.""" + for pipeline in self._pipelines.values(): + try: + await pipeline.close() + except Exception as e: + logger.error(f"Error closing pipeline: {e}") + + self._pipelines.clear() + self._enabled.clear() diff --git a/packages/pipeline-core/src/pipeline_core/runner.py b/packages/pipeline-core/src/pipeline_core/runner.py new file mode 100644 index 0000000..9fdafda --- /dev/null +++ b/packages/pipeline-core/src/pipeline_core/runner.py @@ -0,0 +1,467 @@ +""" +Pipeline Runner - Executes pipelines and tracks execution history. + +The runner handles pipeline execution, tracking execution status in the database, +and providing execution history for monitoring and debugging. +""" + +from __future__ import annotations + +import logging +import time +import uuid +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from pipeline_core.base import PipelineResult +from pipeline_core.contracts import ExecutionRequest, ExecutionStatus + +if TYPE_CHECKING: + import asyncpg + + from pipeline_core.base import BasePipeline + from pipeline_core.registry import PipelineRegistry + +logger = logging.getLogger(__name__) + + +class PipelineRunner: + """ + Executes pipelines and tracks execution history. + + The runner: + - Gets pipeline instances from the registry + - Tracks execution status in the database + - Handles errors and updates status + - Provides execution history queries + + Usage: + registry = PipelineRegistry(pool) + runner = PipelineRunner(pool, registry) + + # Execute a pipeline + result = await runner.execute( + pipeline_id="reviewiq", + request=ExecutionRequest( + job_id="job-123", + stages=["normalize", "classify"], + ) + ) + + # Get execution status + status = await runner.get_execution("exec-123") + + # List executions + executions = await runner.list_executions(pipeline_id="reviewiq") + """ + + def __init__( + self, + pool: asyncpg.Pool, + registry: PipelineRegistry, + ): + """ + Initialize the runner. + + Args: + pool: asyncpg connection pool + registry: Pipeline registry for getting pipeline instances + """ + self._pool = pool + self._registry = registry + + async def execute( + self, + pipeline_id: str, + request: ExecutionRequest, + ) -> tuple[str, PipelineResult]: + """ + Execute a pipeline. + + Args: + pipeline_id: Pipeline identifier + request: Execution request with input data and options + + Returns: + Tuple of (execution_id, PipelineResult) + + Raises: + ValueError: If pipeline not found or disabled + """ + # Get pipeline instance + pipeline = await self._registry.get_pipeline(pipeline_id) + if not pipeline: + raise ValueError(f"Pipeline not found or disabled: {pipeline_id}") + + # Create execution record + execution_id = str(uuid.uuid4()) + stages = request.get("stages") or pipeline.get_stage_names() + + await self._create_execution( + execution_id=execution_id, + pipeline_id=pipeline_id, + job_id=request.get("job_id"), + business_id=request.get("business_id"), + stages_requested=stages, + ) + + # Update status to running + await self._update_execution_status( + execution_id=execution_id, + status="running", + started_at=datetime.utcnow(), + ) + + try: + # Prepare input data + input_data = request.get("input_data") or {} + if request.get("job_id"): + input_data["job_id"] = request["job_id"] + if request.get("business_id"): + input_data["business_id"] = request["business_id"] + + # Validate input + validation_errors = await pipeline.validate_input(input_data) + if validation_errors: + error_msg = "; ".join(validation_errors) + await self._update_execution_status( + execution_id=execution_id, + status="failed", + error_message=f"Validation failed: {error_msg}", + completed_at=datetime.utcnow(), + ) + return execution_id, PipelineResult.from_error( + pipeline_id, f"Validation failed: {error_msg}" + ) + + # Execute pipeline + start_time = time.time() + result = await pipeline.process(input_data, stages=stages) + duration_ms = int((time.time() - start_time) * 1000) + + # Update execution with result + if result.success: + await self._update_execution_status( + execution_id=execution_id, + status="completed", + stages_completed=result.stages_run, + result_summary=self._summarize_result(result), + completed_at=datetime.utcnow(), + ) + else: + await self._update_execution_status( + execution_id=execution_id, + status="failed", + stages_completed=result.stages_run, + error_message=result.error, + completed_at=datetime.utcnow(), + ) + + logger.info( + f"Pipeline {pipeline_id} execution {execution_id} " + f"completed in {duration_ms}ms: success={result.success}" + ) + + return execution_id, result + + except Exception as e: + logger.exception(f"Pipeline {pipeline_id} execution failed: {e}") + + await self._update_execution_status( + execution_id=execution_id, + status="failed", + error_message=str(e), + completed_at=datetime.utcnow(), + ) + + return execution_id, PipelineResult.from_error(pipeline_id, str(e)) + + async def cancel(self, execution_id: str) -> bool: + """ + Cancel a running execution. + + Args: + execution_id: Execution identifier + + Returns: + True if cancelled, False if not found or already completed + """ + async with self._pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE pipeline.executions + SET status = 'cancelled', completed_at = NOW() + WHERE id = $1 AND status IN ('pending', 'running') + """, + uuid.UUID(execution_id), + ) + + cancelled = result.split()[-1] != "0" + if cancelled: + logger.info(f"Cancelled execution: {execution_id}") + return cancelled + + async def get_execution(self, execution_id: str) -> ExecutionStatus | None: + """ + Get execution status. + + Args: + execution_id: Execution identifier + + Returns: + ExecutionStatus or None if not found + """ + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id, pipeline_id, job_id, business_id, status, + stages_requested, stages_completed, current_stage, + input_summary, result_summary, error_message, + started_at, completed_at, created_at + FROM pipeline.executions + WHERE id = $1 + """, + uuid.UUID(execution_id), + ) + + if not row: + return None + + return self._row_to_execution_status(row) + + async def list_executions( + self, + pipeline_id: str | None = None, + job_id: str | None = None, + business_id: str | None = None, + status: str | None = None, + limit: int = 50, + offset: int = 0, + ) -> list[ExecutionStatus]: + """ + List execution history. + + Args: + pipeline_id: Filter by pipeline + job_id: Filter by job + business_id: Filter by business + status: Filter by status + limit: Maximum results + offset: Result offset + + Returns: + List of ExecutionStatus + """ + conditions = [] + params = [] + param_idx = 1 + + if pipeline_id: + conditions.append(f"pipeline_id = ${param_idx}") + params.append(pipeline_id) + param_idx += 1 + + if job_id: + conditions.append(f"job_id = ${param_idx}") + params.append(uuid.UUID(job_id)) + param_idx += 1 + + if business_id: + conditions.append(f"business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + if status: + conditions.append(f"status = ${param_idx}") + params.append(status) + param_idx += 1 + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + + query = f""" + SELECT id, pipeline_id, job_id, business_id, status, + stages_requested, stages_completed, current_stage, + input_summary, result_summary, error_message, + started_at, completed_at, created_at + FROM pipeline.executions + {where_clause} + ORDER BY created_at DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """ + params.extend([limit, offset]) + + async with self._pool.acquire() as conn: + rows = await conn.fetch(query, *params) + + return [self._row_to_execution_status(row) for row in rows] + + async def get_execution_count( + self, + pipeline_id: str | None = None, + status: str | None = None, + ) -> int: + """ + Get execution count. + + Args: + pipeline_id: Filter by pipeline + status: Filter by status + + Returns: + Count of executions matching filters + """ + conditions = [] + params = [] + param_idx = 1 + + if pipeline_id: + conditions.append(f"pipeline_id = ${param_idx}") + params.append(pipeline_id) + param_idx += 1 + + if status: + conditions.append(f"status = ${param_idx}") + params.append(status) + param_idx += 1 + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + + async with self._pool.acquire() as conn: + result = await conn.fetchval( + f"SELECT COUNT(*) FROM pipeline.executions {where_clause}", + *params, + ) + + return result or 0 + + async def _create_execution( + self, + execution_id: str, + pipeline_id: str, + job_id: str | None, + business_id: str | None, + stages_requested: list[str], + ) -> None: + """Create an execution record.""" + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO pipeline.executions ( + id, pipeline_id, job_id, business_id, + status, stages_requested, created_at + ) + VALUES ($1, $2, $3, $4, 'pending', $5, NOW()) + """, + uuid.UUID(execution_id), + pipeline_id, + uuid.UUID(job_id) if job_id else None, + business_id, + stages_requested, + ) + + async def _update_execution_status( + self, + execution_id: str, + status: str, + current_stage: str | None = None, + stages_completed: list[str] | None = None, + input_summary: dict[str, Any] | None = None, + result_summary: dict[str, Any] | None = None, + error_message: str | None = None, + started_at: datetime | None = None, + completed_at: datetime | None = None, + ) -> None: + """Update execution status.""" + updates = ["status = $2"] + params: list[Any] = [uuid.UUID(execution_id), status] + param_idx = 3 + + if current_stage is not None: + updates.append(f"current_stage = ${param_idx}") + params.append(current_stage) + param_idx += 1 + + if stages_completed is not None: + updates.append(f"stages_completed = ${param_idx}") + params.append(stages_completed) + param_idx += 1 + + if input_summary is not None: + updates.append(f"input_summary = ${param_idx}") + params.append(input_summary) + param_idx += 1 + + if result_summary is not None: + updates.append(f"result_summary = ${param_idx}") + params.append(result_summary) + param_idx += 1 + + if error_message is not None: + updates.append(f"error_message = ${param_idx}") + params.append(error_message) + param_idx += 1 + + if started_at is not None: + updates.append(f"started_at = ${param_idx}") + params.append(started_at) + param_idx += 1 + + if completed_at is not None: + updates.append(f"completed_at = ${param_idx}") + params.append(completed_at) + param_idx += 1 + + query = f""" + UPDATE pipeline.executions + SET {", ".join(updates)} + WHERE id = $1 + """ + + async with self._pool.acquire() as conn: + await conn.execute(query, *params) + + def _row_to_execution_status(self, row: Any) -> ExecutionStatus: + """Convert database row to ExecutionStatus.""" + # Calculate progress + stages_requested = row["stages_requested"] or [] + stages_completed = row["stages_completed"] or [] + progress = ( + len(stages_completed) / len(stages_requested) + if stages_requested + else 0.0 + ) + + return ExecutionStatus( + id=str(row["id"]), + pipeline_id=row["pipeline_id"], + job_id=str(row["job_id"]) if row["job_id"] else None, + business_id=row["business_id"], + status=row["status"], + stages_requested=stages_requested, + stages_completed=stages_completed, + current_stage=row["current_stage"], + progress=progress, + input_summary=row["input_summary"], + result_summary=row["result_summary"], + error_message=row["error_message"], + started_at=row["started_at"].isoformat() if row["started_at"] else None, + completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, + created_at=row["created_at"].isoformat() if row["created_at"] else None, + ) + + def _summarize_result(self, result: PipelineResult) -> dict[str, Any]: + """Create a summary of the pipeline result for storage.""" + summary: dict[str, Any] = { + "success": result.success, + "stages_run": result.stages_run, + } + + # Add stage-specific summaries + for stage, stage_result in result.stage_results.items(): + if stage_result.get("data"): + # Extract stats if available + data = stage_result["data"] + if "stats" in data: + summary[f"{stage}_stats"] = data["stats"] + + return summary diff --git a/packages/reviewiq-pipeline/pyproject.toml b/packages/reviewiq-pipeline/pyproject.toml index ff531e8..4e83224 100644 --- a/packages/reviewiq-pipeline/pyproject.toml +++ b/packages/reviewiq-pipeline/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ ] dependencies = [ + "pipeline-core", "asyncpg>=0.28.0", "pydantic>=2.0", "pydantic-settings>=2.0", diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/__init__.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/__init__.py index f5ac699..8d9612d 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/__init__.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/__init__.py @@ -6,6 +6,9 @@ This package provides a complete pipeline for processing customer reviews: - Stage 2: LLM Classification (span extraction with URT codes) - Stage 3: Issue Routing (route negative spans to issues) - Stage 4: Fact Aggregation (pre-aggregate metrics for dashboards) + +Implements the BasePipeline interface from pipeline-core for the extensible +multi-pipeline system with dynamic dashboards. """ from reviewiq_pipeline.config import Config @@ -28,12 +31,14 @@ from reviewiq_pipeline.contracts import ( ValidationError, ValidationResult, ) -from reviewiq_pipeline.pipeline import Pipeline +from reviewiq_pipeline.pipeline import Pipeline, PipelineResult, ReviewIQPipeline __version__ = "0.1.0" __all__ = [ # Main API "Pipeline", + "ReviewIQPipeline", + "PipelineResult", "Config", # Contracts "ScraperOutput", diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py index b386420..d1996dd 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py @@ -1,15 +1,27 @@ """ Pipeline class - main public API for the ReviewIQ pipeline. -Provides a unified interface for running pipeline stages. +Provides a unified interface for running pipeline stages and implements +the BasePipeline interface for the extensible pipeline system. """ from __future__ import annotations import logging +import time from datetime import date from typing import TYPE_CHECKING, Any +from pipeline_core import ( + BasePipeline, + DashboardConfig, + DashboardSection, + PipelineMetadata, + PipelineResult as BasePipelineResult, + StageResult, + WidgetConfig, +) + from reviewiq_pipeline.config import Config from reviewiq_pipeline.contracts import ( ClassificationConfig, @@ -51,9 +63,14 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Stage name to number mapping +STAGE_NAMES = ["normalize", "classify", "route", "aggregate"] +STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4} +STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate"} + class PipelineResult: - """Result from running the full pipeline.""" + """Result from running the full pipeline (legacy format).""" def __init__( self, @@ -86,30 +103,39 @@ class PipelineResult: } -class Pipeline: +class ReviewIQPipeline(BasePipeline): """ - Main pipeline class for processing reviews. + ReviewIQ Classification Pipeline. + + Implements the BasePipeline interface for the extensible pipeline system. + Classifies reviews using URT taxonomy, detects issues, and aggregates metrics. + + Stages: + - normalize: Text cleaning, language detection, deduplication + - classify: LLM-powered span extraction with URT codes + - route: Route negative spans to issues + - aggregate: Pre-aggregate metrics for dashboards Usage: config = Config(database_url="...", llm_provider="openai", ...) - pipeline = Pipeline(config) + pipeline = ReviewIQPipeline(config) + await pipeline.initialize() # Run full pipeline result = await pipeline.process(scraper_output) - # Or run individual stages - stage1_result = await pipeline.normalize(scraper_output) - stage2_result = await pipeline.classify(stage1_result) + # Or run specific stages + result = await pipeline.process(scraper_output, stages=["normalize", "classify"]) """ - def __init__(self, config: Config): + def __init__(self, config: Config | None = None): """ Initialize the pipeline. Args: - config: Pipeline configuration + config: Pipeline configuration. If None, loads from environment. """ - self.config = config + self._config = config or Config() self._db: DatabasePool | None = None self._review_repo: ReviewRepository | None = None self._span_repo: SpanRepository | None = None @@ -118,15 +144,32 @@ class Pipeline: self._embedding_service: EmbeddingService | None = None self._initialized = False + @property + def config(self) -> Config: + """Get pipeline configuration.""" + return self._config + + @property + def metadata(self) -> PipelineMetadata: + """Get pipeline metadata.""" + return PipelineMetadata( + id="reviewiq", + name="ReviewIQ Classification Pipeline", + description="Classifies reviews using URT taxonomy, detects issues, and aggregates metrics for dashboards", + version="1.0.0", + stages=STAGE_NAMES, + input_type="ScraperV1Output", + ) + async def initialize(self) -> None: """Initialize database connections and services.""" if self._initialized: return - logger.info("Initializing pipeline...") + logger.info("Initializing ReviewIQ pipeline...") # Initialize database - self._db = DatabasePool(self.config) + self._db = DatabasePool(self._config) await self._db.initialize() # Initialize repositories @@ -136,10 +179,10 @@ class Pipeline: self._fact_repo = FactRepository(self._db) # Initialize embedding service - self._embedding_service = EmbeddingService(self.config) + self._embedding_service = EmbeddingService(self._config) self._initialized = True - logger.info("Pipeline initialized") + logger.info("ReviewIQ pipeline initialized") async def close(self) -> None: """Close all connections and cleanup resources.""" @@ -148,7 +191,7 @@ class Pipeline: self._db = None self._initialized = False - logger.info("Pipeline closed") + logger.info("ReviewIQ pipeline closed") async def migrate(self) -> int: """ @@ -158,23 +201,437 @@ class Pipeline: Number of migrations run """ if not self._db: - self._db = DatabasePool(self.config) + self._db = DatabasePool(self._config) await self._db.initialize() return await self._db.run_migrations() async def process( + self, + input_data: dict[str, Any], + stages: list[str] | None = None, + ) -> BasePipelineResult: + """ + Process input data through the pipeline. + + Args: + input_data: ScraperV1Output or compatible dictionary + stages: List of stage names to run (default: all) + + Returns: + BasePipelineResult with stage outputs + """ + await self.initialize() + + # Default to all stages + stages = stages or STAGE_NAMES + stages_run: list[str] = [] + stage_results: dict[str, StageResult] = {} + + # Convert input to ScraperOutput if needed + scraper_output = self._ensure_scraper_output(input_data) + + # Track intermediate results for stage dependencies + stage1_result: Stage1Output | None = None + stage2_result: Stage2Output | None = None + + try: + # Stage 1: Normalize + if "normalize" in stages: + start = time.time() + logger.info("Running Stage 1: Normalization") + + try: + stage1_result = await self._run_normalize(scraper_output) + duration_ms = int((time.time() - start) * 1000) + stages_run.append("normalize") + stage_results["normalize"] = StageResult( + stage="normalize", + success=True, + data={"stats": stage1_result.get("stats", {})}, + error=None, + duration_ms=duration_ms, + ) + except Exception as e: + logger.exception("Stage 1 failed") + stage_results["normalize"] = StageResult( + stage="normalize", + success=False, + data={}, + error=str(e), + duration_ms=int((time.time() - start) * 1000), + ) + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=False, + error=f"normalize failed: {e}", + ) + + # Stage 2: Classify + if "classify" in stages and stage1_result: + start = time.time() + logger.info("Running Stage 2: Classification") + + try: + stage2_result = await self._run_classify(stage1_result) + duration_ms = int((time.time() - start) * 1000) + stages_run.append("classify") + stage_results["classify"] = StageResult( + stage="classify", + success=True, + data={"stats": stage2_result.get("stats", {})}, + error=None, + duration_ms=duration_ms, + ) + except Exception as e: + logger.exception("Stage 2 failed") + stage_results["classify"] = StageResult( + stage="classify", + success=False, + data={}, + error=str(e), + duration_ms=int((time.time() - start) * 1000), + ) + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=False, + error=f"classify failed: {e}", + ) + + # Stage 3: Route + if "route" in stages and stage2_result: + start = time.time() + logger.info("Running Stage 3: Issue Routing") + + try: + stage3_result = await self._run_route(stage2_result) + duration_ms = int((time.time() - start) * 1000) + stages_run.append("route") + stage_results["route"] = StageResult( + stage="route", + success=True, + data={"stats": stage3_result.get("stats", {})}, + error=None, + duration_ms=duration_ms, + ) + except Exception as e: + logger.exception("Stage 3 failed") + stage_results["route"] = StageResult( + stage="route", + success=False, + data={}, + error=str(e), + duration_ms=int((time.time() - start) * 1000), + ) + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=False, + error=f"route failed: {e}", + ) + + # Stage 4: Aggregate + if "aggregate" in stages: + start = time.time() + logger.info("Running Stage 4: Aggregation") + + try: + stage4_result = await self._run_aggregate( + scraper_output["business_id"], + date.today().isoformat(), + ) + duration_ms = int((time.time() - start) * 1000) + stages_run.append("aggregate") + stage_results["aggregate"] = StageResult( + stage="aggregate", + success=True, + data={"stats": stage4_result.get("stats", {})}, + error=None, + duration_ms=duration_ms, + ) + except Exception as e: + logger.exception("Stage 4 failed") + stage_results["aggregate"] = StageResult( + stage="aggregate", + success=False, + data={}, + error=str(e), + duration_ms=int((time.time() - start) * 1000), + ) + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=False, + error=f"aggregate failed: {e}", + ) + + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=True, + ) + + except Exception as e: + logger.exception("Pipeline failed with unexpected error") + return BasePipelineResult( + pipeline_id="reviewiq", + stages_run=stages_run, + stage_results=stage_results, + success=False, + error=str(e), + ) + + def get_dashboard_config(self) -> DashboardConfig: + """Get the dashboard configuration for ReviewIQ.""" + return DashboardConfig( + pipeline_id="reviewiq", + title="ReviewIQ Analytics", + description="Review classification insights and issue tracking", + sections=[ + DashboardSection( + id="overview", + title="Overview", + description="Key metrics at a glance", + widgets=[ + WidgetConfig( + id="total_reviews", + type="stat_card", + title="Total Reviews", + grid={"x": 0, "y": 0, "w": 3, "h": 1}, + config={ + "value_key": "total_reviews", + "format": "{value:,}", + "icon": "message-square", + "color": "blue", + }, + ), + WidgetConfig( + id="reviews_processed", + type="stat_card", + title="Reviews Processed", + grid={"x": 3, "y": 0, "w": 3, "h": 1}, + config={ + "value_key": "reviews_processed", + "format": "{value:,}", + "trend_key": "processed_change", + "icon": "check-circle", + "color": "green", + }, + ), + WidgetConfig( + id="issues_found", + type="stat_card", + title="Issues Found", + grid={"x": 6, "y": 0, "w": 3, "h": 1}, + config={ + "value_key": "issues_count", + "format": "{value:,}", + "icon": "alert-triangle", + "color": "red", + }, + ), + WidgetConfig( + id="avg_rating", + type="stat_card", + title="Avg Rating", + grid={"x": 9, "y": 0, "w": 3, "h": 1}, + config={ + "value_key": "avg_rating", + "format": "{value:.1f}", + "trend_key": "rating_change", + "icon": "star", + "color": "yellow", + }, + ), + ], + collapsed=False, + ), + DashboardSection( + id="sentiment", + title="Sentiment Analysis", + description="Review sentiment distribution", + widgets=[ + WidgetConfig( + id="sentiment_distribution", + type="pie_chart", + title="Sentiment Distribution", + grid={"x": 0, "y": 0, "w": 4, "h": 2}, + config={ + "value_key": "count", + "label_key": "sentiment", + "colors": ["#22c55e", "#ef4444", "#6b7280", "#eab308"], + "show_legend": True, + }, + ), + WidgetConfig( + id="sentiment_trend", + type="line_chart", + title="Sentiment Over Time", + grid={"x": 4, "y": 0, "w": 8, "h": 2}, + config={ + "x_axis": {"key": "date", "type": "time"}, + "y_axis": {"key": "count", "label": "Reviews"}, + "series": [ + {"key": "positive", "name": "Positive", "color": "#22c55e"}, + {"key": "negative", "name": "Negative", "color": "#ef4444"}, + {"key": "neutral", "name": "Neutral", "color": "#6b7280"}, + ], + "show_legend": True, + }, + ), + ], + collapsed=False, + ), + DashboardSection( + id="classification", + title="URT Classification", + description="Review classification by URT domain", + widgets=[ + WidgetConfig( + id="urt_distribution", + type="bar_chart", + title="URT Domain Distribution", + grid={"x": 0, "y": 0, "w": 6, "h": 2}, + config={ + "x_axis": {"key": "domain", "type": "category"}, + "y_axis": {"key": "count", "label": "Spans"}, + "series": [{"key": "count", "name": "Spans"}], + }, + ), + WidgetConfig( + id="intensity_heatmap", + type="heatmap", + title="Domain x Intensity", + grid={"x": 6, "y": 0, "w": 6, "h": 2}, + config={ + "x_key": "intensity", + "y_key": "domain", + "value_key": "count", + "color_scale": ["#f0fdf4", "#22c55e"], + "show_values": True, + }, + ), + ], + collapsed=False, + ), + DashboardSection( + id="issues", + title="Issues", + description="Identified issues from negative reviews", + widgets=[ + WidgetConfig( + id="issues_table", + type="table", + title="Active Issues", + grid={"x": 0, "y": 0, "w": 8, "h": 2}, + config={ + "columns": [ + {"key": "domain", "header": "Domain", "width": 100}, + {"key": "subcode", "header": "Code", "width": 120}, + {"key": "span_count", "header": "Mentions", "width": 80, "align": "right"}, + {"key": "max_intensity", "header": "Intensity", "width": 80}, + {"key": "state", "header": "State", "width": 80}, + ], + "row_key": "issue_id", + "page_size": 10, + "sortable": True, + }, + ), + WidgetConfig( + id="issues_by_domain", + type="pie_chart", + title="Issues by Domain", + grid={"x": 8, "y": 0, "w": 4, "h": 2}, + config={ + "value_key": "count", + "label_key": "domain", + "show_legend": True, + }, + ), + ], + collapsed=False, + ), + ], + default_time_range="30d", + refresh_interval=300, + ) + + async def get_widget_data( + self, + widget_id: str, + params: dict[str, Any], + ) -> dict[str, Any]: + """ + Get data for a specific dashboard widget. + + Args: + widget_id: Widget identifier + params: Query parameters (business_id, time_range, etc.) + + Returns: + Widget data dictionary + """ + await self.initialize() + + business_id = params.get("business_id") + time_range = params.get("time_range", "30d") + + match widget_id: + # Overview stats + case "total_reviews": + return await self._get_review_count(business_id) + case "reviews_processed": + return await self._get_processed_count(business_id, time_range) + case "issues_found": + return await self._get_issues_count(business_id) + case "avg_rating": + return await self._get_avg_rating(business_id, time_range) + + # Sentiment + case "sentiment_distribution": + return await self._get_sentiment_distribution(business_id) + case "sentiment_trend": + return await self._get_sentiment_trend(business_id, time_range) + + # Classification + case "urt_distribution": + return await self._get_urt_distribution(business_id) + case "intensity_heatmap": + return await self._get_intensity_heatmap(business_id) + + # Issues + case "issues_table": + return await self._get_issues_table(business_id, params) + case "issues_by_domain": + return await self._get_issues_by_domain(business_id) + + case _: + logger.warning(f"Unknown widget: {widget_id}") + return {"error": f"Unknown widget: {widget_id}"} + + # ========================================================================= + # Legacy Interface (for backward compatibility) + # ========================================================================= + + async def process_legacy( self, scraper_output: ScraperOutput, stages: list[int] | None = None, validate: bool = True, ) -> PipelineResult: """ - Run the full pipeline on scraper output. + Run the full pipeline on scraper output (legacy interface). Args: scraper_output: Output from the scraper (Stage 0) - stages: List of stages to run (default: all [1, 2, 3, 4]) + stages: List of stage numbers to run (default: all [1, 2, 3, 4]) validate: Whether to validate each stage output Returns: @@ -189,7 +646,7 @@ class Pipeline: # Stage 1: Normalize if 1 in stages: logger.info("Running Stage 1: Normalization") - result.stage1 = await self.normalize(scraper_output) + result.stage1 = await self._run_normalize(scraper_output) if validate: validation_results["stage1"] = validate_stage1_output(result.stage1) @@ -197,10 +654,9 @@ class Pipeline: # Stage 2: Classify if 2 in stages and result.stage1: logger.info("Running Stage 2: Classification") - result.stage2 = await self.classify(result.stage1) + result.stage2 = await self._run_classify(result.stage1) if validate: - # Build input reviews map for validation input_reviews = { (r["source"], r["review_id"], r["review_version"]): r for r in result.stage1["reviews_normalized"] @@ -212,7 +668,7 @@ class Pipeline: # Stage 3: Route if 3 in stages and result.stage2: logger.info("Running Stage 3: Issue Routing") - result.stage3 = await self.route(result.stage2) + result.stage3 = await self._run_route(result.stage2) if validate: validation_results["stage3"] = await validate_stage3_output( @@ -222,7 +678,7 @@ class Pipeline: # Stage 4: Aggregate if 4 in stages: logger.info("Running Stage 4: Aggregation") - result.stage4 = await self.aggregate( + result.stage4 = await self._run_aggregate( scraper_output["business_id"], date.today().isoformat(), ) @@ -233,20 +689,60 @@ class Pipeline: result.validation = validation_results return result + # Alias for backward compatibility async def normalize(self, scraper_output: ScraperOutput) -> Stage1Output: - """ - Run Stage 1: Normalization. - - Args: - scraper_output: Raw scraper output - - Returns: - Stage1Output with normalized reviews - """ + """Run Stage 1: Normalization (legacy method).""" await self.initialize() + return await self._run_normalize(scraper_output) + async def classify(self, stage1_output: Stage1Output) -> Stage2Output: + """Run Stage 2: Classification (legacy method).""" + await self.initialize() + return await self._run_classify(stage1_output) + + async def route(self, stage2_output: Stage2Output) -> Stage3Output: + """Run Stage 3: Issue Routing (legacy method).""" + await self.initialize() + return await self._run_route(stage2_output) + + async def aggregate( + self, + business_id: str, + date_str: str, + bucket_types: list[str] | None = None, + ) -> Stage4Output: + """Run Stage 4: Fact Aggregation (legacy method).""" + await self.initialize() + return await self._run_aggregate(business_id, date_str, bucket_types) + + # ========================================================================= + # Internal Stage Implementations + # ========================================================================= + + def _ensure_scraper_output(self, input_data: dict[str, Any]) -> ScraperOutput: + """Ensure input data is in ScraperOutput format.""" + # If it has all required fields, use as-is + required = ["job_id", "business_id", "place_id", "reviews"] + if all(k in input_data for k in required): + return input_data # type: ignore + + # Otherwise, wrap it + return ScraperOutput( + job_id=input_data.get("job_id", "unknown"), + status=input_data.get("status", "completed"), + business_id=input_data.get("business_id", "unknown"), + place_id=input_data.get("place_id", "unknown"), + business_info=input_data.get("business_info", {}), + reviews=input_data.get("reviews", []), + scrape_time_ms=input_data.get("scrape_time_ms", 0), + reviews_scraped=len(input_data.get("reviews", [])), + scraper_version=input_data.get("scraper_version", "unknown"), + ) + + async def _run_normalize(self, scraper_output: ScraperOutput) -> Stage1Output: + """Run normalization stage.""" stage1 = Stage1Normalizer( - self.config, + self._config, self._db, self._review_repo, ) @@ -260,27 +756,16 @@ class Pipeline: return await stage1.process(input_data) - async def classify(self, stage1_output: Stage1Output) -> Stage2Output: - """ - Run Stage 2: Classification. - - Args: - stage1_output: Output from Stage 1 - - Returns: - Stage2Output with classified reviews - """ - await self.initialize() - + async def _run_classify(self, stage1_output: Stage1Output) -> Stage2Output: + """Run classification stage.""" stage2 = Stage2Classifier( - self.config, + self._config, self._db, self._review_repo, self._span_repo, self._embedding_service, ) - # Convert normalized reviews to classification input reviews_to_classify = [ ReviewToClassify( source=r["source"], @@ -299,10 +784,10 @@ class Pipeline: input_data = Stage2Input( reviews=reviews_to_classify, config=ClassificationConfig( - model=self.config.llm_model, - taxonomy_version=self.config.taxonomy_version, - profile=self.config.classification_profile, - max_spans_per_review=self.config.max_spans_per_review, + model=self._config.llm_model, + taxonomy_version=self._config.taxonomy_version, + profile=self._config.classification_profile, + max_spans_per_review=self._config.max_spans_per_review, ), ) @@ -311,26 +796,15 @@ class Pipeline: finally: await stage2.close() - async def route(self, stage2_output: Stage2Output) -> Stage3Output: - """ - Run Stage 3: Issue Routing. - - Args: - stage2_output: Output from Stage 2 - - Returns: - Stage3Output with routing results - """ - await self.initialize() - + async def _run_route(self, stage2_output: Stage2Output) -> Stage3Output: + """Run issue routing stage.""" stage3 = Stage3Router( - self.config, + self._config, self._db, self._span_repo, self._issue_repo, ) - # Extract negative/mixed spans for routing spans_to_route = [] for review in stage2_output["reviews_classified"]: for span in review.get("spans", []): @@ -352,27 +826,15 @@ class Pipeline: return await stage3.process(Stage3Input(spans=spans_to_route)) - async def aggregate( + async def _run_aggregate( self, business_id: str, date_str: str, bucket_types: list[str] | None = None, ) -> Stage4Output: - """ - Run Stage 4: Fact Aggregation. - - Args: - business_id: Business identifier - date_str: Date string (YYYY-MM-DD) - bucket_types: List of bucket types (default: ['day']) - - Returns: - Stage4Output with aggregated facts - """ - await self.initialize() - + """Run fact aggregation stage.""" stage4 = Stage4Aggregator( - self.config, + self._config, self._db, self._fact_repo, ) @@ -381,22 +843,484 @@ class Pipeline: business_id=business_id, date=date_str, bucket_types=bucket_types or ["day"], # type: ignore - taxonomy_version=self.config.taxonomy_version, + taxonomy_version=self._config.taxonomy_version, ) return await stage4.process(input_data) - async def validate(self, job_id: str) -> dict[str, ValidationResult]: - """ - Validate pipeline output for a job. + # ========================================================================= + # Widget Data Methods + # ========================================================================= - Args: - job_id: Job identifier + async def _get_review_count(self, business_id: str | None) -> dict[str, Any]: + """Get total review count.""" + if not self._db: + return {"total_reviews": 0} - Returns: - Dictionary of validation results by stage - """ - # This would query the database for the job's output and validate - # For now, return empty results - logger.warning(f"validate() for job {job_id} not fully implemented") - return {} + async with self._db._pool.acquire() as conn: + if business_id: + count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE business_id = $1", + business_id, + ) + else: + count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw" + ) + + return {"total_reviews": count or 0} + + async def _get_processed_count( + self, business_id: str | None, time_range: str + ) -> dict[str, Any]: + """Get processed review count with trend.""" + if not self._db: + return {"reviews_processed": 0, "processed_change": 0} + + # Parse time range + days = self._parse_time_range(time_range) + + async with self._db._pool.acquire() as conn: + if business_id: + current = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.reviews_enriched + WHERE business_id = $1 + AND processed_at >= NOW() - INTERVAL '1 day' * $2 + """, + business_id, + days, + ) + previous = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.reviews_enriched + WHERE business_id = $1 + AND processed_at >= NOW() - INTERVAL '1 day' * $2 + AND processed_at < NOW() - INTERVAL '1 day' * $3 + """, + business_id, + days * 2, + days, + ) + else: + current = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.reviews_enriched + WHERE processed_at >= NOW() - INTERVAL '1 day' * $1 + """, + days, + ) + previous = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.reviews_enriched + WHERE processed_at >= NOW() - INTERVAL '1 day' * $1 + AND processed_at < NOW() - INTERVAL '1 day' * $2 + """, + days * 2, + days, + ) + + current = current or 0 + previous = previous or 0 + change = ((current - previous) / previous * 100) if previous > 0 else 0 + + return { + "reviews_processed": current, + "processed_change": round(change, 1), + } + + async def _get_issues_count(self, business_id: str | None) -> dict[str, Any]: + """Get open issues count.""" + if not self._db: + return {"issues_count": 0} + + async with self._db._pool.acquire() as conn: + if business_id: + count = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.issues + WHERE business_id = $1 AND state = 'open' + """, + business_id, + ) + else: + count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE state = 'open'" + ) + + return {"issues_count": count or 0} + + async def _get_avg_rating( + self, business_id: str | None, time_range: str + ) -> dict[str, Any]: + """Get average rating with trend.""" + if not self._db: + return {"avg_rating": 0, "rating_change": 0} + + days = self._parse_time_range(time_range) + + async with self._db._pool.acquire() as conn: + if business_id: + current = await conn.fetchval( + """ + SELECT AVG(rating) FROM pipeline.reviews_enriched + WHERE business_id = $1 + AND review_time >= NOW() - INTERVAL '1 day' * $2 + """, + business_id, + days, + ) + previous = await conn.fetchval( + """ + SELECT AVG(rating) FROM pipeline.reviews_enriched + WHERE business_id = $1 + AND review_time >= NOW() - INTERVAL '1 day' * $2 + AND review_time < NOW() - INTERVAL '1 day' * $3 + """, + business_id, + days * 2, + days, + ) + else: + current = await conn.fetchval( + """ + SELECT AVG(rating) FROM pipeline.reviews_enriched + WHERE review_time >= NOW() - INTERVAL '1 day' * $1 + """, + days, + ) + previous = await conn.fetchval( + """ + SELECT AVG(rating) FROM pipeline.reviews_enriched + WHERE review_time >= NOW() - INTERVAL '1 day' * $1 + AND review_time < NOW() - INTERVAL '1 day' * $2 + """, + days * 2, + days, + ) + + current = float(current) if current else 0 + previous = float(previous) if previous else 0 + change = current - previous + + return { + "avg_rating": round(current, 2), + "rating_change": round(change, 2), + } + + async def _get_sentiment_distribution( + self, business_id: str | None + ) -> dict[str, Any]: + """Get sentiment distribution for pie chart.""" + if not self._db: + return {"data": []} + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT + valence, + COUNT(*) as count + FROM pipeline.review_spans + WHERE business_id = $1 AND is_active = TRUE + GROUP BY valence + """, + business_id, + ) + else: + rows = await conn.fetch( + """ + SELECT + valence, + COUNT(*) as count + FROM pipeline.review_spans + WHERE is_active = TRUE + GROUP BY valence + """ + ) + + # Map valence codes to labels + valence_labels = { + "V+": "Positive", + "V-": "Negative", + "V0": "Neutral", + "V±": "Mixed", + } + + data = [ + { + "sentiment": valence_labels.get(row["valence"], row["valence"]), + "count": row["count"], + } + for row in rows + ] + + return {"data": data} + + async def _get_sentiment_trend( + self, business_id: str | None, time_range: str + ) -> dict[str, Any]: + """Get sentiment trend over time for line chart.""" + if not self._db: + return {"data": []} + + days = self._parse_time_range(time_range) + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT + DATE(review_time) as date, + COUNT(*) FILTER (WHERE valence = 'V+') as positive, + COUNT(*) FILTER (WHERE valence = 'V-') as negative, + COUNT(*) FILTER (WHERE valence = 'V0') as neutral + FROM pipeline.review_spans + WHERE business_id = $1 + AND review_time >= NOW() - INTERVAL '1 day' * $2 + AND is_active = TRUE + GROUP BY DATE(review_time) + ORDER BY date + """, + business_id, + days, + ) + else: + rows = await conn.fetch( + """ + SELECT + DATE(review_time) as date, + COUNT(*) FILTER (WHERE valence = 'V+') as positive, + COUNT(*) FILTER (WHERE valence = 'V-') as negative, + COUNT(*) FILTER (WHERE valence = 'V0') as neutral + FROM pipeline.review_spans + WHERE review_time >= NOW() - INTERVAL '1 day' * $1 + AND is_active = TRUE + GROUP BY DATE(review_time) + ORDER BY date + """, + days, + ) + + data = [ + { + "date": row["date"].isoformat(), + "positive": row["positive"], + "negative": row["negative"], + "neutral": row["neutral"], + } + for row in rows + ] + + return {"data": data} + + async def _get_urt_distribution(self, business_id: str | None) -> dict[str, Any]: + """Get URT domain distribution for bar chart.""" + if not self._db: + return {"data": []} + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + COUNT(*) as count + FROM pipeline.review_spans + WHERE business_id = $1 AND is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1) + ORDER BY count DESC + """, + business_id, + ) + else: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + COUNT(*) as count + FROM pipeline.review_spans + WHERE is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1) + ORDER BY count DESC + """ + ) + + # Map domain codes to names + domain_names = { + "O": "Overall", + "P": "People", + "J": "Journey", + "E": "Environment", + "A": "Administrative", + "V": "Value", + "R": "Reliability", + } + + data = [ + { + "domain": domain_names.get(row["domain"], row["domain"]), + "count": row["count"], + } + for row in rows + ] + + return {"data": data} + + async def _get_intensity_heatmap(self, business_id: str | None) -> dict[str, Any]: + """Get domain x intensity heatmap data.""" + if not self._db: + return {"data": []} + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + intensity, + COUNT(*) as count + FROM pipeline.review_spans + WHERE business_id = $1 AND is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1), intensity + """, + business_id, + ) + else: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + intensity, + COUNT(*) as count + FROM pipeline.review_spans + WHERE is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1), intensity + """ + ) + + domain_names = { + "O": "Overall", + "P": "People", + "J": "Journey", + "E": "Environment", + "A": "Administrative", + "V": "Value", + "R": "Reliability", + } + + data = [ + { + "domain": domain_names.get(row["domain"], row["domain"]), + "intensity": row["intensity"], + "count": row["count"], + } + for row in rows + ] + + return {"data": data} + + async def _get_issues_table( + self, business_id: str | None, params: dict[str, Any] + ) -> dict[str, Any]: + """Get issues table data.""" + if not self._db: + return {"data": [], "total": 0} + + page = params.get("page", 1) + page_size = params.get("page_size", 10) + offset = (page - 1) * page_size + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT + issue_id, + domain, + primary_subcode as subcode, + span_count, + max_intensity, + state + FROM pipeline.issues + WHERE business_id = $1 + ORDER BY span_count DESC, created_at DESC + LIMIT $2 OFFSET $3 + """, + business_id, + page_size, + offset, + ) + total = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", + business_id, + ) + else: + rows = await conn.fetch( + """ + SELECT + issue_id, + domain, + primary_subcode as subcode, + span_count, + max_intensity, + state + FROM pipeline.issues + ORDER BY span_count DESC, created_at DESC + LIMIT $1 OFFSET $2 + """, + page_size, + offset, + ) + total = await conn.fetchval("SELECT COUNT(*) FROM pipeline.issues") + + data = [dict(row) for row in rows] + + return {"data": data, "total": total or 0} + + async def _get_issues_by_domain(self, business_id: str | None) -> dict[str, Any]: + """Get issues grouped by domain for pie chart.""" + if not self._db: + return {"data": []} + + async with self._db._pool.acquire() as conn: + if business_id: + rows = await conn.fetch( + """ + SELECT domain, COUNT(*) as count + FROM pipeline.issues + WHERE business_id = $1 + GROUP BY domain + ORDER BY count DESC + """, + business_id, + ) + else: + rows = await conn.fetch( + """ + SELECT domain, COUNT(*) as count + FROM pipeline.issues + GROUP BY domain + ORDER BY count DESC + """ + ) + + data = [{"domain": row["domain"], "count": row["count"]} for row in rows] + + return {"data": data} + + def _parse_time_range(self, time_range: str) -> int: + """Parse time range string to days.""" + if time_range.endswith("d"): + return int(time_range[:-1]) + elif time_range.endswith("w"): + return int(time_range[:-1]) * 7 + elif time_range.endswith("m"): + return int(time_range[:-1]) * 30 + else: + return 30 # Default to 30 days + + +# Alias for backward compatibility +Pipeline = ReviewIQPipeline diff --git a/web/app/pipelines/[pipelineId]/executions/page.tsx b/web/app/pipelines/[pipelineId]/executions/page.tsx new file mode 100644 index 0000000..f098f89 --- /dev/null +++ b/web/app/pipelines/[pipelineId]/executions/page.tsx @@ -0,0 +1,282 @@ +'use client'; + +import { useState, useEffect } from 'react'; +import { useParams } from 'next/navigation'; +import Link from 'next/link'; +import { + ArrowLeft, + CheckCircle, + XCircle, + Clock, + PlayCircle, + Loader, + RefreshCw, + AlertCircle, +} from 'lucide-react'; +import type { ExecutionStatus, PipelineDetail } from '@/lib/pipeline-types'; +import { getPipeline, listExecutions } from '@/lib/pipeline-api'; + +// Status badge component +function StatusBadge({ status }: { status: ExecutionStatus['status'] }) { + const config = { + pending: { + icon: Clock, + color: 'bg-gray-100 text-gray-600 dark:bg-gray-700 dark:text-gray-400', + }, + running: { + icon: Loader, + color: 'bg-blue-100 text-blue-600 dark:bg-blue-900/30 dark:text-blue-400', + }, + completed: { + icon: CheckCircle, + color: 'bg-green-100 text-green-600 dark:bg-green-900/30 dark:text-green-400', + }, + failed: { + icon: XCircle, + color: 'bg-red-100 text-red-600 dark:bg-red-900/30 dark:text-red-400', + }, + cancelled: { + icon: AlertCircle, + color: 'bg-yellow-100 text-yellow-600 dark:bg-yellow-900/30 dark:text-yellow-400', + }, + }; + + const { icon: Icon, color } = config[status] || config.pending; + + return ( + + + {status.charAt(0).toUpperCase() + status.slice(1)} + + ); +} + +// Format date string +function formatDate(dateStr: string | undefined): string { + if (!dateStr) return '-'; + const date = new Date(dateStr); + return date.toLocaleString(); +} + +// Calculate duration +function formatDuration(start?: string, end?: string): string { + if (!start) return '-'; + const startDate = new Date(start); + const endDate = end ? new Date(end) : new Date(); + const ms = endDate.getTime() - startDate.getTime(); + + if (ms < 1000) return `${ms}ms`; + if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`; + return `${(ms / 60000).toFixed(1)}m`; +} + +/** + * Execution history page for a pipeline. + */ +export default function ExecutionsPage() { + const params = useParams(); + const pipelineId = params.pipelineId as string; + + const [pipeline, setPipeline] = useState(null); + const [executions, setExecutions] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [statusFilter, setStatusFilter] = useState(''); + + const fetchData = async () => { + setLoading(true); + setError(null); + + try { + const [pipelineData, executionsData] = await Promise.all([ + getPipeline(pipelineId), + listExecutions(pipelineId, { + status: statusFilter || undefined, + limit: 50, + }), + ]); + setPipeline(pipelineData); + setExecutions(executionsData); + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to load data'); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + if (pipelineId) { + fetchData(); + } + }, [pipelineId, statusFilter]); + + return ( +
+ {/* Navigation */} +
+ + + Back to Dashboard + + + +
+ + {/* Header */} +
+

+ Execution History +

+

+ {pipeline?.name || pipelineId} +

+
+ + {/* Filters */} +
+
+ + +
+
+ + {/* Content */} + {error ? ( +
+ +

{error}

+ +
+ ) : loading ? ( +
+ {[1, 2, 3, 4, 5].map((i) => ( +
+
+
+
+
+
+
+
+
+ ))} +
+ ) : executions.length === 0 ? ( +
+ +

No executions found

+
+ ) : ( +
+ + + + + + + + + + + + + {executions.map((execution) => ( + + + + + + + + + ))} + +
+ Status + + Execution ID + + Job / Business + + Stages + + Started + + Duration +
+ + + + {execution.id.slice(0, 8)}... + + + {execution.job_id ? ( + + {execution.job_id.slice(0, 8)}... + + ) : execution.business_id ? ( + + {execution.business_id} + + ) : ( + - + )} + + + {execution.stages_completed.length} / {execution.stages_requested.length} + + {execution.error_message && ( + + (error) + + )} + + {formatDate(execution.started_at)} + + {formatDuration(execution.started_at, execution.completed_at)} +
+
+ )} +
+ ); +} diff --git a/web/app/pipelines/[pipelineId]/page.tsx b/web/app/pipelines/[pipelineId]/page.tsx new file mode 100644 index 0000000..e7867cb --- /dev/null +++ b/web/app/pipelines/[pipelineId]/page.tsx @@ -0,0 +1,168 @@ +'use client'; + +import { useState, useEffect } from 'react'; +import { useParams } from 'next/navigation'; +import Link from 'next/link'; +import { AlertCircle, ArrowLeft, History, Settings } from 'lucide-react'; +import type { DashboardConfig, PipelineDetail } from '@/lib/pipeline-types'; +import { getPipeline, getDashboardConfig } from '@/lib/pipeline-api'; +import { DynamicDashboard } from '@/components/dashboard'; + +/** + * Pipeline dashboard page. + * + * Displays the dynamic dashboard for a specific pipeline. + */ +export default function PipelineDashboardPage() { + const params = useParams(); + const pipelineId = params.pipelineId as string; + + const [pipeline, setPipeline] = useState(null); + const [dashboardConfig, setDashboardConfig] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + useEffect(() => { + const fetchData = async () => { + setLoading(true); + setError(null); + + try { + const [pipelineData, configData] = await Promise.all([ + getPipeline(pipelineId), + getDashboardConfig(pipelineId), + ]); + setPipeline(pipelineData); + setDashboardConfig(configData); + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to load pipeline'); + } finally { + setLoading(false); + } + }; + + if (pipelineId) { + fetchData(); + } + }, [pipelineId]); + + if (loading) { + return ( +
+ {/* Header skeleton */} +
+
+
+
+
+
+
+ + {/* Dashboard skeleton */} +
+
+ {[1, 2, 3, 4].map((i) => ( +
+ ))} +
+
+
+
+ ); + } + + if (error || !pipeline || !dashboardConfig) { + return ( +
+ + + Back to Pipelines + + +
+ +

+ {error || 'Pipeline not found'} +

+ + Back to Pipelines + +
+
+ ); + } + + return ( +
+ {/* Navigation */} +
+ + + Back to Pipelines + + +
+ + + Execution History + +
+
+ + {/* Pipeline Info */} +
+
+
+

+ {pipeline.name} +

+

{pipeline.description}

+
+
+

+ Version: {pipeline.version} +

+

+ Input: {pipeline.input_type} +

+
+
+ +
+

Stages:

+
+ {pipeline.stages.map((stage, index) => ( + + + {index + 1} + + {stage} + + ))} +
+
+
+ + {/* Dynamic Dashboard */} + +
+ ); +} diff --git a/web/app/pipelines/page.tsx b/web/app/pipelines/page.tsx new file mode 100644 index 0000000..1368c87 --- /dev/null +++ b/web/app/pipelines/page.tsx @@ -0,0 +1,194 @@ +'use client'; + +import { useState, useEffect } from 'react'; +import Link from 'next/link'; +import { + Beaker, + Play, + Pause, + CheckCircle, + AlertCircle, + Clock, + ChevronRight, + RefreshCw, +} from 'lucide-react'; +import type { PipelineInfo } from '@/lib/pipeline-types'; +import { listPipelines } from '@/lib/pipeline-api'; + +/** + * Pipeline card component. + */ +function PipelineCard({ pipeline }: { pipeline: PipelineInfo }) { + return ( + +
+
+
+ +
+
+

+ {pipeline.name} +

+

v{pipeline.version}

+
+
+ +
+ +

+ {pipeline.description} +

+ +
+
+ Stages: + {pipeline.stages.slice(0, 3).map((stage, i) => ( + + {stage} + + ))} + {pipeline.stages.length > 3 && ( + + +{pipeline.stages.length - 3} more + + )} +
+ + + {pipeline.is_enabled ? 'Enabled' : 'Disabled'} + +
+ + ); +} + +/** + * Pipelines list page. + */ +export default function PipelinesPage() { + const [pipelines, setPipelines] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [showDisabled, setShowDisabled] = useState(false); + + const fetchPipelines = async () => { + setLoading(true); + setError(null); + try { + const data = await listPipelines(!showDisabled); + setPipelines(data); + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to load pipelines'); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + fetchPipelines(); + }, [showDisabled]); + + return ( +
+ {/* Header */} +
+
+

+ Pipelines +

+

+ Data processing pipelines for review analysis +

+
+ +
+ {/* Show disabled toggle */} + + + {/* Refresh button */} + +
+
+ + {/* Content */} + {error ? ( +
+ +

{error}

+ +
+ ) : loading ? ( +
+ {[1, 2, 3].map((i) => ( +
+
+
+
+
+
+
+
+
+
+
+ ))} +
+ ) : pipelines.length === 0 ? ( +
+ +

No pipelines registered

+
+ ) : ( +
+ {pipelines.map((pipeline) => ( + + ))} +
+ )} +
+ ); +} diff --git a/web/components/Sidebar.tsx b/web/components/Sidebar.tsx index f1d812b..982a22d 100644 --- a/web/components/Sidebar.tsx +++ b/web/components/Sidebar.tsx @@ -50,6 +50,16 @@ export default function Sidebar() { label: 'Analytics', matchPaths: ['/analytics'], }, + { + href: '/pipelines', + icon: ( + + + + ), + label: 'Pipelines', + matchPaths: ['/pipelines'], + }, { href: '/dashboard/scrapers', icon: ( diff --git a/web/components/dashboard/DashboardSection.tsx b/web/components/dashboard/DashboardSection.tsx new file mode 100644 index 0000000..f501eb0 --- /dev/null +++ b/web/components/dashboard/DashboardSection.tsx @@ -0,0 +1,148 @@ +'use client'; + +import { useState, useEffect, useCallback } from 'react'; +import { ChevronDown, ChevronRight } from 'lucide-react'; +import type { DashboardSection as DashboardSectionType, WidgetData } from '@/lib/pipeline-types'; +import { getWidgetData } from '@/lib/pipeline-api'; +import { renderWidget } from './WidgetRegistry'; + +interface DashboardSectionProps { + section: DashboardSectionType; + pipelineId: string; + businessId?: string; + timeRange?: string; +} + +/** + * Renders a dashboard section with its widgets. + */ +export function DashboardSection({ + section, + pipelineId, + businessId, + timeRange = '30d', +}: DashboardSectionProps) { + const [collapsed, setCollapsed] = useState(section.collapsed ?? false); + const [widgetData, setWidgetData] = useState>({}); + const [widgetLoading, setWidgetLoading] = useState>({}); + const [widgetErrors, setWidgetErrors] = useState>({}); + const [tablePagination, setTablePagination] = useState>({}); + + // Fetch data for a single widget + const fetchWidgetData = useCallback( + async (widgetId: string, page?: number) => { + setWidgetLoading((prev) => ({ ...prev, [widgetId]: true })); + setWidgetErrors((prev) => ({ ...prev, [widgetId]: undefined })); + + try { + const data = await getWidgetData(pipelineId, widgetId, { + business_id: businessId, + time_range: timeRange, + page: page || tablePagination[widgetId] || 1, + }); + setWidgetData((prev) => ({ ...prev, [widgetId]: data })); + } catch (error) { + setWidgetErrors((prev) => ({ + ...prev, + [widgetId]: error instanceof Error ? error.message : 'Failed to load', + })); + } finally { + setWidgetLoading((prev) => ({ ...prev, [widgetId]: false })); + } + }, + [pipelineId, businessId, timeRange, tablePagination] + ); + + // Fetch all widget data on mount and when params change + useEffect(() => { + if (!collapsed) { + section.widgets.forEach((widget) => { + fetchWidgetData(widget.id); + }); + } + }, [section.widgets, collapsed, pipelineId, businessId, timeRange]); + + // Handle page change for tables + const handlePageChange = (widgetId: string, page: number) => { + setTablePagination((prev) => ({ ...prev, [widgetId]: page })); + fetchWidgetData(widgetId, page); + }; + + // Calculate grid layout + // Using a 12-column grid + const getGridClass = (widget: typeof section.widgets[0]) => { + const { grid } = widget; + // Map grid units to Tailwind classes + const colSpanClasses: Record = { + 1: 'col-span-1', + 2: 'col-span-2', + 3: 'col-span-3', + 4: 'col-span-4', + 5: 'col-span-5', + 6: 'col-span-6', + 7: 'col-span-7', + 8: 'col-span-8', + 9: 'col-span-9', + 10: 'col-span-10', + 11: 'col-span-11', + 12: 'col-span-12', + }; + const rowSpanClasses: Record = { + 1: 'row-span-1', + 2: 'row-span-2', + 3: 'row-span-3', + 4: 'row-span-4', + }; + return `${colSpanClasses[grid.w] || 'col-span-4'} ${rowSpanClasses[grid.h] || 'row-span-1'}`; + }; + + return ( +
+ {/* Section Header */} + + + {/* Widgets Grid */} + {!collapsed && ( +
+ {section.widgets.map((widget) => ( +
+ {renderWidget( + widget, + widgetData[widget.id] || null, + widgetLoading[widget.id] ?? true, + widgetErrors[widget.id], + () => fetchWidgetData(widget.id), + widget.type === 'table' + ? (page) => handlePageChange(widget.id, page) + : undefined, + tablePagination[widget.id] || 1 + )} +
+ ))} +
+ )} +
+ ); +} diff --git a/web/components/dashboard/DynamicDashboard.tsx b/web/components/dashboard/DynamicDashboard.tsx new file mode 100644 index 0000000..5ae07a3 --- /dev/null +++ b/web/components/dashboard/DynamicDashboard.tsx @@ -0,0 +1,106 @@ +'use client'; + +import { useState } from 'react'; +import { RefreshCw, Calendar, Building2 } from 'lucide-react'; +import type { DashboardConfig } from '@/lib/pipeline-types'; +import { DashboardSection } from './DashboardSection'; + +interface DynamicDashboardProps { + pipelineId: string; + config: DashboardConfig; + businessId?: string; +} + +// Time range options +const TIME_RANGES = [ + { value: '7d', label: 'Last 7 days' }, + { value: '14d', label: 'Last 14 days' }, + { value: '30d', label: 'Last 30 days' }, + { value: '90d', label: 'Last 90 days' }, +]; + +/** + * Dynamic dashboard that renders from a DashboardConfig. + * + * This component: + * - Renders sections based on the config + * - Provides time range and business filters + * - Handles global refresh + */ +export function DynamicDashboard({ + pipelineId, + config, + businessId: initialBusinessId, +}: DynamicDashboardProps) { + const [timeRange, setTimeRange] = useState(config.default_time_range || '30d'); + const [businessId, setBusinessId] = useState(initialBusinessId); + const [refreshKey, setRefreshKey] = useState(0); + + // Force refresh all widgets + const handleRefresh = () => { + setRefreshKey((prev) => prev + 1); + }; + + return ( +
+ {/* Dashboard Header */} +
+
+

+ {config.title} +

+ {config.description && ( +

{config.description}

+ )} +
+ + {/* Controls */} +
+ {/* Business Filter (placeholder) */} + {businessId && ( +
+ + {businessId} +
+ )} + + {/* Time Range Selector */} +
+ + +
+ + {/* Refresh Button */} + +
+
+ + {/* Sections */} + {config.sections.map((section) => ( + + ))} +
+ ); +} diff --git a/web/components/dashboard/WidgetRegistry.tsx b/web/components/dashboard/WidgetRegistry.tsx new file mode 100644 index 0000000..1437d1d --- /dev/null +++ b/web/components/dashboard/WidgetRegistry.tsx @@ -0,0 +1,98 @@ +'use client'; + +import type { ComponentType } from 'react'; +import type { WidgetConfig, WidgetType, WidgetData } from '@/lib/pipeline-types'; +import { StatCard } from './widgets/StatCard'; +import { LineChartWidget } from './widgets/LineChart'; +import { BarChartWidget } from './widgets/BarChart'; +import { PieChartWidget } from './widgets/PieChart'; +import { DataTableWidget } from './widgets/DataTable'; +import { HeatmapWidget } from './widgets/Heatmap'; + +// Common widget props +export interface WidgetComponentProps { + config: WidgetConfig; + data: WidgetData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; + onPageChange?: (page: number) => void; + currentPage?: number; +} + +// Widget component type +type WidgetComponent = ComponentType; + +/** + * Registry mapping widget types to their React components. + */ +const WIDGET_COMPONENTS: Record = { + stat_card: StatCard as WidgetComponent, + line_chart: LineChartWidget as WidgetComponent, + bar_chart: BarChartWidget as WidgetComponent, + pie_chart: PieChartWidget as WidgetComponent, + table: DataTableWidget as WidgetComponent, + heatmap: HeatmapWidget as WidgetComponent, + // Placeholder for unimplemented types + area_chart: LineChartWidget as WidgetComponent, // Use line chart as fallback + gauge: StatCard as WidgetComponent, // Use stat card as fallback +}; + +/** + * Get the component for a widget type. + */ +export function getWidgetComponent(type: WidgetType): WidgetComponent | null { + return WIDGET_COMPONENTS[type] || null; +} + +/** + * Check if a widget type is supported. + */ +export function isWidgetTypeSupported(type: string): type is WidgetType { + return type in WIDGET_COMPONENTS; +} + +/** + * Render a widget based on its configuration. + */ +export function renderWidget( + config: WidgetConfig, + data: WidgetData | null, + loading: boolean, + error?: string, + onRefresh?: () => void, + onPageChange?: (page: number) => void, + currentPage?: number +): React.ReactNode { + const Component = WIDGET_COMPONENTS[config.type]; + + if (!Component) { + return ( +
+

Unknown widget type: {config.type}

+
+ ); + } + + return ( + + ); +} + +// Export widget components for direct use +export { + StatCard, + LineChartWidget, + BarChartWidget, + PieChartWidget, + DataTableWidget, + HeatmapWidget, +}; diff --git a/web/components/dashboard/index.ts b/web/components/dashboard/index.ts new file mode 100644 index 0000000..2bd1587 --- /dev/null +++ b/web/components/dashboard/index.ts @@ -0,0 +1,26 @@ +/** + * Dashboard component exports. + * + * This module provides the dynamic dashboard system that renders + * pipeline dashboards from configuration. + */ + +// Main components +export { DynamicDashboard } from './DynamicDashboard'; +export { DashboardSection } from './DashboardSection'; + +// Widget registry +export { + getWidgetComponent, + isWidgetTypeSupported, + renderWidget, +} from './WidgetRegistry'; + +// Individual widgets +export { StatCard } from './widgets/StatCard'; +export { LineChartWidget } from './widgets/LineChart'; +export { BarChartWidget } from './widgets/BarChart'; +export { PieChartWidget } from './widgets/PieChart'; +export { DataTableWidget } from './widgets/DataTable'; +export { HeatmapWidget } from './widgets/Heatmap'; +export { WidgetWrapper } from './widgets/WidgetWrapper'; diff --git a/web/components/dashboard/widgets/BarChart.tsx b/web/components/dashboard/widgets/BarChart.tsx new file mode 100644 index 0000000..e369f6a --- /dev/null +++ b/web/components/dashboard/widgets/BarChart.tsx @@ -0,0 +1,106 @@ +'use client'; + +import { + BarChart as RechartsBarChart, + Bar, + XAxis, + YAxis, + CartesianGrid, + Tooltip, + Legend, + ResponsiveContainer, +} from 'recharts'; +import type { WidgetConfig, ChartData, ChartWidgetConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface BarChartWidgetProps { + config: WidgetConfig; + data: ChartData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +// Default colors for series +const DEFAULT_COLORS = [ + '#3b82f6', // blue + '#22c55e', // green + '#ef4444', // red + '#eab308', // yellow + '#8b5cf6', // purple + '#ec4899', // pink + '#06b6d4', // cyan +]; + +/** + * Bar chart widget using Recharts. + */ +export function BarChartWidget({ + config, + data, + loading, + error, + onRefresh, +}: BarChartWidgetProps) { + const chartConfig = config.config as ChartWidgetConfig; + const chartData = data?.data || []; + + return ( + + {chartData.length === 0 ? ( +
+ No data available +
+ ) : ( + + + {chartConfig.show_grid !== false && ( + + )} + + + + {chartConfig.show_legend !== false && } + {chartConfig.series?.map((series, index) => ( + + ))} + + + )} +
+ ); +} diff --git a/web/components/dashboard/widgets/DataTable.tsx b/web/components/dashboard/widgets/DataTable.tsx new file mode 100644 index 0000000..98cc956 --- /dev/null +++ b/web/components/dashboard/widgets/DataTable.tsx @@ -0,0 +1,134 @@ +'use client'; + +import { ChevronLeft, ChevronRight } from 'lucide-react'; +import type { WidgetConfig, TableData, TableWidgetConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface DataTableWidgetProps { + config: WidgetConfig; + data: TableData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; + onPageChange?: (page: number) => void; + currentPage?: number; +} + +/** + * Data table widget with pagination. + */ +export function DataTableWidget({ + config, + data, + loading, + error, + onRefresh, + onPageChange, + currentPage = 1, +}: DataTableWidgetProps) { + const tableConfig = config.config as TableWidgetConfig; + const rows = data?.data || []; + const total = data?.total || 0; + const pageSize = tableConfig.page_size || 10; + const totalPages = Math.ceil(total / pageSize); + + return ( + + {rows.length === 0 ? ( +
+ No data available +
+ ) : ( +
+ {/* Table */} +
+ + + + {tableConfig.columns.map((col) => ( + + ))} + + + + {rows.map((row, rowIndex) => ( + + {tableConfig.columns.map((col) => ( + + ))} + + ))} + +
+ {col.header} +
+ {formatCellValue(row[col.key], col.format)} +
+
+ + {/* Pagination */} + {tableConfig.show_pagination !== false && totalPages > 1 && onPageChange && ( +
+
+ Showing {(currentPage - 1) * pageSize + 1} to{' '} + {Math.min(currentPage * pageSize, total)} of {total} +
+
+ + + Page {currentPage} of {totalPages} + + +
+
+ )} +
+ )} +
+ ); +} + +function formatCellValue(value: unknown, format?: string): string { + if (value === null || value === undefined) return '-'; + if (typeof value === 'number') { + if (format?.includes('%')) { + return `${value.toFixed(1)}%`; + } + return value.toLocaleString(); + } + return String(value); +} diff --git a/web/components/dashboard/widgets/Heatmap.tsx b/web/components/dashboard/widgets/Heatmap.tsx new file mode 100644 index 0000000..8343b6f --- /dev/null +++ b/web/components/dashboard/widgets/Heatmap.tsx @@ -0,0 +1,128 @@ +'use client'; + +import type { WidgetConfig, ChartData, HeatmapConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface HeatmapWidgetProps { + config: WidgetConfig; + data: ChartData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +/** + * Heatmap widget for displaying 2D data grids. + */ +export function HeatmapWidget({ + config, + data, + loading, + error, + onRefresh, +}: HeatmapWidgetProps) { + const heatmapConfig = config.config as HeatmapConfig; + const rawData = data?.data || []; + + // Extract unique x and y values + const xValues = [...new Set(rawData.map((d) => d[heatmapConfig.x_key] as string))]; + const yValues = [...new Set(rawData.map((d) => d[heatmapConfig.y_key] as string))]; + + // Find min/max values for color scaling + const values = rawData.map((d) => d[heatmapConfig.value_key] as number); + const minValue = Math.min(...values, 0); + const maxValue = Math.max(...values, 1); + + // Create lookup map + const valueMap = new Map(); + rawData.forEach((d) => { + const key = `${d[heatmapConfig.y_key]}-${d[heatmapConfig.x_key]}`; + valueMap.set(key, d[heatmapConfig.value_key] as number); + }); + + // Get color for a value + const getColor = (value: number): string => { + const colors = heatmapConfig.color_scale || ['#f0fdf4', '#22c55e']; + const ratio = maxValue === minValue ? 0.5 : (value - minValue) / (maxValue - minValue); + + // Simple interpolation between first and last color + if (colors.length === 2) { + return interpolateColor(colors[0], colors[1], ratio); + } + return colors[Math.min(Math.floor(ratio * colors.length), colors.length - 1)]; + }; + + return ( + + {rawData.length === 0 ? ( +
+ No data available +
+ ) : ( +
+ + + + + ))} + + + + {yValues.map((y) => ( + + + {xValues.map((x) => { + const key = `${y}-${x}`; + const value = valueMap.get(key) || 0; + return ( + + ); + })} + + ))} + +
+ {xValues.map((x) => ( + + {x} +
+ {y} + + {heatmapConfig.show_values && ( + + {value.toLocaleString()} + + )} +
+
+ )} +
+ ); +} + +/** + * Interpolate between two hex colors. + */ +function interpolateColor(color1: string, color2: string, ratio: number): string { + const hex = (c: string) => parseInt(c, 16); + const r1 = hex(color1.slice(1, 3)); + const g1 = hex(color1.slice(3, 5)); + const b1 = hex(color1.slice(5, 7)); + const r2 = hex(color2.slice(1, 3)); + const g2 = hex(color2.slice(3, 5)); + const b2 = hex(color2.slice(5, 7)); + + const r = Math.round(r1 + (r2 - r1) * ratio); + const g = Math.round(g1 + (g2 - g1) * ratio); + const b = Math.round(b1 + (b2 - b1) * ratio); + + return `#${r.toString(16).padStart(2, '0')}${g.toString(16).padStart(2, '0')}${b.toString(16).padStart(2, '0')}`; +} diff --git a/web/components/dashboard/widgets/LineChart.tsx b/web/components/dashboard/widgets/LineChart.tsx new file mode 100644 index 0000000..7efa40d --- /dev/null +++ b/web/components/dashboard/widgets/LineChart.tsx @@ -0,0 +1,109 @@ +'use client'; + +import { + LineChart as RechartsLineChart, + Line, + XAxis, + YAxis, + CartesianGrid, + Tooltip, + Legend, + ResponsiveContainer, +} from 'recharts'; +import type { WidgetConfig, ChartData, ChartWidgetConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface LineChartWidgetProps { + config: WidgetConfig; + data: ChartData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +// Default colors for series +const DEFAULT_COLORS = [ + '#3b82f6', // blue + '#22c55e', // green + '#ef4444', // red + '#eab308', // yellow + '#8b5cf6', // purple + '#ec4899', // pink + '#06b6d4', // cyan +]; + +/** + * Line chart widget using Recharts. + */ +export function LineChartWidget({ + config, + data, + loading, + error, + onRefresh, +}: LineChartWidgetProps) { + const chartConfig = config.config as ChartWidgetConfig; + const chartData = data?.data || []; + + return ( + + {chartData.length === 0 ? ( +
+ No data available +
+ ) : ( + + + {chartConfig.show_grid !== false && ( + + )} + + + + {chartConfig.show_legend !== false && } + {chartConfig.series?.map((series, index) => ( + + ))} + + + )} +
+ ); +} diff --git a/web/components/dashboard/widgets/PieChart.tsx b/web/components/dashboard/widgets/PieChart.tsx new file mode 100644 index 0000000..6658de3 --- /dev/null +++ b/web/components/dashboard/widgets/PieChart.tsx @@ -0,0 +1,106 @@ +'use client'; + +import { + PieChart as RechartsPieChart, + Pie, + Cell, + Tooltip, + Legend, + ResponsiveContainer, +} from 'recharts'; +import type { WidgetConfig, ChartData, PieChartConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface PieChartWidgetProps { + config: WidgetConfig; + data: ChartData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +// Default colors for pie slices +const DEFAULT_COLORS = [ + '#3b82f6', // blue + '#22c55e', // green + '#ef4444', // red + '#eab308', // yellow + '#8b5cf6', // purple + '#ec4899', // pink + '#06b6d4', // cyan + '#f97316', // orange +]; + +/** + * Pie/Donut chart widget using Recharts. + */ +export function PieChartWidget({ + config, + data, + loading, + error, + onRefresh, +}: PieChartWidgetProps) { + const chartConfig = config.config as PieChartConfig; + const chartData = data?.data || []; + const colors = chartConfig.colors || DEFAULT_COLORS; + const innerRadius = chartConfig.inner_radius || 0; // 0 = pie, > 0 = donut + + // Transform data to use consistent keys + const transformedData = chartData.map((item) => ({ + name: item[chartConfig.label_key] as string, + value: item[chartConfig.value_key] as number, + })); + + return ( + + {transformedData.length === 0 ? ( +
+ No data available +
+ ) : ( + + + `${name} ${(percent * 100).toFixed(0)}%` + : undefined + } + labelLine={chartConfig.show_labels !== false} + > + {transformedData.map((_, index) => ( + + ))} + + [value.toLocaleString(), 'Count']} + /> + {chartConfig.show_legend !== false && ( + + )} + + + )} +
+ ); +} diff --git a/web/components/dashboard/widgets/StatCard.tsx b/web/components/dashboard/widgets/StatCard.tsx new file mode 100644 index 0000000..d39c861 --- /dev/null +++ b/web/components/dashboard/widgets/StatCard.tsx @@ -0,0 +1,113 @@ +'use client'; + +import { + TrendingUp, + TrendingDown, + MessageSquare, + CheckCircle, + AlertTriangle, + Star, + Activity, +} from 'lucide-react'; +import type { WidgetConfig, StatCardData, StatCardConfig } from '@/lib/pipeline-types'; +import { WidgetWrapper } from './WidgetWrapper'; + +interface StatCardProps { + config: WidgetConfig; + data: StatCardData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +// Icon mapping +const ICONS: Record> = { + 'message-square': MessageSquare, + 'check-circle': CheckCircle, + 'alert-triangle': AlertTriangle, + star: Star, + activity: Activity, +}; + +// Color mapping +const COLORS: Record = { + blue: 'text-blue-600 bg-blue-100 dark:text-blue-400 dark:bg-blue-900/30', + green: 'text-green-600 bg-green-100 dark:text-green-400 dark:bg-green-900/30', + red: 'text-red-600 bg-red-100 dark:text-red-400 dark:bg-red-900/30', + yellow: 'text-yellow-600 bg-yellow-100 dark:text-yellow-400 dark:bg-yellow-900/30', + purple: 'text-purple-600 bg-purple-100 dark:text-purple-400 dark:bg-purple-900/30', + gray: 'text-gray-600 bg-gray-100 dark:text-gray-400 dark:bg-gray-700', +}; + +/** + * Format a value according to a format string. + * Supports: {value:,} for thousands, {value:.1f} for decimals, {value:.1%} for percentages + */ +function formatValue(value: number | string, format?: string): string { + if (!format) return String(value); + + const num = typeof value === 'string' ? parseFloat(value) : value; + if (isNaN(num)) return String(value); + + // Simple format parsing + if (format.includes(':,}')) { + return num.toLocaleString(); + } + if (format.includes(':.1f}')) { + return num.toFixed(1); + } + if (format.includes(':.2f}')) { + return num.toFixed(2); + } + if (format.includes(':.1%}')) { + return (num * 100).toFixed(1) + '%'; + } + + return String(value); +} + +/** + * Stat card widget for displaying KPIs. + */ +export function StatCard({ config, data, loading, error, onRefresh }: StatCardProps) { + const widgetConfig = config.config as StatCardConfig; + const Icon = widgetConfig.icon ? ICONS[widgetConfig.icon] : Activity; + const colorClass = widgetConfig.color ? COLORS[widgetConfig.color] : COLORS.gray; + + // Extract value and trend from data + const value = data?.[widgetConfig.value_key] ?? 0; + const trend = widgetConfig.trend_key ? data?.[widgetConfig.trend_key] : undefined; + + return ( + +
+
+

+ {formatValue(value, widgetConfig.format)} +

+ {trend !== undefined && ( +
+ {Number(trend) >= 0 ? ( + + ) : ( + + )} + = 0 ? 'text-green-600' : 'text-red-600' + }`} + > + {formatValue(trend, widgetConfig.trend_format || '{value:.1f}')} + +
+ )} +
+ {Icon && ( +
+ +
+ )} +
+
+ ); +} diff --git a/web/components/dashboard/widgets/WidgetWrapper.tsx b/web/components/dashboard/widgets/WidgetWrapper.tsx new file mode 100644 index 0000000..c05a606 --- /dev/null +++ b/web/components/dashboard/widgets/WidgetWrapper.tsx @@ -0,0 +1,66 @@ +'use client'; + +import { RefreshCw, AlertCircle } from 'lucide-react'; +import type { WidgetConfig } from '@/lib/pipeline-types'; + +interface WidgetWrapperProps { + config: WidgetConfig; + loading: boolean; + error?: string; + onRefresh?: () => void; + children: React.ReactNode; +} + +/** + * Common wrapper for dashboard widgets. + * Handles loading, error states, and refresh functionality. + */ +export function WidgetWrapper({ + config, + loading, + error, + onRefresh, + children, +}: WidgetWrapperProps) { + return ( +
+ {/* Header */} +
+

+ {config.title} +

+ {onRefresh && ( + + )} +
+ + {/* Content */} +
+ {error ? ( +
+
+ +

{error}

+
+
+ ) : loading ? ( +
+
+
+
+
+
+ ) : ( + children + )} +
+
+ ); +} diff --git a/web/lib/pipeline-api.ts b/web/lib/pipeline-api.ts new file mode 100644 index 0000000..0aa094d --- /dev/null +++ b/web/lib/pipeline-api.ts @@ -0,0 +1,213 @@ +/** + * Pipeline API client functions. + * + * Provides methods for interacting with the pipeline API endpoints. + */ + +import type { + PipelineInfo, + PipelineDetail, + DashboardConfig, + ExecutionStatus, + WidgetData, +} from './pipeline-types'; + +// API base URL - defaults to same origin in production +const API_BASE = process.env.NEXT_PUBLIC_API_URL || ''; + +/** + * Fetch all registered pipelines. + */ +export async function listPipelines(enabledOnly = true): Promise { + const url = `${API_BASE}/api/pipelines?enabled_only=${enabledOnly}`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch pipelines: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Fetch details for a specific pipeline. + */ +export async function getPipeline(pipelineId: string): Promise { + const url = `${API_BASE}/api/pipelines/${pipelineId}`; + const response = await fetch(url); + + if (!response.ok) { + if (response.status === 404) { + throw new Error(`Pipeline not found: ${pipelineId}`); + } + throw new Error(`Failed to fetch pipeline: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Fetch dashboard configuration for a pipeline. + */ +export async function getDashboardConfig(pipelineId: string): Promise { + const url = `${API_BASE}/api/pipelines/${pipelineId}/dashboard`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch dashboard config: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Fetch data for a specific widget. + */ +export async function getWidgetData( + pipelineId: string, + widgetId: string, + params: { + business_id?: string; + time_range?: string; + page?: number; + page_size?: number; + } = {} +): Promise { + const searchParams = new URLSearchParams(); + + if (params.business_id) { + searchParams.set('business_id', params.business_id); + } + if (params.time_range) { + searchParams.set('time_range', params.time_range); + } + if (params.page) { + searchParams.set('page', params.page.toString()); + } + if (params.page_size) { + searchParams.set('page_size', params.page_size.toString()); + } + + const url = `${API_BASE}/api/pipelines/${pipelineId}/widgets/${widgetId}?${searchParams}`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch widget data: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Execute a pipeline. + */ +export async function executePipeline( + pipelineId: string, + request: { + job_id?: string; + business_id?: string; + input_data?: Record; + stages?: string[]; + options?: Record; + } +): Promise<{ + execution_id: string; + pipeline_id: string; + success: boolean; + stages_run: string[]; + error?: string; +}> { + const url = `${API_BASE}/api/pipelines/${pipelineId}/execute`; + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(request), + }); + + if (!response.ok) { + throw new Error(`Failed to execute pipeline: ${response.statusText}`); + } + + return response.json(); +} + +/** + * List execution history for a pipeline. + */ +export async function listExecutions( + pipelineId: string, + params: { + status?: string; + limit?: number; + offset?: number; + } = {} +): Promise { + const searchParams = new URLSearchParams(); + + if (params.status) { + searchParams.set('status', params.status); + } + if (params.limit) { + searchParams.set('limit', params.limit.toString()); + } + if (params.offset) { + searchParams.set('offset', params.offset.toString()); + } + + const url = `${API_BASE}/api/pipelines/${pipelineId}/executions?${searchParams}`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to fetch executions: ${response.statusText}`); + } + + return response.json(); +} + +/** + * Enable a pipeline. + */ +export async function enablePipeline(pipelineId: string): Promise { + const url = `${API_BASE}/api/pipelines/${pipelineId}/enable`; + const response = await fetch(url, { method: 'POST' }); + + if (!response.ok) { + throw new Error(`Failed to enable pipeline: ${response.statusText}`); + } +} + +/** + * Disable a pipeline. + */ +export async function disablePipeline(pipelineId: string): Promise { + const url = `${API_BASE}/api/pipelines/${pipelineId}/disable`; + const response = await fetch(url, { method: 'POST' }); + + if (!response.ok) { + throw new Error(`Failed to disable pipeline: ${response.statusText}`); + } +} + +/** + * Check pipeline health. + */ +export async function checkPipelineHealth( + pipelineId: string +): Promise<{ + pipeline_id: string; + healthy: boolean; + checks?: Record; + message?: string; + error?: string; +}> { + const url = `${API_BASE}/api/pipelines/${pipelineId}/health`; + const response = await fetch(url); + + if (!response.ok) { + throw new Error(`Failed to check pipeline health: ${response.statusText}`); + } + + return response.json(); +} diff --git a/web/lib/pipeline-types.ts b/web/lib/pipeline-types.ts new file mode 100644 index 0000000..b96bd1d --- /dev/null +++ b/web/lib/pipeline-types.ts @@ -0,0 +1,194 @@ +/** + * TypeScript types for the pipeline system. + * + * These types mirror the Python contracts in pipeline_core/contracts.py + */ + +// Widget types supported by the dashboard +export type WidgetType = + | 'stat_card' + | 'line_chart' + | 'bar_chart' + | 'pie_chart' + | 'table' + | 'heatmap' + | 'area_chart' + | 'gauge'; + +// Grid position for dashboard layout +export interface GridPosition { + x: number; + y: number; + w: number; + h: number; +} + +// Widget configuration +export interface WidgetConfig { + id: string; + type: WidgetType; + title: string; + grid: GridPosition; + config: Record; + data_endpoint?: string; + refresh_interval?: number; +} + +// Dashboard section containing widgets +export interface DashboardSection { + id: string; + title: string; + description?: string; + widgets: WidgetConfig[]; + collapsed?: boolean; +} + +// Full dashboard configuration +export interface DashboardConfig { + pipeline_id: string; + title: string; + description?: string; + sections: DashboardSection[]; + default_time_range?: string; + refresh_interval?: number; +} + +// Pipeline info (summary) +export interface PipelineInfo { + id: string; + name: string; + description: string; + version: string; + is_enabled: boolean; + stages: string[]; + input_type: string; +} + +// Pipeline detail (full info) +export interface PipelineDetail extends PipelineInfo { + module_path: string; + config?: Record; + created_at?: string; + updated_at?: string; +} + +// Execution status +export interface ExecutionStatus { + id: string; + pipeline_id: string; + job_id?: string; + business_id?: string; + status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'; + stages_requested: string[]; + stages_completed: string[]; + current_stage?: string; + progress: number; + error_message?: string; + started_at?: string; + completed_at?: string; + created_at?: string; +} + +// Widget-specific data types + +export interface StatCardData { + [key: string]: number | string; +} + +export interface ChartDataPoint { + [key: string]: number | string; +} + +export interface ChartData { + data: ChartDataPoint[]; +} + +export interface TableData { + data: Record[]; + total: number; +} + +export type WidgetData = StatCardData | ChartData | TableData; + +// Widget props base +export interface WidgetProps { + config: WidgetConfig; + data: WidgetData | null; + loading: boolean; + error?: string; + onRefresh?: () => void; +} + +// Stat card specific config +export interface StatCardConfig { + value_key: string; + label?: string; + format?: string; + trend_key?: string; + trend_format?: string; + icon?: string; + color?: string; +} + +// Chart config +export interface ChartAxisConfig { + key: string; + label?: string; + type?: 'number' | 'category' | 'time'; + format?: string; +} + +export interface ChartSeriesConfig { + key: string; + name: string; + color?: string; + type?: 'line' | 'bar' | 'area'; +} + +export interface ChartWidgetConfig { + x_axis: ChartAxisConfig; + y_axis: ChartAxisConfig; + series: ChartSeriesConfig[]; + stacked?: boolean; + show_legend?: boolean; + show_grid?: boolean; +} + +// Pie chart config +export interface PieChartConfig { + value_key: string; + label_key: string; + colors?: string[]; + show_legend?: boolean; + show_labels?: boolean; + inner_radius?: number; +} + +// Table config +export interface TableColumnConfig { + key: string; + header: string; + width?: number; + align?: 'left' | 'center' | 'right'; + format?: string; + sortable?: boolean; +} + +export interface TableWidgetConfig { + columns: TableColumnConfig[]; + row_key: string; + page_size?: number; + show_pagination?: boolean; + sortable?: boolean; + filterable?: boolean; +} + +// Heatmap config +export interface HeatmapConfig { + x_key: string; + y_key: string; + value_key: string; + color_scale?: string[]; + show_values?: boolean; + format?: string; +}