#!/usr/bin/env python3 """ Generic Pipeline API endpoints. Provides a unified API for all registered pipelines: - List available pipelines - Get pipeline details and metadata - Execute pipelines - Get dashboard configuration - Get widget data - List execution history """ import logging from typing import Any import asyncpg from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field log = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/api/pipelines", tags=["pipelines"]) # Database pool (set by main server) _pool: asyncpg.Pool | None = None # Pipeline instances cache _pipeline_instances: dict[str, Any] = {} def set_database(pool: asyncpg.Pool) -> None: """Set the database pool for pipeline operations.""" global _pool _pool = pool # ==================== Pydantic Models ==================== class PipelineInfo(BaseModel): """Summary information about a pipeline.""" id: str = Field(..., description="Pipeline identifier") name: str = Field(..., description="Display name") description: str = Field(..., description="Human-readable description") version: str = Field(..., description="Semantic version") is_enabled: bool = Field(..., description="Whether pipeline is enabled") stages: list[str] = Field(..., description="Available stages") input_type: str = Field(..., description="Expected input type") class PipelineDetail(PipelineInfo): """Detailed pipeline information.""" module_path: str = Field(..., description="Python module path") config: dict[str, Any] | None = Field(None, description="Pipeline configuration") created_at: str | None = Field(None, description="Registration timestamp") updated_at: str | None = Field(None, description="Last update timestamp") class ExecuteRequest(BaseModel): """Request to execute a pipeline.""" job_id: str | None = Field(None, description="Job ID to process") business_id: str | None = Field(None, description="Business identifier") input_data: dict[str, Any] | None = Field(None, description="Direct input data") stages: list[str] | None = Field(None, description="Stages to run (default: all)") options: dict[str, Any] | None = Field(None, description="Pipeline-specific options") class ExecuteResponse(BaseModel): """Response from pipeline execution.""" execution_id: str = Field(..., description="Execution identifier") pipeline_id: str = Field(..., description="Pipeline that was executed") success: bool = Field(..., description="Whether execution succeeded") stages_run: list[str] = Field(..., description="Stages that were run") error: str | None = Field(None, description="Error message if failed") class ExecutionSummary(BaseModel): """Summary of a pipeline execution.""" id: str = Field(..., description="Execution identifier") pipeline_id: str = Field(..., description="Pipeline identifier") job_id: str | None = Field(None, description="Associated job ID") business_id: str | None = Field(None, description="Business identifier") status: str = Field(..., description="Execution status") stages_requested: list[str] = Field(..., description="Stages requested") stages_completed: list[str] = Field(..., description="Stages completed") error_message: str | None = Field(None, description="Error if failed") started_at: str | None = Field(None, description="Start timestamp") completed_at: str | None = Field(None, description="Completion timestamp") created_at: str | None = Field(None, description="Creation timestamp") class DashboardSectionModel(BaseModel): """Dashboard section configuration.""" id: str title: str description: str | None = None widgets: list[dict[str, Any]] collapsed: bool | None = None class DashboardConfigModel(BaseModel): """Dashboard configuration for a pipeline.""" pipeline_id: str title: str description: str | None = None sections: list[DashboardSectionModel] default_time_range: str | None = None refresh_interval: int | None = None # ==================== Helper Functions ==================== async def _get_pipeline_instance(pipeline_id: str) -> Any: """Get or create a pipeline instance.""" if pipeline_id in _pipeline_instances: return _pipeline_instances[pipeline_id] if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") # Look up pipeline in registry async with _pool.acquire() as conn: row = await conn.fetchrow( """ SELECT pipeline_id, module_path, is_enabled FROM pipeline.registry WHERE pipeline_id = $1 """, pipeline_id, ) if not row: raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") if not row["is_enabled"]: raise HTTPException(status_code=400, detail=f"Pipeline is disabled: {pipeline_id}") # Import and instantiate try: module_path = row["module_path"] module_name, class_name = module_path.rsplit(":", 1) import importlib module = importlib.import_module(module_name) cls = getattr(module, class_name) instance = cls() # Initialize the pipeline await instance.initialize() _pipeline_instances[pipeline_id] = instance return instance except Exception as e: log.exception(f"Failed to load pipeline {pipeline_id}") raise HTTPException( status_code=500, detail=f"Failed to load pipeline: {e}" ) # ==================== API Endpoints ==================== @router.get("/", response_model=list[PipelineInfo]) async def list_pipelines( enabled_only: bool = Query(True, description="Only return enabled pipelines"), ) -> list[PipelineInfo]: """ List all registered pipelines. Returns summary information for each pipeline including name, version, available stages, and enabled status. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: if enabled_only: rows = await conn.fetch( """ SELECT pipeline_id, name, description, version, is_enabled, stages, input_type FROM pipeline.registry WHERE is_enabled = TRUE ORDER BY name """ ) else: rows = await conn.fetch( """ SELECT pipeline_id, name, description, version, is_enabled, stages, input_type FROM pipeline.registry ORDER BY name """ ) return [ PipelineInfo( id=row["pipeline_id"], name=row["name"], description=row["description"] or "", version=row["version"], is_enabled=row["is_enabled"], stages=row["stages"] or [], input_type=row["input_type"] or "dict", ) for row in rows ] @router.get("/{pipeline_id}", response_model=PipelineDetail) async def get_pipeline(pipeline_id: str) -> PipelineDetail: """ Get detailed information about a pipeline. Includes metadata, configuration, and registration timestamps. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: row = await conn.fetchrow( """ SELECT pipeline_id, name, description, version, module_path, is_enabled, stages, input_type, config, created_at, updated_at FROM pipeline.registry WHERE pipeline_id = $1 """, pipeline_id, ) if not row: raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") return PipelineDetail( id=row["pipeline_id"], name=row["name"], description=row["description"] or "", version=row["version"], is_enabled=row["is_enabled"], stages=row["stages"] or [], input_type=row["input_type"] or "dict", module_path=row["module_path"], config=row["config"], created_at=row["created_at"].isoformat() if row["created_at"] else None, updated_at=row["updated_at"].isoformat() if row["updated_at"] else None, ) @router.post("/{pipeline_id}/execute", response_model=ExecuteResponse) async def execute_pipeline( pipeline_id: str, request: ExecuteRequest, ) -> ExecuteResponse: """ Execute a pipeline. The pipeline can be executed with: - A job_id to process an existing scraper job - Direct input_data for testing - Specific stages to run (default: all) """ import uuid pipeline = await _get_pipeline_instance(pipeline_id) # Prepare input data input_data = request.input_data or {} if request.job_id: input_data["job_id"] = request.job_id if request.business_id: input_data["business_id"] = request.business_id # Create execution record execution_id = str(uuid.uuid4()) stages = request.stages or pipeline.get_stage_names() # Prepare input summary for storage import json input_summary = { "job_id": request.job_id, "business_id": request.business_id, "stages": stages, } if _pool: async with _pool.acquire() as conn: await conn.execute( """ INSERT INTO pipeline.executions ( id, pipeline_id, job_id, business_id, status, stages_requested, input_summary, started_at, created_at ) VALUES ($1, $2, $3, $4, 'running', $5, $6, NOW(), NOW()) """, uuid.UUID(execution_id), pipeline_id, uuid.UUID(request.job_id) if request.job_id else None, request.business_id, stages, json.dumps(input_summary), ) try: import time start_time = time.time() # Execute pipeline result = await pipeline.process(input_data, stages=stages) # Calculate total duration total_duration_ms = int((time.time() - start_time) * 1000) # Prepare result summary result_summary = { "success": result.success, "stages_run": result.stages_run, } if hasattr(result, "summary") and result.summary: result_summary.update(result.summary) # Extract stage metrics for profiling stage_metrics = {} if hasattr(result, "stage_results") and result.stage_results: for stage_name, stage_result in result.stage_results.items(): stage_metrics[stage_name] = { "duration_ms": stage_result.get("duration_ms", 0), "success": stage_result.get("success", True), "records_in": stage_result.get("records_in", 0), "records_out": stage_result.get("records_out", 0), "error": stage_result.get("error"), } # Update execution status with metrics if _pool: async with _pool.acquire() as conn: await conn.execute( """ UPDATE pipeline.executions SET status = $2, stages_completed = $3, error_message = $4, result_summary = $5, stage_metrics = $6, total_duration_ms = $7, completed_at = NOW() WHERE id = $1 """, uuid.UUID(execution_id), "completed" if result.success else "failed", result.stages_run, result.error, json.dumps(result_summary), json.dumps(stage_metrics) if stage_metrics else None, total_duration_ms, ) return ExecuteResponse( execution_id=execution_id, pipeline_id=pipeline_id, success=result.success, stages_run=result.stages_run, error=result.error, ) except Exception as e: log.exception(f"Pipeline execution failed: {e}") # Update execution status if _pool: async with _pool.acquire() as conn: await conn.execute( """ UPDATE pipeline.executions SET status = 'failed', error_message = $2, completed_at = NOW() WHERE id = $1 """, uuid.UUID(execution_id), str(e), ) raise HTTPException(status_code=500, detail=f"Execution failed: {e}") @router.get("/{pipeline_id}/executions", response_model=list[ExecutionSummary]) async def list_executions( pipeline_id: str, status: str | None = Query(None, description="Filter by status"), limit: int = Query(50, ge=1, le=200, description="Max results"), offset: int = Query(0, ge=0, description="Offset for pagination"), ) -> list[ExecutionSummary]: """ List execution history for a pipeline. Can filter by status and paginate results. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") conditions = ["pipeline_id = $1"] params: list[Any] = [pipeline_id] param_idx = 2 if status: conditions.append(f"status = ${param_idx}") params.append(status) param_idx += 1 where_clause = " AND ".join(conditions) async with _pool.acquire() as conn: rows = await conn.fetch( f""" SELECT id, pipeline_id, job_id, business_id, status, stages_requested, stages_completed, error_message, started_at, completed_at, created_at FROM pipeline.executions WHERE {where_clause} ORDER BY created_at DESC LIMIT ${param_idx} OFFSET ${param_idx + 1} """, *params, limit, offset, ) return [ ExecutionSummary( id=str(row["id"]), pipeline_id=row["pipeline_id"], job_id=str(row["job_id"]) if row["job_id"] else None, business_id=row["business_id"], status=row["status"], stages_requested=row["stages_requested"] or [], stages_completed=row["stages_completed"] or [], error_message=row["error_message"], started_at=row["started_at"].isoformat() if row["started_at"] else None, completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, created_at=row["created_at"].isoformat() if row["created_at"] else None, ) for row in rows ] class StageMetrics(BaseModel): """Metrics for a single pipeline stage.""" duration_ms: int = Field(0, description="Stage execution time in ms") success: bool = Field(True, description="Whether stage succeeded") records_in: int = Field(0, description="Records input to stage") records_out: int = Field(0, description="Records output from stage") error: str | None = Field(None, description="Error message if failed") class ExecutionDetail(BaseModel): """Detailed execution information.""" id: str = Field(..., description="Execution identifier") pipeline_id: str = Field(..., description="Pipeline identifier") job_id: str | None = Field(None, description="Associated job ID") business_id: str | None = Field(None, description="Business identifier") status: str = Field(..., description="Execution status") stages_requested: list[str] = Field(..., description="Stages requested") stages_completed: list[str] = Field(..., description="Stages completed") current_stage: str | None = Field(None, description="Currently executing stage") progress: int = Field(0, description="Progress percentage") error_message: str | None = Field(None, description="Error if failed") started_at: str | None = Field(None, description="Start timestamp") completed_at: str | None = Field(None, description="Completion timestamp") created_at: str | None = Field(None, description="Creation timestamp") total_duration_ms: int | None = Field(None, description="Total execution time in ms") input_summary: dict[str, Any] | None = Field(None, description="Input data summary") result_summary: dict[str, Any] | None = Field(None, description="Result summary") stage_metrics: dict[str, StageMetrics] | None = Field(None, description="Per-stage execution metrics") @router.get("/{pipeline_id}/executions/{execution_id}", response_model=ExecutionDetail) async def get_execution(pipeline_id: str, execution_id: str) -> ExecutionDetail: """ Get details for a specific execution. Returns full execution details including stage results and summaries. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") import uuid try: exec_uuid = uuid.UUID(execution_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid execution ID format") async with _pool.acquire() as conn: row = await conn.fetchrow( """ SELECT id, pipeline_id, job_id, business_id, status, stages_requested, stages_completed, error_message, input_summary, result_summary, stage_metrics, total_duration_ms, started_at, completed_at, created_at FROM pipeline.executions WHERE pipeline_id = $1 AND id = $2 """, pipeline_id, exec_uuid, ) if not row: raise HTTPException(status_code=404, detail=f"Execution not found: {execution_id}") # Calculate progress stages_requested = row["stages_requested"] or [] stages_completed = row["stages_completed"] or [] progress = ( int(len(stages_completed) / len(stages_requested) * 100) if stages_requested else 0 ) # Parse JSONB fields (asyncpg may return them as strings) import json def parse_json_field(value): if value is None: return None if isinstance(value, str): try: return json.loads(value) except json.JSONDecodeError: return None return value return ExecutionDetail( id=str(row["id"]), pipeline_id=row["pipeline_id"], job_id=str(row["job_id"]) if row["job_id"] else None, business_id=row["business_id"], status=row["status"], stages_requested=stages_requested, stages_completed=stages_completed, current_stage=None, # Could be tracked if needed progress=progress, error_message=row["error_message"], started_at=row["started_at"].isoformat() if row["started_at"] else None, completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, created_at=row["created_at"].isoformat() if row["created_at"] else None, total_duration_ms=row["total_duration_ms"], input_summary=parse_json_field(row["input_summary"]), result_summary=parse_json_field(row["result_summary"]), stage_metrics=parse_json_field(row["stage_metrics"]), ) @router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel) async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel: """ Get dashboard configuration for a pipeline. Returns the widget configuration that the frontend uses to render the pipeline's dynamic dashboard. """ pipeline = await _get_pipeline_instance(pipeline_id) try: config = pipeline.get_dashboard_config() return DashboardConfigModel( pipeline_id=config["pipeline_id"], title=config["title"], description=config.get("description"), sections=[ DashboardSectionModel( id=s["id"], title=s["title"], description=s.get("description"), widgets=s["widgets"], collapsed=s.get("collapsed"), ) for s in config["sections"] ], default_time_range=config.get("default_time_range"), refresh_interval=config.get("refresh_interval"), ) except Exception as e: log.exception(f"Failed to get dashboard config: {e}") raise HTTPException( status_code=500, detail=f"Failed to get dashboard config: {e}" ) @router.get("/{pipeline_id}/widgets/{widget_id}") async def get_widget_data( pipeline_id: str, widget_id: str, business_id: str | None = Query(None, description="Filter by business"), time_range: str = Query("30d", description="Time range (e.g., 7d, 30d, 90d)"), page: int = Query(1, ge=1, description="Page number for paginated widgets"), page_size: int = Query(10, ge=1, le=100, description="Items per page"), ) -> dict[str, Any]: """ Get data for a specific dashboard widget. The response format depends on the widget type. Common formats: - stat_card: {value, trend, ...} - chart: {data: [{x, y, ...}, ...]} - table: {data: [...], total: n} """ pipeline = await _get_pipeline_instance(pipeline_id) try: params = { "business_id": business_id, "time_range": time_range, "page": page, "page_size": page_size, } data = await pipeline.get_widget_data(widget_id, params) return data except Exception as e: log.exception(f"Failed to get widget data: {e}") raise HTTPException( status_code=500, detail=f"Failed to get widget data: {e}" ) @router.post("/{pipeline_id}/enable") async def enable_pipeline(pipeline_id: str) -> dict[str, str]: """Enable a disabled pipeline.""" if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: result = await conn.execute( """ UPDATE pipeline.registry SET is_enabled = TRUE, updated_at = NOW() WHERE pipeline_id = $1 """, pipeline_id, ) if result.split()[-1] == "0": raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") # Clear cached instance _pipeline_instances.pop(pipeline_id, None) return {"status": "enabled", "pipeline_id": pipeline_id} @router.post("/{pipeline_id}/disable") async def disable_pipeline(pipeline_id: str) -> dict[str, str]: """Disable a pipeline.""" if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: result = await conn.execute( """ UPDATE pipeline.registry SET is_enabled = FALSE, updated_at = NOW() WHERE pipeline_id = $1 """, pipeline_id, ) if result.split()[-1] == "0": raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") # Clear cached instance _pipeline_instances.pop(pipeline_id, None) return {"status": "disabled", "pipeline_id": pipeline_id} @router.get("/{pipeline_id}/health") async def pipeline_health(pipeline_id: str) -> dict[str, Any]: """ Check pipeline health. Returns health status and any issues detected. """ pipeline = await _get_pipeline_instance(pipeline_id) try: health = await pipeline.health_check() return { "pipeline_id": pipeline_id, **health, } except Exception as e: return { "pipeline_id": pipeline_id, "healthy": False, "error": str(e), }