#!/usr/bin/env python3 """ Optimized ReviewIQ Analytics endpoint. Provides a single API endpoint returning all dashboard data with optimized SQL queries. Replaces multiple widget queries with 4-5 efficient queries. """ import logging from datetime import datetime, timedelta from typing import Any import asyncpg from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field log = logging.getLogger(__name__) # Create router router = APIRouter(prefix="/api/pipelines/reviewiq", tags=["reviewiq-analytics"]) # Database pool (set by main server) _pool: asyncpg.Pool | None = None def set_database(pool: asyncpg.Pool) -> None: """Set the database pool for analytics operations.""" global _pool _pool = pool # ==================== Pydantic Models ==================== class OverviewStats(BaseModel): """Overview statistics for the dashboard.""" total_reviews: int = Field(0, description="Total reviews processed") total_spans: int = Field(0, description="Total classified spans") open_issues: int = Field(0, description="Open issues count") avg_rating: float | None = Field(None, description="Average review rating") positive_count: int = Field(0, description="Positive sentiment count") negative_count: int = Field(0, description="Negative sentiment count") neutral_count: int = Field(0, description="Neutral sentiment count") mixed_count: int = Field(0, description="Mixed sentiment count") class SentimentDataPoint(BaseModel): """Single data point for sentiment distribution.""" valence: str = Field(..., description="Valence label (V+, V-, V0, V±)") count: int = Field(..., description="Count of spans (mentions)") review_count: int = Field(0, description="Count of distinct reviews") percentage: float = Field(..., description="Percentage of total reviews") class SentimentTrendPoint(BaseModel): """Single data point for sentiment trend over time.""" period: str = Field(..., description="Time period (e.g., '2024-W01')") positive: int = Field(0, description="Positive count") negative: int = Field(0, description="Negative count") neutral: int = Field(0, description="Neutral count") mixed: int = Field(0, description="Mixed count") class SentimentData(BaseModel): """Sentiment distribution and trend data.""" distribution: list[SentimentDataPoint] = Field(default_factory=list) trend: list[SentimentTrendPoint] = Field(default_factory=list) class URTDomainPoint(BaseModel): """URT domain distribution point with sentiment breakdown.""" domain: str = Field(..., description="Domain code (P, J, O, A)") domain_name: str = Field(..., description="Domain display name") count: int = Field(..., description="Count of spans (mentions)") review_count: int = Field(0, description="Count of distinct reviews affected") percentage: float = Field(..., description="Percentage of total reviews") positive_count: int = Field(0, description="Positive sentiment spans") negative_count: int = Field(0, description="Negative sentiment spans") neutral_count: int = Field(0, description="Neutral sentiment spans") positive_reviews: int = Field(0, description="Reviews with positive sentiment") negative_reviews: int = Field(0, description="Reviews with negative sentiment") class IntensityPoint(BaseModel): """Intensity distribution by domain.""" domain: str = Field(..., description="Domain code") intensity: str = Field(..., description="Intensity level (I1, I2, I3)") count: int = Field(..., description="Count of spans") class URTData(BaseModel): """URT domain distribution and heatmap data.""" domains: list[URTDomainPoint] = Field(default_factory=list) intensity_heatmap: list[IntensityPoint] = Field(default_factory=list) class IssueItem(BaseModel): """Single issue item with enriched URT metadata.""" issue_id: str = Field(..., description="Issue identifier") primary_subcode: str = Field(..., description="URT subcode") subcode_name: str | None = Field(None, description="Subcode display name") subcode_definition: str | None = Field(None, description="Subcode definition") solution: str | None = Field(None, description="Recommended action") solution_complexity: str | None = Field(None, description="Solution complexity (simple/medium/complex)") domain: str = Field(..., description="Domain code") domain_name: str | None = Field(None, description="Domain display name") category_name: str | None = Field(None, description="Category display name") default_owner: str | None = Field(None, description="Default owner team") negative_example: str | None = Field(None, description="Example negative review text") entity: str | None = Field(None, description="Related entity") state: str = Field(..., description="Issue state") priority_score: float = Field(..., description="Priority score") span_count: int = Field(..., description="Number of related spans") max_intensity: str | None = Field(None, description="Maximum intensity") created_at: str | None = Field(None, description="Creation timestamp") class PaginatedIssues(BaseModel): """Paginated issues list.""" items: list[IssueItem] = Field(default_factory=list) total: int = Field(0, description="Total count") page: int = Field(1, description="Current page") page_size: int = Field(10, description="Items per page") class SpanItem(BaseModel): """Single classified span.""" span_id: str = Field(..., description="Span identifier") span_text: str = Field(..., description="Span text content") urt_primary: str | None = Field(None, description="Primary URT code") valence: str | None = Field(None, description="Valence") intensity: str | None = Field(None, description="Intensity") review_time: str | None = Field(None, description="Review timestamp") source_review_id: str | None = Field(None, description="Source review ID") entity: str | None = Field(None, description="Entity mentioned") class PaginatedSpans(BaseModel): """Paginated spans list.""" items: list[SpanItem] = Field(default_factory=list) total: int = Field(0, description="Total count") page: int = Field(1, description="Current page") page_size: int = Field(10, description="Items per page") class TimelinePoint(BaseModel): """Single point on the timeline chart.""" date: str = Field(..., description="Date string (YYYY-MM-DD or YYYY-WXX)") review_count: int = Field(0, description="Number of reviews") span_count: int = Field(0, description="Number of spans") avg_rating: float | None = Field(None, description="Average rating") positive_count: int = Field(0, description="Positive sentiment count") negative_count: int = Field(0, description="Negative sentiment count") # ==================== Trend Models ==================== class TrendDataPoint(BaseModel): """Single data point for a trend item.""" date: str = Field(..., description="Date string (YYYY-MM-DD)") count: int = Field(0, description="Total span count") positive: int = Field(0, description="Positive sentiment count") negative: int = Field(0, description="Negative sentiment count") review_count: int = Field(0, description="Number of distinct reviews") # Sentiment trend sentiment_score: float = Field(0, description="Sentiment score: (positive-negative)/total * 100, range -100 to +100") # Rating impact metrics (the business value) avg_rating_negative: float | None = Field(None, description="Avg stars when complaints mention this category - THE DAMAGE METRIC") avg_rating_positive: float | None = Field(None, description="Avg stars when praise mentions this category - THE STRENGTH METRIC") class TrendItem(BaseModel): """A single trend line/series.""" id: str = Field(..., description="Item code (e.g., 'P' or 'P.FRIE')") label: str = Field(..., description="Human-readable label") color: str = Field(..., description="Color hex code") data: list[TrendDataPoint] = Field(default_factory=list, description="Trend data points") # ==================== Domain Scores & Insights ==================== class DomainScore(BaseModel): """Domain-level KPI score.""" domain: str = Field(..., description="Domain code") name: str = Field(..., description="Domain display name") score: float = Field(..., description="Score 0-100") status: str = Field(..., description="Status: good/warning/critical") trend: str | None = Field(None, description="Trend vs previous period (e.g., '+3.2')") positive_count: int = Field(0, description="Positive spans") negative_count: int = Field(0, description="Negative spans") total_count: int = Field(0, description="Total spans") class StrengthItem(BaseModel): """A strength (highly positive subcode).""" rank: int = Field(..., description="Rank order") subcode: str = Field(..., description="URT subcode") subcode_name: str = Field(..., description="Subcode display name") domain: str = Field(..., description="Domain code") domain_name: str = Field(..., description="Domain display name") positive_percentage: float = Field(..., description="% positive sentiment") span_count: int = Field(..., description="Total mentions") marketing_angle: str | None = Field(None, description="Marketing suggestion") class WeaknessItem(BaseModel): """A weakness (negative issue to fix).""" rank: int = Field(..., description="Rank order") issue_id: str | None = Field(None, description="Related issue ID if exists") subcode: str = Field(..., description="URT subcode") subcode_name: str = Field(..., description="Subcode display name") domain: str = Field(..., description="Domain code") domain_name: str = Field(..., description="Domain display name") negative_percentage: float = Field(..., description="% negative sentiment") span_count: int = Field(..., description="Total negative mentions") intensity: str | None = Field(None, description="Max intensity") solution: str | None = Field(None, description="Recommended action") solution_complexity: str | None = Field(None, description="Complexity") projected_rating_impact: float | None = Field(None, description="Potential rating gain if fixed") owner: str | None = Field(None, description="Default owner team") class RatingSimulator(BaseModel): """Rating impact simulation.""" current_rating: float = Field(..., description="Current average rating") if_fix_top_1: float | None = Field(None, description="Projected rating if top 1 issue fixed") if_fix_top_3: float | None = Field(None, description="Projected rating if top 3 issues fixed") potential_gain: float = Field(0, description="Maximum potential rating gain") class OpportunitySpan(BaseModel): """A span (customer feedback) related to an opportunity item.""" span_id: str = Field(..., description="Span identifier") span_text: str = Field(..., description="The classified span text") review_text: str | None = Field(None, description="Full review text for context") rating: int | None = Field(None, description="Source review rating") review_id: str | None = Field(None, description="Source review ID for navigation") review_date: str | None = Field(None, description="Review date") class OpportunityItem(BaseModel): """An item in the opportunity matrix with coordinates and detail data.""" subcode: str = Field(..., description="URT subcode") name: str = Field(..., description="Human-readable subcode name") x: float = Field(..., description="X position (0-1, frequency within quadrant)") y: float = Field(..., description="Y position (0-1, effort within quadrant)") # Detail data for hover/click domain: str = Field(..., description="Domain code (P, J, O, etc.)") domain_name: str = Field(..., description="Domain display name") negative_pct: float = Field(..., description="Percentage of negative mentions") span_count: int = Field(..., description="Number of mentions") solution: str | None = Field(None, description="Suggested solution from taxonomy") complexity: str = Field(..., description="Solution complexity (simple/medium/complex)") rating_impact: float | None = Field(None, description="Projected rating improvement") owner: str | None = Field(None, description="Suggested owner/team") example: str | None = Field(None, description="Example negative quote") spans: list[OpportunitySpan] = Field(default_factory=list, description="Sample customer feedback spans") class OpportunityMatrix(BaseModel): """2x2 opportunity matrix.""" quick_wins: list[OpportunityItem] = Field(default_factory=list, description="High freq + simple") critical: list[OpportunityItem] = Field(default_factory=list, description="High freq + complex") nice_to_have: list[OpportunityItem] = Field(default_factory=list, description="Low freq + simple") strategic: list[OpportunityItem] = Field(default_factory=list, description="Low freq + complex") class Insights(BaseModel): """Business insights including strengths and weaknesses.""" strengths: list[StrengthItem] = Field(default_factory=list) weaknesses: list[WeaknessItem] = Field(default_factory=list) rating_simulator: RatingSimulator | None = Field(None) opportunity_matrix: OpportunityMatrix | None = Field(None) executive_summary: str = Field("", description="Auto-generated summary") # ==================== Report Synthesis Models ==================== class ReportActionResponse(BaseModel): """A prioritized action item for the analyst report.""" priority: str = Field(..., description="critical/high/medium") action: str = Field(..., description="What to do") owner: str = Field(..., description="Who owns it") impact: str = Field(..., description="Expected result") impact_stars: float = Field(0.1, description="Numeric star impact") effort: str = Field("moderate", description="quick_win/moderate/strategic") evidence: str = Field("", description="Supporting quote") complaint_count: int = Field(0, description="Number of complaints addressed") success_metric: str = Field("", description="Measurable success KPI") class ReportEvidenceResponse(BaseModel): """A curated quote that supports the narrative.""" quote: str = Field(..., description="Customer words") context: str = Field(..., description="What this proves") sentiment: str = Field("damaging", description="damaging/praising") weight: str = Field("notable", description="critical/notable") class ReportStrengthResponse(BaseModel): """A key strength to protect and leverage.""" title: str = Field(..., description="Strength title") mention_count: int = Field(0, description="Number of mentions") quote: str = Field("", description="Supporting quote") marketing_angle: str = Field("", description="How to leverage in marketing") class SynthesisResponse(BaseModel): """Analyst report synthesis - consultant-quality business narrative. Supports both legacy format (v1) and new 6-section format (v2). Frontend uses type guards to determine which format to render. """ # Version indicator - "2.0" for new format, absent for legacy report_version: str | None = Field(None, description="Report format version") # ===== LEGACY FORMAT FIELDS (v1) ===== # The Verdict headline: str = Field("", description="One punchy insight line") verdict: str = Field("", description="One sentence executive summary") current_rating: float = Field(0.0, description="Current average rating") potential_rating: float = Field(0.0, description="Achievable rating if issues fixed") rating_gap: float = Field(0.0, description="Potential improvement") # The Story narrative: str = Field("", description="2-3 paragraph consultant-quality prose") # Section Headlines sentiment_headline: str = Field("", description="Insight-first title for sentiment chart") category_headline: str = Field("", description="Insight-first title for category breakdown") timeline_headline: str = Field("", description="Insight-first title for timeline") strengths_headline: str = Field("", description="Insight-first title for strengths") # The Diagnosis primary_problem: str = Field("", description="The #1 issue in plain English") primary_problem_code: str = Field("", description="URT code") root_cause: str = Field("", description="Why this keeps happening") # The Prescription (v1) actions: list[ReportActionResponse] = Field(default_factory=list, description="Prioritized actions") # The Evidence evidence: list[ReportEvidenceResponse] = Field(default_factory=list, description="Curated quotes") # The Strengths (can be v1 or v2 format depending on report_version) # V1: list[ReportStrengthResponse], V2: list[StrengthToProtect dict] # Using list[Any] to prevent Pydantic from coercing/dropping fields strengths: list[Any] = Field(default_factory=list, description="Key strengths to protect") # Momentum momentum: str = Field("stable", description="improving/declining/stable") momentum_detail: str = Field("", description="Trend explanation") # Metadata generated_at: str | None = Field(None, description="When report was generated") review_count: int = Field(0, description="Total reviews analyzed") insight_count: int = Field(0, description="Total insights extracted") # ===== NEW FORMAT FIELDS (v2 - 6-section report) ===== report_title: str = Field("", description="Report title for v2") report_date: str = Field("", description="Report date for v2") business_name: str = Field("", description="Business name for v2") analysis_period: str = Field("", description="Analysis period for v2") # Section 1: Executive Summary (v2) executive_summary: dict | None = Field(None, description="V2 executive summary section") # Section 2: Risk Scorecard (v2) risk_scorecard: dict | None = Field(None, description="V2 risk scorecard section") # Section 3: Critical Issues (v2) critical_issues: list[dict] = Field(default_factory=list, description="V2 critical issues") # Section 4: Strengths to Protect (v2) # Note: For V2 responses, 'strengths' contains StrengthToProtect objects # For V1 responses, 'strengths' contains ReportStrengthResponse objects # Section 5: Action Matrix (v2) action_matrix: list[dict] = Field(default_factory=list, description="V2 action matrix") # Section 6: 90-Day Tracking (v2) tracking_kpis: list[dict] = Field(default_factory=list, description="V2 tracking KPIs") # Charts for visualization (v2) charts: dict | None = Field(None, description="V2 chart data") class ReviewIQAnalyticsResponse(BaseModel): """Complete analytics response for ReviewIQ dashboard.""" overview: OverviewStats = Field(default_factory=OverviewStats) sentiment: SentimentData = Field(default_factory=SentimentData) urt: URTData = Field(default_factory=URTData) domain_scores: list[DomainScore] = Field(default_factory=list) overall_experience_index: float | None = Field(None, description="OEI composite score") insights: Insights = Field(default_factory=Insights) issues: PaginatedIssues = Field(default_factory=PaginatedIssues) spans: PaginatedSpans = Field(default_factory=PaginatedSpans) timeline: list[TimelinePoint] = Field(default_factory=list) synthesis: SynthesisResponse | None = Field(None, description="AI-generated synthesis") filters_applied: dict[str, Any] = Field(default_factory=dict) # ==================== Helper Functions ==================== def _parse_time_range(time_range: str) -> datetime: """Parse time range string to start datetime.""" now = datetime.now() if time_range == "7d": return now - timedelta(days=7) elif time_range == "14d": return now - timedelta(days=14) elif time_range == "30d": return now - timedelta(days=30) elif time_range == "90d": return now - timedelta(days=90) elif time_range == "1y": return now - timedelta(days=365) elif time_range == "all": return datetime(2000, 1, 1) # Effectively no time filter else: # Default to 30 days return now - timedelta(days=30) # Domain configuration DOMAIN_CONFIG = { "O": {"name": "Offering", "owner": "Operations / Product", "green": 80, "yellow": 60, "weight": 0.20}, "P": {"name": "People", "owner": "HR / Training", "green": 85, "yellow": 70, "weight": 0.18}, "J": {"name": "Journey", "owner": "Operations / Process", "green": 75, "yellow": 55, "weight": 0.15}, "E": {"name": "Environment", "owner": "Facilities / IT", "green": 80, "yellow": 65, "weight": 0.12}, "A": {"name": "Access", "owner": "Compliance / Design", "green": 85, "yellow": 70, "weight": 0.10}, "V": {"name": "Value", "owner": "Finance / Pricing", "green": 70, "yellow": 50, "weight": 0.12}, "R": {"name": "Relationship", "owner": "Leadership / CX", "green": 80, "yellow": 60, "weight": 0.13}, } # Labels and colors for trends endpoint DOMAIN_LABELS = { "P": "Staff & Service", "J": "Speed & Process", "O": "Product Quality", "E": "Facilities", "A": "Availability", "V": "Pricing & Value", "R": "Trust & Ethics", } DOMAIN_COLORS = { "P": "#3b82f6", "J": "#8b5cf6", "O": "#f97316", "E": "#06b6d4", "A": "#10b981", "V": "#ec4899", "R": "#f59e0b", } # Intensity weights for scoring INTENSITY_WEIGHTS = {"I1": 1.0, "I2": 2.0, "I3": 4.0} # Legacy mapping for backward compatibility DOMAIN_NAMES = {k: v["name"] for k, v in DOMAIN_CONFIG.items()} # ==================== API Endpoint ==================== @router.get("/analytics", response_model=ReviewIQAnalyticsResponse) async def get_reviewiq_analytics( job_id: str | None = Query(None, description="Filter by job ID"), business_id: str | None = Query(None, description="Filter by business ID"), time_range: str = Query("30d", description="Time range (7d, 14d, 30d, 90d, 1y, all)"), granularity: str = Query("auto", description="Timeline granularity (day, week, month, year, auto)"), sentiment: str | None = Query(None, description="Filter by sentiment (comma-separated: positive,negative)"), urt_domain: str | None = Query(None, description="Filter by URT domain (P, J, O, A)"), intensity: str | None = Query(None, description="Filter by intensity (I1, I2, I3)"), issues_page: int = Query(1, ge=1, description="Issues page number"), issues_page_size: int = Query(10, ge=1, le=100, description="Issues per page"), spans_page: int = Query(1, ge=1, description="Spans page number"), spans_page_size: int = Query(10, ge=1, le=100, description="Spans per page"), ) -> ReviewIQAnalyticsResponse: """ Get all analytics data for ReviewIQ dashboard in a single call. Returns overview stats, sentiment distribution, URT breakdown, issues, and spans. Supports cross-filtering by sentiment, URT domain, and intensity. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") # Parse filters start_date = _parse_time_range(time_range) sentiment_filter = sentiment.split(",") if sentiment else None # Resolve auto granularity based on time range resolved_granularity = granularity if granularity == "auto": if time_range in ("7d", "14d"): resolved_granularity = "day" elif time_range in ("30d", "90d"): resolved_granularity = "week" elif time_range == "1y": resolved_granularity = "month" else: # "all" resolved_granularity = "month" # Build filter conditions filters_applied = { "time_range": time_range, "start_date": start_date.isoformat(), "granularity": resolved_granularity, } if job_id: filters_applied["job_id"] = job_id if business_id: filters_applied["business_id"] = business_id if sentiment_filter: filters_applied["sentiment"] = sentiment_filter if urt_domain: filters_applied["urt_domain"] = urt_domain if intensity: filters_applied["intensity"] = intensity async with _pool.acquire() as conn: # Query 1: Overview Stats overview = await _get_overview_stats( conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity ) # Query 2: Sentiment Distribution + URT Domain Distribution sentiment_data, urt_data = await _get_distributions( conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, resolved_granularity ) # Query 3: Timeline Data timeline = await _get_timeline_data( conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, resolved_granularity ) # Query 4: Issues (paginated) - now with enriched URT data issues = await _get_issues( conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, issues_page, issues_page_size ) # Query 5: Spans (paginated) spans = await _get_spans( conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, spans_page, spans_page_size ) # Query 6: Domain KPI Scores domain_scores, oei = await _get_domain_scores( conn, job_id, business_id, start_date ) # Query 7: Insights (strengths, weaknesses, recommendations) insights = await _get_insights( conn, job_id, business_id, start_date, overview.avg_rating, overview.total_reviews ) # Query 8: AI Synthesis (if available) synthesis = await _get_synthesis(conn, job_id) return ReviewIQAnalyticsResponse( overview=overview, sentiment=sentiment_data, urt=urt_data, domain_scores=domain_scores, overall_experience_index=oei, insights=insights, issues=issues, spans=spans, timeline=timeline, synthesis=synthesis, filters_applied=filters_applied, ) @router.get("/trends", response_model=list[TrendItem]) async def get_reviewiq_trends( job_id: str | None = Query(None, description="Filter by job ID"), business_id: str | None = Query(None, description="Filter by business ID"), items: str = Query(..., description="Comma-separated item codes (e.g., P,J,O or P.FRIE,J.WAIT)"), time_range: str = Query("1y", description="Time range"), granularity: str = Query("auto", description="Granularity (day, week, month, year, auto)"), ) -> list[TrendItem]: """ Get trend data for specified URT domains or subcodes. Items can be: - Single letter domain codes: P, J, O, E, A, V, R - Subcode prefixes with dot: P.FRIE, J.WAIT, O.QUAL Returns time series data for each item showing total count, positive, and negative over time. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") # Parse time range start_date = _parse_time_range(time_range) # Resolve auto granularity based on time range resolved_granularity = granularity if granularity == "auto": if time_range in ("7d", "14d"): resolved_granularity = "day" elif time_range in ("30d", "90d"): resolved_granularity = "week" elif time_range == "1y": resolved_granularity = "month" else: # "all" resolved_granularity = "month" # Map granularity to PostgreSQL DATE_TRUNC unit trunc_unit = { "day": "day", "week": "week", "month": "month", "year": "year", }.get(resolved_granularity, "week") # Parse items item_codes = [item.strip() for item in items.split(",") if item.strip()] if not item_codes: raise HTTPException(status_code=400, detail="At least one item code is required") result: list[TrendItem] = [] async with _pool.acquire() as conn: for item_code in item_codes: # Build WHERE conditions conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 # Determine filter type based on item code format if "." in item_code: # Subcode prefix (e.g., P.FRIE) - use LIKE conditions.append(f"rs.urt_primary LIKE ${param_idx}") params.append(f"{item_code}%") param_idx += 1 else: # Single letter domain (e.g., P) - use LEFT() conditions.append(f"LEFT(rs.urt_primary, 1) = ${param_idx}") params.append(item_code) param_idx += 1 where_clause = " AND ".join(conditions) # Query for trend data with sentiment and rating impact # Key insight: avg_rating_negative shows the damage caused by complaints in this category query = f""" SELECT TO_CHAR(DATE_TRUNC('{trunc_unit}', rs.review_time), 'YYYY-MM-DD') as date, COUNT(*) as count, COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive, COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative, COUNT(DISTINCT rs.review_id) as review_count, -- Avg rating of reviews with NEGATIVE mentions (the damage metric) AVG(re.rating) FILTER (WHERE rs.valence IN ('V-', 'V±')) as avg_rating_negative, -- Avg rating of reviews with POSITIVE mentions (the strength metric) AVG(re.rating) FILTER (WHERE rs.valence = 'V+') as avg_rating_positive FROM pipeline.review_spans rs LEFT JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE {where_clause} AND rs.urt_primary IS NOT NULL GROUP BY DATE_TRUNC('{trunc_unit}', rs.review_time) ORDER BY DATE_TRUNC('{trunc_unit}', rs.review_time) """ rows = await conn.fetch(query, *params) # Build data points with sentiment score and rating impact data_points = [] for row in rows: count = row["count"] or 0 positive = row["positive"] or 0 negative = row["negative"] or 0 # Sentiment score: -100 (all negative) to +100 (all positive) sentiment_score = ((positive - negative) / count * 100) if count > 0 else 0 data_points.append(TrendDataPoint( date=row["date"], count=count, positive=positive, negative=negative, review_count=row["review_count"] or 0, sentiment_score=round(sentiment_score, 1), # The damage: avg stars when people COMPLAIN about this category avg_rating_negative=round(float(row["avg_rating_negative"]), 2) if row["avg_rating_negative"] else None, # The strength: avg stars when people PRAISE this category avg_rating_positive=round(float(row["avg_rating_positive"]), 2) if row["avg_rating_positive"] else None, )) # Determine label and color if "." in item_code: # For subcodes, try to get name from database subcode_row = await conn.fetchrow( "SELECT name FROM pipeline.urt_subcodes WHERE code = $1", item_code ) label = subcode_row["name"] if subcode_row else item_code # Use domain color for subcodes domain_letter = item_code[0] color = DOMAIN_COLORS.get(domain_letter, "#6b7280") else: # For domains, use the DOMAIN_LABELS dict label = DOMAIN_LABELS.get(item_code, item_code) color = DOMAIN_COLORS.get(item_code, "#6b7280") result.append(TrendItem( id=item_code, label=label, color=color, data=data_points, )) return result async def _get_overview_stats( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, sentiment_filter: list[str] | None, urt_domain: str | None, intensity: str | None, ) -> OverviewStats: """Get overview statistics with a single optimized query.""" # Build WHERE conditions for spans conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 if urt_domain: conditions.append(f"LEFT(rs.urt_primary, 1) = ${param_idx}") params.append(urt_domain) param_idx += 1 if intensity: conditions.append(f"rs.intensity = ${param_idx}") params.append(intensity) param_idx += 1 # Valence filter valence_condition = "" if sentiment_filter: valence_codes = [] if "positive" in sentiment_filter: valence_codes.append("V+") if "negative" in sentiment_filter: valence_codes.extend(["V-", "V±"]) if "neutral" in sentiment_filter: valence_codes.append("V0") if valence_codes: conditions.append(f"rs.valence = ANY(${param_idx}::text[])") params.append(valence_codes) param_idx += 1 where_clause = " AND ".join(conditions) query = f""" SELECT COUNT(DISTINCT re.id) as total_reviews, COUNT(rs.span_id) as total_spans, AVG(re.rating) as avg_rating, COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count, COUNT(*) FILTER (WHERE rs.valence = 'V0') as neutral_count, COUNT(*) FILTER (WHERE rs.valence = 'V±') as mixed_count FROM pipeline.review_spans rs LEFT JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE {where_clause} """ row = await conn.fetchrow(query, *params) # Get open issues count separately issue_conditions = ["i.state = 'open'"] issue_params: list[Any] = [] issue_param_idx = 1 if job_id: issue_conditions.append(f"i.job_id = ${issue_param_idx}::uuid") issue_params.append(job_id) issue_param_idx += 1 if business_id: issue_conditions.append(f"i.business_id = ${issue_param_idx}") issue_params.append(business_id) issue_param_idx += 1 issue_where = " AND ".join(issue_conditions) issue_count = await conn.fetchval( f"SELECT COUNT(*) FROM pipeline.issues i WHERE {issue_where}", *issue_params ) return OverviewStats( total_reviews=row["total_reviews"] or 0, total_spans=row["total_spans"] or 0, open_issues=issue_count or 0, avg_rating=float(row["avg_rating"]) if row["avg_rating"] else None, positive_count=row["positive_count"] or 0, negative_count=row["negative_count"] or 0, neutral_count=row["neutral_count"] or 0, mixed_count=row["mixed_count"] or 0, ) async def _get_distributions( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, sentiment_filter: list[str] | None, urt_domain: str | None, intensity: str | None, granularity: str = "week", ) -> tuple[SentimentData, URTData]: """Get sentiment and URT distributions with cross-filtering support.""" # Build base WHERE conditions (job, business, time) base_conditions = ["rs.review_time >= $1"] base_params: list[Any] = [start_date] param_idx = 2 if job_id: base_conditions.append(f"rs.job_id = ${param_idx}::uuid") base_params.append(job_id) param_idx += 1 if business_id: base_conditions.append(f"rs.business_id = ${param_idx}") base_params.append(business_id) param_idx += 1 base_where = " AND ".join(base_conditions) # Convert sentiment filter to valence codes valence_codes = [] if sentiment_filter: if "positive" in sentiment_filter: valence_codes.append("V+") if "negative" in sentiment_filter: valence_codes.extend(["V-", "V±"]) if "neutral" in sentiment_filter: valence_codes.append("V0") # ========== Sentiment Distribution (filtered by domain) ========== sentiment_conditions = list(base_conditions) sentiment_params = list(base_params) sentiment_param_idx = param_idx # Apply domain filter to sentiment (cross-filter: domain → sentiment) if urt_domain: sentiment_conditions.append(f"LEFT(rs.urt_primary, 1) = ${sentiment_param_idx}") sentiment_params.append(urt_domain) sentiment_param_idx += 1 # Apply intensity filter if intensity: sentiment_conditions.append(f"rs.intensity = ${sentiment_param_idx}") sentiment_params.append(intensity) sentiment_param_idx += 1 sentiment_where = " AND ".join(sentiment_conditions) # Updated query with review-based counting to avoid bias from verbose reviews sentiment_query = f""" SELECT valence, COUNT(*) as span_count, COUNT(DISTINCT review_id) as review_count FROM pipeline.review_spans rs WHERE {sentiment_where} AND valence IS NOT NULL GROUP BY valence ORDER BY review_count DESC """ sentiment_rows = await conn.fetch(sentiment_query, *sentiment_params) # Use review_count for percentages to avoid bias from verbose reviews total_reviews = sum(r["review_count"] for r in sentiment_rows) sentiment_distribution = [ SentimentDataPoint( valence=row["valence"], count=row["span_count"], review_count=row["review_count"], percentage=(row["review_count"] / total_reviews * 100) if total_reviews > 0 else 0, ) for row in sentiment_rows ] # ========== Sentiment Trend (filtered by domain) ========== # Map granularity to PostgreSQL DATE_TRUNC unit trunc_unit = { "day": "day", "week": "week", "month": "month", "year": "year", }.get(granularity, "week") trend_query = f""" SELECT TO_CHAR(DATE_TRUNC('{trunc_unit}', rs.review_time), 'YYYY-MM-DD') as period, COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive, COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative, COUNT(*) FILTER (WHERE rs.valence = 'V0') as neutral, COUNT(*) FILTER (WHERE rs.valence = 'V±') as mixed FROM pipeline.review_spans rs WHERE {sentiment_where} GROUP BY DATE_TRUNC('{trunc_unit}', rs.review_time) ORDER BY DATE_TRUNC('{trunc_unit}', rs.review_time) """ trend_rows = await conn.fetch(trend_query, *sentiment_params) sentiment_trend = [ SentimentTrendPoint( period=row["period"], positive=row["positive"] or 0, negative=row["negative"] or 0, neutral=row["neutral"] or 0, mixed=row["mixed"] or 0, ) for row in trend_rows ] # ========== URT Domain Distribution (filtered by sentiment) ========== urt_conditions = list(base_conditions) urt_params = list(base_params) urt_param_idx = param_idx # Apply sentiment filter to URT domains (cross-filter: sentiment → domain) if valence_codes: urt_conditions.append(f"rs.valence = ANY(${urt_param_idx}::text[])") urt_params.append(valence_codes) urt_param_idx += 1 # Apply intensity filter if intensity: urt_conditions.append(f"rs.intensity = ${urt_param_idx}") urt_params.append(intensity) urt_param_idx += 1 urt_where = " AND ".join(urt_conditions) # Updated query with review-based counting to avoid bias from verbose reviews urt_query = f""" SELECT LEFT(urt_primary, 1) as domain, COUNT(*) as span_count, COUNT(DISTINCT review_id) as review_count, COUNT(*) FILTER (WHERE valence = 'V+') as positive_spans, COUNT(*) FILTER (WHERE valence IN ('V-', 'V±')) as negative_spans, COUNT(*) FILTER (WHERE valence = 'V0') as neutral_spans, COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V+') as positive_reviews, COUNT(DISTINCT review_id) FILTER (WHERE valence IN ('V-', 'V±')) as negative_reviews FROM pipeline.review_spans rs WHERE {urt_where} AND urt_primary IS NOT NULL GROUP BY LEFT(urt_primary, 1) ORDER BY review_count DESC """ urt_rows = await conn.fetch(urt_query, *urt_params) # Use review_count for percentages to avoid bias from verbose reviews total_reviews = sum(r["review_count"] for r in urt_rows) domains = [ URTDomainPoint( domain=row["domain"], domain_name=DOMAIN_NAMES.get(row["domain"], row["domain"]), count=row["span_count"], review_count=row["review_count"], percentage=(row["review_count"] / total_reviews * 100) if total_reviews > 0 else 0, positive_count=row["positive_spans"] or 0, negative_count=row["negative_spans"] or 0, neutral_count=row["neutral_spans"] or 0, positive_reviews=row["positive_reviews"] or 0, negative_reviews=row["negative_reviews"] or 0, ) for row in urt_rows ] # ========== Intensity Heatmap (filtered by both sentiment and domain) ========== heatmap_conditions = list(base_conditions) heatmap_params = list(base_params) heatmap_param_idx = param_idx # Apply domain filter if urt_domain: heatmap_conditions.append(f"LEFT(rs.urt_primary, 1) = ${heatmap_param_idx}") heatmap_params.append(urt_domain) heatmap_param_idx += 1 # Apply sentiment filter if valence_codes: heatmap_conditions.append(f"rs.valence = ANY(${heatmap_param_idx}::text[])") heatmap_params.append(valence_codes) heatmap_param_idx += 1 heatmap_where = " AND ".join(heatmap_conditions) heatmap_query = f""" SELECT LEFT(urt_primary, 1) as domain, intensity, COUNT(*) as count FROM pipeline.review_spans rs WHERE {heatmap_where} AND urt_primary IS NOT NULL AND intensity IS NOT NULL GROUP BY LEFT(urt_primary, 1), intensity ORDER BY domain, intensity """ heatmap_rows = await conn.fetch(heatmap_query, *heatmap_params) intensity_heatmap = [ IntensityPoint( domain=row["domain"], intensity=row["intensity"], count=row["count"], ) for row in heatmap_rows ] return ( SentimentData(distribution=sentiment_distribution, trend=sentiment_trend), URTData(domains=domains, intensity_heatmap=intensity_heatmap), ) async def _get_timeline_data( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, sentiment_filter: list[str] | None, urt_domain: str | None, intensity: str | None, granularity: str = "week", ) -> list[TimelinePoint]: """Get timeline data for the brush chart.""" # Build WHERE conditions conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 where_clause = " AND ".join(conditions) # Map granularity to PostgreSQL DATE_TRUNC unit trunc_unit = { "day": "day", "week": "week", "month": "month", "year": "year", }.get(granularity, "week") query = f""" SELECT TO_CHAR(DATE_TRUNC('{trunc_unit}', rs.review_time), 'YYYY-MM-DD') as date, COUNT(DISTINCT CONCAT(rs.source, ':', rs.review_id)) as review_count, COUNT(*) as span_count, AVG(re.rating) as avg_rating, COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count FROM pipeline.review_spans rs LEFT JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE {where_clause} GROUP BY DATE_TRUNC('{trunc_unit}', rs.review_time) ORDER BY DATE_TRUNC('{trunc_unit}', rs.review_time) """ rows = await conn.fetch(query, *params) # Convert rows to dict for easy lookup data_by_date = { row["date"]: TimelinePoint( date=row["date"], review_count=row["review_count"] or 0, span_count=row["span_count"] or 0, avg_rating=float(row["avg_rating"]) if row["avg_rating"] else None, positive_count=row["positive_count"] or 0, negative_count=row["negative_count"] or 0, ) for row in rows } if not data_by_date: return [] # Fill in missing periods with zero values dates = sorted(data_by_date.keys()) min_date = datetime.strptime(dates[0], "%Y-%m-%d") max_date = datetime.strptime(dates[-1], "%Y-%m-%d") def add_period(dt: datetime, gran: str) -> datetime: """Add one period to a datetime based on granularity.""" if gran == "day": return dt + timedelta(days=1) elif gran == "week": return dt + timedelta(weeks=1) elif gran == "month": # Add one month month = dt.month + 1 year = dt.year if month > 12: month = 1 year += 1 # Handle edge cases like Jan 31 -> Feb 28 day = min(dt.day, 28) # Safe for all months return dt.replace(year=year, month=month, day=1) # Use 1st of month for consistency elif gran == "year": return dt.replace(year=dt.year + 1) else: return dt + timedelta(weeks=1) result = [] current = min_date while current <= max_date: date_str = current.strftime("%Y-%m-%d") if date_str in data_by_date: result.append(data_by_date[date_str]) else: # Fill with zero values result.append(TimelinePoint( date=date_str, review_count=0, span_count=0, avg_rating=None, positive_count=0, negative_count=0, )) current = add_period(current, granularity) return result async def _get_issues( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, sentiment_filter: list[str] | None, urt_domain: str | None, intensity: str | None, page: int, page_size: int, ) -> PaginatedIssues: """Get paginated issues.""" # Build WHERE conditions conditions = ["1=1"] params: list[Any] = [] param_idx = 1 if job_id: conditions.append(f"i.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"i.business_id = ${param_idx}") params.append(business_id) param_idx += 1 if urt_domain: conditions.append(f"i.domain = ${param_idx}") params.append(urt_domain) param_idx += 1 if intensity: conditions.append(f"i.max_intensity = ${param_idx}") params.append(intensity) param_idx += 1 where_clause = " AND ".join(conditions) # Count query count_query = f"SELECT COUNT(*) FROM pipeline.issues i WHERE {where_clause}" total = await conn.fetchval(count_query, *params) # Items query with pagination - enriched with URT metadata offset = (page - 1) * page_size items_query = f""" SELECT i.issue_id, i.primary_subcode, s.name as subcode_name, s.definition as subcode_definition, s.solution, s.solution_complexity, s.negative_example, i.domain, d.name as domain_name, d.default_owner, c.name as category_name, i.entity, i.state, i.priority_score, i.span_count, i.max_intensity, i.created_at FROM pipeline.issues i LEFT JOIN pipeline.urt_subcodes s ON i.primary_subcode = s.code LEFT JOIN pipeline.urt_domains d ON i.domain = d.code LEFT JOIN pipeline.urt_categories c ON s.category_code = c.code WHERE {where_clause} ORDER BY i.priority_score DESC, i.created_at DESC LIMIT ${param_idx} OFFSET ${param_idx + 1} """ rows = await conn.fetch(items_query, *params, page_size, offset) items = [ IssueItem( issue_id=row["issue_id"], primary_subcode=row["primary_subcode"], subcode_name=row["subcode_name"], subcode_definition=row["subcode_definition"], solution=row["solution"], solution_complexity=row["solution_complexity"], domain=row["domain"], domain_name=row["domain_name"], category_name=row["category_name"], default_owner=row["default_owner"], negative_example=row["negative_example"], entity=row["entity"], state=row["state"], priority_score=float(row["priority_score"]) if row["priority_score"] else 0, span_count=row["span_count"] or 0, max_intensity=row["max_intensity"], created_at=row["created_at"].isoformat() if row["created_at"] else None, ) for row in rows ] return PaginatedIssues( items=items, total=total or 0, page=page, page_size=page_size, ) async def _get_spans( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, sentiment_filter: list[str] | None, urt_domain: str | None, intensity: str | None, page: int, page_size: int, ) -> PaginatedSpans: """Get paginated spans.""" # Build WHERE conditions conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 if urt_domain: conditions.append(f"LEFT(rs.urt_primary, 1) = ${param_idx}") params.append(urt_domain) param_idx += 1 if intensity: conditions.append(f"rs.intensity = ${param_idx}") params.append(intensity) param_idx += 1 # Valence filter if sentiment_filter: valence_codes = [] if "positive" in sentiment_filter: valence_codes.append("V+") if "negative" in sentiment_filter: valence_codes.extend(["V-", "V±"]) if "neutral" in sentiment_filter: valence_codes.append("V0") if valence_codes: conditions.append(f"rs.valence = ANY(${param_idx}::text[])") params.append(valence_codes) param_idx += 1 where_clause = " AND ".join(conditions) # Count query count_query = f"SELECT COUNT(*) FROM pipeline.review_spans rs WHERE {where_clause}" total = await conn.fetchval(count_query, *params) # Items query with pagination offset = (page - 1) * page_size items_query = f""" SELECT rs.span_id, rs.span_text, rs.urt_primary, rs.valence, rs.intensity, rs.review_time, rs.review_id as source_review_id, rs.entity FROM pipeline.review_spans rs WHERE {where_clause} ORDER BY rs.review_time DESC LIMIT ${param_idx} OFFSET ${param_idx + 1} """ rows = await conn.fetch(items_query, *params, page_size, offset) items = [ SpanItem( span_id=row["span_id"], span_text=row["span_text"], urt_primary=row["urt_primary"], valence=row["valence"], intensity=row["intensity"], review_time=row["review_time"].isoformat() if row["review_time"] else None, source_review_id=row["source_review_id"], entity=row["entity"], ) for row in rows ] return PaginatedSpans( items=items, total=total or 0, page=page, page_size=page_size, ) async def _get_domain_scores( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, ) -> tuple[list[DomainScore], float | None]: """Calculate domain-level KPI scores using intensity-weighted scoring.""" # Build WHERE conditions conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 where_clause = " AND ".join(conditions) # Query to get sentiment counts by domain with intensity weighting query = f""" SELECT LEFT(rs.urt_primary, 1) as domain, rs.valence, rs.intensity, COUNT(*) as count FROM pipeline.review_spans rs WHERE {where_clause} AND rs.urt_primary IS NOT NULL AND rs.valence IS NOT NULL GROUP BY LEFT(rs.urt_primary, 1), rs.valence, rs.intensity ORDER BY domain """ rows = await conn.fetch(query, *params) # Aggregate by domain domain_data: dict[str, dict[str, float]] = {} for row in rows: domain = row["domain"] if domain not in domain_data: domain_data[domain] = { "positive_weight": 0, "negative_weight": 0, "total_weight": 0, "positive_count": 0, "negative_count": 0, "total_count": 0 } intensity = row["intensity"] or "I1" weight = INTENSITY_WEIGHTS.get(intensity, 1.0) count = row["count"] domain_data[domain]["total_weight"] += weight * count domain_data[domain]["total_count"] += count if row["valence"] == "V+": domain_data[domain]["positive_weight"] += weight * count domain_data[domain]["positive_count"] += count elif row["valence"] in ("V-", "V±"): domain_data[domain]["negative_weight"] += weight * count domain_data[domain]["negative_count"] += count # Calculate scores domain_scores = [] for domain, cfg in DOMAIN_CONFIG.items(): data = domain_data.get(domain, { "positive_weight": 0, "negative_weight": 0, "total_weight": 0, "positive_count": 0, "negative_count": 0, "total_count": 0 }) total = data["total_weight"] if total > 0: # Score = 50 + (positive - negative) / total * 50 # This gives 0-100 scale where 50 is neutral score = 50 + ((data["positive_weight"] - data["negative_weight"]) / total) * 50 score = max(0, min(100, score)) else: score = 50 # Neutral if no data # Determine status based on thresholds if score >= cfg["green"]: status = "good" elif score >= cfg["yellow"]: status = "warning" else: status = "critical" domain_scores.append(DomainScore( domain=domain, name=cfg["name"], score=round(score, 1), status=status, trend=None, # TODO: Calculate trend vs previous period positive_count=int(data["positive_count"]), negative_count=int(data["negative_count"]), total_count=int(data["total_count"]), )) # Calculate Overall Experience Index (OEI) oei = None if domain_scores: weighted_sum = sum( ds.score * DOMAIN_CONFIG[ds.domain]["weight"] for ds in domain_scores if ds.domain in DOMAIN_CONFIG ) total_weight = sum( DOMAIN_CONFIG[ds.domain]["weight"] for ds in domain_scores if ds.domain in DOMAIN_CONFIG ) if total_weight > 0: oei = round(weighted_sum / total_weight, 1) return domain_scores, oei async def _get_insights( conn: asyncpg.Connection, job_id: str | None, business_id: str | None, start_date: datetime, avg_rating: float | None, total_reviews: int, ) -> Insights: """Generate strengths, weaknesses, and business insights.""" # Build WHERE conditions conditions = ["rs.review_time >= $1"] params: list[Any] = [start_date] param_idx = 2 if job_id: conditions.append(f"rs.job_id = ${param_idx}::uuid") params.append(job_id) param_idx += 1 if business_id: conditions.append(f"rs.business_id = ${param_idx}") params.append(business_id) param_idx += 1 where_clause = " AND ".join(conditions) # Query: Get subcode-level sentiment distribution with URT metadata query = f""" SELECT rs.urt_primary as subcode, s.name as subcode_name, s.solution, s.solution_complexity, s.marketing_angle, s.negative_example, LEFT(rs.urt_primary, 1) as domain, d.name as domain_name, d.default_owner, COUNT(*) as total_count, COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count, MAX(rs.intensity) as max_intensity FROM pipeline.review_spans rs LEFT JOIN pipeline.urt_subcodes s ON rs.urt_primary = s.code LEFT JOIN pipeline.urt_domains d ON LEFT(rs.urt_primary, 1) = d.code WHERE {where_clause} AND rs.urt_primary IS NOT NULL GROUP BY rs.urt_primary, s.name, s.solution, s.solution_complexity, s.marketing_angle, s.negative_example, LEFT(rs.urt_primary, 1), d.name, d.default_owner HAVING COUNT(*) >= 2 ORDER BY COUNT(*) DESC """ rows = await conn.fetch(query, *params) # Separate into strengths and weaknesses strengths = [] weaknesses = [] # Store tuples of (subcode, count, complexity) for coordinate calculation quick_wins_raw = [] critical_raw = [] nice_to_have_raw = [] strategic_raw = [] # Calculate median for opportunity matrix counts = [r["total_count"] for r in rows] median_count = sorted(counts)[len(counts) // 2] if counts else 0 max_count = max(counts) if counts else 1 min_count = min(counts) if counts else 0 for row in rows: total = row["total_count"] positive = row["positive_count"] negative = row["negative_count"] pos_pct = (positive / total * 100) if total > 0 else 0 neg_pct = (negative / total * 100) if total > 0 else 0 subcode = row["subcode"] complexity = row["solution_complexity"] or "medium" # Strengths: >= 70% positive if pos_pct >= 70 and len(strengths) < 5: strengths.append(StrengthItem( rank=len(strengths) + 1, subcode=subcode, subcode_name=row["subcode_name"] or subcode, domain=row["domain"], domain_name=row["domain_name"] or row["domain"], positive_percentage=round(pos_pct, 1), span_count=total, marketing_angle=row["marketing_angle"], )) # Weaknesses: >= 40% negative if neg_pct >= 40 and len(weaknesses) < 5: # Calculate projected rating impact impact = None if avg_rating and total_reviews > 0: # Simplified model: impact = (negative_spans / total_reviews) * avg_intensity_loss intensity_loss = {"I1": 0.5, "I2": 1.0, "I3": 2.0}.get(row["max_intensity"], 0.5) impact = round((negative / total_reviews) * intensity_loss, 2) weaknesses.append(WeaknessItem( rank=len(weaknesses) + 1, issue_id=None, # Could link to issue if exists subcode=subcode, subcode_name=row["subcode_name"] or subcode, domain=row["domain"], domain_name=row["domain_name"] or row["domain"], negative_percentage=round(neg_pct, 1), span_count=negative, intensity=row["max_intensity"], solution=row["solution"], solution_complexity=complexity, projected_rating_impact=impact, owner=row["default_owner"], )) # Opportunity matrix (for weaknesses only) if neg_pct >= 40: is_high_freq = total >= median_count is_simple = complexity == "simple" name = row["subcode_name"] or subcode # Build detail dict for the opportunity item item_data = { "subcode": subcode, "name": name, "count": total, "complexity": complexity, "domain": row["domain"], "domain_name": row["domain_name"] or row["domain"], "negative_pct": round(neg_pct, 1), "span_count": negative, "solution": row["solution"], "rating_impact": impact, "owner": row["default_owner"], "example": row["negative_example"], } if is_high_freq and is_simple: quick_wins_raw.append(item_data) elif is_high_freq and not is_simple: critical_raw.append(item_data) elif not is_high_freq and is_simple: nice_to_have_raw.append(item_data) else: strategic_raw.append(item_data) # Helper to compute coordinates for opportunity items and fetch spans async def compute_opportunity_items( items: list[dict], is_high_freq: bool ) -> list[OpportunityItem]: if not items: return [] # Get min/max counts within this quadrant for x-axis normalization quadrant_counts = [item["count"] for item in items] q_min = min(quadrant_counts) q_max = max(quadrant_counts) q_range = q_max - q_min if q_max > q_min else 1 result = [] for item in items[:5]: count = item["count"] complexity = item["complexity"] # X: frequency within quadrant (0.1 to 0.9 to keep items away from edges) x = 0.1 + 0.8 * ((count - q_min) / q_range) # Y: effort based on complexity (simple=0.2, medium=0.5, complex=0.8) effort_map = {"simple": 0.2, "medium": 0.5, "complex": 0.8} y = effort_map.get(complexity, 0.5) # Add small jitter to prevent overlap import random x = max(0.05, min(0.95, x + random.uniform(-0.05, 0.05))) y = max(0.05, min(0.95, y + random.uniform(-0.08, 0.08))) # Fetch sample spans for this subcode (negative sentiment only) # Use original 'text' column since spans were extracted from it spans_query = """ SELECT rs.span_id, rs.span_text, re.rating, rs.review_id, re.review_time::text as review_date, re.text as review_text FROM pipeline.review_spans rs LEFT JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE rs.urt_primary = $1 AND rs.valence IN ('V-', 'V±') ORDER BY re.review_time DESC NULLS LAST LIMIT 15 """ span_rows = await conn.fetch(spans_query, item["subcode"]) spans = [ OpportunitySpan( span_id=row["span_id"], span_text=row["span_text"] or "", review_text=row["review_text"][:500] if row["review_text"] else None, rating=row["rating"], review_id=row["review_id"], review_date=row["review_date"][:10] if row["review_date"] else None, ) for row in span_rows ] result.append(OpportunityItem( subcode=item["subcode"], name=item["name"], x=round(x, 3), y=round(y, 3), domain=item["domain"], domain_name=item["domain_name"], negative_pct=item["negative_pct"], span_count=item["span_count"], solution=item["solution"], complexity=complexity, rating_impact=item["rating_impact"], owner=item["owner"], example=item["example"], spans=spans, )) return result quick_wins = await compute_opportunity_items(quick_wins_raw, is_high_freq=True) critical = await compute_opportunity_items(critical_raw, is_high_freq=True) nice_to_have = await compute_opportunity_items(nice_to_have_raw, is_high_freq=False) strategic = await compute_opportunity_items(strategic_raw, is_high_freq=False) # Generate executive summary summary = "" if weaknesses: top_weakness = weaknesses[0] summary = f"Your biggest opportunity is improving {top_weakness.subcode_name} ({top_weakness.domain_name}) - {top_weakness.negative_percentage:.0f}% of mentions are negative. " if strengths: top_strength = strengths[0] summary += f"{top_strength.subcode_name} is your strongest asset with {top_strength.positive_percentage:.0f}% positive sentiment." # Rating simulator rating_simulator = None if avg_rating and weaknesses: impacts = [w.projected_rating_impact or 0 for w in weaknesses] if_fix_top_1 = round(avg_rating + impacts[0], 2) if len(impacts) >= 1 else None if_fix_top_3 = round(avg_rating + sum(impacts[:3]), 2) if len(impacts) >= 3 else None potential = round(sum(impacts[:5]), 2) rating_simulator = RatingSimulator( current_rating=round(avg_rating, 2), if_fix_top_1=if_fix_top_1, if_fix_top_3=if_fix_top_3, potential_gain=potential, ) return Insights( strengths=strengths, weaknesses=weaknesses, rating_simulator=rating_simulator, opportunity_matrix=OpportunityMatrix( quick_wins=quick_wins, critical=critical, nice_to_have=nice_to_have, strategic=strategic, ), executive_summary=summary, ) async def _get_synthesis( conn: asyncpg.Connection, job_id: str | None, ) -> SynthesisResponse | None: """Fetch analyst report synthesis from pipeline execution. Handles both legacy format (v1) and new 6-section format (v2). """ if not job_id: return None try: row = await conn.fetchrow(""" SELECT synthesis FROM pipeline.executions WHERE job_id = $1::uuid AND synthesis IS NOT NULL ORDER BY created_at DESC LIMIT 1 """, job_id) if not row or not row["synthesis"]: return None data = row["synthesis"] if isinstance(data, str): import json data = json.loads(data) # Check for v2 format (6-section report) report_version = data.get("report_version") if report_version == "2.0": # Parse v2 format exec_summary = data.get("executive_summary", {}) # For V2, return the data structure as the frontend expects it # The frontend type guard checks for executive_summary to detect V2 return SynthesisResponse( # Version report_version="2.0", # V2 metadata report_title=data.get("report_title", ""), report_date=data.get("report_date", ""), business_name=data.get("business_name", ""), analysis_period=data.get("analysis_period", ""), generated_at=data.get("generated_at"), review_count=data.get("review_count", 0), insight_count=data.get("insight_count", 0), # V2 sections (these are dicts/lists that frontend will parse) executive_summary=data.get("executive_summary"), risk_scorecard=data.get("risk_scorecard"), critical_issues=data.get("critical_issues", []), action_matrix=data.get("action_matrix", []), tracking_kpis=data.get("tracking_kpis", []), charts=data.get("charts"), # Legacy fields populated from v2 for backwards compat current_rating=exec_summary.get("current_rating", 0.0), potential_rating=exec_summary.get("potential_rating", 0.0), rating_gap=exec_summary.get("rating_gap", 0.0), headline=exec_summary.get("one_liner", ""), momentum=exec_summary.get("momentum", "stable"), momentum_detail=exec_summary.get("momentum_detail", ""), # V2 strengths are passed as raw dicts (StrengthToProtect format) # Frontend type guard will handle the different structure strengths=data.get("strengths", []), ) # Parse legacy v1 format actions = [ ReportActionResponse( priority=a.get("priority", "medium"), action=a.get("action", ""), owner=a.get("owner", ""), impact=a.get("impact", ""), impact_stars=float(a.get("impact_stars", 0.1)), effort=a.get("effort", "moderate"), evidence=a.get("evidence", ""), complaint_count=int(a.get("complaint_count", 0)), success_metric=a.get("success_metric", ""), ) for a in data.get("actions", []) ] evidence = [ ReportEvidenceResponse( quote=e.get("quote", ""), context=e.get("context", ""), sentiment=e.get("sentiment", "damaging"), weight=e.get("weight", "notable"), ) for e in data.get("evidence", []) ] strengths = [ ReportStrengthResponse( title=s.get("title", ""), mention_count=int(s.get("mention_count", 0)), quote=s.get("quote", ""), marketing_angle=s.get("marketing_angle", ""), ) for s in data.get("strengths", []) ] return SynthesisResponse( headline=data.get("headline", ""), verdict=data.get("verdict", ""), current_rating=data.get("current_rating", 0.0), potential_rating=data.get("potential_rating", 0.0), rating_gap=data.get("rating_gap", 0.0), narrative=data.get("narrative", ""), sentiment_headline=data.get("sentiment_headline", ""), category_headline=data.get("category_headline", ""), timeline_headline=data.get("timeline_headline", ""), strengths_headline=data.get("strengths_headline", ""), primary_problem=data.get("primary_problem", ""), primary_problem_code=data.get("primary_problem_code", ""), root_cause=data.get("root_cause", ""), actions=actions, evidence=evidence, strengths=strengths, momentum=data.get("momentum", "stable"), momentum_detail=data.get("momentum_detail", ""), generated_at=data.get("generated_at"), review_count=data.get("review_count", 0), insight_count=data.get("insight_count", 0), ) except Exception as e: log.warning(f"Failed to fetch synthesis for job {job_id}: {e}") return None # ==================== Drill-down Endpoints ==================== @router.get("/issues/{issue_id}/spans", response_model=list[SpanItem]) async def get_issue_spans(issue_id: str) -> list[SpanItem]: """Get all spans related to a specific issue.""" if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: query = """ SELECT rs.span_id, rs.span_text, rs.urt_primary, rs.valence, rs.intensity, rs.review_time, rs.review_id as source_review_id, rs.entity FROM pipeline.review_spans rs JOIN pipeline.issue_spans iss ON rs.span_id = iss.span_id WHERE iss.issue_id = $1 ORDER BY rs.review_time DESC """ rows = await conn.fetch(query, issue_id) return [ SpanItem( span_id=row["span_id"], span_text=row["span_text"], urt_primary=row["urt_primary"], valence=row["valence"], intensity=row["intensity"], review_time=row["review_time"].isoformat() if row["review_time"] else None, source_review_id=row["source_review_id"], entity=row["entity"], ) for row in rows ] # ==================== Full Review Drill-Down ==================== class ReviewSpan(BaseModel): """A span within a review with its classification.""" span_id: str span_text: str start_offset: int | None = Field(None, description="Character offset in original text") end_offset: int | None = Field(None, description="Character end offset") urt_primary: str | None urt_secondary: list[str] | None = None valence: str | None intensity: str | None entity: str | None class FullReview(BaseModel): """Complete review with all spans and metadata for drill-down.""" review_id: str source: str rating: int | None review_text: str | None text_normalized: str | None = None # Text used for span offset calculation review_time: str | None author_name: str | None = None author_url: str | None = None review_url: str | None = None business_name: str | None = None # Composite URT (derived from spans) urt_primary: str | None = None urt_secondary: list[str] | None = None # All classified spans spans: list[ReviewSpan] = Field(default_factory=list) @router.get("/reviews/{review_id}", response_model=FullReview) async def get_full_review( review_id: str, source: str = Query("google", description="Review source (default: google)"), ) -> FullReview: """ Get a full review with all its classified spans. This enables drill-down from any aggregate metric to the raw source data. Spans are returned with their classifications, allowing the UI to highlight them within the original review text. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") async with _pool.acquire() as conn: # Get the review with latest version # Join with reviews_raw to get author info # Note: span offsets are computed against text_normalized, so we return both review_query = """ SELECT re.review_id, re.source, re.rating, re.text as review_text, re.text_normalized, re.review_time, rr.reviewer_name as author_name, re.business_id, re.place_id, re.urt_primary, re.urt_secondary FROM pipeline.reviews_enriched re LEFT JOIN pipeline.reviews_raw rr ON re.raw_id = rr.id WHERE re.review_id = $1 AND re.source = $2 ORDER BY re.review_version DESC LIMIT 1 """ review_row = await conn.fetchrow(review_query, review_id, source) if not review_row: # Try without source filter in case source is different review_row = await conn.fetchrow(""" SELECT re.review_id, re.source, re.rating, re.text as review_text, re.text_normalized, re.review_time, rr.reviewer_name as author_name, re.business_id, re.place_id, re.urt_primary, re.urt_secondary FROM pipeline.reviews_enriched re LEFT JOIN pipeline.reviews_raw rr ON re.raw_id = rr.id WHERE re.review_id = $1 ORDER BY re.review_version DESC LIMIT 1 """, review_id) if not review_row: raise HTTPException(status_code=404, detail=f"Review {review_id} not found") # Get all spans for this review (use the actual source from found review) actual_source = review_row["source"] spans_query = """ SELECT rs.span_id, rs.span_text, rs.span_start as start_offset, rs.span_end as end_offset, rs.urt_primary, rs.urt_secondary, rs.valence, rs.intensity, rs.entity FROM pipeline.review_spans rs WHERE rs.review_id = $1 AND rs.source = $2 ORDER BY rs.span_start, rs.span_id """ span_rows = await conn.fetch(spans_query, review_id, actual_source) spans = [ ReviewSpan( span_id=row["span_id"], span_text=row["span_text"], start_offset=row.get("start_offset"), end_offset=row.get("end_offset"), urt_primary=row["urt_primary"], urt_secondary=row.get("urt_secondary"), valence=row["valence"], intensity=row["intensity"], entity=row.get("entity"), ) for row in span_rows ] # Construct Google Maps review URL if we have place_id place_id = review_row.get("place_id") review_url = None if place_id and review_row["source"] == "google": review_url = f"https://www.google.com/maps/place/?q=place_id:{place_id}" return FullReview( review_id=review_row["review_id"], source=review_row["source"], rating=review_row["rating"], review_text=review_row["review_text"], text_normalized=review_row.get("text_normalized"), review_time=review_row["review_time"].isoformat() if review_row["review_time"] else None, author_name=review_row.get("author_name"), author_url=None, # Not stored in DB review_url=review_url, business_name=review_row.get("business_id"), # Use business_id as fallback urt_primary=review_row.get("urt_primary"), urt_secondary=review_row.get("urt_secondary"), spans=spans, ) @router.get("/reviews", response_model=PaginatedSpans) async def get_reviews_by_filter( job_id: str | None = Query(None, description="Filter by job ID"), urt_domain: str | None = Query(None, description="Filter by URT domain"), sentiment: str | None = Query(None, description="Filter by sentiment"), intensity: str | None = Query(None, description="Filter by intensity"), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), ) -> PaginatedSpans: """ Get reviews matching specific filters. Used for drilling down from chart segments to see contributing reviews. """ if not _pool: raise HTTPException(status_code=503, detail="Database not initialized") # Reuse _get_spans with the filters async with _pool.acquire() as conn: sentiment_filter = sentiment.split(",") if sentiment else None start_date = datetime(2000, 1, 1) # No time filter for drill-down return await _get_spans( conn, job_id, None, start_date, sentiment_filter, urt_domain, intensity, page, page_size )