#!/usr/bin/env python3 """ Generic Pipeline API endpoints. Provides a unified API for all registered pipelines: - List available pipelines - Get pipeline details and metadata - Execute pipelines - Get dashboard configuration - Get widget data - List execution history """ import logging from typing import Any import asyncpg from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field log = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/api/pipelines", tags=["pipelines"]) # Database pool (set by main server) _pool: asyncpg.Pool | None = None # Pipeline instances cache _pipeline_instances: dict[str, Any] = {} def set_database(pool: asyncpg.Pool) -> None: """Set the database pool for pipeline operations.""" global _pool _pool = pool # ==================== Pydantic Models ==================== class PipelineInfo(BaseModel): """Summary information about a pipeline.""" id: str = Field(..., description="Pipeline identifier") name: str = Field(..., description="Display name") description: str = Field(..., description="Human-readable description") version: str = Field(..., description="Semantic version") is_enabled: bool = Field(..., description="Whether pipeline is enabled") stages: list[str] = Field(..., description="Available stages") input_type: str = Field(..., description="Expected input type") class PipelineDetail(PipelineInfo): """Detailed pipeline information.""" module_path: str = Field(..., description="Python module path") config: dict[str, Any] | None = Field(None, description="Pipeline configuration") created_at: str | None = Field(None, description="Registration timestamp") updated_at: str | None = Field(None, description="Last update timestamp") class ExecuteRequest(BaseModel): """Request to execute a pipeline.""" job_id: str | None = Field(None, description="Job ID to process") business_id: str | None = Field(None, description="Business identifier") input_data: dict[str, Any] | None = Field(None, description="Direct input data") stages: list[str] | None = Field(None, description="Stages to run (default: all)") options: dict[str, Any] | None = Field(None, description="Pipeline-specific options") class ExecuteResponse(BaseModel): """Response from pipeline execution.""" execution_id: str = Field(..., description="Execution identifier") pipeline_id: str = Field(..., description="Pipeline that was executed") success: bool = Field(..., description="Whether execution succeeded") stages_run: list[str] = Field(..., description="Stages that were run") error: str | None = Field(None, description="Error message if failed") class ExecutionSummary(BaseModel): """Summary of a pipeline execution.""" id: str = Field(..., description="Execution identifier") pipeline_id: str = Field(..., description="Pipeline identifier") job_id: str | None = Field(None, description="Associated job ID") business_id: str | None = Field(None, description="Business identifier") status: str = Field(..., description="Execution status") stages_requested: list[str] = Field(..., description="Stages requested") stages_completed: list[str] = Field(..., description="Stages completed") error_message: str | None = Field(None, description="Error if failed") started_at: str | None = Field(None, description="Start timestamp") completed_at: str | None = Field(None, description="Completion timestamp") created_at: str | None = Field(None, description="Creation timestamp") class DashboardSectionModel(BaseModel): """Dashboard section configuration.""" id: str title: str description: str | None = None widgets: list[dict[str, Any]] collapsed: bool | None = None class DashboardConfigModel(BaseModel): """Dashboard configuration for a pipeline.""" pipeline_id: str title: str description: str | None = None sections: list[DashboardSectionModel] default_time_range: str | None = None refresh_interval: int | None = None # ==================== Helper Functions ==================== async def _get_pipeline_instance(pipeline_id: str) -> Any: """Get or create a pipeline instance.""" if pipeline_id in _pipeline_instances: return _pipeline_instances[pipeline_id] if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") # Look up pipeline in registry async with _pool.acquire() as conn: row = await conn.fetchrow( """ SELECT pipeline_id, module_path, is_enabled FROM pipeline.registry WHERE pipeline_id = $1 """, pipeline_id, ) if not row: raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") if not row["is_enabled"]: raise HTTPException(status_code=400, detail=f"Pipeline is disabled: {pipeline_id}") # Import and instantiate try: module_path = row["module_path"] module_name, class_name = module_path.rsplit(":", 1) import importlib module = importlib.import_module(module_name) cls = getattr(module, class_name) instance = cls() # Initialize the pipeline await instance.initialize() _pipeline_instances[pipeline_id] = instance return instance except Exception as e: log.exception(f"Failed to load pipeline {pipeline_id}") raise HTTPException( status_code=500, detail=f"Failed to load pipeline: {e}" ) # ==================== API Endpoints ==================== @router.get("/", response_model=list[PipelineInfo]) async def list_pipelines( enabled_only: bool = Query(True, description="Only return enabled pipelines"), ) -> list[PipelineInfo]: """ List all registered pipelines. Returns summary information for each pipeline including name, version, available stages, and enabled status. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: if enabled_only: rows = await conn.fetch( """ SELECT pipeline_id, name, description, version, is_enabled, stages, input_type FROM pipeline.registry WHERE is_enabled = TRUE ORDER BY name """ ) else: rows = await conn.fetch( """ SELECT pipeline_id, name, description, version, is_enabled, stages, input_type FROM pipeline.registry ORDER BY name """ ) return [ PipelineInfo( id=row["pipeline_id"], name=row["name"], description=row["description"] or "", version=row["version"], is_enabled=row["is_enabled"], stages=row["stages"] or [], input_type=row["input_type"] or "dict", ) for row in rows ] @router.get("/{pipeline_id}", response_model=PipelineDetail) async def get_pipeline(pipeline_id: str) -> PipelineDetail: """ Get detailed information about a pipeline. Includes metadata, configuration, and registration timestamps. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: row = await conn.fetchrow( """ SELECT pipeline_id, name, description, version, module_path, is_enabled, stages, input_type, config, created_at, updated_at FROM pipeline.registry WHERE pipeline_id = $1 """, pipeline_id, ) if not row: raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") return PipelineDetail( id=row["pipeline_id"], name=row["name"], description=row["description"] or "", version=row["version"], is_enabled=row["is_enabled"], stages=row["stages"] or [], input_type=row["input_type"] or "dict", module_path=row["module_path"], config=row["config"], created_at=row["created_at"].isoformat() if row["created_at"] else None, updated_at=row["updated_at"].isoformat() if row["updated_at"] else None, ) @router.post("/{pipeline_id}/execute", response_model=ExecuteResponse) async def execute_pipeline( pipeline_id: str, request: ExecuteRequest, ) -> ExecuteResponse: """ Execute a pipeline. The pipeline can be executed with: - A job_id to process an existing scraper job - Direct input_data for testing - Specific stages to run (default: all) """ import uuid pipeline = await _get_pipeline_instance(pipeline_id) # Prepare input data input_data = request.input_data or {} if request.job_id: input_data["job_id"] = request.job_id if request.business_id: input_data["business_id"] = request.business_id # Create execution record execution_id = str(uuid.uuid4()) stages = request.stages or pipeline.get_stage_names() if _pool: async with _pool.acquire() as conn: await conn.execute( """ INSERT INTO pipeline.executions ( id, pipeline_id, job_id, business_id, status, stages_requested, created_at ) VALUES ($1, $2, $3, $4, 'running', $5, NOW()) """, uuid.UUID(execution_id), pipeline_id, uuid.UUID(request.job_id) if request.job_id else None, request.business_id, stages, ) try: # Execute pipeline result = await pipeline.process(input_data, stages=stages) # Update execution status if _pool: async with _pool.acquire() as conn: await conn.execute( """ UPDATE pipeline.executions SET status = $2, stages_completed = $3, error_message = $4, completed_at = NOW() WHERE id = $1 """, uuid.UUID(execution_id), "completed" if result.success else "failed", result.stages_run, result.error, ) return ExecuteResponse( execution_id=execution_id, pipeline_id=pipeline_id, success=result.success, stages_run=result.stages_run, error=result.error, ) except Exception as e: log.exception(f"Pipeline execution failed: {e}") # Update execution status if _pool: async with _pool.acquire() as conn: await conn.execute( """ UPDATE pipeline.executions SET status = 'failed', error_message = $2, completed_at = NOW() WHERE id = $1 """, uuid.UUID(execution_id), str(e), ) raise HTTPException(status_code=500, detail=f"Execution failed: {e}") @router.get("/{pipeline_id}/executions", response_model=list[ExecutionSummary]) async def list_executions( pipeline_id: str, status: str | None = Query(None, description="Filter by status"), limit: int = Query(50, ge=1, le=200, description="Max results"), offset: int = Query(0, ge=0, description="Offset for pagination"), ) -> list[ExecutionSummary]: """ List execution history for a pipeline. Can filter by status and paginate results. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") conditions = ["pipeline_id = $1"] params: list[Any] = [pipeline_id] param_idx = 2 if status: conditions.append(f"status = ${param_idx}") params.append(status) param_idx += 1 where_clause = " AND ".join(conditions) async with _pool.acquire() as conn: rows = await conn.fetch( f""" SELECT id, pipeline_id, job_id, business_id, status, stages_requested, stages_completed, error_message, started_at, completed_at, created_at FROM pipeline.executions WHERE {where_clause} ORDER BY created_at DESC LIMIT ${param_idx} OFFSET ${param_idx + 1} """, *params, limit, offset, ) return [ ExecutionSummary( id=str(row["id"]), pipeline_id=row["pipeline_id"], job_id=str(row["job_id"]) if row["job_id"] else None, business_id=row["business_id"], status=row["status"], stages_requested=row["stages_requested"] or [], stages_completed=row["stages_completed"] or [], error_message=row["error_message"], started_at=row["started_at"].isoformat() if row["started_at"] else None, completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, created_at=row["created_at"].isoformat() if row["created_at"] else None, ) for row in rows ] @router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel) async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel: """ Get dashboard configuration for a pipeline. Returns the widget configuration that the frontend uses to render the pipeline's dynamic dashboard. """ pipeline = await _get_pipeline_instance(pipeline_id) try: config = pipeline.get_dashboard_config() return DashboardConfigModel( pipeline_id=config["pipeline_id"], title=config["title"], description=config.get("description"), sections=[ DashboardSectionModel( id=s["id"], title=s["title"], description=s.get("description"), widgets=s["widgets"], collapsed=s.get("collapsed"), ) for s in config["sections"] ], default_time_range=config.get("default_time_range"), refresh_interval=config.get("refresh_interval"), ) except Exception as e: log.exception(f"Failed to get dashboard config: {e}") raise HTTPException( status_code=500, detail=f"Failed to get dashboard config: {e}" ) @router.get("/{pipeline_id}/widgets/{widget_id}") async def get_widget_data( pipeline_id: str, widget_id: str, business_id: str | None = Query(None, description="Filter by business"), time_range: str = Query("30d", description="Time range (e.g., 7d, 30d, 90d)"), page: int = Query(1, ge=1, description="Page number for paginated widgets"), page_size: int = Query(10, ge=1, le=100, description="Items per page"), ) -> dict[str, Any]: """ Get data for a specific dashboard widget. The response format depends on the widget type. Common formats: - stat_card: {value, trend, ...} - chart: {data: [{x, y, ...}, ...]} - table: {data: [...], total: n} """ pipeline = await _get_pipeline_instance(pipeline_id) try: params = { "business_id": business_id, "time_range": time_range, "page": page, "page_size": page_size, } data = await pipeline.get_widget_data(widget_id, params) return data except Exception as e: log.exception(f"Failed to get widget data: {e}") raise HTTPException( status_code=500, detail=f"Failed to get widget data: {e}" ) @router.post("/{pipeline_id}/enable") async def enable_pipeline(pipeline_id: str) -> dict[str, str]: """Enable a disabled pipeline.""" if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: result = await conn.execute( """ UPDATE pipeline.registry SET is_enabled = TRUE, updated_at = NOW() WHERE pipeline_id = $1 """, pipeline_id, ) if result.split()[-1] == "0": raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") # Clear cached instance _pipeline_instances.pop(pipeline_id, None) return {"status": "enabled", "pipeline_id": pipeline_id} @router.post("/{pipeline_id}/disable") async def disable_pipeline(pipeline_id: str) -> dict[str, str]: """Disable a pipeline.""" if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: result = await conn.execute( """ UPDATE pipeline.registry SET is_enabled = FALSE, updated_at = NOW() WHERE pipeline_id = $1 """, pipeline_id, ) if result.split()[-1] == "0": raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}") # Clear cached instance _pipeline_instances.pop(pipeline_id, None) return {"status": "disabled", "pipeline_id": pipeline_id} @router.get("/{pipeline_id}/health") async def pipeline_health(pipeline_id: str) -> dict[str, Any]: """ Check pipeline health. Returns health status and any issues detected. """ pipeline = await _get_pipeline_instance(pipeline_id) try: health = await pipeline.health_check() return { "pipeline_id": pipeline_id, **health, } except Exception as e: return { "pipeline_id": pipeline_id, "healthy": False, "error": str(e), }