From 9b667e69a79079738d892d9901c00b5bf11081a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Thu, 29 Jan 2026 03:12:53 +0000 Subject: [PATCH] feat(pipeline): Add Stage 5 Synthesis for AI-generated narratives - Add Stage5Synthesizer class that generates AI narratives and action plans - Add generate() method to LLMClient for synthesis generation - Integrate Stage 5 into pipeline runner after route stage - Add synthesis JSONB column to pipeline.executions table - Update reviewiq_analytics API to return synthesis data - Synthesis includes: executive narrative, sentiment/category/timeline insights, action plan, marketing angles, and priority recommendations Co-Authored-By: Claude Opus 4.5 --- api/routes/reviewiq_analytics.py | 1762 +++++++++++++++++ .../013_add_synthesis_to_executions.sql | 12 + .../src/reviewiq_pipeline/pipeline.py | 603 +++++- .../reviewiq_pipeline/services/llm_client.py | 342 +++- .../stages/stage5_synthesize.py | 477 +++++ 5 files changed, 3129 insertions(+), 67 deletions(-) create mode 100644 api/routes/reviewiq_analytics.py create mode 100644 migrations/versions/013_add_synthesis_to_executions.sql create mode 100644 packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py diff --git a/api/routes/reviewiq_analytics.py b/api/routes/reviewiq_analytics.py new file mode 100644 index 0000000..58d689f --- /dev/null +++ b/api/routes/reviewiq_analytics.py @@ -0,0 +1,1762 @@ +#!/usr/bin/env python3 +""" +Optimized ReviewIQ Analytics endpoint. + +Provides a single API endpoint returning all dashboard data with optimized SQL queries. +Replaces multiple widget queries with 4-5 efficient queries. +""" + +import logging +from datetime import datetime, timedelta +from typing import Any + +import asyncpg +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel, Field + +log = logging.getLogger(__name__) + +# Create router +router = APIRouter(prefix="/api/pipelines/reviewiq", tags=["reviewiq-analytics"]) + +# Database pool (set by main server) +_pool: asyncpg.Pool | None = None + + +def set_database(pool: asyncpg.Pool) -> None: + """Set the database pool for analytics operations.""" + global _pool + _pool = pool + + +# ==================== Pydantic Models ==================== + + +class OverviewStats(BaseModel): + """Overview statistics for the dashboard.""" + + total_reviews: int = Field(0, description="Total reviews processed") + total_spans: int = Field(0, description="Total classified spans") + open_issues: int = Field(0, description="Open issues count") + avg_rating: float | None = Field(None, description="Average review rating") + positive_count: int = Field(0, description="Positive sentiment count") + negative_count: int = Field(0, description="Negative sentiment count") + neutral_count: int = Field(0, description="Neutral sentiment count") + mixed_count: int = Field(0, description="Mixed sentiment count") + + +class SentimentDataPoint(BaseModel): + """Single data point for sentiment distribution.""" + + valence: str = Field(..., description="Valence label (V+, V-, V0, V±)") + count: int = Field(..., description="Count of spans (mentions)") + review_count: int = Field(0, description="Count of distinct reviews") + percentage: float = Field(..., description="Percentage of total reviews") + + +class SentimentTrendPoint(BaseModel): + """Single data point for sentiment trend over time.""" + + period: str = Field(..., description="Time period (e.g., '2024-W01')") + positive: int = Field(0, description="Positive count") + negative: int = Field(0, description="Negative count") + neutral: int = Field(0, description="Neutral count") + mixed: int = Field(0, description="Mixed count") + + +class SentimentData(BaseModel): + """Sentiment distribution and trend data.""" + + distribution: list[SentimentDataPoint] = Field(default_factory=list) + trend: list[SentimentTrendPoint] = Field(default_factory=list) + + +class URTDomainPoint(BaseModel): + """URT domain distribution point with sentiment breakdown.""" + + domain: str = Field(..., description="Domain code (P, J, O, A)") + domain_name: str = Field(..., description="Domain display name") + count: int = Field(..., description="Count of spans (mentions)") + review_count: int = Field(0, description="Count of distinct reviews affected") + percentage: float = Field(..., description="Percentage of total reviews") + positive_count: int = Field(0, description="Positive sentiment spans") + negative_count: int = Field(0, description="Negative sentiment spans") + neutral_count: int = Field(0, description="Neutral sentiment spans") + positive_reviews: int = Field(0, description="Reviews with positive sentiment") + negative_reviews: int = Field(0, description="Reviews with negative sentiment") + + +class IntensityPoint(BaseModel): + """Intensity distribution by domain.""" + + domain: str = Field(..., description="Domain code") + intensity: str = Field(..., description="Intensity level (I1, I2, I3)") + count: int = Field(..., description="Count of spans") + + +class URTData(BaseModel): + """URT domain distribution and heatmap data.""" + + domains: list[URTDomainPoint] = Field(default_factory=list) + intensity_heatmap: list[IntensityPoint] = Field(default_factory=list) + + +class IssueItem(BaseModel): + """Single issue item with enriched URT metadata.""" + + issue_id: str = Field(..., description="Issue identifier") + primary_subcode: str = Field(..., description="URT subcode") + subcode_name: str | None = Field(None, description="Subcode display name") + subcode_definition: str | None = Field(None, description="Subcode definition") + solution: str | None = Field(None, description="Recommended action") + solution_complexity: str | None = Field(None, description="Solution complexity (simple/medium/complex)") + domain: str = Field(..., description="Domain code") + domain_name: str | None = Field(None, description="Domain display name") + category_name: str | None = Field(None, description="Category display name") + default_owner: str | None = Field(None, description="Default owner team") + negative_example: str | None = Field(None, description="Example negative review text") + entity: str | None = Field(None, description="Related entity") + state: str = Field(..., description="Issue state") + priority_score: float = Field(..., description="Priority score") + span_count: int = Field(..., description="Number of related spans") + max_intensity: str | None = Field(None, description="Maximum intensity") + created_at: str | None = Field(None, description="Creation timestamp") + + +class PaginatedIssues(BaseModel): + """Paginated issues list.""" + + items: list[IssueItem] = Field(default_factory=list) + total: int = Field(0, description="Total count") + page: int = Field(1, description="Current page") + page_size: int = Field(10, description="Items per page") + + +class SpanItem(BaseModel): + """Single classified span.""" + + span_id: str = Field(..., description="Span identifier") + span_text: str = Field(..., description="Span text content") + urt_primary: str | None = Field(None, description="Primary URT code") + valence: str | None = Field(None, description="Valence") + intensity: str | None = Field(None, description="Intensity") + review_time: str | None = Field(None, description="Review timestamp") + source_review_id: str | None = Field(None, description="Source review ID") + entity: str | None = Field(None, description="Entity mentioned") + + +class PaginatedSpans(BaseModel): + """Paginated spans list.""" + + items: list[SpanItem] = Field(default_factory=list) + total: int = Field(0, description="Total count") + page: int = Field(1, description="Current page") + page_size: int = Field(10, description="Items per page") + + +class TimelinePoint(BaseModel): + """Single point on the timeline chart.""" + + date: str = Field(..., description="Date string (YYYY-MM-DD or YYYY-WXX)") + review_count: int = Field(0, description="Number of reviews") + span_count: int = Field(0, description="Number of spans") + avg_rating: float | None = Field(None, description="Average rating") + positive_count: int = Field(0, description="Positive sentiment count") + negative_count: int = Field(0, description="Negative sentiment count") + + +# ==================== Domain Scores & Insights ==================== + + +class DomainScore(BaseModel): + """Domain-level KPI score.""" + + domain: str = Field(..., description="Domain code") + name: str = Field(..., description="Domain display name") + score: float = Field(..., description="Score 0-100") + status: str = Field(..., description="Status: good/warning/critical") + trend: str | None = Field(None, description="Trend vs previous period (e.g., '+3.2')") + positive_count: int = Field(0, description="Positive spans") + negative_count: int = Field(0, description="Negative spans") + total_count: int = Field(0, description="Total spans") + + +class StrengthItem(BaseModel): + """A strength (highly positive subcode).""" + + rank: int = Field(..., description="Rank order") + subcode: str = Field(..., description="URT subcode") + subcode_name: str = Field(..., description="Subcode display name") + domain: str = Field(..., description="Domain code") + domain_name: str = Field(..., description="Domain display name") + positive_percentage: float = Field(..., description="% positive sentiment") + span_count: int = Field(..., description="Total mentions") + marketing_angle: str | None = Field(None, description="Marketing suggestion") + + +class WeaknessItem(BaseModel): + """A weakness (negative issue to fix).""" + + rank: int = Field(..., description="Rank order") + issue_id: str | None = Field(None, description="Related issue ID if exists") + subcode: str = Field(..., description="URT subcode") + subcode_name: str = Field(..., description="Subcode display name") + domain: str = Field(..., description="Domain code") + domain_name: str = Field(..., description="Domain display name") + negative_percentage: float = Field(..., description="% negative sentiment") + span_count: int = Field(..., description="Total negative mentions") + intensity: str | None = Field(None, description="Max intensity") + solution: str | None = Field(None, description="Recommended action") + solution_complexity: str | None = Field(None, description="Complexity") + projected_rating_impact: float | None = Field(None, description="Potential rating gain if fixed") + owner: str | None = Field(None, description="Default owner team") + + +class RatingSimulator(BaseModel): + """Rating impact simulation.""" + + current_rating: float = Field(..., description="Current average rating") + if_fix_top_1: float | None = Field(None, description="Projected rating if top 1 issue fixed") + if_fix_top_3: float | None = Field(None, description="Projected rating if top 3 issues fixed") + potential_gain: float = Field(0, description="Maximum potential rating gain") + + +class OpportunitySpan(BaseModel): + """A span (customer feedback) related to an opportunity item.""" + + span_id: str = Field(..., description="Span identifier") + span_text: str = Field(..., description="The classified span text") + review_text: str | None = Field(None, description="Full review text for context") + rating: int | None = Field(None, description="Source review rating") + review_id: str | None = Field(None, description="Source review ID for navigation") + review_date: str | None = Field(None, description="Review date") + + +class OpportunityItem(BaseModel): + """An item in the opportunity matrix with coordinates and detail data.""" + + subcode: str = Field(..., description="URT subcode") + name: str = Field(..., description="Human-readable subcode name") + x: float = Field(..., description="X position (0-1, frequency within quadrant)") + y: float = Field(..., description="Y position (0-1, effort within quadrant)") + # Detail data for hover/click + domain: str = Field(..., description="Domain code (P, J, O, etc.)") + domain_name: str = Field(..., description="Domain display name") + negative_pct: float = Field(..., description="Percentage of negative mentions") + span_count: int = Field(..., description="Number of mentions") + solution: str | None = Field(None, description="Suggested solution from taxonomy") + complexity: str = Field(..., description="Solution complexity (simple/medium/complex)") + rating_impact: float | None = Field(None, description="Projected rating improvement") + owner: str | None = Field(None, description="Suggested owner/team") + example: str | None = Field(None, description="Example negative quote") + spans: list[OpportunitySpan] = Field(default_factory=list, description="Sample customer feedback spans") + + +class OpportunityMatrix(BaseModel): + """2x2 opportunity matrix.""" + + quick_wins: list[OpportunityItem] = Field(default_factory=list, description="High freq + simple") + critical: list[OpportunityItem] = Field(default_factory=list, description="High freq + complex") + nice_to_have: list[OpportunityItem] = Field(default_factory=list, description="Low freq + simple") + strategic: list[OpportunityItem] = Field(default_factory=list, description="Low freq + complex") + + +class Insights(BaseModel): + """Business insights including strengths and weaknesses.""" + + strengths: list[StrengthItem] = Field(default_factory=list) + weaknesses: list[WeaknessItem] = Field(default_factory=list) + rating_simulator: RatingSimulator | None = Field(None) + opportunity_matrix: OpportunityMatrix | None = Field(None) + executive_summary: str = Field("", description="Auto-generated summary") + + +# ==================== AI Synthesis Models ==================== + + +class ActionItemResponse(BaseModel): + """A specific action recommendation from AI synthesis.""" + + id: str = Field(..., description="Action identifier") + title: str = Field(..., description="Clear action title") + why: str = Field("", description="Root cause from reviews") + what: str = Field("", description="Specific steps to take") + who: str = Field("", description="Department or role responsible") + impact: str = Field("", description="Expected outcome") + evidence: list[str] = Field(default_factory=list, description="Supporting quotes") + estimated_rating_lift: float | None = Field(None, description="Projected rating improvement") + complexity: str = Field("medium", description="quick/medium/complex") + priority: str = Field("medium", description="critical/high/medium/low") + timeline: str = Field("This month", description="When to implement") + related_subcode: str = Field("", description="Related URT subcode") + + +class TimelineAnnotationResponse(BaseModel): + """A key event annotation for timeline visualization.""" + + date: str = Field(..., description="Event date (YYYY-MM-DD)") + label: str = Field(..., description="Short label") + description: str = Field("", description="What happened") + type: str = Field("neutral", description="positive/negative/neutral/event") + + +class SynthesisResponse(BaseModel): + """AI-generated synthesis with narratives and action plans.""" + + executive_narrative: str = Field("", description="2-3 paragraph business story") + sentiment_insight: str = Field("", description="Why sentiment is distributed this way") + category_insight: str = Field("", description="Pattern in categories") + timeline_insight: str = Field("", description="Trends over time") + priority_domain: str | None = Field(None, description="Domain needing most attention (P/V/J/O/A/E/R)") + priority_issue: str | None = Field(None, description="Subcode to fix first (e.g., V1.03)") + action_plan: list[ActionItemResponse] = Field(default_factory=list, description="Prioritized actions") + timeline_annotations: list[TimelineAnnotationResponse] = Field(default_factory=list, description="Key events") + marketing_angles: list[str] = Field(default_factory=list, description="Ways to promote strengths") + competitor_context: str | None = Field(None, description="Industry comparison") + generated_at: str | None = Field(None, description="When synthesis was generated") + + +class ReviewIQAnalyticsResponse(BaseModel): + """Complete analytics response for ReviewIQ dashboard.""" + + overview: OverviewStats = Field(default_factory=OverviewStats) + sentiment: SentimentData = Field(default_factory=SentimentData) + urt: URTData = Field(default_factory=URTData) + domain_scores: list[DomainScore] = Field(default_factory=list) + overall_experience_index: float | None = Field(None, description="OEI composite score") + insights: Insights = Field(default_factory=Insights) + issues: PaginatedIssues = Field(default_factory=PaginatedIssues) + spans: PaginatedSpans = Field(default_factory=PaginatedSpans) + timeline: list[TimelinePoint] = Field(default_factory=list) + synthesis: SynthesisResponse | None = Field(None, description="AI-generated synthesis") + filters_applied: dict[str, Any] = Field(default_factory=dict) + + +# ==================== Helper Functions ==================== + + +def _parse_time_range(time_range: str) -> datetime: + """Parse time range string to start datetime.""" + now = datetime.now() + + if time_range == "7d": + return now - timedelta(days=7) + elif time_range == "14d": + return now - timedelta(days=14) + elif time_range == "30d": + return now - timedelta(days=30) + elif time_range == "90d": + return now - timedelta(days=90) + elif time_range == "1y": + return now - timedelta(days=365) + elif time_range == "all": + return datetime(2000, 1, 1) # Effectively no time filter + else: + # Default to 30 days + return now - timedelta(days=30) + + +# Domain configuration +DOMAIN_CONFIG = { + "O": {"name": "Offering", "owner": "Operations / Product", "green": 80, "yellow": 60, "weight": 0.20}, + "P": {"name": "People", "owner": "HR / Training", "green": 85, "yellow": 70, "weight": 0.18}, + "J": {"name": "Journey", "owner": "Operations / Process", "green": 75, "yellow": 55, "weight": 0.15}, + "E": {"name": "Environment", "owner": "Facilities / IT", "green": 80, "yellow": 65, "weight": 0.12}, + "A": {"name": "Access", "owner": "Compliance / Design", "green": 85, "yellow": 70, "weight": 0.10}, + "V": {"name": "Value", "owner": "Finance / Pricing", "green": 70, "yellow": 50, "weight": 0.12}, + "R": {"name": "Relationship", "owner": "Leadership / CX", "green": 80, "yellow": 60, "weight": 0.13}, +} + +# Intensity weights for scoring +INTENSITY_WEIGHTS = {"I1": 1.0, "I2": 2.0, "I3": 4.0} + +# Legacy mapping for backward compatibility +DOMAIN_NAMES = {k: v["name"] for k, v in DOMAIN_CONFIG.items()} + + +# ==================== API Endpoint ==================== + + +@router.get("/analytics", response_model=ReviewIQAnalyticsResponse) +async def get_reviewiq_analytics( + job_id: str | None = Query(None, description="Filter by job ID"), + business_id: str | None = Query(None, description="Filter by business ID"), + time_range: str = Query("30d", description="Time range (7d, 14d, 30d, 90d, 1y, all)"), + sentiment: str | None = Query(None, description="Filter by sentiment (comma-separated: positive,negative)"), + urt_domain: str | None = Query(None, description="Filter by URT domain (P, J, O, A)"), + intensity: str | None = Query(None, description="Filter by intensity (I1, I2, I3)"), + issues_page: int = Query(1, ge=1, description="Issues page number"), + issues_page_size: int = Query(10, ge=1, le=100, description="Issues per page"), + spans_page: int = Query(1, ge=1, description="Spans page number"), + spans_page_size: int = Query(10, ge=1, le=100, description="Spans per page"), +) -> ReviewIQAnalyticsResponse: + """ + Get all analytics data for ReviewIQ dashboard in a single call. + + Returns overview stats, sentiment distribution, URT breakdown, issues, and spans. + Supports cross-filtering by sentiment, URT domain, and intensity. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + # Parse filters + start_date = _parse_time_range(time_range) + sentiment_filter = sentiment.split(",") if sentiment else None + + # Build filter conditions + filters_applied = { + "time_range": time_range, + "start_date": start_date.isoformat(), + } + if job_id: + filters_applied["job_id"] = job_id + if business_id: + filters_applied["business_id"] = business_id + if sentiment_filter: + filters_applied["sentiment"] = sentiment_filter + if urt_domain: + filters_applied["urt_domain"] = urt_domain + if intensity: + filters_applied["intensity"] = intensity + + async with _pool.acquire() as conn: + # Query 1: Overview Stats + overview = await _get_overview_stats( + conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity + ) + + # Query 2: Sentiment Distribution + URT Domain Distribution + sentiment_data, urt_data = await _get_distributions( + conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity + ) + + # Query 3: Timeline Data + timeline = await _get_timeline_data( + conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity + ) + + # Query 4: Issues (paginated) - now with enriched URT data + issues = await _get_issues( + conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, + issues_page, issues_page_size + ) + + # Query 5: Spans (paginated) + spans = await _get_spans( + conn, job_id, business_id, start_date, sentiment_filter, urt_domain, intensity, + spans_page, spans_page_size + ) + + # Query 6: Domain KPI Scores + domain_scores, oei = await _get_domain_scores( + conn, job_id, business_id, start_date + ) + + # Query 7: Insights (strengths, weaknesses, recommendations) + insights = await _get_insights( + conn, job_id, business_id, start_date, + overview.avg_rating, overview.total_reviews + ) + + # Query 8: AI Synthesis (if available) + synthesis = await _get_synthesis(conn, job_id) + + return ReviewIQAnalyticsResponse( + overview=overview, + sentiment=sentiment_data, + urt=urt_data, + domain_scores=domain_scores, + overall_experience_index=oei, + insights=insights, + issues=issues, + spans=spans, + timeline=timeline, + synthesis=synthesis, + filters_applied=filters_applied, + ) + + +async def _get_overview_stats( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + sentiment_filter: list[str] | None, + urt_domain: str | None, + intensity: str | None, +) -> OverviewStats: + """Get overview statistics with a single optimized query.""" + + # Build WHERE conditions for spans + conditions = ["rs.review_time >= $1"] + params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + conditions.append(f"rs.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"rs.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + if urt_domain: + conditions.append(f"LEFT(rs.urt_primary, 1) = ${param_idx}") + params.append(urt_domain) + param_idx += 1 + + if intensity: + conditions.append(f"rs.intensity = ${param_idx}") + params.append(intensity) + param_idx += 1 + + # Valence filter + valence_condition = "" + if sentiment_filter: + valence_codes = [] + if "positive" in sentiment_filter: + valence_codes.append("V+") + if "negative" in sentiment_filter: + valence_codes.extend(["V-", "V±"]) + if "neutral" in sentiment_filter: + valence_codes.append("V0") + if valence_codes: + conditions.append(f"rs.valence = ANY(${param_idx}::text[])") + params.append(valence_codes) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + query = f""" + SELECT + COUNT(DISTINCT re.id) as total_reviews, + COUNT(rs.span_id) as total_spans, + AVG(re.rating) as avg_rating, + COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, + COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count, + COUNT(*) FILTER (WHERE rs.valence = 'V0') as neutral_count, + COUNT(*) FILTER (WHERE rs.valence = 'V±') as mixed_count + FROM pipeline.review_spans rs + LEFT JOIN pipeline.reviews_enriched re ON ( + re.source = rs.source + AND re.review_id = rs.review_id + AND re.review_version = rs.review_version + ) + WHERE {where_clause} + """ + + row = await conn.fetchrow(query, *params) + + # Get open issues count separately + issue_conditions = ["i.state = 'open'"] + issue_params: list[Any] = [] + issue_param_idx = 1 + + if job_id: + issue_conditions.append(f"i.job_id = ${issue_param_idx}::uuid") + issue_params.append(job_id) + issue_param_idx += 1 + + if business_id: + issue_conditions.append(f"i.business_id = ${issue_param_idx}") + issue_params.append(business_id) + issue_param_idx += 1 + + issue_where = " AND ".join(issue_conditions) + issue_count = await conn.fetchval( + f"SELECT COUNT(*) FROM pipeline.issues i WHERE {issue_where}", + *issue_params + ) + + return OverviewStats( + total_reviews=row["total_reviews"] or 0, + total_spans=row["total_spans"] or 0, + open_issues=issue_count or 0, + avg_rating=float(row["avg_rating"]) if row["avg_rating"] else None, + positive_count=row["positive_count"] or 0, + negative_count=row["negative_count"] or 0, + neutral_count=row["neutral_count"] or 0, + mixed_count=row["mixed_count"] or 0, + ) + + +async def _get_distributions( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + sentiment_filter: list[str] | None, + urt_domain: str | None, + intensity: str | None, +) -> tuple[SentimentData, URTData]: + """Get sentiment and URT distributions with cross-filtering support.""" + + # Build base WHERE conditions (job, business, time) + base_conditions = ["rs.review_time >= $1"] + base_params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + base_conditions.append(f"rs.job_id = ${param_idx}::uuid") + base_params.append(job_id) + param_idx += 1 + + if business_id: + base_conditions.append(f"rs.business_id = ${param_idx}") + base_params.append(business_id) + param_idx += 1 + + base_where = " AND ".join(base_conditions) + + # Convert sentiment filter to valence codes + valence_codes = [] + if sentiment_filter: + if "positive" in sentiment_filter: + valence_codes.append("V+") + if "negative" in sentiment_filter: + valence_codes.extend(["V-", "V±"]) + if "neutral" in sentiment_filter: + valence_codes.append("V0") + + # ========== Sentiment Distribution (filtered by domain) ========== + sentiment_conditions = list(base_conditions) + sentiment_params = list(base_params) + sentiment_param_idx = param_idx + + # Apply domain filter to sentiment (cross-filter: domain → sentiment) + if urt_domain: + sentiment_conditions.append(f"LEFT(rs.urt_primary, 1) = ${sentiment_param_idx}") + sentiment_params.append(urt_domain) + sentiment_param_idx += 1 + + # Apply intensity filter + if intensity: + sentiment_conditions.append(f"rs.intensity = ${sentiment_param_idx}") + sentiment_params.append(intensity) + sentiment_param_idx += 1 + + sentiment_where = " AND ".join(sentiment_conditions) + + # Updated query with review-based counting to avoid bias from verbose reviews + sentiment_query = f""" + SELECT + valence, + COUNT(*) as span_count, + COUNT(DISTINCT review_id) as review_count + FROM pipeline.review_spans rs + WHERE {sentiment_where} AND valence IS NOT NULL + GROUP BY valence + ORDER BY review_count DESC + """ + + sentiment_rows = await conn.fetch(sentiment_query, *sentiment_params) + # Use review_count for percentages to avoid bias from verbose reviews + total_reviews = sum(r["review_count"] for r in sentiment_rows) + + sentiment_distribution = [ + SentimentDataPoint( + valence=row["valence"], + count=row["span_count"], + review_count=row["review_count"], + percentage=(row["review_count"] / total_reviews * 100) if total_reviews > 0 else 0, + ) + for row in sentiment_rows + ] + + # ========== Sentiment Trend (filtered by domain) ========== + trend_query = f""" + SELECT + TO_CHAR(DATE_TRUNC('week', rs.review_time), 'IYYY-"W"IW') as period, + COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive, + COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative, + COUNT(*) FILTER (WHERE rs.valence = 'V0') as neutral, + COUNT(*) FILTER (WHERE rs.valence = 'V±') as mixed + FROM pipeline.review_spans rs + WHERE {sentiment_where} + GROUP BY DATE_TRUNC('week', rs.review_time) + ORDER BY DATE_TRUNC('week', rs.review_time) + """ + + trend_rows = await conn.fetch(trend_query, *sentiment_params) + sentiment_trend = [ + SentimentTrendPoint( + period=row["period"], + positive=row["positive"] or 0, + negative=row["negative"] or 0, + neutral=row["neutral"] or 0, + mixed=row["mixed"] or 0, + ) + for row in trend_rows + ] + + # ========== URT Domain Distribution (filtered by sentiment) ========== + urt_conditions = list(base_conditions) + urt_params = list(base_params) + urt_param_idx = param_idx + + # Apply sentiment filter to URT domains (cross-filter: sentiment → domain) + if valence_codes: + urt_conditions.append(f"rs.valence = ANY(${urt_param_idx}::text[])") + urt_params.append(valence_codes) + urt_param_idx += 1 + + # Apply intensity filter + if intensity: + urt_conditions.append(f"rs.intensity = ${urt_param_idx}") + urt_params.append(intensity) + urt_param_idx += 1 + + urt_where = " AND ".join(urt_conditions) + + # Updated query with review-based counting to avoid bias from verbose reviews + urt_query = f""" + SELECT + LEFT(urt_primary, 1) as domain, + COUNT(*) as span_count, + COUNT(DISTINCT review_id) as review_count, + COUNT(*) FILTER (WHERE valence = 'V+') as positive_spans, + COUNT(*) FILTER (WHERE valence IN ('V-', 'V±')) as negative_spans, + COUNT(*) FILTER (WHERE valence = 'V0') as neutral_spans, + COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V+') as positive_reviews, + COUNT(DISTINCT review_id) FILTER (WHERE valence IN ('V-', 'V±')) as negative_reviews + FROM pipeline.review_spans rs + WHERE {urt_where} AND urt_primary IS NOT NULL + GROUP BY LEFT(urt_primary, 1) + ORDER BY review_count DESC + """ + + urt_rows = await conn.fetch(urt_query, *urt_params) + # Use review_count for percentages to avoid bias from verbose reviews + total_reviews = sum(r["review_count"] for r in urt_rows) + + domains = [ + URTDomainPoint( + domain=row["domain"], + domain_name=DOMAIN_NAMES.get(row["domain"], row["domain"]), + count=row["span_count"], + review_count=row["review_count"], + percentage=(row["review_count"] / total_reviews * 100) if total_reviews > 0 else 0, + positive_count=row["positive_spans"] or 0, + negative_count=row["negative_spans"] or 0, + neutral_count=row["neutral_spans"] or 0, + positive_reviews=row["positive_reviews"] or 0, + negative_reviews=row["negative_reviews"] or 0, + ) + for row in urt_rows + ] + + # ========== Intensity Heatmap (filtered by both sentiment and domain) ========== + heatmap_conditions = list(base_conditions) + heatmap_params = list(base_params) + heatmap_param_idx = param_idx + + # Apply domain filter + if urt_domain: + heatmap_conditions.append(f"LEFT(rs.urt_primary, 1) = ${heatmap_param_idx}") + heatmap_params.append(urt_domain) + heatmap_param_idx += 1 + + # Apply sentiment filter + if valence_codes: + heatmap_conditions.append(f"rs.valence = ANY(${heatmap_param_idx}::text[])") + heatmap_params.append(valence_codes) + heatmap_param_idx += 1 + + heatmap_where = " AND ".join(heatmap_conditions) + + heatmap_query = f""" + SELECT + LEFT(urt_primary, 1) as domain, + intensity, + COUNT(*) as count + FROM pipeline.review_spans rs + WHERE {heatmap_where} + AND urt_primary IS NOT NULL + AND intensity IS NOT NULL + GROUP BY LEFT(urt_primary, 1), intensity + ORDER BY domain, intensity + """ + + heatmap_rows = await conn.fetch(heatmap_query, *heatmap_params) + intensity_heatmap = [ + IntensityPoint( + domain=row["domain"], + intensity=row["intensity"], + count=row["count"], + ) + for row in heatmap_rows + ] + + return ( + SentimentData(distribution=sentiment_distribution, trend=sentiment_trend), + URTData(domains=domains, intensity_heatmap=intensity_heatmap), + ) + + +async def _get_timeline_data( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + sentiment_filter: list[str] | None, + urt_domain: str | None, + intensity: str | None, +) -> list[TimelinePoint]: + """Get timeline data for the brush chart.""" + + # Build WHERE conditions + conditions = ["rs.review_time >= $1"] + params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + conditions.append(f"rs.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"rs.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + query = f""" + SELECT + TO_CHAR(DATE_TRUNC('week', rs.review_time), 'IYYY-"W"IW') as date, + COUNT(DISTINCT CONCAT(rs.source, ':', rs.review_id)) as review_count, + COUNT(*) as span_count, + AVG(re.rating) as avg_rating, + COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, + COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count + FROM pipeline.review_spans rs + LEFT JOIN pipeline.reviews_enriched re ON ( + re.source = rs.source + AND re.review_id = rs.review_id + AND re.review_version = rs.review_version + ) + WHERE {where_clause} + GROUP BY DATE_TRUNC('week', rs.review_time) + ORDER BY DATE_TRUNC('week', rs.review_time) + """ + + rows = await conn.fetch(query, *params) + + return [ + TimelinePoint( + date=row["date"], + review_count=row["review_count"] or 0, + span_count=row["span_count"] or 0, + avg_rating=float(row["avg_rating"]) if row["avg_rating"] else None, + positive_count=row["positive_count"] or 0, + negative_count=row["negative_count"] or 0, + ) + for row in rows + ] + + +async def _get_issues( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + sentiment_filter: list[str] | None, + urt_domain: str | None, + intensity: str | None, + page: int, + page_size: int, +) -> PaginatedIssues: + """Get paginated issues.""" + + # Build WHERE conditions + conditions = ["1=1"] + params: list[Any] = [] + param_idx = 1 + + if job_id: + conditions.append(f"i.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"i.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + if urt_domain: + conditions.append(f"i.domain = ${param_idx}") + params.append(urt_domain) + param_idx += 1 + + if intensity: + conditions.append(f"i.max_intensity = ${param_idx}") + params.append(intensity) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + # Count query + count_query = f"SELECT COUNT(*) FROM pipeline.issues i WHERE {where_clause}" + total = await conn.fetchval(count_query, *params) + + # Items query with pagination - enriched with URT metadata + offset = (page - 1) * page_size + items_query = f""" + SELECT + i.issue_id, + i.primary_subcode, + s.name as subcode_name, + s.definition as subcode_definition, + s.solution, + s.solution_complexity, + s.negative_example, + i.domain, + d.name as domain_name, + d.default_owner, + c.name as category_name, + i.entity, + i.state, + i.priority_score, + i.span_count, + i.max_intensity, + i.created_at + FROM pipeline.issues i + LEFT JOIN pipeline.urt_subcodes s ON i.primary_subcode = s.code + LEFT JOIN pipeline.urt_domains d ON i.domain = d.code + LEFT JOIN pipeline.urt_categories c ON s.category_code = c.code + WHERE {where_clause} + ORDER BY i.priority_score DESC, i.created_at DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """ + + rows = await conn.fetch(items_query, *params, page_size, offset) + + items = [ + IssueItem( + issue_id=row["issue_id"], + primary_subcode=row["primary_subcode"], + subcode_name=row["subcode_name"], + subcode_definition=row["subcode_definition"], + solution=row["solution"], + solution_complexity=row["solution_complexity"], + domain=row["domain"], + domain_name=row["domain_name"], + category_name=row["category_name"], + default_owner=row["default_owner"], + negative_example=row["negative_example"], + entity=row["entity"], + state=row["state"], + priority_score=float(row["priority_score"]) if row["priority_score"] else 0, + span_count=row["span_count"] or 0, + max_intensity=row["max_intensity"], + created_at=row["created_at"].isoformat() if row["created_at"] else None, + ) + for row in rows + ] + + return PaginatedIssues( + items=items, + total=total or 0, + page=page, + page_size=page_size, + ) + + +async def _get_spans( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + sentiment_filter: list[str] | None, + urt_domain: str | None, + intensity: str | None, + page: int, + page_size: int, +) -> PaginatedSpans: + """Get paginated spans.""" + + # Build WHERE conditions + conditions = ["rs.review_time >= $1"] + params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + conditions.append(f"rs.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"rs.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + if urt_domain: + conditions.append(f"LEFT(rs.urt_primary, 1) = ${param_idx}") + params.append(urt_domain) + param_idx += 1 + + if intensity: + conditions.append(f"rs.intensity = ${param_idx}") + params.append(intensity) + param_idx += 1 + + # Valence filter + if sentiment_filter: + valence_codes = [] + if "positive" in sentiment_filter: + valence_codes.append("V+") + if "negative" in sentiment_filter: + valence_codes.extend(["V-", "V±"]) + if "neutral" in sentiment_filter: + valence_codes.append("V0") + if valence_codes: + conditions.append(f"rs.valence = ANY(${param_idx}::text[])") + params.append(valence_codes) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + # Count query + count_query = f"SELECT COUNT(*) FROM pipeline.review_spans rs WHERE {where_clause}" + total = await conn.fetchval(count_query, *params) + + # Items query with pagination + offset = (page - 1) * page_size + items_query = f""" + SELECT + rs.span_id, + rs.span_text, + rs.urt_primary, + rs.valence, + rs.intensity, + rs.review_time, + rs.review_id as source_review_id, + rs.entity + FROM pipeline.review_spans rs + WHERE {where_clause} + ORDER BY rs.review_time DESC + LIMIT ${param_idx} OFFSET ${param_idx + 1} + """ + + rows = await conn.fetch(items_query, *params, page_size, offset) + + items = [ + SpanItem( + span_id=row["span_id"], + span_text=row["span_text"], + urt_primary=row["urt_primary"], + valence=row["valence"], + intensity=row["intensity"], + review_time=row["review_time"].isoformat() if row["review_time"] else None, + source_review_id=row["source_review_id"], + entity=row["entity"], + ) + for row in rows + ] + + return PaginatedSpans( + items=items, + total=total or 0, + page=page, + page_size=page_size, + ) + + +async def _get_domain_scores( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, +) -> tuple[list[DomainScore], float | None]: + """Calculate domain-level KPI scores using intensity-weighted scoring.""" + + # Build WHERE conditions + conditions = ["rs.review_time >= $1"] + params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + conditions.append(f"rs.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"rs.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + # Query to get sentiment counts by domain with intensity weighting + query = f""" + SELECT + LEFT(rs.urt_primary, 1) as domain, + rs.valence, + rs.intensity, + COUNT(*) as count + FROM pipeline.review_spans rs + WHERE {where_clause} + AND rs.urt_primary IS NOT NULL + AND rs.valence IS NOT NULL + GROUP BY LEFT(rs.urt_primary, 1), rs.valence, rs.intensity + ORDER BY domain + """ + + rows = await conn.fetch(query, *params) + + # Aggregate by domain + domain_data: dict[str, dict[str, float]] = {} + for row in rows: + domain = row["domain"] + if domain not in domain_data: + domain_data[domain] = { + "positive_weight": 0, "negative_weight": 0, "total_weight": 0, + "positive_count": 0, "negative_count": 0, "total_count": 0 + } + + intensity = row["intensity"] or "I1" + weight = INTENSITY_WEIGHTS.get(intensity, 1.0) + count = row["count"] + + domain_data[domain]["total_weight"] += weight * count + domain_data[domain]["total_count"] += count + + if row["valence"] == "V+": + domain_data[domain]["positive_weight"] += weight * count + domain_data[domain]["positive_count"] += count + elif row["valence"] in ("V-", "V±"): + domain_data[domain]["negative_weight"] += weight * count + domain_data[domain]["negative_count"] += count + + # Calculate scores + domain_scores = [] + for domain, cfg in DOMAIN_CONFIG.items(): + data = domain_data.get(domain, { + "positive_weight": 0, "negative_weight": 0, "total_weight": 0, + "positive_count": 0, "negative_count": 0, "total_count": 0 + }) + + total = data["total_weight"] + if total > 0: + # Score = 50 + (positive - negative) / total * 50 + # This gives 0-100 scale where 50 is neutral + score = 50 + ((data["positive_weight"] - data["negative_weight"]) / total) * 50 + score = max(0, min(100, score)) + else: + score = 50 # Neutral if no data + + # Determine status based on thresholds + if score >= cfg["green"]: + status = "good" + elif score >= cfg["yellow"]: + status = "warning" + else: + status = "critical" + + domain_scores.append(DomainScore( + domain=domain, + name=cfg["name"], + score=round(score, 1), + status=status, + trend=None, # TODO: Calculate trend vs previous period + positive_count=int(data["positive_count"]), + negative_count=int(data["negative_count"]), + total_count=int(data["total_count"]), + )) + + # Calculate Overall Experience Index (OEI) + oei = None + if domain_scores: + weighted_sum = sum( + ds.score * DOMAIN_CONFIG[ds.domain]["weight"] + for ds in domain_scores + if ds.domain in DOMAIN_CONFIG + ) + total_weight = sum( + DOMAIN_CONFIG[ds.domain]["weight"] + for ds in domain_scores + if ds.domain in DOMAIN_CONFIG + ) + if total_weight > 0: + oei = round(weighted_sum / total_weight, 1) + + return domain_scores, oei + + +async def _get_insights( + conn: asyncpg.Connection, + job_id: str | None, + business_id: str | None, + start_date: datetime, + avg_rating: float | None, + total_reviews: int, +) -> Insights: + """Generate strengths, weaknesses, and business insights.""" + + # Build WHERE conditions + conditions = ["rs.review_time >= $1"] + params: list[Any] = [start_date] + param_idx = 2 + + if job_id: + conditions.append(f"rs.job_id = ${param_idx}::uuid") + params.append(job_id) + param_idx += 1 + + if business_id: + conditions.append(f"rs.business_id = ${param_idx}") + params.append(business_id) + param_idx += 1 + + where_clause = " AND ".join(conditions) + + # Query: Get subcode-level sentiment distribution with URT metadata + query = f""" + SELECT + rs.urt_primary as subcode, + s.name as subcode_name, + s.solution, + s.solution_complexity, + s.marketing_angle, + s.negative_example, + LEFT(rs.urt_primary, 1) as domain, + d.name as domain_name, + d.default_owner, + COUNT(*) as total_count, + COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, + COUNT(*) FILTER (WHERE rs.valence IN ('V-', 'V±')) as negative_count, + MAX(rs.intensity) as max_intensity + FROM pipeline.review_spans rs + LEFT JOIN pipeline.urt_subcodes s ON rs.urt_primary = s.code + LEFT JOIN pipeline.urt_domains d ON LEFT(rs.urt_primary, 1) = d.code + WHERE {where_clause} + AND rs.urt_primary IS NOT NULL + GROUP BY rs.urt_primary, s.name, s.solution, s.solution_complexity, + s.marketing_angle, s.negative_example, LEFT(rs.urt_primary, 1), + d.name, d.default_owner + HAVING COUNT(*) >= 2 + ORDER BY COUNT(*) DESC + """ + + rows = await conn.fetch(query, *params) + + # Separate into strengths and weaknesses + strengths = [] + weaknesses = [] + # Store tuples of (subcode, count, complexity) for coordinate calculation + quick_wins_raw = [] + critical_raw = [] + nice_to_have_raw = [] + strategic_raw = [] + + # Calculate median for opportunity matrix + counts = [r["total_count"] for r in rows] + median_count = sorted(counts)[len(counts) // 2] if counts else 0 + max_count = max(counts) if counts else 1 + min_count = min(counts) if counts else 0 + + for row in rows: + total = row["total_count"] + positive = row["positive_count"] + negative = row["negative_count"] + pos_pct = (positive / total * 100) if total > 0 else 0 + neg_pct = (negative / total * 100) if total > 0 else 0 + + subcode = row["subcode"] + complexity = row["solution_complexity"] or "medium" + + # Strengths: >= 70% positive + if pos_pct >= 70 and len(strengths) < 5: + strengths.append(StrengthItem( + rank=len(strengths) + 1, + subcode=subcode, + subcode_name=row["subcode_name"] or subcode, + domain=row["domain"], + domain_name=row["domain_name"] or row["domain"], + positive_percentage=round(pos_pct, 1), + span_count=total, + marketing_angle=row["marketing_angle"], + )) + + # Weaknesses: >= 40% negative + if neg_pct >= 40 and len(weaknesses) < 5: + # Calculate projected rating impact + impact = None + if avg_rating and total_reviews > 0: + # Simplified model: impact = (negative_spans / total_reviews) * avg_intensity_loss + intensity_loss = {"I1": 0.5, "I2": 1.0, "I3": 2.0}.get(row["max_intensity"], 0.5) + impact = round((negative / total_reviews) * intensity_loss, 2) + + weaknesses.append(WeaknessItem( + rank=len(weaknesses) + 1, + issue_id=None, # Could link to issue if exists + subcode=subcode, + subcode_name=row["subcode_name"] or subcode, + domain=row["domain"], + domain_name=row["domain_name"] or row["domain"], + negative_percentage=round(neg_pct, 1), + span_count=negative, + intensity=row["max_intensity"], + solution=row["solution"], + solution_complexity=complexity, + projected_rating_impact=impact, + owner=row["default_owner"], + )) + + # Opportunity matrix (for weaknesses only) + if neg_pct >= 40: + is_high_freq = total >= median_count + is_simple = complexity == "simple" + name = row["subcode_name"] or subcode + + # Build detail dict for the opportunity item + item_data = { + "subcode": subcode, + "name": name, + "count": total, + "complexity": complexity, + "domain": row["domain"], + "domain_name": row["domain_name"] or row["domain"], + "negative_pct": round(neg_pct, 1), + "span_count": negative, + "solution": row["solution"], + "rating_impact": impact, + "owner": row["default_owner"], + "example": row["negative_example"], + } + + if is_high_freq and is_simple: + quick_wins_raw.append(item_data) + elif is_high_freq and not is_simple: + critical_raw.append(item_data) + elif not is_high_freq and is_simple: + nice_to_have_raw.append(item_data) + else: + strategic_raw.append(item_data) + + # Helper to compute coordinates for opportunity items and fetch spans + async def compute_opportunity_items( + items: list[dict], is_high_freq: bool + ) -> list[OpportunityItem]: + if not items: + return [] + # Get min/max counts within this quadrant for x-axis normalization + quadrant_counts = [item["count"] for item in items] + q_min = min(quadrant_counts) + q_max = max(quadrant_counts) + q_range = q_max - q_min if q_max > q_min else 1 + + result = [] + for item in items[:5]: + count = item["count"] + complexity = item["complexity"] + # X: frequency within quadrant (0.1 to 0.9 to keep items away from edges) + x = 0.1 + 0.8 * ((count - q_min) / q_range) + # Y: effort based on complexity (simple=0.2, medium=0.5, complex=0.8) + effort_map = {"simple": 0.2, "medium": 0.5, "complex": 0.8} + y = effort_map.get(complexity, 0.5) + # Add small jitter to prevent overlap + import random + x = max(0.05, min(0.95, x + random.uniform(-0.05, 0.05))) + y = max(0.05, min(0.95, y + random.uniform(-0.08, 0.08))) + + # Fetch sample spans for this subcode (negative sentiment only) + # Use original 'text' column since spans were extracted from it + spans_query = """ + SELECT + rs.span_id, + rs.span_text, + re.rating, + rs.review_id, + re.review_time::text as review_date, + re.text as review_text + FROM pipeline.review_spans rs + LEFT JOIN pipeline.reviews_enriched re ON ( + re.source = rs.source + AND re.review_id = rs.review_id + AND re.review_version = rs.review_version + ) + WHERE rs.urt_primary = $1 + AND rs.valence IN ('V-', 'V±') + ORDER BY re.review_time DESC NULLS LAST + LIMIT 15 + """ + span_rows = await conn.fetch(spans_query, item["subcode"]) + spans = [ + OpportunitySpan( + span_id=row["span_id"], + span_text=row["span_text"] or "", + review_text=row["review_text"][:500] if row["review_text"] else None, + rating=row["rating"], + review_id=row["review_id"], + review_date=row["review_date"][:10] if row["review_date"] else None, + ) + for row in span_rows + ] + + result.append(OpportunityItem( + subcode=item["subcode"], + name=item["name"], + x=round(x, 3), + y=round(y, 3), + domain=item["domain"], + domain_name=item["domain_name"], + negative_pct=item["negative_pct"], + span_count=item["span_count"], + solution=item["solution"], + complexity=complexity, + rating_impact=item["rating_impact"], + owner=item["owner"], + example=item["example"], + spans=spans, + )) + return result + + quick_wins = await compute_opportunity_items(quick_wins_raw, is_high_freq=True) + critical = await compute_opportunity_items(critical_raw, is_high_freq=True) + nice_to_have = await compute_opportunity_items(nice_to_have_raw, is_high_freq=False) + strategic = await compute_opportunity_items(strategic_raw, is_high_freq=False) + + # Generate executive summary + summary = "" + if weaknesses: + top_weakness = weaknesses[0] + summary = f"Your biggest opportunity is improving {top_weakness.subcode_name} ({top_weakness.domain_name}) - {top_weakness.negative_percentage:.0f}% of mentions are negative. " + if strengths: + top_strength = strengths[0] + summary += f"{top_strength.subcode_name} is your strongest asset with {top_strength.positive_percentage:.0f}% positive sentiment." + + # Rating simulator + rating_simulator = None + if avg_rating and weaknesses: + impacts = [w.projected_rating_impact or 0 for w in weaknesses] + if_fix_top_1 = round(avg_rating + impacts[0], 2) if len(impacts) >= 1 else None + if_fix_top_3 = round(avg_rating + sum(impacts[:3]), 2) if len(impacts) >= 3 else None + potential = round(sum(impacts[:5]), 2) + + rating_simulator = RatingSimulator( + current_rating=round(avg_rating, 2), + if_fix_top_1=if_fix_top_1, + if_fix_top_3=if_fix_top_3, + potential_gain=potential, + ) + + return Insights( + strengths=strengths, + weaknesses=weaknesses, + rating_simulator=rating_simulator, + opportunity_matrix=OpportunityMatrix( + quick_wins=quick_wins, + critical=critical, + nice_to_have=nice_to_have, + strategic=strategic, + ), + executive_summary=summary, + ) + + +async def _get_synthesis( + conn: asyncpg.Connection, + job_id: str | None, +) -> SynthesisResponse | None: + """Fetch AI-generated synthesis from pipeline execution.""" + if not job_id: + return None + + try: + # Get the latest execution with synthesis for this job + row = await conn.fetchrow(""" + SELECT synthesis + FROM pipeline.executions + WHERE job_id = $1::uuid + AND synthesis IS NOT NULL + ORDER BY created_at DESC + LIMIT 1 + """, job_id) + + if not row or not row["synthesis"]: + return None + + data = row["synthesis"] + # asyncpg may return JSONB as dict or string + if isinstance(data, str): + import json + data = json.loads(data) + + # Convert to response model + action_plan = [ + ActionItemResponse( + id=a.get("id", f"action_{i}"), + title=a.get("title", ""), + why=a.get("why", ""), + what=a.get("what", ""), + who=a.get("who", ""), + impact=a.get("impact", ""), + evidence=a.get("evidence", []), + estimated_rating_lift=a.get("estimated_rating_lift"), + complexity=a.get("complexity", "medium"), + priority=a.get("priority", "medium"), + timeline=a.get("timeline", "This month"), + related_subcode=a.get("related_subcode", ""), + ) + for i, a in enumerate(data.get("action_plan", [])) + ] + + timeline_annotations = [ + TimelineAnnotationResponse( + date=t.get("date", ""), + label=t.get("label", ""), + description=t.get("description", ""), + type=t.get("type", "neutral"), + ) + for t in data.get("timeline_annotations", []) + ] + + return SynthesisResponse( + executive_narrative=data.get("executive_narrative", ""), + sentiment_insight=data.get("sentiment_insight", ""), + category_insight=data.get("category_insight", ""), + timeline_insight=data.get("timeline_insight", ""), + priority_domain=data.get("priority_domain"), + priority_issue=data.get("priority_issue"), + action_plan=action_plan, + timeline_annotations=timeline_annotations, + marketing_angles=data.get("marketing_angles", []), + competitor_context=data.get("competitor_context"), + generated_at=data.get("generated_at"), + ) + + except Exception as e: + log.warning(f"Failed to fetch synthesis for job {job_id}: {e}") + return None + + +# ==================== Drill-down Endpoints ==================== + + +@router.get("/issues/{issue_id}/spans", response_model=list[SpanItem]) +async def get_issue_spans(issue_id: str) -> list[SpanItem]: + """Get all spans related to a specific issue.""" + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + query = """ + SELECT + rs.span_id, + rs.span_text, + rs.urt_primary, + rs.valence, + rs.intensity, + rs.review_time, + rs.review_id as source_review_id, + rs.entity + FROM pipeline.review_spans rs + JOIN pipeline.issue_spans iss ON rs.span_id = iss.span_id + WHERE iss.issue_id = $1 + ORDER BY rs.review_time DESC + """ + rows = await conn.fetch(query, issue_id) + + return [ + SpanItem( + span_id=row["span_id"], + span_text=row["span_text"], + urt_primary=row["urt_primary"], + valence=row["valence"], + intensity=row["intensity"], + review_time=row["review_time"].isoformat() if row["review_time"] else None, + source_review_id=row["source_review_id"], + entity=row["entity"], + ) + for row in rows + ] + + +# ==================== Full Review Drill-Down ==================== + + +class ReviewSpan(BaseModel): + """A span within a review with its classification.""" + + span_id: str + span_text: str + start_offset: int | None = Field(None, description="Character offset in original text") + end_offset: int | None = Field(None, description="Character end offset") + urt_primary: str | None + urt_secondary: list[str] | None = None + valence: str | None + intensity: str | None + entity: str | None + + +class FullReview(BaseModel): + """Complete review with all spans and metadata for drill-down.""" + + review_id: str + source: str + rating: int | None + review_text: str | None + text_normalized: str | None = None # Text used for span offset calculation + review_time: str | None + author_name: str | None = None + author_url: str | None = None + review_url: str | None = None + business_name: str | None = None + # Composite URT (derived from spans) + urt_primary: str | None = None + urt_secondary: list[str] | None = None + # All classified spans + spans: list[ReviewSpan] = Field(default_factory=list) + + +@router.get("/reviews/{review_id}", response_model=FullReview) +async def get_full_review( + review_id: str, + source: str = Query("google", description="Review source (default: google)"), +) -> FullReview: + """ + Get a full review with all its classified spans. + + This enables drill-down from any aggregate metric to the raw source data. + Spans are returned with their classifications, allowing the UI to highlight + them within the original review text. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + async with _pool.acquire() as conn: + # Get the review with latest version + # Join with reviews_raw to get author info + # Note: span offsets are computed against text_normalized, so we return both + review_query = """ + SELECT + re.review_id, + re.source, + re.rating, + re.text as review_text, + re.text_normalized, + re.review_time, + rr.reviewer_name as author_name, + re.business_id, + re.place_id, + re.urt_primary, + re.urt_secondary + FROM pipeline.reviews_enriched re + LEFT JOIN pipeline.reviews_raw rr ON re.raw_id = rr.id + WHERE re.review_id = $1 AND re.source = $2 + ORDER BY re.review_version DESC + LIMIT 1 + """ + review_row = await conn.fetchrow(review_query, review_id, source) + + if not review_row: + # Try without source filter in case source is different + review_row = await conn.fetchrow(""" + SELECT + re.review_id, + re.source, + re.rating, + re.text as review_text, + re.text_normalized, + re.review_time, + rr.reviewer_name as author_name, + re.business_id, + re.place_id, + re.urt_primary, + re.urt_secondary + FROM pipeline.reviews_enriched re + LEFT JOIN pipeline.reviews_raw rr ON re.raw_id = rr.id + WHERE re.review_id = $1 + ORDER BY re.review_version DESC + LIMIT 1 + """, review_id) + + if not review_row: + raise HTTPException(status_code=404, detail=f"Review {review_id} not found") + + # Get all spans for this review (use the actual source from found review) + actual_source = review_row["source"] + spans_query = """ + SELECT + rs.span_id, + rs.span_text, + rs.span_start as start_offset, + rs.span_end as end_offset, + rs.urt_primary, + rs.urt_secondary, + rs.valence, + rs.intensity, + rs.entity + FROM pipeline.review_spans rs + WHERE rs.review_id = $1 AND rs.source = $2 + ORDER BY rs.span_start, rs.span_id + """ + span_rows = await conn.fetch(spans_query, review_id, actual_source) + + spans = [ + ReviewSpan( + span_id=row["span_id"], + span_text=row["span_text"], + start_offset=row.get("start_offset"), + end_offset=row.get("end_offset"), + urt_primary=row["urt_primary"], + urt_secondary=row.get("urt_secondary"), + valence=row["valence"], + intensity=row["intensity"], + entity=row.get("entity"), + ) + for row in span_rows + ] + + # Construct Google Maps review URL if we have place_id + place_id = review_row.get("place_id") + review_url = None + if place_id and review_row["source"] == "google": + review_url = f"https://www.google.com/maps/place/?q=place_id:{place_id}" + + return FullReview( + review_id=review_row["review_id"], + source=review_row["source"], + rating=review_row["rating"], + review_text=review_row["review_text"], + text_normalized=review_row.get("text_normalized"), + review_time=review_row["review_time"].isoformat() if review_row["review_time"] else None, + author_name=review_row.get("author_name"), + author_url=None, # Not stored in DB + review_url=review_url, + business_name=review_row.get("business_id"), # Use business_id as fallback + urt_primary=review_row.get("urt_primary"), + urt_secondary=review_row.get("urt_secondary"), + spans=spans, + ) + + +@router.get("/reviews", response_model=PaginatedSpans) +async def get_reviews_by_filter( + job_id: str | None = Query(None, description="Filter by job ID"), + urt_domain: str | None = Query(None, description="Filter by URT domain"), + sentiment: str | None = Query(None, description="Filter by sentiment"), + intensity: str | None = Query(None, description="Filter by intensity"), + page: int = Query(1, ge=1), + page_size: int = Query(20, ge=1, le=100), +) -> PaginatedSpans: + """ + Get reviews matching specific filters. + + Used for drilling down from chart segments to see contributing reviews. + """ + if not _pool: + raise HTTPException(status_code=503, detail="Database not initialized") + + # Reuse _get_spans with the filters + async with _pool.acquire() as conn: + sentiment_filter = sentiment.split(",") if sentiment else None + start_date = datetime(2000, 1, 1) # No time filter for drill-down + + return await _get_spans( + conn, job_id, None, start_date, sentiment_filter, + urt_domain, intensity, page, page_size + ) diff --git a/migrations/versions/013_add_synthesis_to_executions.sql b/migrations/versions/013_add_synthesis_to_executions.sql new file mode 100644 index 0000000..2f3fe44 --- /dev/null +++ b/migrations/versions/013_add_synthesis_to_executions.sql @@ -0,0 +1,12 @@ +-- Migration: Add synthesis column to pipeline.executions +-- This stores the AI-generated synthesis from Stage 4 + +ALTER TABLE pipeline.executions +ADD COLUMN IF NOT EXISTS synthesis JSONB DEFAULT NULL; + +-- Add index for querying executions with synthesis +CREATE INDEX IF NOT EXISTS idx_executions_has_synthesis +ON pipeline.executions ((synthesis IS NOT NULL)); + +-- Comment +COMMENT ON COLUMN pipeline.executions.synthesis IS 'AI-generated synthesis containing narratives, action plan, and insights from Stage 4'; diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py index d1996dd..bda3ae5 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py @@ -7,9 +7,11 @@ the BasePipeline interface for the extensible pipeline system. from __future__ import annotations +import json import logging +import re import time -from datetime import date +from datetime import date, datetime, timedelta from typing import TYPE_CHECKING, Any from pipeline_core import ( @@ -51,6 +53,8 @@ from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer from reviewiq_pipeline.stages.stage2_classify import Stage2Classifier from reviewiq_pipeline.stages.stage3_route import Stage3Router from reviewiq_pipeline.stages.stage4_aggregate import Stage4Aggregator +from reviewiq_pipeline.stages.stage5_synthesize import Stage5Synthesizer +from reviewiq_pipeline.services.llm_client import LLMClient from reviewiq_pipeline.validation.validators import ( validate_stage1_output, validate_stage2_output, @@ -64,9 +68,65 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) # Stage name to number mapping -STAGE_NAMES = ["normalize", "classify", "route", "aggregate"] -STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4} -STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate"} +STAGE_NAMES = ["normalize", "classify", "route", "aggregate", "synthesize"] +STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4, "synthesize": 5} +STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate", 5: "synthesize"} + + +def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> datetime | None: + """Parse relative date strings like '10 months ago' into datetime objects. + + Args: + date_str: A relative date string (e.g., "10 months ago", "2 weeks ago") + or an ISO date string, or None. + default_to_now: If True, returns current datetime when parsing fails. + + Returns: + A datetime object, or None if parsing fails and default_to_now is False. + """ + now = datetime.now() + + if not date_str: + return now if default_to_now else None + + # Try to parse as ISO date first + try: + return datetime.fromisoformat(date_str.replace('Z', '+00:00')) + except (ValueError, AttributeError): + pass + + # Parse relative dates like "10 months ago", "2 weeks ago", "a day ago" + date_str = date_str.lower().strip() + + # Handle "a/an" as 1 + date_str = re.sub(r'\b(a|an)\s+', '1 ', date_str) + + # Extract number and unit + match = re.match(r'(\d+)\s*(second|minute|hour|day|week|month|year)s?\s*ago', date_str) + if match: + amount = int(match.group(1)) + unit = match.group(2) + + if unit == 'second': + return now - timedelta(seconds=amount) + elif unit == 'minute': + return now - timedelta(minutes=amount) + elif unit == 'hour': + return now - timedelta(hours=amount) + elif unit == 'day': + return now - timedelta(days=amount) + elif unit == 'week': + return now - timedelta(weeks=amount) + elif unit == 'month': + # Approximate months as 30 days + return now - timedelta(days=amount * 30) + elif unit == 'year': + # Approximate years as 365 days + return now - timedelta(days=amount * 365) + + # If we can't parse it, return now or None + logger.warning(f"Could not parse relative date: {date_str}") + return now if default_to_now else None class PipelineResult: @@ -228,8 +288,11 @@ class ReviewIQPipeline(BasePipeline): stages_run: list[str] = [] stage_results: dict[str, StageResult] = {} - # Convert input to ScraperOutput if needed - scraper_output = self._ensure_scraper_output(input_data) + # Convert input to ScraperOutput if needed (may fetch from DB) + scraper_output = await self._ensure_scraper_output(input_data) + + # Extract job_id for linking issues to pipeline executions + job_id = scraper_output.get("job_id") # Track intermediate results for stage dependencies stage1_result: Stage1Output | None = None @@ -270,6 +333,20 @@ class ReviewIQPipeline(BasePipeline): ) # Stage 2: Classify + # If classify is requested but we don't have stage1_result, try to fetch from DB + if "classify" in stages and not stage1_result and job_id: + logger.info("No stage1_result, fetching existing normalized reviews from database") + stage1_result = await self._fetch_normalized_reviews_from_db(job_id) + if stage1_result: + logger.info(f"Loaded {len(stage1_result.get('reviews_normalized', []))} reviews from DB for reclassification") + # Clean up old spans and issues before reclassification + if self._span_repo: + deactivated = await self._span_repo.deactivate_spans_for_job(job_id) + logger.info(f"Deactivated {deactivated} existing spans for job {job_id}") + if self._issue_repo: + deleted = await self._issue_repo.delete_issues_for_job(job_id) + logger.info(f"Deleted {deleted} existing issues for job {job_id}") + if "classify" in stages and stage1_result: start = time.time() logger.info("Running Stage 2: Classification") @@ -308,7 +385,7 @@ class ReviewIQPipeline(BasePipeline): logger.info("Running Stage 3: Issue Routing") try: - stage3_result = await self._run_route(stage2_result) + stage3_result = await self._run_route(stage2_result, job_id=job_id) duration_ms = int((time.time() - start) * 1000) stages_run.append("route") stage_results["route"] = StageResult( @@ -371,6 +448,43 @@ class ReviewIQPipeline(BasePipeline): error=f"aggregate failed: {e}", ) + # Stage 5: Synthesize (AI-generated narratives) + # Requires job_id and execution_id from pipeline execution tracking + if "synthesize" in stages and job_id: + start = time.time() + logger.info("Running Stage 5: Synthesis") + + try: + # Get the execution_id for this pipeline run + execution_id = input_data.get("execution_id") + if execution_id: + stage5_result = await self._run_synthesize(job_id, execution_id) + duration_ms = int((time.time() - start) * 1000) + stages_run.append("synthesize") + stage_results["synthesize"] = StageResult( + stage="synthesize", + success=True, + data={ + "actions_generated": len(stage5_result.action_plan) if stage5_result else 0, + "has_narrative": bool(stage5_result and stage5_result.executive_narrative), + }, + error=None, + duration_ms=duration_ms, + ) + else: + logger.warning("No execution_id provided, skipping synthesis") + except Exception as e: + logger.exception("Stage 5 failed") + stage_results["synthesize"] = StageResult( + stage="synthesize", + success=False, + data={}, + error=str(e), + duration_ms=int((time.time() - start) * 1000), + ) + # Synthesis failure is non-fatal - pipeline still succeeds + logger.warning(f"Synthesis failed but continuing: {e}") + return BasePipelineResult( pipeline_id="reviewiq", stages_run=stages_run, @@ -558,6 +672,34 @@ class ReviewIQPipeline(BasePipeline): ], collapsed=False, ), + DashboardSection( + id="classified_reviews", + title="Classified Reviews", + description="All reviews with URT classification codes and human-readable meanings", + widgets=[ + WidgetConfig( + id="classified_reviews_table", + type="table", + title="Reviews with URT Codes", + grid={"x": 0, "y": 0, "w": 12, "h": 3}, + config={ + "columns": [ + {"key": "span_text", "header": "Review Excerpt", "width": 300}, + {"key": "urt_code", "header": "Code", "width": 80}, + {"key": "code_name", "header": "Category", "width": 150}, + {"key": "domain_name", "header": "Domain", "width": 100}, + {"key": "valence", "header": "Sentiment", "width": 80}, + {"key": "intensity", "header": "Intensity", "width": 80}, + {"key": "rating", "header": "Stars", "width": 60, "align": "center"}, + ], + "row_key": "span_id", + "page_size": 15, + "sortable": True, + }, + ), + ], + collapsed=False, + ), ], default_time_range="30d", refresh_interval=300, @@ -573,7 +715,7 @@ class ReviewIQPipeline(BasePipeline): Args: widget_id: Widget identifier - params: Query parameters (business_id, time_range, etc.) + params: Query parameters (business_id, job_id, time_range, etc.) Returns: Widget data dictionary @@ -581,36 +723,41 @@ class ReviewIQPipeline(BasePipeline): await self.initialize() business_id = params.get("business_id") + job_id = params.get("job_id") time_range = params.get("time_range", "30d") match widget_id: # Overview stats case "total_reviews": - return await self._get_review_count(business_id) + return await self._get_review_count(business_id, job_id) case "reviews_processed": - return await self._get_processed_count(business_id, time_range) + return await self._get_processed_count(business_id, job_id, time_range) case "issues_found": - return await self._get_issues_count(business_id) + return await self._get_issues_count(business_id, job_id) case "avg_rating": - return await self._get_avg_rating(business_id, time_range) + return await self._get_avg_rating(business_id, job_id, time_range) # Sentiment case "sentiment_distribution": - return await self._get_sentiment_distribution(business_id) + return await self._get_sentiment_distribution(business_id, job_id) case "sentiment_trend": - return await self._get_sentiment_trend(business_id, time_range) + return await self._get_sentiment_trend(business_id, job_id, time_range) # Classification case "urt_distribution": - return await self._get_urt_distribution(business_id) + return await self._get_urt_distribution(business_id, job_id) case "intensity_heatmap": - return await self._get_intensity_heatmap(business_id) + return await self._get_intensity_heatmap(business_id, job_id) # Issues case "issues_table": - return await self._get_issues_table(business_id, params) + return await self._get_issues_table(business_id, job_id, params) case "issues_by_domain": - return await self._get_issues_by_domain(business_id) + return await self._get_issues_by_domain(business_id, job_id) + + # Classified Reviews + case "classified_reviews_table": + return await self._get_classified_reviews(business_id, job_id, params) case _: logger.warning(f"Unknown widget: {widget_id}") @@ -643,6 +790,9 @@ class ReviewIQPipeline(BasePipeline): result = PipelineResult() validation_results: dict[str, ValidationResult] = {} + # Extract job_id for linking issues + job_id = scraper_output.get("job_id") + # Stage 1: Normalize if 1 in stages: logger.info("Running Stage 1: Normalization") @@ -668,7 +818,7 @@ class ReviewIQPipeline(BasePipeline): # Stage 3: Route if 3 in stages and result.stage2: logger.info("Running Stage 3: Issue Routing") - result.stage3 = await self._run_route(result.stage2) + result.stage3 = await self._run_route(result.stage2, job_id=job_id) if validate: validation_results["stage3"] = await validate_stage3_output( @@ -700,10 +850,10 @@ class ReviewIQPipeline(BasePipeline): await self.initialize() return await self._run_classify(stage1_output) - async def route(self, stage2_output: Stage2Output) -> Stage3Output: + async def route(self, stage2_output: Stage2Output, job_id: str | None = None) -> Stage3Output: """Run Stage 3: Issue Routing (legacy method).""" await self.initialize() - return await self._run_route(stage2_output) + return await self._run_route(stage2_output, job_id=job_id) async def aggregate( self, @@ -719,14 +869,91 @@ class ReviewIQPipeline(BasePipeline): # Internal Stage Implementations # ========================================================================= - def _ensure_scraper_output(self, input_data: dict[str, Any]) -> ScraperOutput: - """Ensure input data is in ScraperOutput format.""" + async def _ensure_scraper_output(self, input_data: dict[str, Any]) -> ScraperOutput: + """Ensure input data is in ScraperOutput format. + + If only job_id is provided, fetches job data from the database. + """ # If it has all required fields, use as-is required = ["job_id", "business_id", "place_id", "reviews"] if all(k in input_data for k in required): return input_data # type: ignore - # Otherwise, wrap it + # If we have a job_id but missing reviews, fetch from database + job_id = input_data.get("job_id") + if job_id and not input_data.get("reviews") and self._db: + logger.info(f"Fetching job data from database for job_id: {job_id}") + async with self._db.pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT job_id, status, reviews_data, reviews_count, + metadata->>'business_name' as business_name, + metadata->>'place_id' as place_id, + metadata->>'address' as address, + metadata->>'category' as category, + metadata->>'total_reviews' as total_reviews, + metadata->>'average_rating' as average_rating, + scraper_version + FROM public.jobs + WHERE job_id = $1::uuid + """, + str(job_id), + ) + + if row and row["reviews_data"]: + reviews_data = row["reviews_data"] + # asyncpg may return JSONB as a string - parse it if needed + if isinstance(reviews_data, str): + logger.info("Parsing reviews_data JSON string") + reviews_data = json.loads(reviews_data) + # Convert reviews_data to RawReview format + # Handle both API format (review_id, author, rating) and scraper format (reviewId, name, stars) + reviews = [] + for i, review in enumerate(reviews_data): + if isinstance(review, str): + # Skip if review is somehow a string + logger.warning(f"Skipping review {i}: got string instead of dict") + continue + # Parse the review time (may be relative like "10 months ago") + raw_time = review.get("timestamp") or review.get("publishedAtDate") or "" + parsed_time = _parse_relative_date(raw_time) + + reviews.append({ + "review_id": review.get("review_id") or review.get("reviewId") or f"review_{i}", + "author_name": review.get("author") or review.get("name") or "Anonymous", + "author_id": review.get("reviewerId"), + "rating": review.get("rating") or review.get("stars") or 0, + "text": review.get("text"), + "review_time": parsed_time, + "response_text": review.get("responseFromOwner", {}).get("text") if review.get("responseFromOwner") else None, + "response_time": review.get("responseFromOwner", {}).get("publishedAtDate") if review.get("responseFromOwner") else None, + "photos": review.get("reviewImageUrls"), + "raw_payload": review, + }) + + logger.info(f"Loaded {len(reviews)} reviews from job {job_id}") + + return ScraperOutput( + job_id=str(row["job_id"]), + status=row["status"] or "completed", + business_id=row["business_name"] or "unknown", + place_id=row["place_id"] or "unknown", + business_info={ + "name": row["business_name"] or "", + "address": row["address"] or "", + "category": row["category"] or "", + "total_reviews": int(row["total_reviews"]) if row["total_reviews"] else 0, + "average_rating": float(row["average_rating"]) if row["average_rating"] else 0.0, + }, + reviews=reviews, + scrape_time_ms=0, + reviews_scraped=len(reviews), + scraper_version=row["scraper_version"] or "unknown", + ) + else: + logger.warning(f"No reviews found in database for job_id: {job_id}") + + # Otherwise, wrap it with empty/default values return ScraperOutput( job_id=input_data.get("job_id", "unknown"), status=input_data.get("status", "completed"), @@ -739,6 +966,70 @@ class ReviewIQPipeline(BasePipeline): scraper_version=input_data.get("scraper_version", "unknown"), ) + async def _fetch_normalized_reviews_from_db(self, job_id: str) -> Stage1Output | None: + """Fetch existing normalized reviews from DB for reclassification. + + Used when running classify stage standalone without normalize. + """ + if not self._db: + return None + + async with self._db.pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT + source, + review_id, + review_version, + business_id, + place_id, + text, + text_normalized, + rating, + review_time + FROM pipeline.reviews_enriched + WHERE job_id = $1::uuid + AND is_latest = TRUE + ORDER BY review_time DESC + """, + job_id, + ) + + if not rows: + logger.warning(f"No normalized reviews found in DB for job_id: {job_id}") + return None + + reviews_normalized = [ + NormalizedReview( + source=row["source"], + review_id=row["review_id"], + review_version=row["review_version"], + business_id=row["business_id"], + place_id=row["place_id"], + text=row["text"], + text_normalized=row["text_normalized"], + rating=row["rating"], + review_time=row["review_time"], + ) + for row in rows + ] + + logger.info(f"Fetched {len(reviews_normalized)} normalized reviews from DB for job {job_id}") + + return Stage1Output( + job_id=job_id, + reviews_normalized=reviews_normalized, + reviews_skipped=[], + duplicates_found=[], + stats={ + "total_input": len(reviews_normalized), + "processed": len(reviews_normalized), + "skipped": 0, + "duplicates": 0, + "from_db": True, + }, + ) + async def _run_normalize(self, scraper_output: ScraperOutput) -> Stage1Output: """Run normalization stage.""" stage1 = Stage1Normalizer( @@ -788,6 +1079,7 @@ class ReviewIQPipeline(BasePipeline): taxonomy_version=self._config.taxonomy_version, profile=self._config.classification_profile, max_spans_per_review=self._config.max_spans_per_review, + job_id=stage1_output.get("job_id"), ), ) @@ -796,7 +1088,7 @@ class ReviewIQPipeline(BasePipeline): finally: await stage2.close() - async def _run_route(self, stage2_output: Stage2Output) -> Stage3Output: + async def _run_route(self, stage2_output: Stage2Output, job_id: str | None = None) -> Stage3Output: """Run issue routing stage.""" stage3 = Stage3Router( self._config, @@ -806,9 +1098,12 @@ class ReviewIQPipeline(BasePipeline): ) spans_to_route = [] + now = datetime.now() for review in stage2_output["reviews_classified"]: for span in review.get("spans", []): if span["valence"] in ("V-", "V±"): + # Use current datetime as fallback for missing review_time + review_time = review.get("review_time") or now spans_to_route.append( SpanToRoute( span_id=span["span_id"], @@ -818,13 +1113,13 @@ class ReviewIQPipeline(BasePipeline): valence=span["valence"], intensity=span["intensity"], entity_normalized=span.get("entity_normalized"), - review_time=review.get("review_time", ""), + review_time=review_time, confidence=span.get("confidence", "medium"), trust_score=review.get("trust_score", 0.5), ) ) - return await stage3.process(Stage3Input(spans=spans_to_route)) + return await stage3.process(Stage3Input(spans=spans_to_route, job_id=job_id)) async def _run_aggregate( self, @@ -848,17 +1143,39 @@ class ReviewIQPipeline(BasePipeline): return await stage4.process(input_data) + async def _run_synthesize(self, job_id: str, execution_id: str): + """Run AI synthesis stage to generate narratives and action plans.""" + from reviewiq_pipeline.stages.stage5_synthesize import Synthesis + + # Create LLM client for synthesis + llm_client = LLMClient.create(self._config) + + try: + stage5 = Stage5Synthesizer( + pool=self._db.pool, + llm_client=llm_client, + ) + + return await stage5.run(job_id, execution_id) + finally: + await llm_client.close() + # ========================================================================= # Widget Data Methods # ========================================================================= - async def _get_review_count(self, business_id: str | None) -> dict[str, Any]: + async def _get_review_count(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get total review count.""" if not self._db: return {"total_reviews": 0} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", + job_id, + ) + elif business_id: count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE business_id = $1", business_id, @@ -871,7 +1188,7 @@ class ReviewIQPipeline(BasePipeline): return {"total_reviews": count or 0} async def _get_processed_count( - self, business_id: str | None, time_range: str + self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get processed review count with trend.""" if not self._db: @@ -881,7 +1198,14 @@ class ReviewIQPipeline(BasePipeline): days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + # When filtering by job_id, just return count for that job + current = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", + job_id, + ) + return {"reviews_processed": current or 0, "processed_change": 0} + elif business_id: current = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.reviews_enriched @@ -929,13 +1253,21 @@ class ReviewIQPipeline(BasePipeline): "processed_change": round(change, 1), } - async def _get_issues_count(self, business_id: str | None) -> dict[str, Any]: + async def _get_issues_count(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get open issues count.""" if not self._db: return {"issues_count": 0} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + count = await conn.fetchval( + """ + SELECT COUNT(*) FROM pipeline.issues + WHERE job_id = $1::uuid AND state = 'open' + """, + job_id, + ) + elif business_id: count = await conn.fetchval( """ SELECT COUNT(*) FROM pipeline.issues @@ -951,7 +1283,7 @@ class ReviewIQPipeline(BasePipeline): return {"issues_count": count or 0} async def _get_avg_rating( - self, business_id: str | None, time_range: str + self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get average rating with trend.""" if not self._db: @@ -960,7 +1292,13 @@ class ReviewIQPipeline(BasePipeline): days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + current = await conn.fetchval( + "SELECT AVG(rating) FROM pipeline.reviews_enriched WHERE job_id = $1::uuid", + job_id, + ) + return {"avg_rating": round(float(current), 2) if current else 0, "rating_change": 0} + elif business_id: current = await conn.fetchval( """ SELECT AVG(rating) FROM pipeline.reviews_enriched @@ -1009,14 +1347,26 @@ class ReviewIQPipeline(BasePipeline): } async def _get_sentiment_distribution( - self, business_id: str | None + self, business_id: str | None, job_id: str | None = None ) -> dict[str, Any]: """Get sentiment distribution for pie chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT + valence, + COUNT(*) as count + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND is_active = TRUE + GROUP BY valence + """, + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT @@ -1059,7 +1409,7 @@ class ReviewIQPipeline(BasePipeline): return {"data": data} async def _get_sentiment_trend( - self, business_id: str | None, time_range: str + self, business_id: str | None, job_id: str | None, time_range: str ) -> dict[str, Any]: """Get sentiment trend over time for line chart.""" if not self._db: @@ -1068,7 +1418,23 @@ class ReviewIQPipeline(BasePipeline): days = self._parse_time_range(time_range) async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT + DATE(review_time) as date, + COUNT(*) FILTER (WHERE valence = 'V+') as positive, + COUNT(*) FILTER (WHERE valence = 'V-') as negative, + COUNT(*) FILTER (WHERE valence = 'V0') as neutral + FROM pipeline.review_spans + WHERE job_id = $1::uuid + AND is_active = TRUE + GROUP BY DATE(review_time) + ORDER BY date + """, + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT @@ -1115,13 +1481,26 @@ class ReviewIQPipeline(BasePipeline): return {"data": data} - async def _get_urt_distribution(self, business_id: str | None) -> dict[str, Any]: + async def _get_urt_distribution(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get URT domain distribution for bar chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + COUNT(*) as count + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1) + ORDER BY count DESC + """, + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT @@ -1168,13 +1547,26 @@ class ReviewIQPipeline(BasePipeline): return {"data": data} - async def _get_intensity_heatmap(self, business_id: str | None) -> dict[str, Any]: + async def _get_intensity_heatmap(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get domain x intensity heatmap data.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + intensity, + COUNT(*) as count + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1), intensity + """, + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT @@ -1222,7 +1614,7 @@ class ReviewIQPipeline(BasePipeline): return {"data": data} async def _get_issues_table( - self, business_id: str | None, params: dict[str, Any] + self, business_id: str | None, job_id: str | None, params: dict[str, Any] ) -> dict[str, Any]: """Get issues table data.""" if not self._db: @@ -1233,7 +1625,30 @@ class ReviewIQPipeline(BasePipeline): offset = (page - 1) * page_size async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT + issue_id, + domain, + primary_subcode as subcode, + span_count, + max_intensity, + state + FROM pipeline.issues + WHERE job_id = $1::uuid + ORDER BY span_count DESC, created_at DESC + LIMIT $2 OFFSET $3 + """, + job_id, + page_size, + offset, + ) + total = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE job_id = $1::uuid", + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT @@ -1279,13 +1694,24 @@ class ReviewIQPipeline(BasePipeline): return {"data": data, "total": total or 0} - async def _get_issues_by_domain(self, business_id: str | None) -> dict[str, Any]: + async def _get_issues_by_domain(self, business_id: str | None, job_id: str | None = None) -> dict[str, Any]: """Get issues grouped by domain for pie chart.""" if not self._db: return {"data": []} async with self._db._pool.acquire() as conn: - if business_id: + if job_id: + rows = await conn.fetch( + """ + SELECT domain, COUNT(*) as count + FROM pipeline.issues + WHERE job_id = $1::uuid + GROUP BY domain + ORDER BY count DESC + """, + job_id, + ) + elif business_id: rows = await conn.fetch( """ SELECT domain, COUNT(*) as count @@ -1310,6 +1736,89 @@ class ReviewIQPipeline(BasePipeline): return {"data": data} + async def _get_classified_reviews( + self, business_id: str | None, job_id: str | None, params: dict[str, Any] + ) -> dict[str, Any]: + """Get classified reviews with URT codes and human-readable names.""" + if not self._db: + return {"data": [], "total": 0} + + page = params.get("page", 1) + page_size = params.get("page_size", 15) + offset = (page - 1) * page_size + + async with self._db._pool.acquire() as conn: + # Build the query with JOINs to get human-readable code names + base_query = """ + SELECT + s.span_id, + s.span_text, + s.urt_primary as urt_code, + COALESCE(sub.name, cat.name, dom.name) as code_name, + COALESCE(sub.definition, dom.description) as code_definition, + dom.name as domain_name, + CASE s.valence + WHEN 'V+' THEN 'Positive' + WHEN 'V-' THEN 'Negative' + WHEN 'V0' THEN 'Neutral' + WHEN 'V±' THEN 'Mixed' + ELSE s.valence + END as valence, + CASE s.intensity + WHEN 'I1' THEN 'Mild' + WHEN 'I2' THEN 'Moderate' + WHEN 'I3' THEN 'Strong' + ELSE s.intensity + END as intensity, + e.rating, + s.review_time + FROM pipeline.review_spans s + LEFT JOIN pipeline.reviews_enriched e ON s.review_id = e.review_id AND s.review_version = e.review_version + LEFT JOIN pipeline.urt_domains dom ON SUBSTRING(s.urt_primary, 1, 1) = dom.code + LEFT JOIN pipeline.urt_categories cat ON SUBSTRING(s.urt_primary, 1, 2) = cat.code + LEFT JOIN pipeline.urt_subcodes sub ON s.urt_primary = sub.code + WHERE s.is_active = TRUE + """ + count_query = """ + SELECT COUNT(*) FROM pipeline.review_spans s + WHERE s.is_active = TRUE + """ + + if job_id: + base_query += " AND s.job_id = $1::uuid" + count_query += " AND s.job_id = $1::uuid" + base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" + rows = await conn.fetch(base_query, job_id) + total = await conn.fetchval(count_query, job_id) + elif business_id: + base_query += " AND s.business_id = $1" + count_query += " AND s.business_id = $1" + base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" + rows = await conn.fetch(base_query, business_id) + total = await conn.fetchval(count_query, business_id) + else: + base_query += f" ORDER BY s.review_time DESC LIMIT {page_size} OFFSET {offset}" + rows = await conn.fetch(base_query) + total = await conn.fetchval(count_query) + + data = [ + { + "span_id": row["span_id"], + "span_text": row["span_text"], + "urt_code": row["urt_code"], + "code_name": row["code_name"] or "Unknown", + "code_definition": row["code_definition"] or "", + "domain_name": row["domain_name"] or "Unknown", + "valence": row["valence"], + "intensity": row["intensity"], + "rating": row["rating"], + "review_time": row["review_time"].isoformat() if row["review_time"] else None, + } + for row in rows + ] + + return {"data": data, "total": total or 0} + def _parse_time_range(self, time_range: str) -> int: """Parse time range string to days.""" if time_range.endswith("d"): diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/services/llm_client.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/services/llm_client.py index 738b409..5c08549 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/services/llm_client.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/services/llm_client.py @@ -29,28 +29,205 @@ Your task is to extract semantic spans from customer reviews and classify each s ## SPAN EXTRACTION RULES -1. **Split on contrasting conjunctions**: but, however, although, despite, yet, though -2. **Split on topic/target change**: food → service → bathroom = 3 spans -3. **Split on valence change**: positive → negative = split -4. **Split on domain change**: O (Offering) → J (Journey) → E (Environment) = split -5. **Keep together**: cause→effect within same feedback unit ("X because Y" = 1 span) +**CRITICAL: Use TOPIC-BASED splitting, NOT sentence-based splitting.** + +A span = all consecutive text about the SAME topic/domain, regardless of sentence count. + +### When to KEEP TOGETHER (same span): +- Multiple sentences about the same topic: "The food was great. I loved the pasta. The sauce was perfect." → ONE span (all about Offering) +- Cause and effect: "The wait was long because they were understaffed" → ONE span +- Elaboration: "Staff was rude. They ignored us for 20 minutes." → ONE span (both about People) +- Single-topic reviews: Even if 5 sentences, if all about food → ONE span + +### When to SPLIT (separate spans): +- Contrasting conjunctions that change topic: "Food was great BUT service was slow" → TWO spans +- Domain change: food (O) → staff (P) → ambiance (E) = split at each change +- Target change: "The waiter was nice but the manager was rude" → TWO spans (different people) + +### Examples: +- "Amazing food. Best burger ever. Fries were crispy too." → 1 span (all Offering, V+) +- "Food was great but we waited an hour." → 2 spans (Offering V+, Journey V-) +- "I've been coming here for years. Always consistent quality." → 1 span (Relationship) +- "The staff are lovely and amazing with kids. More highchairs are definitely needed though." → 2 spans (People V+, Access V-) **Guardrails**: -- Max 3 spans per sentence (if 4+, re-check for over-splitting) -- Min 1 span per review (even single-word reviews) -- Spans must be non-overlapping and cover meaningful content +- Prefer FEWER, LARGER spans over many small ones +- Most reviews should have 1-3 spans, rarely more +- Min 1 span per review +- Spans must be non-overlapping -## URT DOMAINS (Tier-3 codes: X#.##) +## URT TAXONOMY - COMPLETE (138 codes, use EXACT codes) -| Domain | Code | Description | -|--------|------|-------------| -| Offering | O1-O4 | Product/service quality, features, variety | -| Price | P1-P4 | Value, pricing, promotions, payment | -| Journey | J1-J4 | Timing, process, convenience, accessibility | -| Environment | E1-E4 | Physical space, ambiance, cleanliness, digital UX | -| Attitude | A1-A4 | Staff behavior, helpfulness, professionalism | -| Voice | V1-V4 | Brand, communication, marketing, transparency | -| Relationship | R1-R4 | Loyalty, trust, consistency, personalization | +### O - OFFERING (Product/Service Quality) - 18 codes +O1.01 Works/Doesn't Work: Basic functionality success or failure +O1.02 Performance Level: How well it operates +O1.03 Durability: Longevity and resistance to wear +O1.04 Reliability: Consistency of function over time +O1.05 Outcome Achievement: Did customer accomplish their goal? +O2.01 Materials/Inputs: Quality of components or ingredients +O2.02 Craftsmanship: Skill of construction or execution +O2.03 Presentation: Visual and aesthetic quality +O2.04 Attention to Detail: Finishing touches and refinement +O2.05 Condition at Delivery: State when received +O3.01 All Components Present: Nothing missing from what was promised +O3.02 Feature Availability: Promised features actually work +O3.03 Scope Delivery: Full scope of work completed +O3.04 Documentation: Supporting materials provided +O4.01 Specification Match: Matches what was ordered +O4.02 Personalization: Adapted to individual preferences +O4.03 Flexibility: Can be modified or adjusted +O4.04 Appropriateness: Right solution for the need + +### P - PEOPLE (Staff Interactions) - 20 codes +P1.01 Warmth: Friendly and welcoming manner +P1.02 Respect: Treated with dignity +P1.03 Patience: Calm and tolerant approach +P1.04 Enthusiasm: Energy and engagement +P1.05 Empathy: Understanding feelings +P2.01 Knowledge: Expertise and understanding +P2.02 Skill: Technical ability +P2.03 Problem Solving: Ability to find solutions +P2.04 Advice Quality: Helpful recommendations +P2.05 Training Level: Staff training evident +P3.01 Attentiveness: Being present and engaged +P3.02 Initiative: Proactive help +P3.03 Follow-through: Completing promised actions +P3.04 Availability: Being available when needed +P3.05 Dedication: Commitment to helping +P4.01 Clarity: Clear communication +P4.02 Listening: Understanding customer needs +P4.03 Transparency: Honest and open +P4.04 Honesty: Truthful communication +P4.05 Proactive Updates: Keeping customer informed + +### J - JOURNEY (Process & Timing) - 20 codes +J1.01 Speed: How fast things happen +J1.02 Punctuality: On-time delivery +J1.03 Queue Management: Handling of waiting customers +J1.04 Punctuality: Meeting scheduled times +J1.05 Pacing: Appropriate speed (not rushed/dragged) +J2.01 Simplicity: Easy process +J2.02 Friction: Obstacles encountered +J2.03 Navigation: Finding what you need +J2.04 Booking Availability: Slots/capacity when needed +J2.05 Inventory: Stock availability +J3.01 Consistency: Same experience every time +J3.02 Accuracy: Getting it right +J3.03 Uptime: System availability +J3.04 Data Accuracy: Correct info in systems +J3.05 Integration: Systems work together +J4.01 Problem Recognition: Acknowledging issues +J4.02 Resolution Speed: How fast problems get fixed +J4.03 Resolution Fairness: Fair handling of issues +J4.04 Escalation: Getting to right person +J4.05 Closure: Issue fully resolved + +### E - ENVIRONMENT (Physical & Digital Space) - 20 codes +E1.01 Cleanliness: How clean the space is +E1.02 Comfort: Physical comfort +E1.03 Space Design: Layout and organization +E1.04 Ambiance: Atmosphere and vibe +E1.05 Comfort: Physical comfort +E2.01 Lighting: Light quality and level +E2.02 Sound/Noise: Audio environment +E2.03 Temperature: Climate control +E2.04 Visual Design: Aesthetics of interface +E2.05 Mobile Experience: Mobile usability +E3.01 Interface Design: Digital UX/UI +E3.02 App/Website Speed: Digital performance +E3.03 Usability: Ease of digital use +E3.04 Health Safety: Health precautions +E3.05 Cyber Security: Digital security +E4.01 Safety: Physical safety +E4.02 Security: Protection of belongings/data +E4.03 Health/Hygiene: Health standards +E4.04 Social Responsibility: Ethical practices +E4.05 Community Impact: Local community effect + +### A - ACCESS (Availability & Accessibility) - 20 codes +A1.01 Hours: Operating hours +A1.02 Booking Availability: Appointment slots +A1.03 Inventory: Product availability +A1.04 Wayfinding: Finding destination +A1.05 Physical Accessibility: Disability accommodations +A2.01 Physical Access: Mobility accessibility +A2.02 Language Access: Language accommodation +A2.03 Digital Accessibility: Screen reader/a11y +A2.04 Language Accessibility: Multilingual support +A2.05 Hours of Operation: Service availability times +A3.01 Diversity Welcome: All backgrounds welcome +A3.02 Accommodation: Special needs accommodation +A3.03 Response Time: Speed of getting answers +A3.04 Documentation Clarity: Clear instructions +A3.05 Support Accessibility: Getting help when needed +A4.01 Location: Physical location convenience +A4.02 Parking: Parking availability +A4.03 Multiple Channels: Ways to engage +A4.04 Payment Flexibility: Multiple payment options +A4.05 Refund Accessibility: Getting money back + +### V - VALUE (Pricing & Costs) - 20 codes ⚠️ USE FOR ALL PRICE/COST/FEE MENTIONS +V1.01 Price Level: Cost amount ("cheap", "expensive", "affordable", "€", "$") +V1.02 Price Fairness: Fair for what you get +V1.03 Hidden Costs: Unexpected charges, surprise fees, hidden fees, extra charges +V1.04 Price Transparency: Clear pricing upfront +V1.05 Price Stability: Consistent pricing +V2.01 Clear Pricing: Easy to understand costs +V2.02 Honest Billing: Accurate charges +V2.03 Policy Clarity: Clear terms and conditions +V2.04 Quality-Price Ratio: Worth vs cost +V2.05 Competitive Value: Compared to alternatives +V3.01 Time Investment: Time required +V3.02 Hassle Factor: Difficulty and inconvenience +V3.03 Mental Load: Cognitive effort required +V3.04 Promotion Clarity: Clear offer terms +V3.05 Reward Redemption: Using points/rewards +V4.01 Value for Money: Worth what you paid +V4.02 ROI: Return on investment +V4.03 Overall Satisfaction: Happy with the exchange +V4.04 Billing Accuracy: Correct charges +V4.05 Billing Resolution: Fixing billing issues + +### R - RELATIONSHIP (Trust & Loyalty) - 20 codes +R1.01 Honesty: Truthfulness +R1.02 Ethics: Ethical behavior, deceptive practices, scams +R1.03 Promises Kept: Following through on promises +R1.04 Ethics: Ethical behavior +R1.05 Accountability: Taking responsibility +R2.01 Consistency: Reliable over time +R2.02 Trustworthiness: Can be trusted +R2.03 Accountability: Takes responsibility +R2.04 Predictability: Consistent experience +R2.05 Standards: Meeting quality standards +R3.01 Error Acknowledgment: Admits mistakes +R3.02 Apology Quality: Sincere apologies +R3.03 Making It Right: Correcting mistakes +R3.04 Personal Connection: Human touch +R3.05 Going Extra Mile: Beyond expectations +R4.01 Customer Recognition: Remembers customers +R4.02 Loyalty Rewards: Rewards for loyalty +R4.03 Long-term Relationship: Builds relationships +R4.04 Service Recovery: Making things right +R4.05 Feedback Response: Acting on feedback + +## CLASSIFICATION EXAMPLES (Critical Distinctions) + +**PRICING/COSTS → V codes (Value), NOT P codes:** +- "Cheap prices", "good price", "€50" → V1.01 Price Level +- "Hidden charges", "surprise fees", "extra €35" → V1.03 Hidden Costs +- "Great value for money" → V4.01 Value for Money +- "Overcharged", "wrong amount" → V4.04 Billing Accuracy + +**STAFF BEHAVIOR → P codes (People):** +- "Staff was friendly", "welcoming" → P1.01 Warmth +- "Rude", "disrespectful", "ignored us" → P1.02 Respect +- "Patient", "took their time" → P1.03 Patience +- "Knowledgeable", "expert" → P2.01 Knowledge + +**DECEPTION/ETHICS → R codes (Relationship):** +- "They lied", "misleading" → R1.01 Honesty +- "Felt scammed", "dishonest practices" → R1.02 Ethics +- "Didn't honor the deal" → R1.03 Promises Kept ## DIMENSION CODES @@ -159,6 +336,20 @@ class LLMClientBase(ABC): self.config = config self.total_tokens_used = 0 self.total_cost_usd = 0.0 + self._custom_prompt: str | None = None + + def set_prompt(self, prompt: str) -> None: + """ + Set a custom system prompt (e.g., built dynamically from database). + + Args: + prompt: The system prompt to use for classification + """ + self._custom_prompt = prompt + + def get_prompt(self) -> str: + """Get the current system prompt (custom or default).""" + return self._custom_prompt or SYSTEM_PROMPT @abstractmethod async def classify( @@ -178,6 +369,28 @@ class LLMClientBase(ABC): """ pass + @abstractmethod + async def generate( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.7, + max_tokens: int = 4000, + ) -> str: + """ + Generate text using the LLM (for synthesis, narratives, etc.). + + Args: + system_prompt: System instructions + user_prompt: User content/context + temperature: Creativity level (0-1) + max_tokens: Maximum response length + + Returns: + Generated text response + """ + pass + @abstractmethod async def close(self) -> None: """Close the client and cleanup resources.""" @@ -211,7 +424,7 @@ class OpenAIClient(LLMClientBase): start_time = time.time() messages = [ - {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "system", "content": self.get_prompt()}, { "role": "user", "content": f'Classify this review:\n\n"{review_text}"', @@ -255,6 +468,43 @@ class OpenAIClient(LLMClientBase): return result, metadata + async def generate( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.7, + max_tokens: int = 4000, + ) -> str: + """Generate text using OpenAI.""" + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + response = await self.client.chat.completions.create( + model=self.model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + response_format={"type": "json_object"}, + timeout=self.config.llm_timeout_seconds, + ) + + content = response.choices[0].message.content + if not content: + raise ValueError("Empty response from OpenAI") + + # Track usage + if response.usage: + input_tokens = response.usage.prompt_tokens + output_tokens = response.usage.completion_tokens + pricing = self.PRICING.get(self.model, {"input": 0.15, "output": 0.60}) + cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + self.total_tokens_used += input_tokens + output_tokens + self.total_cost_usd += cost + + return content + async def close(self) -> None: """Close the OpenAI client.""" await self.client.close() @@ -289,7 +539,7 @@ class AnthropicClient(LLMClientBase): response = await self.client.messages.create( model=self.model, max_tokens=4096, - system=SYSTEM_PROMPT, + system=self.get_prompt(), messages=[ { "role": "user", @@ -329,6 +579,58 @@ class AnthropicClient(LLMClientBase): return result, metadata + async def generate( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.7, + max_tokens: int = 4000, + ) -> str: + """Generate text using Anthropic.""" + response = await self.client.messages.create( + model=self.model, + max_tokens=max_tokens, + system=system_prompt, + messages=[{"role": "user", "content": user_prompt}], + temperature=temperature, + ) + + content = response.content[0].text if response.content else "" + if not content: + raise ValueError("Empty response from Anthropic") + + # Track usage + input_tokens = response.usage.input_tokens + output_tokens = response.usage.output_tokens + pricing = self.PRICING.get(self.model, {"input": 3.0, "output": 15.0}) + cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + self.total_tokens_used += input_tokens + output_tokens + self.total_cost_usd += cost + + # Extract JSON from response (handles code blocks) + return self._extract_json_string(content) + + def _extract_json_string(self, content: str) -> str: + """Extract JSON string from response, handling markdown code blocks.""" + import re + content = content.strip() + + # If it starts with {, return as-is + if content.startswith("{"): + return content + + # Try to find JSON in code blocks + json_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content) + if json_match: + return json_match.group(1) + + # Try to find JSON object + json_match = re.search(r"\{[\s\S]*\}", content) + if json_match: + return json_match.group(0) + + return content + def _extract_json(self, content: str) -> dict[str, Any]: """Extract JSON from response, handling markdown code blocks.""" content = content.strip() diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py new file mode 100644 index 0000000..65acb1b --- /dev/null +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py @@ -0,0 +1,477 @@ +""" +Stage 5: Synthesize - Generate AI narratives and action plans. + +This stage runs after classification and routing to produce: +- Executive narrative (business-specific story) +- Section insights (sentiment, category, timeline) +- Action plan with prioritized recommendations +- Timeline annotations for key events +- Marketing angles from strengths +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import asyncpg + +from reviewiq_pipeline.services.llm_client import LLMClientBase + +logger = logging.getLogger(__name__) + + +@dataclass +class ActionItem: + """A specific action recommendation.""" + id: str + title: str + why: str + what: str + who: str + impact: str + evidence: list[str] + estimated_rating_lift: float | None + complexity: str # 'quick' | 'medium' | 'complex' + priority: str # 'critical' | 'high' | 'medium' | 'low' + timeline: str + related_subcode: str + + +@dataclass +class TimelineAnnotation: + """An annotation for a key event on the timeline.""" + date: str + label: str + description: str + type: str # 'positive' | 'negative' | 'neutral' | 'event' + + +@dataclass +class Synthesis: + """Complete synthesis output from Stage 5.""" + executive_narrative: str + sentiment_insight: str + category_insight: str + timeline_insight: str + priority_domain: str | None + priority_issue: str | None + action_plan: list[ActionItem] + issue_actions: dict[str, str] + timeline_annotations: list[TimelineAnnotation] + marketing_angles: list[str] + competitor_context: str | None + generated_at: str + + +SYNTHESIS_SYSTEM_PROMPT = """You are an expert business analyst specializing in customer experience and review analysis. + +Your task is to analyze classified review data and generate actionable business insights. + +You will receive: +1. Summary statistics (total reviews, rating, sentiment distribution) +2. Top issues by category with example quotes +3. Top strengths with example quotes +4. Domain breakdown (what customers talk about most) + +Generate a JSON response with these fields: + +{ + "executive_narrative": "2-3 paragraph story explaining the business situation, key problems, and path forward. Be specific with numbers and examples.", + + "sentiment_insight": "1-2 sentences explaining WHY sentiment is distributed this way. Connect to specific issues.", + + "category_insight": "1-2 sentences about the pattern in categories. Which domain needs most attention and why?", + + "timeline_insight": "1-2 sentences about trends if data shows changes over time.", + + "priority_domain": "Single letter code (P/V/J/O/A/E/R) for the domain needing most attention, or null", + + "priority_issue": "The subcode (e.g., 'V1.03') that should be fixed first, or null", + + "action_plan": [ + { + "id": "action_1", + "title": "Clear action title", + "why": "Root cause from the reviews", + "what": "Specific steps to take", + "who": "Department or role responsible", + "impact": "Expected outcome", + "evidence": ["Quote 1", "Quote 2"], + "estimated_rating_lift": 0.3, + "complexity": "quick|medium|complex", + "priority": "critical|high|medium|low", + "timeline": "This week|This month|This quarter", + "related_subcode": "V1.03" + } + ], + + "timeline_annotations": [ + { + "date": "2024-01-15", + "label": "Short label", + "description": "What happened", + "type": "positive|negative|neutral|event" + } + ], + + "marketing_angles": [ + "Way to promote strength 1", + "Way to promote strength 2" + ], + + "competitor_context": "How this compares to industry/competitors, or null if unknown" +} + +Be specific, actionable, and business-focused. Use actual numbers and quotes from the data. +Prioritize actions by impact and feasibility. +""" + + +class Stage5Synthesizer: + """ + Stage 5: Generate AI synthesis from classified review data. + + This stage: + 1. Aggregates classification results + 2. Identifies patterns and priorities + 3. Generates narrative insights via LLM + 4. Produces actionable recommendations + """ + + def __init__(self, pool: asyncpg.Pool, llm_client: LLMClientBase): + self.pool = pool + self.llm_client = llm_client + + async def run(self, job_id: str, execution_id: str) -> Synthesis: + """ + Generate synthesis for a completed pipeline execution. + + Args: + job_id: The scraping job ID + execution_id: The pipeline execution ID + + Returns: + Synthesis object with all generated insights + """ + logger.info(f"Stage 5: Generating synthesis for job {job_id}") + + # Gather all the data we need + context = await self._gather_context(job_id) + + # Generate synthesis via LLM + synthesis = await self._generate_synthesis(context) + + # Store synthesis in database + await self._store_synthesis(execution_id, synthesis) + + logger.info(f"Stage 5: Synthesis complete - {len(synthesis.action_plan)} actions generated") + return synthesis + + async def _gather_context(self, job_id: str) -> dict[str, Any]: + """Gather all context needed for synthesis.""" + + # Get overview stats + overview = await self.pool.fetchrow(""" + SELECT + COUNT(DISTINCT r.review_id) as total_reviews, + AVG(r.rating) as avg_rating, + COUNT(s.span_id) as total_spans + FROM pipeline.reviews_enriched r + LEFT JOIN pipeline.review_spans s ON s.review_id = r.review_id + WHERE r.job_id = $1::uuid + """, job_id) + + # Get sentiment distribution + sentiment = await self.pool.fetch(""" + SELECT + valence, + COUNT(*) as count, + COUNT(DISTINCT review_id) as review_count + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND valence IS NOT NULL AND is_active = TRUE + GROUP BY valence + ORDER BY count DESC + """, job_id) + + # Get top issues (weaknesses) + top_issues = await self.pool.fetch(""" + SELECT + s.urt_primary as subcode, + sc.name as subcode_name, + sc.definition, + d.code as domain, + d.name as domain_name, + COUNT(*) as span_count, + COUNT(*) FILTER (WHERE s.valence = 'V-') as negative_count, + ARRAY_AGG(s.span_text ORDER BY s.intensity DESC) FILTER (WHERE s.valence = 'V-') as example_quotes + FROM pipeline.review_spans s + LEFT JOIN pipeline.urt_subcodes sc ON sc.code = s.urt_primary + LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) + WHERE s.job_id = $1::uuid AND s.valence = 'V-' AND s.is_active = TRUE + GROUP BY s.urt_primary, sc.name, sc.definition, d.code, d.name + ORDER BY negative_count DESC + LIMIT 10 + """, job_id) + + # Get top strengths + top_strengths = await self.pool.fetch(""" + SELECT + s.urt_primary as subcode, + sc.name as subcode_name, + sc.definition, + d.code as domain, + d.name as domain_name, + COUNT(*) as span_count, + COUNT(*) FILTER (WHERE s.valence = 'V+') as positive_count, + ARRAY_AGG(s.span_text ORDER BY s.intensity DESC) FILTER (WHERE s.valence = 'V+') as example_quotes + FROM pipeline.review_spans s + LEFT JOIN pipeline.urt_subcodes sc ON sc.code = s.urt_primary + LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) + WHERE s.job_id = $1::uuid AND s.valence = 'V+' AND s.is_active = TRUE + GROUP BY s.urt_primary, sc.name, sc.definition, d.code, d.name + ORDER BY positive_count DESC + LIMIT 5 + """, job_id) + + # Get domain distribution + domains = await self.pool.fetch(""" + SELECT + SUBSTRING(urt_primary, 1, 1) as domain, + d.name as domain_name, + COUNT(*) as total_count, + COUNT(*) FILTER (WHERE valence = 'V+') as positive_count, + COUNT(*) FILTER (WHERE valence = 'V-') as negative_count + FROM pipeline.review_spans s + LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) + WHERE s.job_id = $1::uuid AND s.is_active = TRUE + GROUP BY SUBSTRING(urt_primary, 1, 1), d.name + ORDER BY total_count DESC + """, job_id) + + # Get business name if available + business = await self.pool.fetchrow(""" + SELECT DISTINCT business_id as business_name + FROM pipeline.reviews_enriched + WHERE job_id = $1::uuid AND business_id IS NOT NULL + LIMIT 1 + """, job_id) + + return { + "business_name": business["business_name"] if business else "This business", + "overview": dict(overview) if overview else {}, + "sentiment": [dict(r) for r in sentiment], + "top_issues": [dict(r) for r in top_issues], + "top_strengths": [dict(r) for r in top_strengths], + "domains": [dict(r) for r in domains], + } + + async def _generate_synthesis(self, context: dict[str, Any]) -> Synthesis: + """Generate synthesis using LLM.""" + + # Build the user prompt with context + user_prompt = f"""Analyze this review data for {context['business_name']}: + +## Overview +- Total Reviews: {context['overview'].get('total_reviews', 0)} +- Average Rating: {context['overview'].get('avg_rating', 'N/A')} +- Total Insights Extracted: {context['overview'].get('total_spans', 0)} + +## Sentiment Distribution +{self._format_sentiment(context['sentiment'])} + +## Top Issues (Problems) +{self._format_issues(context['top_issues'])} + +## Top Strengths +{self._format_strengths(context['top_strengths'])} + +## Domain Breakdown +{self._format_domains(context['domains'])} + +Generate a complete synthesis with actionable insights. +""" + + # Call LLM + try: + response = await self.llm_client.generate( + system_prompt=SYNTHESIS_SYSTEM_PROMPT, + user_prompt=user_prompt, + temperature=0.7, # Allow some creativity + max_tokens=4000, + ) + + # Parse JSON response + result = json.loads(response) + + # Convert to Synthesis object + return Synthesis( + executive_narrative=result.get("executive_narrative", ""), + sentiment_insight=result.get("sentiment_insight", ""), + category_insight=result.get("category_insight", ""), + timeline_insight=result.get("timeline_insight", ""), + priority_domain=result.get("priority_domain"), + priority_issue=result.get("priority_issue"), + action_plan=[ + ActionItem( + id=a.get("id", f"action_{i}"), + title=a.get("title", ""), + why=a.get("why", ""), + what=a.get("what", ""), + who=a.get("who", ""), + impact=a.get("impact", ""), + evidence=a.get("evidence", []), + estimated_rating_lift=a.get("estimated_rating_lift"), + complexity=a.get("complexity", "medium"), + priority=a.get("priority", "medium"), + timeline=a.get("timeline", "This month"), + related_subcode=a.get("related_subcode", ""), + ) + for i, a in enumerate(result.get("action_plan", [])) + ], + issue_actions={}, # Can be populated from action_plan + timeline_annotations=[ + TimelineAnnotation( + date=t.get("date", ""), + label=t.get("label", ""), + description=t.get("description", ""), + type=t.get("type", "neutral"), + ) + for t in result.get("timeline_annotations", []) + ], + marketing_angles=result.get("marketing_angles", []), + competitor_context=result.get("competitor_context"), + generated_at=datetime.utcnow().isoformat(), + ) + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse LLM response: {e}") + return self._create_fallback_synthesis() + except Exception as e: + logger.error(f"Synthesis generation failed: {e}") + return self._create_fallback_synthesis() + + def _format_sentiment(self, sentiment: list[dict]) -> str: + """Format sentiment data for prompt.""" + lines = [] + for s in sentiment: + valence = s.get("valence", "Unknown") + count = s.get("count", 0) + reviews = s.get("review_count", 0) + label = {"V+": "Positive", "V-": "Negative", "V0": "Neutral", "V±": "Mixed"}.get(valence, valence) + lines.append(f"- {label}: {count} mentions ({reviews} reviews)") + return "\n".join(lines) or "No sentiment data" + + def _format_issues(self, issues: list[dict]) -> str: + """Format issues for prompt.""" + lines = [] + for i, issue in enumerate(issues[:5], 1): + subcode = issue.get("subcode", "") + name = issue.get("subcode_name", "") + domain = issue.get("domain_name", "") + count = issue.get("negative_count", 0) + quotes = issue.get("example_quotes", [])[:2] + + lines.append(f"{i}. [{subcode}] {name} ({domain})") + lines.append(f" - {count} negative mentions") + for q in quotes: + if q: + lines.append(f' - Example: "{q[:100]}..."' if len(q) > 100 else f' - Example: "{q}"') + return "\n".join(lines) or "No issues found" + + def _format_strengths(self, strengths: list[dict]) -> str: + """Format strengths for prompt.""" + lines = [] + for i, strength in enumerate(strengths[:3], 1): + subcode = strength.get("subcode", "") + name = strength.get("subcode_name", "") + domain = strength.get("domain_name", "") + count = strength.get("positive_count", 0) + quotes = strength.get("example_quotes", [])[:2] + + lines.append(f"{i}. [{subcode}] {name} ({domain})") + lines.append(f" - {count} positive mentions") + for q in quotes: + if q: + lines.append(f' - Example: "{q[:100]}..."' if len(q) > 100 else f' - Example: "{q}"') + return "\n".join(lines) or "No strengths found" + + def _format_domains(self, domains: list[dict]) -> str: + """Format domain distribution for prompt.""" + lines = [] + for d in domains: + domain = d.get("domain", "") + name = d.get("domain_name", "") + total = d.get("total_count", 0) + positive = d.get("positive_count", 0) + negative = d.get("negative_count", 0) + lines.append(f"- {domain} ({name}): {total} total ({positive} positive, {negative} negative)") + return "\n".join(lines) or "No domain data" + + def _create_fallback_synthesis(self) -> Synthesis: + """Create a minimal synthesis when LLM fails.""" + return Synthesis( + executive_narrative="Unable to generate detailed analysis. Please review the data manually.", + sentiment_insight="", + category_insight="", + timeline_insight="", + priority_domain=None, + priority_issue=None, + action_plan=[], + issue_actions={}, + timeline_annotations=[], + marketing_angles=[], + competitor_context=None, + generated_at=datetime.utcnow().isoformat(), + ) + + async def _store_synthesis(self, execution_id: str, synthesis: Synthesis) -> None: + """Store synthesis in database.""" + await self.pool.execute(""" + UPDATE pipeline.executions + SET + synthesis = $2, + updated_at = NOW() + WHERE execution_id = $1::uuid + """, execution_id, json.dumps({ + "executive_narrative": synthesis.executive_narrative, + "sentiment_insight": synthesis.sentiment_insight, + "category_insight": synthesis.category_insight, + "timeline_insight": synthesis.timeline_insight, + "priority_domain": synthesis.priority_domain, + "priority_issue": synthesis.priority_issue, + "action_plan": [ + { + "id": a.id, + "title": a.title, + "why": a.why, + "what": a.what, + "who": a.who, + "impact": a.impact, + "evidence": a.evidence, + "estimated_rating_lift": a.estimated_rating_lift, + "complexity": a.complexity, + "priority": a.priority, + "timeline": a.timeline, + "related_subcode": a.related_subcode, + } + for a in synthesis.action_plan + ], + "issue_actions": synthesis.issue_actions, + "timeline_annotations": [ + { + "date": t.date, + "label": t.label, + "description": t.description, + "type": t.type, + } + for t in synthesis.timeline_annotations + ], + "marketing_angles": synthesis.marketing_angles, + "competitor_context": synthesis.competitor_context, + "generated_at": synthesis.generated_at, + }))