Files
whyrating-engine-legacy/api/routes/pipelines.py
2026-02-02 18:19:00 +00:00

714 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Generic Pipeline API endpoints.
Provides a unified API for all registered pipelines:
- List available pipelines
- Get pipeline details and metadata
- Execute pipelines
- Get dashboard configuration
- Get widget data
- List execution history
"""
import logging
from typing import Any
import asyncpg
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
log = logging.getLogger(__name__)
# Create router
router = APIRouter(prefix="/api/pipelines", tags=["pipelines"])
# Database pool (set by main server)
_pool: asyncpg.Pool | None = None
# Pipeline instances cache
_pipeline_instances: dict[str, Any] = {}
def set_database(pool: asyncpg.Pool) -> None:
"""Set the database pool for pipeline operations."""
global _pool
_pool = pool
# ==================== Pydantic Models ====================
class PipelineInfo(BaseModel):
"""Summary information about a pipeline."""
id: str = Field(..., description="Pipeline identifier")
name: str = Field(..., description="Display name")
description: str = Field(..., description="Human-readable description")
version: str = Field(..., description="Semantic version")
is_enabled: bool = Field(..., description="Whether pipeline is enabled")
stages: list[str] = Field(..., description="Available stages")
input_type: str = Field(..., description="Expected input type")
class PipelineDetail(PipelineInfo):
"""Detailed pipeline information."""
module_path: str = Field(..., description="Python module path")
config: dict[str, Any] | None = Field(None, description="Pipeline configuration")
created_at: str | None = Field(None, description="Registration timestamp")
updated_at: str | None = Field(None, description="Last update timestamp")
class ExecuteRequest(BaseModel):
"""Request to execute a pipeline."""
job_id: str | None = Field(None, description="Job ID to process")
business_id: str | None = Field(None, description="Business identifier")
input_data: dict[str, Any] | None = Field(None, description="Direct input data")
stages: list[str] | None = Field(None, description="Stages to run (default: all)")
options: dict[str, Any] | None = Field(None, description="Pipeline-specific options")
class ExecuteResponse(BaseModel):
"""Response from pipeline execution."""
execution_id: str = Field(..., description="Execution identifier")
pipeline_id: str = Field(..., description="Pipeline that was executed")
success: bool = Field(..., description="Whether execution succeeded")
stages_run: list[str] = Field(..., description="Stages that were run")
error: str | None = Field(None, description="Error message if failed")
class ExecutionSummary(BaseModel):
"""Summary of a pipeline execution."""
id: str = Field(..., description="Execution identifier")
pipeline_id: str = Field(..., description="Pipeline identifier")
job_id: str | None = Field(None, description="Associated job ID")
business_id: str | None = Field(None, description="Business identifier")
status: str = Field(..., description="Execution status")
stages_requested: list[str] = Field(..., description="Stages requested")
stages_completed: list[str] = Field(..., description="Stages completed")
error_message: str | None = Field(None, description="Error if failed")
started_at: str | None = Field(None, description="Start timestamp")
completed_at: str | None = Field(None, description="Completion timestamp")
created_at: str | None = Field(None, description="Creation timestamp")
class DashboardSectionModel(BaseModel):
"""Dashboard section configuration."""
id: str
title: str
description: str | None = None
widgets: list[dict[str, Any]]
collapsed: bool | None = None
class DashboardConfigModel(BaseModel):
"""Dashboard configuration for a pipeline."""
pipeline_id: str
title: str
description: str | None = None
sections: list[DashboardSectionModel]
default_time_range: str | None = None
refresh_interval: int | None = None
# ==================== Helper Functions ====================
async def _get_pipeline_instance(pipeline_id: str) -> Any:
"""Get or create a pipeline instance."""
if pipeline_id in _pipeline_instances:
return _pipeline_instances[pipeline_id]
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
# Look up pipeline in registry
async with _pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT pipeline_id, module_path, is_enabled
FROM pipeline.registry
WHERE pipeline_id = $1
""",
pipeline_id,
)
if not row:
raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}")
if not row["is_enabled"]:
raise HTTPException(status_code=400, detail=f"Pipeline is disabled: {pipeline_id}")
# Import and instantiate
try:
module_path = row["module_path"]
module_name, class_name = module_path.rsplit(":", 1)
import importlib
module = importlib.import_module(module_name)
cls = getattr(module, class_name)
instance = cls()
# Initialize the pipeline
await instance.initialize()
_pipeline_instances[pipeline_id] = instance
return instance
except Exception as e:
log.exception(f"Failed to load pipeline {pipeline_id}")
raise HTTPException(
status_code=500, detail=f"Failed to load pipeline: {e}"
)
# ==================== API Endpoints ====================
@router.get("/", response_model=list[PipelineInfo])
async def list_pipelines(
enabled_only: bool = Query(True, description="Only return enabled pipelines"),
) -> list[PipelineInfo]:
"""
List all registered pipelines.
Returns summary information for each pipeline including name, version,
available stages, and enabled status.
"""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
async with _pool.acquire() as conn:
if enabled_only:
rows = await conn.fetch(
"""
SELECT pipeline_id, name, description, version,
is_enabled, stages, input_type
FROM pipeline.registry
WHERE is_enabled = TRUE
ORDER BY name
"""
)
else:
rows = await conn.fetch(
"""
SELECT pipeline_id, name, description, version,
is_enabled, stages, input_type
FROM pipeline.registry
ORDER BY name
"""
)
return [
PipelineInfo(
id=row["pipeline_id"],
name=row["name"],
description=row["description"] or "",
version=row["version"],
is_enabled=row["is_enabled"],
stages=row["stages"] or [],
input_type=row["input_type"] or "dict",
)
for row in rows
]
@router.get("/{pipeline_id}", response_model=PipelineDetail)
async def get_pipeline(pipeline_id: str) -> PipelineDetail:
"""
Get detailed information about a pipeline.
Includes metadata, configuration, and registration timestamps.
"""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
async with _pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT pipeline_id, name, description, version, module_path,
is_enabled, stages, input_type, config,
created_at, updated_at
FROM pipeline.registry
WHERE pipeline_id = $1
""",
pipeline_id,
)
if not row:
raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}")
return PipelineDetail(
id=row["pipeline_id"],
name=row["name"],
description=row["description"] or "",
version=row["version"],
is_enabled=row["is_enabled"],
stages=row["stages"] or [],
input_type=row["input_type"] or "dict",
module_path=row["module_path"],
config=row["config"],
created_at=row["created_at"].isoformat() if row["created_at"] else None,
updated_at=row["updated_at"].isoformat() if row["updated_at"] else None,
)
@router.post("/{pipeline_id}/execute", response_model=ExecuteResponse)
async def execute_pipeline(
pipeline_id: str,
request: ExecuteRequest,
) -> ExecuteResponse:
"""
Execute a pipeline.
The pipeline can be executed with:
- A job_id to process an existing scraper job
- Direct input_data for testing
- Specific stages to run (default: all)
"""
import uuid
pipeline = await _get_pipeline_instance(pipeline_id)
# Create execution record
execution_id = str(uuid.uuid4())
# Prepare input data
input_data = request.input_data or {}
if request.job_id:
input_data["job_id"] = request.job_id
if request.business_id:
input_data["business_id"] = request.business_id
# Pass execution_id so Stage 5 synthesis can store results
input_data["execution_id"] = execution_id
stages = request.stages or pipeline.get_stage_names()
# Prepare input summary for storage
import json
input_summary = {
"job_id": request.job_id,
"business_id": request.business_id,
"stages": stages,
}
if _pool:
async with _pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO pipeline.executions (
id, pipeline_id, job_id, business_id,
status, stages_requested, input_summary,
started_at, created_at
)
VALUES ($1, $2, $3, $4, 'running', $5, $6, NOW(), NOW())
""",
uuid.UUID(execution_id),
pipeline_id,
uuid.UUID(request.job_id) if request.job_id else None,
request.business_id,
stages,
json.dumps(input_summary),
)
try:
import time
start_time = time.time()
# Execute pipeline
result = await pipeline.process(input_data, stages=stages)
# Calculate total duration
total_duration_ms = int((time.time() - start_time) * 1000)
# Prepare result summary
result_summary = {
"success": result.success,
"stages_run": result.stages_run,
}
if hasattr(result, "summary") and result.summary:
result_summary.update(result.summary)
# Extract stage metrics for profiling
stage_metrics = {}
if hasattr(result, "stage_results") and result.stage_results:
for stage_name, stage_result in result.stage_results.items():
stage_metrics[stage_name] = {
"duration_ms": stage_result.get("duration_ms", 0),
"success": stage_result.get("success", True),
"records_in": stage_result.get("records_in", 0),
"records_out": stage_result.get("records_out", 0),
"error": stage_result.get("error"),
}
# Update execution status with metrics
if _pool:
async with _pool.acquire() as conn:
await conn.execute(
"""
UPDATE pipeline.executions
SET status = $2, stages_completed = $3, error_message = $4,
result_summary = $5, stage_metrics = $6,
total_duration_ms = $7, completed_at = NOW()
WHERE id = $1
""",
uuid.UUID(execution_id),
"completed" if result.success else "failed",
result.stages_run,
result.error,
json.dumps(result_summary),
json.dumps(stage_metrics) if stage_metrics else None,
total_duration_ms,
)
return ExecuteResponse(
execution_id=execution_id,
pipeline_id=pipeline_id,
success=result.success,
stages_run=result.stages_run,
error=result.error,
)
except Exception as e:
log.exception(f"Pipeline execution failed: {e}")
# Update execution status
if _pool:
async with _pool.acquire() as conn:
await conn.execute(
"""
UPDATE pipeline.executions
SET status = 'failed', error_message = $2, completed_at = NOW()
WHERE id = $1
""",
uuid.UUID(execution_id),
str(e),
)
raise HTTPException(status_code=500, detail=f"Execution failed: {e}")
@router.get("/{pipeline_id}/executions", response_model=list[ExecutionSummary])
async def list_executions(
pipeline_id: str,
status: str | None = Query(None, description="Filter by status"),
limit: int = Query(50, ge=1, le=200, description="Max results"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
) -> list[ExecutionSummary]:
"""
List execution history for a pipeline.
Can filter by status and paginate results.
"""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
conditions = ["pipeline_id = $1"]
params: list[Any] = [pipeline_id]
param_idx = 2
if status:
conditions.append(f"status = ${param_idx}")
params.append(status)
param_idx += 1
where_clause = " AND ".join(conditions)
async with _pool.acquire() as conn:
rows = await conn.fetch(
f"""
SELECT id, pipeline_id, job_id, business_id, status,
stages_requested, stages_completed, error_message,
started_at, completed_at, created_at
FROM pipeline.executions
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ${param_idx} OFFSET ${param_idx + 1}
""",
*params,
limit,
offset,
)
return [
ExecutionSummary(
id=str(row["id"]),
pipeline_id=row["pipeline_id"],
job_id=str(row["job_id"]) if row["job_id"] else None,
business_id=row["business_id"],
status=row["status"],
stages_requested=row["stages_requested"] or [],
stages_completed=row["stages_completed"] or [],
error_message=row["error_message"],
started_at=row["started_at"].isoformat() if row["started_at"] else None,
completed_at=row["completed_at"].isoformat() if row["completed_at"] else None,
created_at=row["created_at"].isoformat() if row["created_at"] else None,
)
for row in rows
]
class StageMetrics(BaseModel):
"""Metrics for a single pipeline stage."""
duration_ms: int = Field(0, description="Stage execution time in ms")
success: bool = Field(True, description="Whether stage succeeded")
records_in: int = Field(0, description="Records input to stage")
records_out: int = Field(0, description="Records output from stage")
error: str | None = Field(None, description="Error message if failed")
class ExecutionDetail(BaseModel):
"""Detailed execution information."""
id: str = Field(..., description="Execution identifier")
pipeline_id: str = Field(..., description="Pipeline identifier")
job_id: str | None = Field(None, description="Associated job ID")
business_id: str | None = Field(None, description="Business identifier")
status: str = Field(..., description="Execution status")
stages_requested: list[str] = Field(..., description="Stages requested")
stages_completed: list[str] = Field(..., description="Stages completed")
current_stage: str | None = Field(None, description="Currently executing stage")
progress: int = Field(0, description="Progress percentage")
error_message: str | None = Field(None, description="Error if failed")
started_at: str | None = Field(None, description="Start timestamp")
completed_at: str | None = Field(None, description="Completion timestamp")
created_at: str | None = Field(None, description="Creation timestamp")
total_duration_ms: int | None = Field(None, description="Total execution time in ms")
input_summary: dict[str, Any] | None = Field(None, description="Input data summary")
result_summary: dict[str, Any] | None = Field(None, description="Result summary")
stage_metrics: dict[str, StageMetrics] | None = Field(None, description="Per-stage execution metrics")
@router.get("/{pipeline_id}/executions/{execution_id}", response_model=ExecutionDetail)
async def get_execution(pipeline_id: str, execution_id: str) -> ExecutionDetail:
"""
Get details for a specific execution.
Returns full execution details including stage results and summaries.
"""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
import uuid
try:
exec_uuid = uuid.UUID(execution_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid execution ID format")
async with _pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT id, pipeline_id, job_id, business_id, status,
stages_requested, stages_completed, error_message,
input_summary, result_summary, stage_metrics,
total_duration_ms, started_at, completed_at, created_at
FROM pipeline.executions
WHERE pipeline_id = $1 AND id = $2
""",
pipeline_id,
exec_uuid,
)
if not row:
raise HTTPException(status_code=404, detail=f"Execution not found: {execution_id}")
# Calculate progress
stages_requested = row["stages_requested"] or []
stages_completed = row["stages_completed"] or []
progress = (
int(len(stages_completed) / len(stages_requested) * 100)
if stages_requested
else 0
)
# Parse JSONB fields (asyncpg may return them as strings)
import json
def parse_json_field(value):
if value is None:
return None
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return None
return value
return ExecutionDetail(
id=str(row["id"]),
pipeline_id=row["pipeline_id"],
job_id=str(row["job_id"]) if row["job_id"] else None,
business_id=row["business_id"],
status=row["status"],
stages_requested=stages_requested,
stages_completed=stages_completed,
current_stage=None, # Could be tracked if needed
progress=progress,
error_message=row["error_message"],
started_at=row["started_at"].isoformat() if row["started_at"] else None,
completed_at=row["completed_at"].isoformat() if row["completed_at"] else None,
created_at=row["created_at"].isoformat() if row["created_at"] else None,
total_duration_ms=row["total_duration_ms"],
input_summary=parse_json_field(row["input_summary"]),
result_summary=parse_json_field(row["result_summary"]),
stage_metrics=parse_json_field(row["stage_metrics"]),
)
@router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel)
async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel:
"""
Get dashboard configuration for a pipeline.
Returns the widget configuration that the frontend uses to render
the pipeline's dynamic dashboard.
"""
pipeline = await _get_pipeline_instance(pipeline_id)
try:
config = pipeline.get_dashboard_config()
return DashboardConfigModel(
pipeline_id=config["pipeline_id"],
title=config["title"],
description=config.get("description"),
sections=[
DashboardSectionModel(
id=s["id"],
title=s["title"],
description=s.get("description"),
widgets=s["widgets"],
collapsed=s.get("collapsed"),
)
for s in config["sections"]
],
default_time_range=config.get("default_time_range"),
refresh_interval=config.get("refresh_interval"),
)
except Exception as e:
log.exception(f"Failed to get dashboard config: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get dashboard config: {e}"
)
@router.get("/{pipeline_id}/widgets/{widget_id}")
async def get_widget_data(
pipeline_id: str,
widget_id: str,
business_id: str | None = Query(None, description="Filter by business"),
job_id: str | None = Query(None, description="Filter by job ID"),
time_range: str = Query("30d", description="Time range (e.g., 7d, 30d, 90d)"),
page: int = Query(1, ge=1, description="Page number for paginated widgets"),
page_size: int = Query(10, ge=1, le=100, description="Items per page"),
) -> dict[str, Any]:
"""
Get data for a specific dashboard widget.
The response format depends on the widget type. Common formats:
- stat_card: {value, trend, ...}
- chart: {data: [{x, y, ...}, ...]}
- table: {data: [...], total: n}
"""
pipeline = await _get_pipeline_instance(pipeline_id)
try:
params = {
"business_id": business_id,
"job_id": job_id,
"time_range": time_range,
"page": page,
"page_size": page_size,
}
data = await pipeline.get_widget_data(widget_id, params)
return data
except Exception as e:
log.exception(f"Failed to get widget data: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to get widget data: {e}"
)
@router.post("/{pipeline_id}/enable")
async def enable_pipeline(pipeline_id: str) -> dict[str, str]:
"""Enable a disabled pipeline."""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
async with _pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE pipeline.registry
SET is_enabled = TRUE, updated_at = NOW()
WHERE pipeline_id = $1
""",
pipeline_id,
)
if result.split()[-1] == "0":
raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}")
# Clear cached instance
_pipeline_instances.pop(pipeline_id, None)
return {"status": "enabled", "pipeline_id": pipeline_id}
@router.post("/{pipeline_id}/disable")
async def disable_pipeline(pipeline_id: str) -> dict[str, str]:
"""Disable a pipeline."""
if not _pool:
raise HTTPException(status_code=503, detail="Database not initialized")
async with _pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE pipeline.registry
SET is_enabled = FALSE, updated_at = NOW()
WHERE pipeline_id = $1
""",
pipeline_id,
)
if result.split()[-1] == "0":
raise HTTPException(status_code=404, detail=f"Pipeline not found: {pipeline_id}")
# Clear cached instance
_pipeline_instances.pop(pipeline_id, None)
return {"status": "disabled", "pipeline_id": pipeline_id}
@router.get("/{pipeline_id}/health")
async def pipeline_health(pipeline_id: str) -> dict[str, Any]:
"""
Check pipeline health.
Returns health status and any issues detected.
"""
pipeline = await _get_pipeline_instance(pipeline_id)
try:
health = await pipeline.health_check()
return {
"pipeline_id": pipeline_id,
**health,
}
except Exception as e:
return {
"pipeline_id": pipeline_id,
"healthy": False,
"error": str(e),
}