-- ============================================================================= -- Migration: 006_pipeline_registry.sql -- Pipeline Registry and Execution History -- ============================================================================= -- -- Creates tables for the extensible pipeline system: -- pipeline.registry - Registered pipelines and their metadata -- pipeline.executions - Pipeline execution history -- -- This enables dynamic pipeline discovery, registration, and execution tracking. -- -- Date: 2026-01-24 -- ============================================================================= -- Ensure pipeline schema exists (should already exist from 005) CREATE SCHEMA IF NOT EXISTS pipeline; -- ============================================================================= -- SECTION 1: PIPELINE REGISTRY -- ============================================================================= -- Pipeline registry table -- Stores registered pipelines and their metadata for dynamic discovery CREATE TABLE IF NOT EXISTS pipeline.registry ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Pipeline identity pipeline_id VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(255) NOT NULL, description TEXT, version VARCHAR(20) NOT NULL, -- Module information for dynamic loading module_path VARCHAR(255) NOT NULL, -- e.g., "reviewiq_pipeline.pipeline:ReviewIQPipeline" -- Pipeline configuration stages TEXT[] NOT NULL DEFAULT '{}', input_type VARCHAR(100) NOT NULL DEFAULT 'dict', config JSONB, -- Status is_enabled BOOLEAN NOT NULL DEFAULT TRUE, -- Timestamps created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); COMMENT ON TABLE pipeline.registry IS 'Registered pipelines available for execution'; COMMENT ON COLUMN pipeline.registry.pipeline_id IS 'Unique pipeline identifier (e.g., "reviewiq")'; COMMENT ON COLUMN pipeline.registry.module_path IS 'Python module path for dynamic import (package.module:ClassName)'; COMMENT ON COLUMN pipeline.registry.stages IS 'Ordered list of stage names'; COMMENT ON COLUMN pipeline.registry.input_type IS 'Expected input type (for documentation/validation)'; COMMENT ON COLUMN pipeline.registry.config IS 'Pipeline-specific configuration as JSON'; -- Indexes for registry CREATE INDEX IF NOT EXISTS idx_registry_enabled ON pipeline.registry (is_enabled) WHERE is_enabled = TRUE; -- ============================================================================= -- SECTION 2: EXECUTION HISTORY -- ============================================================================= -- Execution status enum DO $$ BEGIN CREATE TYPE pipeline.execution_status AS ENUM ( 'pending', 'running', 'completed', 'failed', 'cancelled' ); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Pipeline execution history -- Tracks each pipeline execution for monitoring and debugging CREATE TABLE IF NOT EXISTS pipeline.executions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Pipeline reference pipeline_id VARCHAR(50) NOT NULL, -- Optional associations job_id UUID, -- Link to scraper job (soft FK to public.jobs) business_id VARCHAR(255), -- Business being processed -- Execution status status pipeline.execution_status NOT NULL DEFAULT 'pending', -- Stage tracking stages_requested TEXT[] NOT NULL DEFAULT '{}', stages_completed TEXT[] NOT NULL DEFAULT '{}', current_stage VARCHAR(100), -- Input/output summaries (for quick reference without loading full data) input_summary JSONB, result_summary JSONB, -- Error tracking error_message TEXT, -- Timestamps started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); COMMENT ON TABLE pipeline.executions IS 'Pipeline execution history for monitoring and debugging'; COMMENT ON COLUMN pipeline.executions.pipeline_id IS 'Reference to pipeline.registry.pipeline_id'; COMMENT ON COLUMN pipeline.executions.job_id IS 'Optional link to scraper job (soft FK)'; COMMENT ON COLUMN pipeline.executions.stages_requested IS 'Stages requested to run'; COMMENT ON COLUMN pipeline.executions.stages_completed IS 'Stages that completed successfully'; COMMENT ON COLUMN pipeline.executions.input_summary IS 'Summary of input data (for display)'; COMMENT ON COLUMN pipeline.executions.result_summary IS 'Summary of results (for display)'; -- Indexes for execution queries CREATE INDEX IF NOT EXISTS idx_executions_pipeline_id ON pipeline.executions (pipeline_id); CREATE INDEX IF NOT EXISTS idx_executions_job_id ON pipeline.executions (job_id) WHERE job_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_executions_business_id ON pipeline.executions (business_id) WHERE business_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_executions_status ON pipeline.executions (status); CREATE INDEX IF NOT EXISTS idx_executions_created_at ON pipeline.executions (created_at DESC); -- Composite index for common query patterns CREATE INDEX IF NOT EXISTS idx_executions_pipeline_status ON pipeline.executions (pipeline_id, status, created_at DESC); -- ============================================================================= -- SECTION 3: TRIGGER FOR UPDATED_AT -- ============================================================================= -- Function to update updated_at timestamp CREATE OR REPLACE FUNCTION pipeline.update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; -- Trigger for registry table DROP TRIGGER IF EXISTS tr_registry_updated_at ON pipeline.registry; CREATE TRIGGER tr_registry_updated_at BEFORE UPDATE ON pipeline.registry FOR EACH ROW EXECUTE FUNCTION pipeline.update_updated_at_column(); -- ============================================================================= -- SECTION 4: INITIAL DATA -- ============================================================================= -- Register the ReviewIQ pipeline (can be updated by the application on startup) INSERT INTO pipeline.registry ( pipeline_id, name, description, version, module_path, stages, input_type, is_enabled ) VALUES ( 'reviewiq', 'ReviewIQ Classification Pipeline', 'Classifies reviews using URT taxonomy, detects issues, and aggregates metrics', '1.0.0', 'reviewiq_pipeline.pipeline:ReviewIQPipeline', ARRAY['normalize', 'classify', 'route', 'aggregate'], 'ScraperV1Output', TRUE ) ON CONFLICT (pipeline_id) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, version = EXCLUDED.version, module_path = EXCLUDED.module_path, stages = EXCLUDED.stages, input_type = EXCLUDED.input_type, updated_at = NOW(); -- ============================================================================= -- SECTION 5: VIEWS -- ============================================================================= -- View for recent executions with pipeline info CREATE OR REPLACE VIEW pipeline.executions_with_pipeline AS SELECT e.id, e.pipeline_id, r.name AS pipeline_name, e.job_id, e.business_id, e.status, e.stages_requested, e.stages_completed, e.current_stage, e.error_message, e.started_at, e.completed_at, e.created_at, CASE WHEN e.status = 'running' THEN EXTRACT(EPOCH FROM (NOW() - e.started_at))::INTEGER WHEN e.completed_at IS NOT NULL THEN EXTRACT(EPOCH FROM (e.completed_at - e.started_at))::INTEGER ELSE NULL END AS duration_seconds FROM pipeline.executions e LEFT JOIN pipeline.registry r ON e.pipeline_id = r.pipeline_id; COMMENT ON VIEW pipeline.executions_with_pipeline IS 'Executions joined with pipeline metadata and duration'; -- View for pipeline execution statistics CREATE OR REPLACE VIEW pipeline.execution_stats AS SELECT pipeline_id, COUNT(*) AS total_executions, COUNT(*) FILTER (WHERE status = 'completed') AS completed_count, COUNT(*) FILTER (WHERE status = 'failed') AS failed_count, COUNT(*) FILTER (WHERE status = 'running') AS running_count, COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled_count, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed') AS avg_duration_seconds, MAX(created_at) AS last_execution_at FROM pipeline.executions GROUP BY pipeline_id; COMMENT ON VIEW pipeline.execution_stats IS 'Aggregated execution statistics per pipeline'; -- ============================================================================= -- DONE -- =============================================================================