From 796f587c57acc503dcfa82c093d4b0489d990b26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 21:13:19 +0000 Subject: [PATCH] feat: Add pipeline execution UI, stage metrics, and API proxy routes - Add run pipeline page with job selection UI - Add execution detail page with stage metrics visualization - Add stage_metrics and total_duration_ms to pipeline.executions table - Create Next.js API proxy routes for all pipeline endpoints - Fix trailing slash issues in pipeline-api.ts URLs - Add Docker volume mounts for pipeline packages - Add REVIEWIQ_DATABASE_URL and LLM API keys to docker-compose - Fix JSONB field parsing in execution detail endpoint Co-Authored-By: Claude Opus 4.5 --- api/routes/pipelines.py | 157 +++++- docker-compose.production.yml | 10 + migrations/versions/007_add_stage_metrics.sql | 36 ++ .../pipelines/[pipelineId]/dashboard/route.ts | 30 ++ .../pipelines/[pipelineId]/execute/route.ts | 39 ++ .../executions/[executionId]/route.ts | 30 ++ .../[pipelineId]/executions/route.ts | 33 ++ web/app/api/pipelines/[pipelineId]/route.ts | 30 ++ .../[pipelineId]/widgets/[widgetId]/route.ts | 35 ++ web/app/api/pipelines/route.ts | 29 ++ .../executions/[executionId]/page.tsx | 479 ++++++++++++++++++ web/app/pipelines/[pipelineId]/run/page.tsx | 288 +++++++++++ web/lib/pipeline-api.ts | 20 + 13 files changed, 1212 insertions(+), 4 deletions(-) create mode 100644 migrations/versions/007_add_stage_metrics.sql create mode 100644 web/app/api/pipelines/[pipelineId]/dashboard/route.ts create mode 100644 web/app/api/pipelines/[pipelineId]/execute/route.ts create mode 100644 web/app/api/pipelines/[pipelineId]/executions/[executionId]/route.ts create mode 100644 web/app/api/pipelines/[pipelineId]/executions/route.ts create mode 100644 web/app/api/pipelines/[pipelineId]/route.ts create mode 100644 web/app/api/pipelines/[pipelineId]/widgets/[widgetId]/route.ts create mode 100644 web/app/api/pipelines/route.ts create mode 100644 web/app/pipelines/[pipelineId]/executions/[executionId]/page.tsx create mode 100644 web/app/pipelines/[pipelineId]/run/page.tsx diff --git a/api/routes/pipelines.py b/api/routes/pipelines.py index 4fc7ef1..87563c4 100644 --- a/api/routes/pipelines.py +++ b/api/routes/pipelines.py @@ -288,41 +288,81 @@ async def execute_pipeline( execution_id = str(uuid.uuid4()) stages = request.stages or pipeline.get_stage_names() + # Prepare input summary for storage + import json + input_summary = { + "job_id": request.job_id, + "business_id": request.business_id, + "stages": stages, + } + if _pool: async with _pool.acquire() as conn: await conn.execute( """ INSERT INTO pipeline.executions ( id, pipeline_id, job_id, business_id, - status, stages_requested, created_at + status, stages_requested, input_summary, + started_at, created_at ) - VALUES ($1, $2, $3, $4, 'running', $5, NOW()) + VALUES ($1, $2, $3, $4, 'running', $5, $6, NOW(), NOW()) """, uuid.UUID(execution_id), pipeline_id, uuid.UUID(request.job_id) if request.job_id else None, request.business_id, stages, + json.dumps(input_summary), ) try: + import time + start_time = time.time() + # Execute pipeline result = await pipeline.process(input_data, stages=stages) - # Update execution status + # Calculate total duration + total_duration_ms = int((time.time() - start_time) * 1000) + + # Prepare result summary + result_summary = { + "success": result.success, + "stages_run": result.stages_run, + } + if hasattr(result, "summary") and result.summary: + result_summary.update(result.summary) + + # Extract stage metrics for profiling + stage_metrics = {} + if hasattr(result, "stage_results") and result.stage_results: + for stage_name, stage_result in result.stage_results.items(): + stage_metrics[stage_name] = { + "duration_ms": stage_result.get("duration_ms", 0), + "success": stage_result.get("success", True), + "records_in": stage_result.get("records_in", 0), + "records_out": stage_result.get("records_out", 0), + "error": stage_result.get("error"), + } + + # Update execution status with metrics if _pool: async with _pool.acquire() as conn: await conn.execute( """ UPDATE pipeline.executions SET status = $2, stages_completed = $3, error_message = $4, - completed_at = NOW() + result_summary = $5, stage_metrics = $6, + total_duration_ms = $7, completed_at = NOW() WHERE id = $1 """, uuid.UUID(execution_id), "completed" if result.success else "failed", result.stages_run, result.error, + json.dumps(result_summary), + json.dumps(stage_metrics) if stage_metrics else None, + total_duration_ms, ) return ExecuteResponse( @@ -412,6 +452,115 @@ async def list_executions( ] +class StageMetrics(BaseModel): + """Metrics for a single pipeline stage.""" + + duration_ms: int = Field(0, description="Stage execution time in ms") + success: bool = Field(True, description="Whether stage succeeded") + records_in: int = Field(0, description="Records input to stage") + records_out: int = Field(0, description="Records output from stage") + error: str | None = Field(None, description="Error message if failed") + + +class ExecutionDetail(BaseModel): + """Detailed execution information.""" + + id: str = Field(..., description="Execution identifier") + pipeline_id: str = Field(..., description="Pipeline identifier") + job_id: str | None = Field(None, description="Associated job ID") + business_id: str | None = Field(None, description="Business identifier") + status: str = Field(..., description="Execution status") + stages_requested: list[str] = Field(..., description="Stages requested") + stages_completed: list[str] = Field(..., description="Stages completed") + current_stage: str | None = Field(None, description="Currently executing stage") + progress: int = Field(0, description="Progress percentage") + error_message: str | None = Field(None, description="Error if failed") + started_at: str | None = Field(None, description="Start timestamp") + completed_at: str | None = Field(None, description="Completion timestamp") + created_at: str | None = Field(None, description="Creation timestamp") + total_duration_ms: int | None = Field(None, description="Total execution time in ms") + input_summary: dict[str, Any] | None = Field(None, description="Input data summary") + result_summary: dict[str, Any] | None = Field(None, description="Result summary") + stage_metrics: dict[str, StageMetrics] | None = Field(None, description="Per-stage execution metrics") + + +@router.get("/{pipeline_id}/executions/{execution_id}", response_model=ExecutionDetail) +async def get_execution(pipeline_id: str, execution_id: str) -> ExecutionDetail: + """ + Get details for a specific execution. + + Returns full execution details including stage results and summaries. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + import uuid + + try: + exec_uuid = uuid.UUID(execution_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid execution ID format") + + async with _pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id, pipeline_id, job_id, business_id, status, + stages_requested, stages_completed, error_message, + input_summary, result_summary, stage_metrics, + total_duration_ms, started_at, completed_at, created_at + FROM pipeline.executions + WHERE pipeline_id = $1 AND id = $2 + """, + pipeline_id, + exec_uuid, + ) + + if not row: + raise HTTPException(status_code=404, detail=f"Execution not found: {execution_id}") + + # Calculate progress + stages_requested = row["stages_requested"] or [] + stages_completed = row["stages_completed"] or [] + progress = ( + int(len(stages_completed) / len(stages_requested) * 100) + if stages_requested + else 0 + ) + + # Parse JSONB fields (asyncpg may return them as strings) + import json + + def parse_json_field(value): + if value is None: + return None + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + return None + return value + + return ExecutionDetail( + id=str(row["id"]), + pipeline_id=row["pipeline_id"], + job_id=str(row["job_id"]) if row["job_id"] else None, + business_id=row["business_id"], + status=row["status"], + stages_requested=stages_requested, + stages_completed=stages_completed, + current_stage=None, # Could be tracked if needed + progress=progress, + error_message=row["error_message"], + started_at=row["started_at"].isoformat() if row["started_at"] else None, + completed_at=row["completed_at"].isoformat() if row["completed_at"] else None, + created_at=row["created_at"].isoformat() if row["created_at"] else None, + total_duration_ms=row["total_duration_ms"], + input_summary=parse_json_field(row["input_summary"]), + result_summary=parse_json_field(row["result_summary"]), + stage_metrics=parse_json_field(row["stage_metrics"]), + ) + + @router.get("/{pipeline_id}/dashboard", response_model=DashboardConfigModel) async def get_dashboard_config(pipeline_id: str) -> DashboardConfigModel: """ diff --git a/docker-compose.production.yml b/docker-compose.production.yml index 5e35140..9491b7d 100644 --- a/docker-compose.production.yml +++ b/docker-compose.production.yml @@ -37,6 +37,16 @@ services: # Chromium/Xvfb configuration - DISPLAY=:99 - CHROME_BIN=/usr/bin/chromium + # Pipeline packages path + - PYTHONPATH=/app:/app/packages/pipeline-core/src:/app/packages/reviewiq-pipeline/src + # ReviewIQ pipeline database URL (same DB, pipeline schema) + - REVIEWIQ_DATABASE_URL=postgresql://scraper:${DB_PASSWORD:-scraper123}@db:5432/scraper + # ReviewIQ LLM API keys + - REVIEWIQ_OPENAI_API_KEY=${OPENAI_API_KEY:-} + - REVIEWIQ_ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-} + volumes: + - ./packages:/app/packages:ro + - ./api:/app/api:ro ports: - "8000:8000" - "5900:5900" # VNC port (for VNC client) diff --git a/migrations/versions/007_add_stage_metrics.sql b/migrations/versions/007_add_stage_metrics.sql new file mode 100644 index 0000000..2a21d9c --- /dev/null +++ b/migrations/versions/007_add_stage_metrics.sql @@ -0,0 +1,36 @@ +-- ============================================================================= +-- Migration: 007_add_stage_metrics.sql +-- Add stage-level metrics to pipeline executions +-- ============================================================================= +-- +-- Adds a JSONB column to store per-stage execution metrics for profiling: +-- - duration_ms: execution time per stage +-- - records_processed: number of records handled +-- - errors: any stage-specific errors +-- +-- Date: 2026-01-24 +-- ============================================================================= + +-- Add stage_metrics column to store per-stage profiling data +ALTER TABLE pipeline.executions +ADD COLUMN IF NOT EXISTS stage_metrics JSONB; + +COMMENT ON COLUMN pipeline.executions.stage_metrics IS 'Per-stage execution metrics (timing, records processed, errors)'; + +-- Example structure: +-- { +-- "normalize": {"duration_ms": 150, "records_in": 100, "records_out": 100, "success": true}, +-- "classify": {"duration_ms": 2500, "records_in": 100, "records_out": 100, "success": true}, +-- "route": {"duration_ms": 80, "records_in": 100, "records_out": 15, "success": true}, +-- "aggregate": {"duration_ms": 45, "records_in": 100, "records_out": 1, "success": true} +-- } + +-- Add total_duration_ms for quick access to overall execution time +ALTER TABLE pipeline.executions +ADD COLUMN IF NOT EXISTS total_duration_ms INTEGER; + +COMMENT ON COLUMN pipeline.executions.total_duration_ms IS 'Total execution duration in milliseconds'; + +-- ============================================================================= +-- DONE +-- ============================================================================= diff --git a/web/app/api/pipelines/[pipelineId]/dashboard/route.ts b/web/app/api/pipelines/[pipelineId]/dashboard/route.ts new file mode 100644 index 0000000..543ed99 --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/dashboard/route.ts @@ -0,0 +1,30 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string }> } +) { + try { + const { pipelineId } = await params; + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}/dashboard`; + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch dashboard config' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipeline dashboard API error:', error); + return NextResponse.json( + { error: 'Failed to fetch dashboard config' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/[pipelineId]/execute/route.ts b/web/app/api/pipelines/[pipelineId]/execute/route.ts new file mode 100644 index 0000000..2a4d6ec --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/execute/route.ts @@ -0,0 +1,39 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function POST( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string }> } +) { + try { + const { pipelineId } = await params; + const body = await request.json(); + + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}/execute`; + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ detail: 'Unknown error' })); + return NextResponse.json( + { error: error.detail || 'Failed to execute pipeline' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipeline execute API error:', error); + return NextResponse.json( + { error: 'Failed to execute pipeline' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/[pipelineId]/executions/[executionId]/route.ts b/web/app/api/pipelines/[pipelineId]/executions/[executionId]/route.ts new file mode 100644 index 0000000..0d058f6 --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/executions/[executionId]/route.ts @@ -0,0 +1,30 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string; executionId: string }> } +) { + try { + const { pipelineId, executionId } = await params; + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}/executions/${executionId}`; + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch execution' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipeline execution API error:', error); + return NextResponse.json( + { error: 'Failed to fetch execution' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/[pipelineId]/executions/route.ts b/web/app/api/pipelines/[pipelineId]/executions/route.ts new file mode 100644 index 0000000..43dd803 --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/executions/route.ts @@ -0,0 +1,33 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string }> } +) { + try { + const { pipelineId } = await params; + const { searchParams } = new URL(request.url); + const limit = searchParams.get('limit') || '50'; + + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}/executions?limit=${limit}`; + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch executions' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipeline executions API error:', error); + return NextResponse.json( + { error: 'Failed to fetch executions' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/[pipelineId]/route.ts b/web/app/api/pipelines/[pipelineId]/route.ts new file mode 100644 index 0000000..11e1643 --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/route.ts @@ -0,0 +1,30 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string }> } +) { + try { + const { pipelineId } = await params; + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}`; + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch pipeline' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipeline API error:', error); + return NextResponse.json( + { error: 'Failed to fetch pipeline' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/[pipelineId]/widgets/[widgetId]/route.ts b/web/app/api/pipelines/[pipelineId]/widgets/[widgetId]/route.ts new file mode 100644 index 0000000..f8d27e2 --- /dev/null +++ b/web/app/api/pipelines/[pipelineId]/widgets/[widgetId]/route.ts @@ -0,0 +1,35 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ pipelineId: string; widgetId: string }> } +) { + try { + const { pipelineId, widgetId } = await params; + const { searchParams } = new URL(request.url); + + // Forward query params + const queryString = searchParams.toString(); + const url = `${API_BASE_URL}/api/pipelines/${pipelineId}/widgets/${widgetId}${queryString ? `?${queryString}` : ''}`; + + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch widget data' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Widget API error:', error); + return NextResponse.json( + { error: 'Failed to fetch widget data' }, + { status: 500 } + ); + } +} diff --git a/web/app/api/pipelines/route.ts b/web/app/api/pipelines/route.ts new file mode 100644 index 0000000..e273da8 --- /dev/null +++ b/web/app/api/pipelines/route.ts @@ -0,0 +1,29 @@ +import { NextRequest, NextResponse } from 'next/server'; + +const API_BASE_URL = process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000'; + +export async function GET(request: NextRequest) { + try { + const { searchParams } = new URL(request.url); + const enabledOnly = searchParams.get('enabled_only') !== 'false'; + + const url = `${API_BASE_URL}/api/pipelines/?enabled_only=${enabledOnly}`; + const response = await fetch(url); + + if (!response.ok) { + return NextResponse.json( + { error: 'Failed to fetch pipelines' }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error('Pipelines API error:', error); + return NextResponse.json( + { error: 'Failed to fetch pipelines' }, + { status: 500 } + ); + } +} diff --git a/web/app/pipelines/[pipelineId]/executions/[executionId]/page.tsx b/web/app/pipelines/[pipelineId]/executions/[executionId]/page.tsx new file mode 100644 index 0000000..0870b47 --- /dev/null +++ b/web/app/pipelines/[pipelineId]/executions/[executionId]/page.tsx @@ -0,0 +1,479 @@ +'use client'; + +import { useState, useEffect } from 'react'; +import { useParams } from 'next/navigation'; +import Link from 'next/link'; +import { + ArrowLeft, + CheckCircle, + XCircle, + Clock, + Loader, + RefreshCw, + AlertCircle, + ChevronDown, + ChevronRight, + ExternalLink, + Timer, + ArrowRightLeft, +} from 'lucide-react'; +import type { ExecutionStatus, StageMetrics } from '@/lib/pipeline-types'; +import { getExecution } from '@/lib/pipeline-api'; + +// Status badge component +function StatusBadge({ status }: { status: ExecutionStatus['status'] }) { + const config = { + pending: { + icon: Clock, + color: 'bg-gray-100 text-gray-600', + label: 'Pending', + }, + running: { + icon: Loader, + color: 'bg-blue-100 text-blue-600', + label: 'Running', + }, + completed: { + icon: CheckCircle, + color: 'bg-green-100 text-green-600', + label: 'Completed', + }, + failed: { + icon: XCircle, + color: 'bg-red-100 text-red-600', + label: 'Failed', + }, + cancelled: { + icon: AlertCircle, + color: 'bg-yellow-100 text-yellow-600', + label: 'Cancelled', + }, + }; + + const { icon: Icon, color, label } = config[status] || config.pending; + + return ( + + + {label} + + ); +} + +// Format date string +function formatDate(dateStr: string | undefined): string { + if (!dateStr) return '-'; + const date = new Date(dateStr); + return date.toLocaleDateString() + ' ' + date.toLocaleTimeString(); +} + +// Format duration from milliseconds +function formatDurationMs(ms: number | undefined): string { + if (!ms) return '-'; + if (ms < 1000) return `${ms}ms`; + if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`; + const minutes = Math.floor(ms / 60000); + const seconds = Math.floor((ms % 60000) / 1000); + return `${minutes}m ${seconds}s`; +} + +// Calculate duration from timestamps +function formatDuration(start?: string, end?: string): string { + if (!start) return '-'; + const startDate = new Date(start); + const endDate = end ? new Date(end) : new Date(); + const ms = endDate.getTime() - startDate.getTime(); + return formatDurationMs(ms); +} + +// Stage metrics display component +function StageMetricsTable({ + stages, + metrics, + completed, +}: { + stages: string[]; + metrics: Record | undefined; + completed: string[]; +}) { + if (!metrics || Object.keys(metrics).length === 0) { + return ( +
+ No stage metrics available yet +
+ ); + } + + // Calculate total duration + const totalDuration = Object.values(metrics).reduce( + (sum, m) => sum + (m.duration_ms || 0), + 0 + ); + + return ( +
+ + + + + + + + + + + + + {stages.map((stage, index) => { + const stageMetrics = metrics[stage]; + const isCompleted = completed.includes(stage); + const percentage = totalDuration > 0 && stageMetrics + ? ((stageMetrics.duration_ms / totalDuration) * 100).toFixed(1) + : '0'; + + return ( + + + + + + + + + ); + })} + + + + + + + + + + + +
+ Stage + + Status + + Duration + + % of Total + + Records In + + Records Out +
+
+ + {index + 1} + + {stage} +
+
+ {stageMetrics ? ( + stageMetrics.success ? ( + + + Success + + ) : ( + + + Failed + + ) + ) : isCompleted ? ( + - + ) : ( + Pending + )} + + + {stageMetrics ? formatDurationMs(stageMetrics.duration_ms) : '-'} + + + {stageMetrics && totalDuration > 0 ? ( +
+
+
+
+ + {percentage}% + +
+ ) : ( + - + )} +
+ + {stageMetrics?.records_in?.toLocaleString() || '-'} + + + + {stageMetrics?.records_out?.toLocaleString() || '-'} + +
Total + {formatDurationMs(totalDuration)} + 100%
+
+ ); +} + +// JSON viewer component +function JsonViewer({ data, title }: { data: unknown; title: string }) { + const [expanded, setExpanded] = useState(false); + + if (!data) return null; + + return ( +
+ + {expanded && ( +
+
+            {JSON.stringify(data, null, 2)}
+          
+
+ )} +
+ ); +} + +/** + * Execution detail page - shows results, status, and stage metrics. + */ +export default function ExecutionDetailPage() { + const params = useParams(); + const pipelineId = params.pipelineId as string; + const executionId = params.executionId as string; + + const [execution, setExecution] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [autoRefresh, setAutoRefresh] = useState(true); + + const fetchExecution = async () => { + try { + const data = await getExecution(pipelineId, executionId); + setExecution(data); + setError(null); + + // Stop auto-refresh if execution is complete + if (data.status === 'completed' || data.status === 'failed' || data.status === 'cancelled') { + setAutoRefresh(false); + } + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to load execution'); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + fetchExecution(); + }, [pipelineId, executionId]); + + // Auto-refresh while running + useEffect(() => { + if (!autoRefresh) return; + + const interval = setInterval(fetchExecution, 3000); + return () => clearInterval(interval); + }, [autoRefresh, pipelineId, executionId]); + + if (loading && !execution) { + return ( +
+
+
+
+
+
+
+
+ ); + } + + if (error && !execution) { + return ( +
+ + + Back to Pipeline + + +
+ +

{error}

+ +
+
+ ); + } + + const isRunning = execution?.status === 'running' || execution?.status === 'pending'; + + return ( +
+ {/* Navigation */} +
+ + + Back to Pipeline + + +
+ {isRunning && ( + + Auto-refreshing... + + )} + +
+
+ + {/* Header */} +
+
+
+

+ Execution Details +

+ + {execution?.id} + +
+ +
+ + {/* Info Grid */} +
+
+ Started +

+ {formatDate(execution?.started_at)} +

+
+
+ Completed +

+ {formatDate(execution?.completed_at)} +

+
+
+ Total Duration +

+ {execution?.total_duration_ms + ? formatDurationMs(execution.total_duration_ms) + : formatDuration(execution?.started_at, execution?.completed_at)} +

+
+
+ Stages +

+ {execution?.stages_completed.length} / {execution?.stages_requested.length} +

+
+
+ + {/* Job/Business Link */} + {execution?.job_id && ( +
+ Job: + + {execution.job_id.slice(0, 12)}... + + +
+ )} + + {execution?.business_id && ( +
+ Business: + + {execution.business_id} + +
+ )} +
+ + {/* Error Message */} + {execution?.error_message && ( +
+
+ +
+

Execution Failed

+

{execution.error_message}

+
+
+
+ )} + + {/* Stage Metrics / Profiling */} +
+
+ +

+ Stage Performance +

+
+ + +
+ + {/* Raw Data */} +
+

+ Execution Data +

+ +
+ + +
+
+
+ ); +} diff --git a/web/app/pipelines/[pipelineId]/run/page.tsx b/web/app/pipelines/[pipelineId]/run/page.tsx new file mode 100644 index 0000000..1fcab43 --- /dev/null +++ b/web/app/pipelines/[pipelineId]/run/page.tsx @@ -0,0 +1,288 @@ +'use client'; + +import { useState, useEffect } from 'react'; +import { useParams, useRouter } from 'next/navigation'; +import Link from 'next/link'; +import { + ArrowLeft, + Play, + CheckCircle, + Search, + Loader, + AlertCircle, + ChevronRight, +} from 'lucide-react'; +import type { PipelineDetail } from '@/lib/pipeline-types'; +import { getPipeline, executePipeline } from '@/lib/pipeline-api'; + +interface Job { + job_id: string; + business_name: string; + place_id: string; + status: string; + total_reviews: number; + created_at: string; + completed_at?: string; +} + +/** + * Run Pipeline page - select a job to process through the pipeline. + */ +export default function RunPipelinePage() { + const params = useParams(); + const router = useRouter(); + const pipelineId = params.pipelineId as string; + + const [pipeline, setPipeline] = useState(null); + const [jobs, setJobs] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + const [searchQuery, setSearchQuery] = useState(''); + const [selectedJob, setSelectedJob] = useState(null); + const [executing, setExecuting] = useState(false); + + useEffect(() => { + const fetchData = async () => { + setLoading(true); + setError(null); + + try { + // Fetch pipeline details + const pipelineData = await getPipeline(pipelineId); + setPipeline(pipelineData); + + // Fetch completed jobs + const response = await fetch('/api/jobs/?status=completed&limit=100'); + if (!response.ok) throw new Error('Failed to fetch jobs'); + const jobsData = await response.json(); + // API returns { jobs: [...] } + setJobs(jobsData.jobs || []); + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to load data'); + } finally { + setLoading(false); + } + }; + + fetchData(); + }, [pipelineId]); + + const handleExecute = async () => { + if (!selectedJob || !pipeline) return; + + setExecuting(true); + setError(null); + + try { + const execution = await executePipeline(pipelineId, { + job_id: selectedJob.job_id, + // Always run all stages - they're sequential and depend on each other + stages: pipeline.stages, + }); + + // Navigate to execution detail page + router.push(`/pipelines/${pipelineId}/executions/${execution.execution_id}`); + } catch (e) { + setError(e instanceof Error ? e.message : 'Failed to execute pipeline'); + setExecuting(false); + } + }; + + // Filter jobs by search query + const filteredJobs = jobs.filter((job) => + job.business_name.toLowerCase().includes(searchQuery.toLowerCase()) || + job.place_id.toLowerCase().includes(searchQuery.toLowerCase()) + ); + + if (loading) { + return ( +
+
+
+
+
+
+
+ ); + } + + if (error && !pipeline) { + return ( +
+ + + Back to Pipeline + + +
+ +

{error}

+
+
+ ); + } + + return ( +
+ {/* Navigation */} + + + Back to Pipeline + + + {/* Header */} +
+

Run Pipeline

+

+ {pipeline?.name} - Select a completed job to process +

+
+ + {error && ( +
+

{error}

+
+ )} + +
+ {/* Job Selection */} +
+

+ Select Job +

+ + {/* Search */} +
+ + setSearchQuery(e.target.value)} + placeholder="Search jobs by business name..." + className="w-full pl-10 pr-4 py-2 border border-gray-300 rounded-lg text-sm focus:outline-none focus:ring-2 focus:ring-blue-500" + /> +
+ + {/* Jobs List */} +
+ {filteredJobs.length === 0 ? ( +
+

No completed jobs found

+
+ ) : ( + filteredJobs.map((job, index) => ( +
setSelectedJob(job)} + className={`p-4 rounded-lg border-2 cursor-pointer transition-all ${ + selectedJob?.job_id === job.job_id + ? 'border-blue-500 bg-blue-50' + : 'border-gray-200 hover:border-gray-300 hover:bg-gray-50' + }`} + > +
+
+

+ {job.business_name} +

+

+ {job.place_id} +

+
+ {job.total_reviews} reviews + + {new Date(job.completed_at || job.created_at).toLocaleDateString()} + +
+
+ {selectedJob?.job_id === job.job_id ? ( + + ) : ( + + )} +
+
+ )) + )} +
+
+ + {/* Pipeline Info & Execute */} +
+

+ Pipeline Stages +

+ + {/* Stages (read-only, informational) */} +
+ {pipeline?.stages.map((stage, index) => ( +
+
+ {index + 1} +
+ {stage} +
+ ))} +
+ +

+ All stages will run sequentially. Each stage depends on the output of the previous stage. +

+ + {/* Summary */} +
+

Summary

+
+
+ Selected Job: + + {selectedJob?.business_name || 'None'} + +
+
+ Reviews: + + {selectedJob?.total_reviews || '-'} + +
+
+ Stages: + + {pipeline?.stages.length || 0} + +
+
+
+ + {/* Execute Button */} + +
+
+
+ ); +} diff --git a/web/lib/pipeline-api.ts b/web/lib/pipeline-api.ts index 0aa094d..23d5759 100644 --- a/web/lib/pipeline-api.ts +++ b/web/lib/pipeline-api.ts @@ -166,6 +166,26 @@ export async function listExecutions( return response.json(); } +/** + * Get a specific execution by ID. + */ +export async function getExecution( + pipelineId: string, + executionId: string +): Promise { + const url = `${API_BASE}/api/pipelines/${pipelineId}/executions/${executionId}`; + const response = await fetch(url); + + if (!response.ok) { + if (response.status === 404) { + throw new Error(`Execution not found: ${executionId}`); + } + throw new Error(`Failed to fetch execution: ${response.statusText}`); + } + + return response.json(); +} + /** * Enable a pipeline. */