feat: Add pipeline execution UI, stage metrics, and API proxy routes
- Add run pipeline page with job selection UI - Add execution detail page with stage metrics visualization - Add stage_metrics and total_duration_ms to pipeline.executions table - Create Next.js API proxy routes for all pipeline endpoints - Fix trailing slash issues in pipeline-api.ts URLs - Add Docker volume mounts for pipeline packages - Add REVIEWIQ_DATABASE_URL and LLM API keys to docker-compose - Fix JSONB field parsing in execution detail endpoint Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -288,41 +288,81 @@ async def execute_pipeline(
|
||||
execution_id = str(uuid.uuid4())
|
||||
stages = request.stages or pipeline.get_stage_names()
|
||||
|
||||
# Prepare input summary for storage
|
||||
import json
|
||||
input_summary = {
|
||||
"job_id": request.job_id,
|
||||
"business_id": request.business_id,
|
||||
"stages": stages,
|
||||
}
|
||||
|
||||
if _pool:
|
||||
async with _pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO pipeline.executions (
|
||||
id, pipeline_id, job_id, business_id,
|
||||
status, stages_requested, created_at
|
||||
status, stages_requested, input_summary,
|
||||
started_at, created_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, 'running', $5, NOW())
|
||||
VALUES ($1, $2, $3, $4, 'running', $5, $6, NOW(), NOW())
|
||||
""",
|
||||
uuid.UUID(execution_id),
|
||||
pipeline_id,
|
||||
uuid.UUID(request.job_id) if request.job_id else None,
|
||||
request.business_id,
|
||||
stages,
|
||||
json.dumps(input_summary),
|
||||
)
|
||||
|
||||
try:
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
# Execute pipeline
|
||||
result = await pipeline.process(input_data, stages=stages)
|
||||
|
||||
# Update execution status
|
||||
# Calculate total duration
|
||||
total_duration_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
# Prepare result summary
|
||||
result_summary = {
|
||||
"success": result.success,
|
||||
"stages_run": result.stages_run,
|
||||
}
|
||||
if hasattr(result, "summary") and result.summary:
|
||||
result_summary.update(result.summary)
|
||||
|
||||
# Extract stage metrics for profiling
|
||||
stage_metrics = {}
|
||||
if hasattr(result, "stage_results") and result.stage_results:
|
||||
for stage_name, stage_result in result.stage_results.items():
|
||||
stage_metrics[stage_name] = {
|
||||
"duration_ms": stage_result.get("duration_ms", 0),
|
||||
"success": stage_result.get("success", True),
|
||||
"records_in": stage_result.get("records_in", 0),
|
||||
"records_out": stage_result.get("records_out", 0),
|
||||
"error": stage_result.get("error"),
|
||||
}
|
||||
|
||||
# Update execution status with metrics
|
||||
if _pool:
|
||||
async with _pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE pipeline.executions
|
||||
SET status = $2, stages_completed = $3, error_message = $4,
|
||||
completed_at = NOW()
|
||||
result_summary = $5, stage_metrics = $6,
|
||||
total_duration_ms = $7, completed_at = NOW()
|
||||
WHERE id = $1
|
||||
""",
|
||||
uuid.UUID(execution_id),
|
||||
"completed" if result.success else "failed",
|
||||
result.stages_run,
|
||||
result.error,
|
||||
json.dumps(result_summary),
|
||||
json.dumps(stage_metrics) if stage_metrics else None,
|
||||
total_duration_ms,
|
||||
)
|
||||
|
||||
return ExecuteResponse(
|
||||
@@ -412,6 +452,115 @@ async def list_executions(
|
||||
]
|
||||
|
||||
|
||||
class StageMetrics(BaseModel):
|
||||
"""Metrics for a single pipeline stage."""
|
||||
|
||||
duration_ms: int = Field(0, description="Stage execution time in ms")
|
||||
success: bool = Field(True, description="Whether stage succeeded")
|
||||
records_in: int = Field(0, description="Records input to stage")
|
||||
records_out: int = Field(0, description="Records output from stage")
|
||||
error: str | None = Field(None, description="Error message if failed")
|
||||
|
||||
|
||||
class ExecutionDetail(BaseModel):
|
||||
"""Detailed execution information."""
|
||||
|
||||
id: str = Field(..., description="Execution identifier")
|
||||
pipeline_id: str = Field(..., description="Pipeline identifier")
|
||||
job_id: str | None = Field(None, description="Associated job ID")
|
||||
business_id: str | None = Field(None, description="Business identifier")
|
||||
status: str = Field(..., description="Execution status")
|
||||
stages_requested: list[str] = Field(..., description="Stages requested")
|
||||
stages_completed: list[str] = Field(..., description="Stages completed")
|
||||
current_stage: str | None = Field(None, description="Currently executing stage")
|
||||
progress: int = Field(0, description="Progress percentage")
|
||||
error_message: str | None = Field(None, description="Error if failed")
|
||||
started_at: str | None = Field(None, description="Start timestamp")
|
||||
completed_at: str | None = Field(None, description="Completion timestamp")
|
||||
created_at: str | None = Field(None, description="Creation timestamp")
|
||||
total_duration_ms: int | None = Field(None, description="Total execution time in ms")
|
||||
input_summary: dict[str, Any] | None = Field(None, description="Input data summary")
|
||||
result_summary: dict[str, Any] | None = Field(None, description="Result summary")
|
||||
stage_metrics: dict[str, StageMetrics] | None = Field(None, description="Per-stage execution metrics")
|
||||
|
||||
|
||||
@router.get("/{pipeline_id}/executions/{execution_id}", response_model=ExecutionDetail)
|
||||
async def get_execution(pipeline_id: str, execution_id: str) -> ExecutionDetail:
|
||||
"""
|
||||
Get details for a specific execution.
|
||||
|
||||
Returns full execution details including stage results and summaries.
|
||||
"""
|
||||
if not _pool:
|
||||
raise HTTPException(status_code=503, detail="Database not initialized")
|
||||
|
||||
import uuid
|
||||
|
||||
try:
|
||||
exec_uuid = uuid.UUID(execution_id)
|
||||
except ValueError:
|
||||
raise HTTPException(status_code=400, detail="Invalid execution ID format")
|
||||
|
||||
async with _pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT id, pipeline_id, job_id, business_id, status,
|
||||
stages_requested, stages_completed, error_message,
|
||||
input_summary, result_summary, stage_metrics,
|
||||
total_duration_ms, started_at, completed_at, created_at
|
||||
FROM pipeline.executions
|
||||
WHERE pipeline_id = $1 AND id = $2
|
||||
""",
|
||||
pipeline_id,
|
||||
exec_uuid,
|
||||
)
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail=f"Execution not found: {execution_id}")
|
||||
|
||||
# Calculate progress
|
||||
stages_requested = row["stages_requested"] or []
|
||||
stages_completed = row["stages_completed"] or []
|
||||
progress = (
|
||||
int(len(stages_completed) / len(stages_requested) * 100)
|
||||
if stages_requested
|
||||
else 0
|
||||
)
|
||||
|
||||
# Parse JSONB fields (asyncpg may return them as strings)
|
||||
import json
|
||||
|
||||
def parse_json_field(value):
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
return value
|
||||
|
||||
return ExecutionDetail(
|
||||
id=str(row["id"]),
|
||||
pipeline_id=row["pipeline_id"],
|
||||
job_id=str(row["job_id"]) if row["job_id"] else None,
|
||||
business_id=row["business_id"],
|
||||
status=row["status"],
|
||||
stages_requested=stages_requested,
|
||||
stages_completed=stages_completed,
|
||||
current_stage=None, # Could be tracked if needed
|
||||
progress=progress,
|
||||
error_message=row["error_message"],
|
||||
started_at=row["started_at"].isoformat() if row["started_at"] else None,
|
||||
completed_at=row["completed_at"].isoformat() if row["completed_at"] else None,
|
||||
created_at=row["created_at"].isoformat() if row["created_at"] else None,
|
||||
total_duration_ms=row["total_duration_ms"],
|
||||
input_summary=parse_json_field(row["input_summary"]),
|
||||
result_summary=parse_json_field(row["result_summary"]),
|
||||
stage_metrics=parse_json_field(row["stage_metrics"]),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel)
|
||||
async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user