""" Pipeline class - main public API for the ReviewIQ pipeline. Provides a unified interface for running pipeline stages and implements the BasePipeline interface for the extensible pipeline system. """ from __future__ import annotations import json import logging import re import time from datetime import date, datetime, timedelta from typing import TYPE_CHECKING, Any from pipeline_core import ( BasePipeline, DashboardConfig, DashboardSection, PipelineMetadata, PipelineResult as BasePipelineResult, StageResult, WidgetConfig, ) from reviewiq_pipeline.config import Config from reviewiq_pipeline.contracts import ( ClassificationConfig, NormalizedReview, ReviewToClassify, ScraperOutput, SpanToRoute, Stage1Input, Stage1Output, Stage2Input, Stage2Output, Stage3Input, Stage3Output, Stage4Input, Stage4Output, ValidationResult, ) from reviewiq_pipeline.db.connection import DatabasePool from reviewiq_pipeline.db.repositories import ( FactRepository, IssueRepository, ReviewRepository, SpanRepository, ) from reviewiq_pipeline.services.embeddings import EmbeddingService from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer from reviewiq_pipeline.stages.stage2_classify import Stage2Classifier from reviewiq_pipeline.stages.stage3_route import Stage3Router from reviewiq_pipeline.stages.stage4_aggregate import Stage4Aggregator from reviewiq_pipeline.stages.stage5_synthesize import Stage5Synthesizer from reviewiq_pipeline.services.llm_client import LLMClient from reviewiq_pipeline.validation.validators import ( validate_stage1_output, validate_stage2_output, validate_stage3_output, validate_stage4_output, ) if TYPE_CHECKING: pass logger = logging.getLogger(__name__) # Stage name to number mapping STAGE_NAMES = ["normalize", "classify", "route", "aggregate", "synthesize"] STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4, "synthesize": 5} STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate", 5: "synthesize"} def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> datetime | None: """Parse relative date strings like '10 months ago' into datetime objects. Args: date_str: A relative date string (e.g., "10 months ago", "2 weeks ago") or an ISO date string, or None. default_to_now: If True, returns current datetime when parsing fails. Returns: A datetime object, or None if parsing fails and default_to_now is False. """ now = datetime.now() if not date_str: return now if default_to_now else None # Try to parse as ISO date first try: return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except (ValueError, AttributeError): pass # Parse relative dates like "10 months ago", "2 weeks ago", "a day ago" date_str = date_str.lower().strip() # Handle "a/an" as 1 date_str = re.sub(r'\b(a|an)\s+', '1 ', date_str) # Extract number and unit match = re.match(r'(\d+)\s*(second|minute|hour|day|week|month|year)s?\s*ago', date_str) if match: amount = int(match.group(1)) unit = match.group(2) if unit == 'second': return now - timedelta(seconds=amount) elif unit == 'minute': return now - timedelta(minutes=amount) elif unit == 'hour': return now - timedelta(hours=amount) elif unit == 'day': return now - timedelta(days=amount) elif unit == 'week': return now - timedelta(weeks=amount) elif unit == 'month': # Approximate months as 30 days return now - timedelta(days=amount * 30) elif unit == 'year': # Approximate years as 365 days return now - timedelta(days=amount * 365) # If we can't parse it, return now or None logger.warning(f"Could not parse relative date: {date_str}") return now if default_to_now else None class PipelineResult: """Result from running the full pipeline (legacy format).""" def __init__( self, stage1: Stage1Output | None = None, stage2: Stage2Output | None = None, stage3: Stage3Output | None = None, stage4: Stage4Output | None = None, validation: dict[str, ValidationResult] | None = None, ): self.stage1 = stage1 self.stage2 = stage2 self.stage3 = stage3 self.stage4 = stage4 self.validation = validation or {} @property def success(self) -> bool: """Check if all ran stages passed validation.""" return all(v["passed"] for v in self.validation.values()) def to_dict(self) -> dict[str, Any]: """Convert to dictionary.""" return { "stage1": self.stage1, "stage2": self.stage2, "stage3": self.stage3, "stage4": self.stage4, "validation": self.validation, "success": self.success, } class ReviewIQPipeline(BasePipeline): """ ReviewIQ Classification Pipeline. Implements the BasePipeline interface for the extensible pipeline system. Classifies reviews using URT taxonomy, detects issues, and aggregates metrics. Stages: - normalize: Text cleaning, language detection, deduplication - classify: LLM-powered span extraction with URT codes - route: Route negative spans to issues - aggregate: Pre-aggregate metrics for dashboards Usage: config = Config(database_url="...", llm_provider="openai", ...) pipeline = ReviewIQPipeline(config) await pipeline.initialize() # Run full pipeline result = await pipeline.process(scraper_output) # Or run specific stages result = await pipeline.process(scraper_output, stages=["normalize", "classify"]) """ def __init__(self, config: Config | None = None): """ Initialize the pipeline. Args: config: Pipeline configuration. If None, loads from environment. """ self._config = config or Config() self._db: DatabasePool | None = None self._review_repo: ReviewRepository | None = None self._span_repo: SpanRepository | None = None self._issue_repo: IssueRepository | None = None self._fact_repo: FactRepository | None = None self._embedding_service: EmbeddingService | None = None self._initialized = False @property def config(self) -> Config: """Get pipeline configuration.""" return self._config @property def metadata(self) -> PipelineMetadata: """Get pipeline metadata.""" return PipelineMetadata( id="reviewiq", name="ReviewIQ Classification Pipeline", description="Classifies reviews using URT taxonomy, detects issues, and aggregates metrics for dashboards", version="1.0.0", stages=STAGE_NAMES, input_type="ScraperV1Output", ) async def initialize(self) -> None: """Initialize database connections and services.""" if self._initialized: return logger.info("Initializing ReviewIQ pipeline...") # Initialize database self._db = DatabasePool(self._config) await self._db.initialize() # Initialize repositories self._review_repo = ReviewRepository(self._db) self._span_repo = SpanRepository(self._db) self._issue_repo = IssueRepository(self._db) self._fact_repo = FactRepository(self._db) # Initialize embedding service self._embedding_service = EmbeddingService(self._config) self._initialized = True logger.info("ReviewIQ pipeline initialized") async def close(self) -> None: """Close all connections and cleanup resources.""" if self._db: await self._db.close() self._db = None self._initialized = False logger.info("ReviewIQ pipeline closed") async def migrate(self) -> int: """ Run database migrations. Returns: Number of migrations run """ if not self._db: self._db = DatabasePool(self._config) await self._db.initialize() return await self._db.run_migrations() async def process( self, input_data: dict[str, Any], stages: list[str] | None = None, ) -> BasePipelineResult: """ Process input data through the pipeline. Args: input_data: ScraperV1Output or compatible dictionary stages: List of stage names to run (default: all) Returns: BasePipelineResult with stage outputs """ await self.initialize() # Default to all stages stages = stages or STAGE_NAMES stages_run: list[str] = [] stage_results: dict[str, StageResult] = {} # Convert input to ScraperOutput if needed (may fetch from DB) scraper_output = await self._ensure_scraper_output(input_data) # Extract job_id for linking issues to pipeline executions job_id = scraper_output.get("job_id") # Track intermediate results for stage dependencies stage1_result: Stage1Output | None = None stage2_result: Stage2Output | None = None try: # Stage 1: Normalize if "normalize" in stages: start = time.time() logger.info("Running Stage 1: Normalization") try: stage1_result = await self._run_normalize(scraper_output) duration_ms = int((time.time() - start) * 1000) stages_run.append("normalize") stage_results["normalize"] = StageResult( stage="normalize", success=True, data={"stats": stage1_result.get("stats", {})}, error=None, duration_ms=duration_ms, ) except Exception as e: logger.exception("Stage 1 failed") stage_results["normalize"] = StageResult( stage="normalize", success=False, data={}, error=str(e), duration_ms=int((time.time() - start) * 1000), ) return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=False, error=f"normalize failed: {e}", ) # Stage 2: Classify # If classify is requested but we don't have stage1_result, try to fetch from DB if "classify" in stages and not stage1_result and job_id: logger.info("No stage1_result, fetching existing normalized reviews from database") stage1_result = await self._fetch_normalized_reviews_from_db(job_id) if stage1_result: logger.info(f"Loaded {len(stage1_result.get('reviews_normalized', []))} reviews from DB for reclassification") # Clean up old spans and issues before reclassification if self._span_repo: deactivated = await self._span_repo.deactivate_spans_for_job(job_id) logger.info(f"Deactivated {deactivated} existing spans for job {job_id}") if self._issue_repo: deleted = await self._issue_repo.delete_issues_for_job(job_id) logger.info(f"Deleted {deleted} existing issues for job {job_id}") if "classify" in stages and stage1_result: start = time.time() logger.info("Running Stage 2: Classification") try: stage2_result = await self._run_classify(stage1_result) duration_ms = int((time.time() - start) * 1000) stages_run.append("classify") stage_results["classify"] = StageResult( stage="classify", success=True, data={"stats": stage2_result.get("stats", {})}, error=None, duration_ms=duration_ms, ) except Exception as e: logger.exception("Stage 2 failed") stage_results["classify"] = StageResult( stage="classify", success=False, data={}, error=str(e), duration_ms=int((time.time() - start) * 1000), ) return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=False, error=f"classify failed: {e}", ) # Stage 3: Route if "route" in stages and stage2_result: start = time.time() logger.info("Running Stage 3: Issue Routing") try: stage3_result = await self._run_route(stage2_result, job_id=job_id) duration_ms = int((time.time() - start) * 1000) stages_run.append("route") stage_results["route"] = StageResult( stage="route", success=True, data={"stats": stage3_result.get("stats", {})}, error=None, duration_ms=duration_ms, ) except Exception as e: logger.exception("Stage 3 failed") stage_results["route"] = StageResult( stage="route", success=False, data={}, error=str(e), duration_ms=int((time.time() - start) * 1000), ) return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=False, error=f"route failed: {e}", ) # Stage 4: Aggregate if "aggregate" in stages: start = time.time() logger.info("Running Stage 4: Aggregation") try: stage4_result = await self._run_aggregate( scraper_output["business_id"], date.today().isoformat(), ) duration_ms = int((time.time() - start) * 1000) stages_run.append("aggregate") stage_results["aggregate"] = StageResult( stage="aggregate", success=True, data={"stats": stage4_result.get("stats", {})}, error=None, duration_ms=duration_ms, ) except Exception as e: logger.exception("Stage 4 failed") stage_results["aggregate"] = StageResult( stage="aggregate", success=False, data={}, error=str(e), duration_ms=int((time.time() - start) * 1000), ) return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=False, error=f"aggregate failed: {e}", ) # Stage 5: Synthesize (AI-generated narratives) # Requires job_id and execution_id from pipeline execution tracking if "synthesize" in stages and job_id: start = time.time() logger.info("Running Stage 5: Synthesis") try: # Get the execution_id for this pipeline run execution_id = input_data.get("execution_id") if execution_id: stage5_result = await self._run_synthesize(job_id, execution_id) duration_ms = int((time.time() - start) * 1000) stages_run.append("synthesize") stage_results["synthesize"] = StageResult( stage="synthesize", success=True, data={ "actions_generated": len(stage5_result.action_plan) if stage5_result else 0, "has_narrative": bool(stage5_result and stage5_result.executive_narrative), }, error=None, duration_ms=duration_ms, ) else: logger.warning("No execution_id provided, skipping synthesis") except Exception as e: logger.exception("Stage 5 failed") stage_results["synthesize"] = StageResult( stage="synthesize", success=False, data={}, error=str(e), duration_ms=int((time.time() - start) * 1000), ) # Synthesis failure is non-fatal - pipeline still succeeds logger.warning(f"Synthesis failed but continuing: {e}") return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=True, ) except Exception as e: logger.exception("Pipeline failed with unexpected error") return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, stage_results=stage_results, success=False, error=str(e), ) def get_dashboard_config(self) -> DashboardConfig: """Get the dashboard configuration for ReviewIQ.""" return DashboardConfig( pipeline_id="reviewiq", title="ReviewIQ Analytics", description="Review classification insights and issue tracking", sections=[ DashboardSection( id="overview", title="Overview", description="Key metrics at a glance", widgets=[ WidgetConfig( id="total_reviews", type="stat_card", title="Total Reviews", grid={"x": 0, "y": 0, "w": 3, "h": 1}, config={ "value_key": "total_reviews", "format": "{value:,}", "icon": "message-square", "color": "blue", }, ), WidgetConfig( id="reviews_processed", type="stat_card", title="Reviews Processed", grid={"x": 3, "y": 0, "w": 3, "h": 1}, config={ "value_key": "reviews_processed", "format": "{value:,}", "trend_key": "processed_change", "icon": "check-circle", "color": "green", }, ), WidgetConfig( id="issues_found", type="stat_card", title="Issues Found", grid={"x": 6, "y": 0, "w": 3, "h": 1}, config={ "value_key": "issues_count", "format": "{value:,}", "icon": "alert-triangle", "color": "red", }, ), WidgetConfig( id="avg_rating", type="stat_card", title="Avg Rating", grid={"x": 9, "y": 0, "w": 3, "h": 1}, config={ "value_key": "avg_rating", "format": "{value:.1f}", "trend_key": "rating_change", "icon": "star", "color": "yellow", }, ), ], collapsed=False, ), DashboardSection( id="sentiment", title="Sentiment Analysis", description="Review sentiment distribution", widgets=[ WidgetConfig( id="sentiment_distribution", type="pie_chart", title="Sentiment Distribution", grid={"x": 0, "y": 0, "w": 4, "h": 2}, config={ "value_key": "count", "label_key": "sentiment", "colors": ["#22c55e", "#ef4444", "#6b7280", "#eab308"], "show_legend": True, }, ), WidgetConfig( id="sentiment_trend", type="line_chart", title="Sentiment Over Time", grid={"x": 4, "y": 0, "w": 8, "h": 2}, config={ "x_axis": {"key": "date", "type": "time"}, "y_axis": {"key": "count", "label": "Reviews"}, "series": [ {"key": "positive", "name": "Positive", "color": "#22c55e"}, {"key": "negative", "name": "Negative", "color": "#ef4444"}, {"key": "neutral", "name": "Neutral", "color": "#6b7280"}, ], "show_legend": True, }, ), ], collapsed=False, ), DashboardSection( id="classification", title="URT Classification", description="Review classification by URT domain", widgets=[ WidgetConfig( id="urt_distribution", type="bar_chart", title="URT Domain Distribution", grid={"x": 0, "y": 0, "w": 6, "h": 2}, config={ "x_axis": {"key": "domain", "type": "category"}, "y_axis": {"key": "count", "label": "Spans"}, "series": [{"key": "count", "name": "Spans"}], }, ), WidgetConfig( id="intensity_heatmap", type="heatmap", title="Domain x Intensity", grid={"x": 6, "y": 0, "w": 6, "h": 2}, config={ "x_key": "intensity", "y_key": "domain", "value_key": "count", "color_scale": ["#f0fdf4", "#22c55e"], "show_values": True, }, ), ], collapsed=False, ), DashboardSection( id="issues", title="Issues", description="Identified issues from negative reviews", widgets=[ WidgetConfig( id="issues_table", type="table", title="Active Issues", grid={"x": 0, "y": 0, "w": 8, "h": 2}, config={ "columns": [ {"key": "domain", "header": "Domain", "width": 100}, {"key": "subcode", "header": "Code", "width": 120}, {"key": "span_count", "header": "Mentions", "width": 80, "align": "right"}, {"key": "max_intensity", "header": "Intensity", "width": 80}, {"key": "state", "header": "State", "width": 80}, ], "row_key": "issue_id", "page_size": 10, "sortable": True, }, ), WidgetConfig( id="issues_by_domain", type="pie_chart", title="Issues by Domain", grid={"x": 8, "y": 0, "w": 4, "h": 2}, config={ "value_key": "count", "label_key": "domain", "show_legend": True, }, ), ], collapsed=False, ), DashboardSection( id="classified_reviews", title="Classified Reviews", description="All reviews with URT classification codes and human-readable meanings", widgets=[ WidgetConfig( id="classified_reviews_table", type="table", title="Reviews with URT Codes", grid={"x": 0, "y": 0, "w": 12, "h": 3}, config={ "columns": [ {"key": "span_text", "header": "Review Excerpt", "width": 300}, {"key": "urt_code", "header": "Code", "width": 80}, {"key": "code_name", "header": "Category", "width": 150}, {"key": "domain_name", "header": "Domain", "width": 100}, {"key": "valence", "header": "Sentiment", "width": 80}, {"key": "intensity", "header": "Intensity", "width": 80}, {"key": "rating", "header": "Stars", "width": 60, "align": "center"}, ], "row_key": "span_id", "page_size": 15, "sortable": True, }, ), ], collapsed=False, ), ], default_time_range="30d", refresh_interval=300, ) async def get_widget_data( self, widget_id: str, params: dict[str, Any], ) -> dict[str, Any]: """ Get data for a specific dashboard widget. Args: widget_id: Widget identifier params: Query parameters (business_id, job_id, time_range, etc.) Returns: Widget data dictionary """ await self.initialize() business_id = params.get("business_id") job_id = params.get("job_id") time_range = params.get("time_range", "30d") match widget_id: # Overview stats case "total_reviews": return await self._get_review_count(business_id, job_id) case "reviews_processed": return await self._get_processed_count(business_id, job_id, time_range) case "issues_found": return await self._get_issues_count(business_id, job_id) case "avg_rating": return await self._get_avg_rating(business_id, job_id, time_range) # Sentiment case "sentiment_distribution": return await self._get_sentiment_distribution(business_id, job_id) case "sentiment_trend": return await self._get_sentiment_trend(business_id, job_id, time_range) # Classification case "urt_distribution": return await self._get_urt_distribution(business_id, job_id) case "intensity_heatmap": return await self._get_intensity_heatmap(business_id, job_id) # Issues case "issues_table": return await self._get_issues_table(business_id, job_id, params) case "issues_by_domain": return await self._get_issues_by_domain(business_id, job_id) # Classified Reviews case "classified_reviews_table": return await self._get_classified_reviews(business_id, job_id, params) case _: logger.warning(f"Unknown widget: {widget_id}") return {"error": f"Unknown widget: {widget_id}"} # ========================================================================= # Legacy Interface (for backward compatibility) # ========================================================================= async def process_legacy( self, scraper_output: ScraperOutput, stages: list[int] | None = None, validate: bool = True, ) -> PipelineResult: """ Run the full pipeline on scraper output (legacy interface). Args: scraper_output: Output from the scraper (Stage 0) stages: List of stage numbers to run (default: all [1, 2, 3, 4]) validate: Whether to validate each stage output Returns: PipelineResult with all stage outputs and validation results """ await self.initialize() stages = stages or [1, 2, 3, 4] result = PipelineResult() validation_results: dict[str, ValidationResult] = {} # Extract job_id for linking issues job_id = scraper_output.get("job_id") # Stage 1: Normalize if 1 in stages: logger.info("Running Stage 1: Normalization") result.stage1 = await self._run_normalize(scraper_output) if validate: validation_results["stage1"] = validate_stage1_output(result.stage1) # Stage 2: Classify if 2 in stages and result.stage1: logger.info("Running Stage 2: Classification") result.stage2 = await self._run_classify(result.stage1) if validate: input_reviews = { (r["source"], r["review_id"], r["review_version"]): r for r in result.stage1["reviews_normalized"] } validation_results["stage2"] = validate_stage2_output( result.stage2, input_reviews ) # Stage 3: Route if 3 in stages and result.stage2: logger.info("Running Stage 3: Issue Routing") result.stage3 = await self._run_route(result.stage2, job_id=job_id) if validate: validation_results["stage3"] = await validate_stage3_output( result.stage3, self._db ) # Stage 4: Aggregate if 4 in stages: logger.info("Running Stage 4: Aggregation") result.stage4 = await self._run_aggregate( scraper_output["business_id"], date.today().isoformat(), ) if validate: validation_results["stage4"] = validate_stage4_output(result.stage4) result.validation = validation_results return result # Alias for backward compatibility async def normalize(self, scraper_output: ScraperOutput) -> Stage1Output: """Run Stage 1: Normalization (legacy method).""" await self.initialize() return await self._run_normalize(scraper_output) async def classify(self, stage1_output: Stage1Output) -> Stage2Output: """Run Stage 2: Classification (legacy method).""" await self.initialize() return await self._run_classify(stage1_output) async def route(self, stage2_output: Stage2Output, job_id: str | None = None) -> Stage3Output: """Run Stage 3: Issue Routing (legacy method).""" await self.initialize() return await self._run_route(stage2_output, job_id=job_id) async def aggregate( self, business_id: str, date_str: str, bucket_types: list[str] | None = None, ) -> Stage4Output: """Run Stage 4: Fact Aggregation (legacy method).""" await self.initialize() return await self._run_aggregate(business_id, date_str, bucket_types) # ========================================================================= # Internal Stage Implementations # ========================================================================= async def _ensure_scraper_output(self, input_data: dict[str, Any]) -> ScraperOutput: """Ensure input data is in ScraperOutput format. If only job_id is provided, fetches job data from the database. """ # If it has all required fields, use as-is required = ["job_id", "business_id", "place_id", "reviews"] if all(k in input_data for k in required): return input_data # type: ignore # If we have a job_id but missing reviews, fetch from database job_id = input_data.get("job_id") if job_id and not input_data.get("reviews") and self._db: logger.info(f"Fetching job data from database for job_id: {job_id}") async with self._db.pool.acquire() as conn: row = await conn.fetchrow( """ SELECT job_id, status, reviews_data, reviews_count, metadata->>'business_name' as business_name, metadata->>'place_id' as place_id, metadata->>'address' as address, metadata->>'category' as category, metadata->>'total_reviews' as total_reviews, metadata->>'average_rating' as average_rating, scraper_version FROM public.jobs WHERE job_id = $1::uuid """, str(job_id), ) if row and row["reviews_data"]: reviews_data = row["reviews_data"] # asyncpg may return JSONB as a string - parse it if needed if isinstance(reviews_data, str): logger.info("Parsing reviews_data JSON string") reviews_data = json.loads(reviews_data) # Convert reviews_data to RawReview format # Handle both API format (review_id, author, rating) and scraper format (reviewId, name, stars) reviews = [] for i, review in enumerate(reviews_data): if isinstance(review, str): # Skip if review is somehow a string logger.warning(f"Skipping review {i}: got string instead of dict") continue # Parse the review time (may be relative like "10 months ago") raw_time = review.get("timestamp") or review.get("publishedAtDate") or "" parsed_time = _parse_relative_date(raw_time) reviews.append({ "review_id": review.get("review_id") or review.get("reviewId") or f"review_{i}", "author_name": review.get("author") or review.get("name") or "Anonymous", "author_id": review.get("reviewerId"), "rating": review.get("rating") or review.get("stars") or 0, "text": review.get("text"), "review_time": parsed_time, "response_text": review.get("responseFromOwner", {}).get("text") if review.get("responseFromOwner") else None, "response_time": review.get("responseFromOwner", {}).get("publishedAtDate") if review.get("responseFromOwner") else None, "photos": review.get("reviewImageUrls"), "raw_payload": review, }) logger.info(f"Loaded {len(reviews)} reviews from job {job_id}") return ScraperOutput( job_id=str(row["job_id"]), status=row["status"] or "completed", business_id=row["business_name"] or "unknown", place_id=row["place_id"] or "unknown", business_info={ "name": row["business_name"] or "", "address": row["address"] or "", "category": row["category"] or "", "total_reviews": int(row["total_reviews"]) if row["total_reviews"] else 0, "average_rating": float(row["average_rating"]) if row["average_rating"] else 0.0, }, reviews=reviews, scrape_time_ms=0, reviews_scraped=len(reviews), scraper_version=row["scraper_version"] or "unknown", ) else: logger.warning(f"No reviews found in database for job_id: {job_id}") # Otherwise, wrap it with empty/default values return ScraperOutput( job_id=input_data.get("job_id", "unknown"), status=input_data.get("status", "completed"), business_id=input_data.get("business_id", "unknown"), place_id=input_data.get("place_id", "unknown"), business_info=input_data.get("business_info", {}), reviews=input_data.get("reviews", []), scrape_time_ms=input_data.get("scrape_time_ms", 0), reviews_scraped=len(input_data.get("reviews", [])), scraper_version=input_data.get("scraper_version", "unknown"), ) async def _fetch_normalized_reviews_from_db(self, job_id: str) -> Stage1Output | None: """Fetch existing normalized reviews from DB for reclassification. Used when running classify stage standalone without normalize. """ if not self._db: return None async with self._db.pool.acquire() as conn: rows = await conn.fetch( """ SELECT source, review_id, review_version, business_id, place_id, text, text_normalized, rating, review_time FROM pipeline.reviews_enriched WHERE job_id = $1::uuid AND is_latest = TRUE ORDER BY review_time DESC """, job_id, ) if not rows: logger.warning(f"No normalized reviews found in DB for job_id: {job_id}") return None reviews_normalized = [ NormalizedReview( source=row["source"], review_id=row["review_id"], review_version=row["review_version"], business_id=row["business_id"], place_id=row["place_id"], text=row["text"], text_normalized=row["text_normalized"], rating=row["rating"], review_time=row["review_time"], ) for row in rows ] logger.info(f"Fetched {len(reviews_normalized)} normalized reviews from DB for job {job_id}") return Stage1Output( job_id=job_id, reviews_normalized=reviews_normalized, reviews_skipped=[], duplicates_found=[], stats={ "total_input": len(reviews_normalized), "processed": len(reviews_normalized), "skipped": 0, "duplicates": 0, "from_db": True, }, ) async def _run_normalize(self, scraper_output: ScraperOutput) -> Stage1Output: """Run normalization stage.""" stage1 = Stage1Normalizer( self._config, self._db, self._review_repo, ) input_data = Stage1Input( job_id=scraper_output["job_id"], business_id=scraper_output["business_id"], place_id=scraper_output["place_id"], reviews=scraper_output["reviews"], ) return await stage1.process(input_data) async def _run_classify(self, stage1_output: Stage1Output) -> Stage2Output: """Run classification stage.""" stage2 = Stage2Classifier( self._config, self._db, self._review_repo, self._span_repo, self._embedding_service, ) reviews_to_classify = [ ReviewToClassify( source=r["source"], review_id=r["review_id"], review_version=r["review_version"], business_id=r["business_id"], place_id=r["place_id"], text=r["text"], text_normalized=r["text_normalized"], rating=r["rating"], review_time=r["review_time"], ) for r in stage1_output["reviews_normalized"] ] input_data = Stage2Input( reviews=reviews_to_classify, config=ClassificationConfig( model=self._config.llm_model, taxonomy_version=self._config.taxonomy_version, profile=self._config.classification_profile, max_spans_per_review=self._config.max_spans_per_review, job_id=stage1_output.get("job_id"), ), ) try: return await stage2.process(input_data) finally: await stage2.close() async def _run_route(self, stage2_output: Stage2Output, job_id: str | None = None) -> Stage3Output: """Run issue routing stage.""" stage3 = Stage3Router( self._config, self._db, self._span_repo, self._issue_repo, ) spans_to_route = [] now = datetime.now() for review in stage2_output["reviews_classified"]: for span in review.get("spans", []): if span["valence"] in ("V-", "V±"): # Use current datetime as fallback for missing review_time review_time = review.get("review_time") or now spans_to_route.append( SpanToRoute( span_id=span["span_id"], business_id=review.get("business_id", ""), place_id=review.get("place_id", ""), urt_primary=span["urt_primary"], valence=span["valence"], intensity=span["intensity"], entity_normalized=span.get("entity_normalized"), review_time=review_time, confidence=span.get("confidence", "medium"), trust_score=review.get("trust_score", 0.5), ) ) return await stage3.process(Stage3Input(spans=spans_to_route, job_id=job_id)) async def _run_aggregate( self, business_id: str, date_str: str, bucket_types: list[str] | None = None, ) -> Stage4Output: """Run fact aggregation stage.""" stage4 = Stage4Aggregator( self._config, self._db, self._fact_repo, ) input_data = Stage4Input( business_id=business_id, date=date_str, bucket_types=bucket_types or ["day"], # type: ignore taxonomy_version=self._config.taxonomy_version, ) return await stage4.process(input_data) async def _run_synthesize(self, job_id: str, execution_id: str): """Run AI synthesis stage to generate narratives and action plans.""" from reviewiq_pipeline.stages.stage5_synthesize import Synthesis # Create LLM client for synthesis llm_client = LLMClient.create(self._config) try: stage5 = Stage5Synthesizer( pool=self._db.pool, llm_client=llm_client, ) return await stage5.run(job_id, execution_id) finally: await llm_client.close() # ========================================================================= # Widget Data Methods # ========================================================================= async def _get_review_count(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get total review count.""" if not self._db: return {"total_reviews": 0} async with self._db._pool.acquire() as conn: if job_id: count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", job_id, ) elif business_id: count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE business_id = $1", business_id, ) else: count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw" ) return {"total_reviews": count or 0} async def _get_processed_count( self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get processed review count with trend.""" if not self._db: return {"reviews_processed": 0, "processed_change": 0} # Parse time range days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: if job_id: # When filtering by job_id, just return count for that job current = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", job_id, ) return {"reviews_processed": current or 0, "processed_change": 0} elif business_id: current = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1 AND processed_at >= NOW() - INTERVAL '1 day' * $2 """, business_id, days, ) previous = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1 AND processed_at >= NOW() - INTERVAL '1 day' * $2 AND processed_at < NOW() - INTERVAL '1 day' * $3 """, business_id, days * 2, days, ) else: current = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE processed_at >= NOW() - INTERVAL '1 day' * $1 """, days, ) previous = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE processed_at >= NOW() - INTERVAL '1 day' * $1 AND processed_at < NOW() - INTERVAL '1 day' * $2 """, days * 2, days, ) current = current or 0 previous = previous or 0 change = ((current - previous) / previous * 100) if previous > 0 else 0 return { "reviews_processed": current, "processed_change": round(change, 1), } async def _get_issues_count(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get open issues count.""" if not self._db: return {"issues_count": 0} async with self._db._pool.acquire() as conn: if job_id: count = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.issues WHERE job_id = $1::uuid AND state = 'open' """, job_id, ) elif business_id: count = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1 AND state = 'open' """, business_id, ) else: count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE state = 'open'" ) return {"issues_count": count or 0} async def _get_avg_rating( self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get average rating with trend.""" if not self._db: return {"avg_rating": 0, "rating_change": 0} days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: if job_id: current = await conn.fetchval( "SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", job_id, ) return {"avg_rating": round(float(current), 2) if current else 0, "rating_change": 0} elif business_id: current = await conn.fetchval( """ SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE business_id = $1 AND review_time >= NOW() - INTERVAL '1 day' * $2 """, business_id, days, ) previous = await conn.fetchval( """ SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE business_id = $1 AND review_time >= NOW() - INTERVAL '1 day' * $2 AND review_time < NOW() - INTERVAL '1 day' * $3 """, business_id, days * 2, days, ) else: current = await conn.fetchval( """ SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE review_time >= NOW() - INTERVAL '1 day' * $1 """, days, ) previous = await conn.fetchval( """ SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE review_time >= NOW() - INTERVAL '1 day' * $1 AND review_time < NOW() - INTERVAL '1 day' * $2 """, days * 2, days, ) current = float(current) if current else 0 previous = float(previous) if previous else 0 change = current - previous return { "avg_rating": round(current, 2), "rating_change": round(change, 2), } async def _get_sentiment_distribution( self, business_id: str | None, job_id: str | None = None ) -> dict[str, Any]: """Get sentiment distribution for pie chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT valence, COUNT(*) as count FROM pipeline.review_spans WHERE job_id = $1::uuid AND is_active = TRUE GROUP BY valence """, job_id, ) elif business_id: rows = await conn.fetch( """ SELECT valence, COUNT(*) as count FROM pipeline.review_spans WHERE business_id = $1 AND is_active = TRUE GROUP BY valence """, business_id, ) else: rows = await conn.fetch( """ SELECT valence, COUNT(*) as count FROM pipeline.review_spans WHERE is_active = TRUE GROUP BY valence """ ) # Map valence codes to labels valence_labels = { "V+": "Positive", "V-": "Negative", "V0": "Neutral", "V±": "Mixed", } data = [ { "sentiment": valence_labels.get(row["valence"], row["valence"]), "count": row["count"], } for row in rows ] return {"data": data} async def _get_sentiment_trend( self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get sentiment trend over time for line chart.""" if not self._db: return {"data": []} days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT DATE(review_time) as date, COUNT(*) FILTER (WHERE valence = 'V+') as positive, COUNT(*) FILTER (WHERE valence = 'V-') as negative, COUNT(*) FILTER (WHERE valence = 'V0') as neutral FROM pipeline.review_spans WHERE job_id = $1::uuid AND is_active = TRUE GROUP BY DATE(review_time) ORDER BY date """, job_id, ) elif business_id: rows = await conn.fetch( """ SELECT DATE(review_time) as date, COUNT(*) FILTER (WHERE valence = 'V+') as positive, COUNT(*) FILTER (WHERE valence = 'V-') as negative, COUNT(*) FILTER (WHERE valence = 'V0') as neutral FROM pipeline.review_spans WHERE business_id = $1 AND review_time >= NOW() - INTERVAL '1 day' * $2 AND is_active = TRUE GROUP BY DATE(review_time) ORDER BY date """, business_id, days, ) else: rows = await conn.fetch( """ SELECT DATE(review_time) as date, COUNT(*) FILTER (WHERE valence = 'V+') as positive, COUNT(*) FILTER (WHERE valence = 'V-') as negative, COUNT(*) FILTER (WHERE valence = 'V0') as neutral FROM pipeline.review_spans WHERE review_time >= NOW() - INTERVAL '1 day' * $1 AND is_active = TRUE GROUP BY DATE(review_time) ORDER BY date """, days, ) data = [ { "date": row["date"].isoformat(), "positive": row["positive"], "negative": row["negative"], "neutral": row["neutral"], } for row in rows ] return {"data": data} async def _get_urt_distribution(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get URT domain distribution for bar chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, COUNT(*) as count FROM pipeline.review_spans WHERE job_id = $1::uuid AND is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1) ORDER BY count DESC """, job_id, ) elif business_id: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, COUNT(*) as count FROM pipeline.review_spans WHERE business_id = $1 AND is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1) ORDER BY count DESC """, business_id, ) else: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, COUNT(*) as count FROM pipeline.review_spans WHERE is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1) ORDER BY count DESC """ ) # Map domain codes to names domain_names = { "O": "Overall", "P": "People", "J": "Journey", "E": "Environment", "A": "Administrative", "V": "Value", "R": "Reliability", } data = [ { "domain": domain_names.get(row["domain"], row["domain"]), "count": row["count"], } for row in rows ] return {"data": data} async def _get_intensity_heatmap(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get domain x intensity heatmap data.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, intensity, COUNT(*) as count FROM pipeline.review_spans WHERE job_id = $1::uuid AND is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1), intensity """, job_id, ) elif business_id: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, intensity, COUNT(*) as count FROM pipeline.review_spans WHERE business_id = $1 AND is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1), intensity """, business_id, ) else: rows = await conn.fetch( """ SELECT SUBSTRING(urt_primary, 1, 1) as domain, intensity, COUNT(*) as count FROM pipeline.review_spans WHERE is_active = TRUE GROUP BY SUBSTRING(urt_primary, 1, 1), intensity """ ) domain_names = { "O": "Overall", "P": "People", "J": "Journey", "E": "Environment", "A": "Administrative", "V": "Value", "R": "Reliability", } data = [ { "domain": domain_names.get(row["domain"], row["domain"]), "intensity": row["intensity"], "count": row["count"], } for row in rows ] return {"data": data} async def _get_issues_table( self, business_id: str | None, job_id: str | None, params: dict[str, Any] ) -> dict[str, Any]: """Get issues table data.""" if not self._db: return {"data": [], "total": 0} page = params.get("page", 1) page_size = params.get("page_size", 10) offset = (page - 1) * page_size async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT issue_id, domain, primary_subcode as subcode, span_count, max_intensity, state FROM pipeline.issues WHERE job_id = $1::uuid ORDER BY span_count DESC, created_at DESC LIMIT $2 OFFSET $3 """, job_id, page_size, offset, ) total = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE job_id = $1::uuid", job_id, ) elif business_id: rows = await conn.fetch( """ SELECT issue_id, domain, primary_subcode as subcode, span_count, max_intensity, state FROM pipeline.issues WHERE business_id = $1 ORDER BY span_count DESC, created_at DESC LIMIT $2 OFFSET $3 """, business_id, page_size, offset, ) total = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id, ) else: rows = await conn.fetch( """ SELECT issue_id, domain, primary_subcode as subcode, span_count, max_intensity, state FROM pipeline.issues ORDER BY span_count DESC, created_at DESC LIMIT $1 OFFSET $2 """, page_size, offset, ) total = await conn.fetchval("SELECT COUNT(*) FROM pipeline.issues") data = [dict(row) for row in rows] return {"data": data, "total": total or 0} async def _get_issues_by_domain(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get issues grouped by domain for pie chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: if job_id: rows = await conn.fetch( """ SELECT domain, COUNT(*) as count FROM pipeline.issues WHERE job_id = $1::uuid GROUP BY domain ORDER BY count DESC """, job_id, ) elif business_id: rows = await conn.fetch( """ SELECT domain, COUNT(*) as count FROM pipeline.issues WHERE business_id = $1 GROUP BY domain ORDER BY count DESC """, business_id, ) else: rows = await conn.fetch( """ SELECT domain, COUNT(*) as count FROM pipeline.issues GROUP BY domain ORDER BY count DESC """ ) data = [{"domain": row["domain"], "count": row["count"]} for row in rows] return {"data": data} async def _get_classified_reviews( self, business_id: str | None, job_id: str | None, params: dict[str, Any] ) -> dict[str, Any]: """Get classified reviews with URT codes and human-readable names.""" if not self._db: return {"data": [], "total": 0} page = params.get("page", 1) page_size = params.get("page_size", 15) offset = (page - 1) * page_size async with self._db._pool.acquire() as conn: # Build the query with JOINs to get human-readable code names base_query = """ SELECT s.span_id, s.span_text, s.urt_primary as urt_code, COALESCE(sub.name, cat.name, dom.name) as code_name, COALESCE(sub.definition, dom.description) as code_definition, dom.name as domain_name, CASE s.valence WHEN 'V+' THEN 'Positive' WHEN 'V-' THEN 'Negative' WHEN 'V0' THEN 'Neutral' WHEN 'V±' THEN 'Mixed' ELSE s.valence END as valence, CASE s.intensity WHEN 'I1' THEN 'Mild' WHEN 'I2' THEN 'Moderate' WHEN 'I3' THEN 'Strong' ELSE s.intensity END as intensity, e.rating, s.review_time FROM pipeline.review_spans s LEFT JOIN pipeline.reviews_enriched e ON s.review_id = e.review_id AND s.review_version = e.review_version LEFT JOIN pipeline.urt_domains dom ON SUBSTRING(s.urt_primary, 1, 1) = dom.code LEFT JOIN pipeline.urt_categories cat ON SUBSTRING(s.urt_primary, 1, 2) = cat.code LEFT JOIN pipeline.urt_subcodes sub ON s.urt_primary = sub.code WHERE s.is_active = TRUE """ count_query = """ SELECT COUNT(*) FROM pipeline.review_spans s WHERE s.is_active = TRUE """ if job_id: base_query += " AND s.job_id = $1::uuid" count_query += " AND s.job_id = $1::uuid" base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" rows = await conn.fetch(base_query, job_id) total = await conn.fetchval(count_query, job_id) elif business_id: base_query += " AND s.business_id = $1" count_query += " AND s.business_id = $1" base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" rows = await conn.fetch(base_query, business_id) total = await conn.fetchval(count_query, business_id) else: base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" rows = await conn.fetch(base_query) total = await conn.fetchval(count_query) data = [ { "span_id": row["span_id"], "span_text": row["span_text"], "urt_code": row["urt_code"], "code_name": row["code_name"] or "Unknown", "code_definition": row["code_definition"] or "", "domain_name": row["domain_name"] or "Unknown", "valence": row["valence"], "intensity": row["intensity"], "rating": row["rating"], "review_time": row["review_time"].isoformat() if row["review_time"] else None, } for row in rows ] return {"data": data, "total": total or 0} def _parse_time_range(self, time_range: str) -> int: """Parse time range string to days.""" if time_range.endswith("d"): return int(time_range[:-1]) elif time_range.endswith("w"): return int(time_range[:-1]) * 7 elif time_range.endswith("m"): return int(time_range[:-1]) * 30 else: return 30 # Default to 30 days # Alias for backward compatibility Pipeline = ReviewIQPipeline