diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py index b2471c3..e5874c2 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/stages/stage5_synthesize.py @@ -1,19 +1,19 @@ """ -Stage 5: Synthesize - Generate AI narratives and action plans. +Stage 5: Synthesize - Generate analyst-quality business report. -This stage runs after classification and routing to produce: -- Executive narrative (business-specific story) -- Section insights (sentiment, category, timeline) -- Action plan with prioritized recommendations -- Timeline annotations for key events -- Marketing angles from strengths +This stage runs after classification and produces a polished report with: +- Executive verdict (one-line insight + rating potential) +- Narrative story (2-3 paragraphs of consultant-quality prose) +- Section headlines (insight-first titles for each chart) +- Action plan (prioritized, owner-assigned recommendations) +- Key evidence (curated quotes that prove the narrative) """ from __future__ import annotations import json import logging -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import TYPE_CHECKING, Any @@ -25,168 +25,400 @@ from reviewiq_pipeline.services.llm_client import LLMClientBase logger = logging.getLogger(__name__) +# ============================================================================= +# NEW 6-SECTION REPORT STRUCTURE (€60 Business Value Report) +# ============================================================================= + +@dataclass +class RiskIndicator: + """A single risk metric for the scorecard.""" + name: str # "Staff Behavior" + score: int # 1-10 (10 = excellent, 1 = critical) + trend: str # 'improving' | 'declining' | 'stable' + complaint_count: int # Number of related complaints + color: str # 'green' | 'yellow' | 'red' + + +@dataclass +class CriticalIssue: + """A critical issue requiring immediate attention.""" + rank: int # 1, 2, or 3 + title: str # "Hidden Fees Destroying Trust" + urt_code: str # "V1.03" + complaint_count: int # 94 + revenue_impact: str # "€12,000/month at risk" + evidence: list[str] # 2-3 damning quotes + root_cause: str # Why this keeps happening + solution: str # Specific fix (from taxonomy) + effort: str # 'quick_win' | 'moderate' | 'strategic' + timeline: str # "1 week" | "2-4 weeks" | "1-2 months" + + +@dataclass +class StrengthToProtect: + """A competitive advantage to protect and leverage.""" + title: str # "Exceptional Staff Service" + mention_count: int # 168 + percentage: float # 42.0 (% of positive reviews) + top_quotes: list[str] # 2-3 best quotes + risk_of_loss: str # What could erode this strength + leverage_action: str # How to amplify in marketing + + @dataclass class ActionItem: - """A specific action recommendation.""" - id: str - title: str - why: str - what: str - who: str - impact: str - evidence: list[str] - estimated_rating_lift: float | None - complexity: str # 'quick' | 'medium' | 'complex' - priority: str # 'critical' | 'high' | 'medium' | 'low' - timeline: str - related_subcode: str + """A prioritized action in the action matrix.""" + action: str # What to do (imperative) + owner: str # Who owns it + effort: str # 'low' | 'medium' | 'high' + impact: str # 'low' | 'medium' | 'high' + quadrant: str # 'quick_win' | 'major_project' | 'fill_in' | 'deprioritize' + expected_lift: str # "+0.3★" + deadline: str # "Week 1" | "Week 2-4" | "Month 2-3" + success_metric: str # Measurable KPI @dataclass -class TimelineAnnotation: - """An annotation for a key event on the timeline.""" - date: str +class TrackingKPI: + """A KPI for the 90-day tracking framework.""" + metric: str # "Deposit Complaints" + current_value: str # "47/month" + target_30_day: str # "< 25/month" + target_60_day: str # "< 15/month" + target_90_day: str # "< 5/month" + measurement: str # How to measure this + + +@dataclass +class ExecutiveSummary: + """Section 1: Executive Summary.""" + health_score: int # 1-100 overall health + health_label: str # "Needs Attention" | "Stable" | "Strong" + one_liner: str # Single sentence verdict + current_rating: float # 3.71 + potential_rating: float # 4.2 + rating_gap: float # 0.49 + estimated_revenue_at_risk: str # "€15,000/month" + key_insight: str # The most important finding + momentum: str # 'improving' | 'declining' | 'stable' + momentum_detail: str # Explanation + + +@dataclass +class RiskScorecard: + """Section 2: Risk Scorecard.""" + overall_risk: str # 'low' | 'medium' | 'high' | 'critical' + indicators: list[RiskIndicator] + highest_risk_area: str # "Value Perception" + immediate_attention: str # What needs fixing NOW + + +@dataclass +class ChartDataPoint: + """A single data point for charts.""" label: str - description: str - type: str # 'positive' | 'negative' | 'neutral' | 'event' + value: float + color: str | None = None @dataclass -class Synthesis: - """Complete synthesis output from Stage 5.""" - executive_narrative: str - sentiment_insight: str - category_insight: str - timeline_insight: str - priority_domain: str | None - priority_issue: str | None - action_plan: list[ActionItem] - issue_actions: dict[str, str] - timeline_annotations: list[TimelineAnnotation] - marketing_angles: list[str] - competitor_context: str | None +class TimeSeriesPoint: + """A time-series data point.""" + month: str # "Jan", "Feb", etc. + month_date: str # "2025-01" for sorting + value: float + + +@dataclass +class DualSeriesPoint: + """A dual time-series data point (positive/negative).""" + month: str + month_date: str + positive: int + negative: int + + +@dataclass +class ReportCharts: + """All chart data for the report.""" + # Chart 1: Rating gauge + rating_current: float + rating_target: float + rating_min: float = 1.0 + rating_max: float = 5.0 + + # Chart 2: Sentiment pie (positive vs negative) + sentiment_pie: list[ChartDataPoint] = field(default_factory=list) + + # Chart 3: Issue categories pie + issues_pie: list[ChartDataPoint] = field(default_factory=list) + + # Chart 4: Rating distribution bar + rating_distribution: list[ChartDataPoint] = field(default_factory=list) + + # Chart 5: Complaints trend line + complaints_trend: list[TimeSeriesPoint] = field(default_factory=list) + + # Chart 6: Rating trend line + rating_trend: list[TimeSeriesPoint] = field(default_factory=list) + + # Chart 7: Momentum dual line (positive vs negative over time) + momentum_trend: list[DualSeriesPoint] = field(default_factory=list) + + +@dataclass +class ReportSynthesis: + """ + Complete €60 Business Reputation Report. + + 6-Section Structure: + 1. Executive Summary - Health snapshot + ROI framing + 2. Risk Scorecard - Visual risk assessment + 3. Critical Issues - Top 3 problems with evidence + 4. Protect Your Strengths - Competitive advantages + 5. Action Matrix - Prioritized effort/impact grid + 6. 90-Day Tracking - KPIs and milestones + """ + # Section 1: Executive Summary + executive_summary: ExecutiveSummary + + # Section 2: Risk Scorecard + risk_scorecard: RiskScorecard + + # Section 3: Critical Issues (Top 3) + critical_issues: list[CriticalIssue] + + # Section 4: Protect Your Strengths + strengths: list[StrengthToProtect] + + # Section 5: Action Matrix + action_matrix: list[ActionItem] + + # Section 6: 90-Day Tracking Framework + tracking_kpis: list[TrackingKPI] + + # Report metadata + report_title: str # "Reputation Health Report: Soho Club" + report_date: str # "January 2026" + business_name: str generated_at: str + review_count: int + insight_count: int + analysis_period: str # "Last 12 months" + + # Charts data (for visualization) - optional, must be last due to default + charts: ReportCharts | None = None -SYNTHESIS_SYSTEM_PROMPT = """You are an expert business analyst specializing in customer experience and review analysis. +REPORT_SYSTEM_PROMPT = """You are a senior reputation consultant generating a €60 Business Reputation Report. -Your task is to analyze classified review data and generate actionable business insights. +This report helps SMB owners understand their online reputation and take concrete action. +The business owner KNOWS their business - you must be evidence-grounded, never speculative. -You will receive: -1. Summary statistics (total reviews, rating, sentiment distribution) -2. Top issues by category with example quotes -3. Top strengths with example quotes -4. Domain breakdown (what customers talk about most) +## CRITICAL RULES -Generate a JSON response with these fields: +1. **CITE EVIDENCE**: Every claim must reference data provided (quote, count, percentage) +2. **NO SPECULATION**: Never guess at causes - only state what the data shows +3. **HYPER-SPECIFIC SOLUTIONS**: Transform taxonomy guidance into IMMEDIATELY ACTIONABLE steps. + + First, INFER the business type from the name and quotes (restaurant, hotel, nightclub, retail, clinic, etc.) + Then generate solutions appropriate for THAT business type's operations. + + QUALITY LEVELS: + - LEVEL 1 (REJECT): Generic advice like "Train staff" or "Improve cleanliness" + - LEVEL 2 (ACCEPTABLE): Contextual like "Train front-desk staff on greeting protocol" + - LEVEL 3 (EXCELLENT): Operational with WHAT/WHO/WHEN/HOW details + + Each solution MUST include: + - WHAT physically changes (sign, checklist, script, schedule, policy, equipment) + - WHO executes it (specific role: manager, front desk, kitchen staff, cleaning crew) + - WHEN/HOW OFTEN (time of day, frequency, trigger event) + - WHY this fixes it (reference the specific quote that reveals the problem) + + SOLUTION PATTERN: + "[Action verb] [specific thing] [when/where]: [details]. Quote '[excerpt]' indicates [what this fixes]." + + THINK ABOUT: + - What can be done THIS WEEK vs next month? + - What's a visible change customers will notice? + - What's the cheapest fix that addresses the root cause? + - What quote excerpt PROVES this solution targets the right problem? +4. **BUSINESS OWNER TONE**: Write for someone who runs the business daily, not consultants +5. **ROI FRAMING**: Connect issues to business impact (revenue, customers lost) + +## HEALTH SCORE CALCULATION + +Calculate health_score (1-100) based on: +- Base: (avg_rating / 5) × 60 points +- Momentum: +10 if improving, -10 if declining +- Issue severity: -5 per critical issue (>20 complaints) +- Strength bonus: +5 per strong positive area (>30 mentions) + +## RISK SCORE COLORS + +- 8-10: green (healthy) +- 5-7: yellow (needs attention) +- 1-4: red (critical) + +## OUTPUT FORMAT (JSON) { - "executive_narrative": "2-3 paragraph story explaining the business situation, key problems, and path forward. Be specific with numbers and examples.", + "executive_summary": { + "health_score": 62, + "health_label": "Needs Attention", + "one_liner": "Strong service undercut by pricing transparency issues", + "potential_rating": 4.2, + "estimated_revenue_at_risk": "€8,000/month", + "key_insight": "47 customers mentioned feeling 'surprised' by costs - fixable with upfront disclosure", + "momentum_override": null + }, - "sentiment_insight": "1-2 sentences explaining WHY sentiment is distributed this way. Connect to specific issues.", + "risk_scorecard": { + "overall_risk": "medium", + "indicators": [ + {"name": "Staff & Service", "score": 8, "trend": "stable", "complaint_count": 5, "color": "green"}, + {"name": "Value & Pricing", "score": 3, "trend": "declining", "complaint_count": 47, "color": "red"}, + {"name": "Facility & Cleanliness", "score": 6, "trend": "stable", "complaint_count": 12, "color": "yellow"}, + {"name": "Experience & Journey", "score": 7, "trend": "improving", "complaint_count": 8, "color": "yellow"} + ], + "highest_risk_area": "Value Perception", + "immediate_attention": "Price transparency requires immediate action - 47 complaints in 90 days" + }, - "category_insight": "1-2 sentences about the pattern in categories. Which domain needs most attention and why?", - - "timeline_insight": "1-2 sentences about trends if data shows changes over time.", - - "priority_domain": "Single letter code (P/V/J/O/A/E/R) for the domain needing most attention, or null", - - "priority_issue": "The subcode (e.g., 'V1.03') that should be fixed first, or null", - - "action_plan": [ + "critical_issues": [ { - "id": "action_1", - "title": "Clear action title", - "why": "Root cause from the reviews", - "what": "Specific steps to take", - "who": "Department or role responsible", - "impact": "Expected outcome", - "evidence": ["Quote 1", "Quote 2"], - "estimated_rating_lift": 0.3, - "complexity": "quick|medium|complex", - "priority": "critical|high|medium|low", - "timeline": "This week|This month|This quarter", - "related_subcode": "V1.03" + "rank": 1, + "title": "[Specific problem] [Location/touchpoint where it occurs]", + "urt_code": "P1.02", + "complaint_count": 13, + "revenue_impact": "€X/month (estimated N lost customers × €Y avg transaction)", + "evidence": [ + "Exact quote from data showing the problem", + "Another quote that reinforces the pattern" + ], + "root_cause": "WHY this keeps happening - the systemic issue, not just symptoms", + "solution": "[Verb] [specific change] [when/where]: (1) [First step with detail], (2) [Second step], (3) [Verification method]. Quote '[excerpt]' shows [what this fixes].", + "effort": "quick_win|moderate|strategic", + "timeline": "1 week|2-4 weeks|1-2 months" } ], - "timeline_annotations": [ + "strengths": [ { - "date": "2024-01-15", - "label": "Short label", - "description": "What happened", - "type": "positive|negative|neutral|event" + "title": "Exceptional Staff Service", + "mention_count": 68, + "percentage": 42.0, + "top_quotes": [ + "Maria was incredibly helpful and patient", + "Staff went above and beyond" + ], + "risk_of_loss": "Staff turnover could erode this; ensure recognition program exists", + "leverage_action": "Feature Maria by name in Google Business posts: 'Our team member Maria consistently delights guests - here's why'. Add 'Meet Our Team' section to website with photos and quotes from reviews." } ], - "marketing_angles": [ - "Way to promote strength 1", - "Way to promote strength 2" + NOTE ON STRENGTHS: + - leverage_action MUST be specific and actionable, not generic "use in marketing" + - If staff names appear in quotes, USE THOSE NAMES in the leverage action + - Suggest specific channels: Google Business posts, Instagram, website section, email signature + - risk_of_loss should identify what could erode this strength (turnover, complacency, competitor copying) + + "action_matrix": [ + { + "action": "[Verb] [specific thing] [where/when]: [implementation details]", + "owner": "Role responsible (Manager, Operations, Front Desk, Kitchen, etc.)", + "effort": "low|medium|high", + "impact": "low|medium|high", + "quadrant": "quick_win (low effort + high impact) | major_project (high effort + high impact) | fill_in (low effort + low impact) | deprioritize (high effort + low impact)", + "expected_lift": "+0.1★ to +0.4★ based on complaint volume addressed", + "deadline": "Week 1|Week 2|Month 2", + "success_metric": "'[keyword from complaints]' mentions drop from X/month to Synthesis: - """ - Generate synthesis for a completed pipeline execution. + async def run(self, job_id: str, execution_id: str) -> ReportSynthesis: + """Generate report synthesis for a completed pipeline execution.""" + logger.info(f"Stage 5: Generating report for job {job_id}") - Args: - job_id: The scraping job ID - execution_id: The pipeline execution ID - - Returns: - Synthesis object with all generated insights - """ - logger.info(f"Stage 5: Generating synthesis for job {job_id}") - - # Gather all the data we need + # Gather context for LLM context = await self._gather_context(job_id) - # Generate synthesis via LLM - synthesis = await self._generate_synthesis(context) + # Gather chart data (parallel with LLM call would be nice, but sequential for now) + chart_data = await self._gather_chart_data(job_id) - # Store synthesis in database + # Generate report via LLM + synthesis = await self._generate_report(context, chart_data) + + # Store in database await self._store_synthesis(execution_id, synthesis) - logger.info(f"Stage 5: Synthesis complete - {len(synthesis.action_plan)} actions generated") + logger.info(f"Stage 5: Report complete - {len(synthesis.action_matrix)} actions, {len(synthesis.critical_issues)} issues, {len(chart_data.complaints_trend)} chart points") return synthesis async def _gather_context(self, job_id: str) -> dict[str, Any]: - """Gather all context needed for synthesis.""" + """Gather all context needed for report generation.""" - # Get overview stats + # Overview stats overview = await self.pool.fetchrow(""" SELECT COUNT(DISTINCT r.review_id) as total_reviews, - AVG(r.rating) as avg_rating, + ROUND(AVG(r.rating)::numeric, 2) as avg_rating, COUNT(s.span_id) as total_spans FROM pipeline.reviews_enriched r LEFT JOIN pipeline.review_spans s ON s.review_id = r.review_id WHERE r.job_id = $1::uuid """, job_id) - # Get sentiment distribution + # Sentiment distribution sentiment = await self.pool.fetch(""" SELECT valence, @@ -198,278 +430,743 @@ class Stage5Synthesizer: ORDER BY count DESC """, job_id) - # Get top issues (weaknesses) + # Top issues with quotes AND taxonomy solutions top_issues = await self.pool.fetch(""" SELECT s.urt_primary as subcode, sc.name as subcode_name, sc.definition, - d.code as domain, + sc.solution as taxonomy_solution, + sc.solution_complexity, d.name as domain_name, - COUNT(*) as span_count, COUNT(*) FILTER (WHERE s.valence = 'V-') as negative_count, - ARRAY_AGG(s.span_text ORDER BY s.intensity DESC) FILTER (WHERE s.valence = 'V-') as example_quotes + ARRAY_AGG(DISTINCT s.span_text ORDER BY s.span_text) + FILTER (WHERE s.valence = 'V-' AND LENGTH(s.span_text) > 20) as quotes FROM pipeline.review_spans s LEFT JOIN pipeline.urt_subcodes sc ON sc.code = s.urt_primary LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) WHERE s.job_id = $1::uuid AND s.valence = 'V-' AND s.is_active = TRUE - GROUP BY s.urt_primary, sc.name, sc.definition, d.code, d.name + GROUP BY s.urt_primary, sc.name, sc.definition, sc.solution, sc.solution_complexity, d.name + HAVING COUNT(*) FILTER (WHERE s.valence = 'V-') >= 3 ORDER BY negative_count DESC - LIMIT 10 + LIMIT 8 """, job_id) - # Get top strengths + # Top strengths with quotes top_strengths = await self.pool.fetch(""" SELECT s.urt_primary as subcode, sc.name as subcode_name, - sc.definition, - d.code as domain, d.name as domain_name, - COUNT(*) as span_count, COUNT(*) FILTER (WHERE s.valence = 'V+') as positive_count, - ARRAY_AGG(s.span_text ORDER BY s.intensity DESC) FILTER (WHERE s.valence = 'V+') as example_quotes + ARRAY_AGG(DISTINCT s.span_text ORDER BY s.span_text) + FILTER (WHERE s.valence = 'V+' AND LENGTH(s.span_text) > 20) as quotes FROM pipeline.review_spans s LEFT JOIN pipeline.urt_subcodes sc ON sc.code = s.urt_primary LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) WHERE s.job_id = $1::uuid AND s.valence = 'V+' AND s.is_active = TRUE - GROUP BY s.urt_primary, sc.name, sc.definition, d.code, d.name + GROUP BY s.urt_primary, sc.name, d.name + HAVING COUNT(*) FILTER (WHERE s.valence = 'V+') >= 3 ORDER BY positive_count DESC LIMIT 5 """, job_id) - # Get domain distribution + # Domain summary domains = await self.pool.fetch(""" SELECT - SUBSTRING(urt_primary, 1, 1) as domain, d.name as domain_name, - COUNT(*) as total_count, - COUNT(*) FILTER (WHERE valence = 'V+') as positive_count, - COUNT(*) FILTER (WHERE valence = 'V-') as negative_count + COUNT(*) as total, + COUNT(*) FILTER (WHERE valence = 'V+') as positive, + COUNT(*) FILTER (WHERE valence = 'V-') as negative, + ROUND(100.0 * COUNT(*) FILTER (WHERE valence = 'V-') / NULLIF(COUNT(*), 0), 1) as negative_pct FROM pipeline.review_spans s LEFT JOIN pipeline.urt_domains d ON d.code = SUBSTRING(s.urt_primary, 1, 1) WHERE s.job_id = $1::uuid AND s.is_active = TRUE - GROUP BY SUBSTRING(urt_primary, 1, 1), d.name - ORDER BY total_count DESC + GROUP BY d.name + ORDER BY negative DESC """, job_id) - # Get business name if available - business = await self.pool.fetchrow(""" - SELECT DISTINCT business_id as business_name - FROM pipeline.reviews_enriched - WHERE job_id = $1::uuid AND business_id IS NOT NULL - LIMIT 1 + # Business name + business = await self.pool.fetchval(""" + SELECT DISTINCT business_id FROM pipeline.reviews_enriched + WHERE job_id = $1::uuid LIMIT 1 + """, job_id) + + # MOMENTUM: Calculate from data (not LLM guess) + momentum_data = await self.pool.fetchrow(""" + SELECT + COUNT(*) FILTER ( + WHERE review_time > NOW() - INTERVAL '3 months' AND valence = 'V-' + ) as recent_negative, + COUNT(*) FILTER ( + WHERE review_time BETWEEN NOW() - INTERVAL '6 months' AND NOW() - INTERVAL '3 months' + AND valence = 'V-' + ) as prior_negative, + COUNT(*) FILTER ( + WHERE review_time > NOW() - INTERVAL '3 months' AND valence = 'V+' + ) as recent_positive, + COUNT(*) FILTER ( + WHERE review_time BETWEEN NOW() - INTERVAL '6 months' AND NOW() - INTERVAL '3 months' + AND valence = 'V+' + ) as prior_positive + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND is_active = TRUE + """, job_id) + + # Calculate momentum direction + recent_neg = momentum_data["recent_negative"] or 0 + prior_neg = momentum_data["prior_negative"] or 1 # Avoid division by zero + recent_pos = momentum_data["recent_positive"] or 0 + prior_pos = momentum_data["prior_positive"] or 1 + + neg_change = (recent_neg - prior_neg) / prior_neg if prior_neg > 0 else 0 + pos_change = (recent_pos - prior_pos) / prior_pos if prior_pos > 0 else 0 + + # Calculate net sentiment change using weighted approach + # Negative changes are weighted heavier (1.5x) because they hurt ratings more + net_sentiment = pos_change - (neg_change * 1.5) + + if net_sentiment > 0.2: + calculated_momentum = "improving" + if neg_change < -0.2: + momentum_detail = f"Complaints down {abs(neg_change)*100:.0f}% vs prior 3 months" + elif pos_change > 0.2: + momentum_detail = f"Positive feedback up {pos_change*100:.0f}%, outpacing complaints" + else: + momentum_detail = "Overall sentiment trending positive" + elif net_sentiment < -0.2: + calculated_momentum = "declining" + if neg_change > 0.2: + momentum_detail = f"Complaints up {neg_change*100:.0f}% vs prior 3 months" + elif pos_change < -0.2: + momentum_detail = f"Positive feedback down {abs(pos_change)*100:.0f}%" + else: + momentum_detail = "Overall sentiment trending negative" + else: + calculated_momentum = "stable" + momentum_detail = "Feedback patterns consistent with prior 3 months" + + # RISK ALERTS: Scan for dangerous keywords + risk_alerts = await self.pool.fetch(""" + SELECT DISTINCT span_text + FROM pipeline.review_spans + WHERE job_id = $1::uuid + AND is_active = TRUE + AND span_text ~* '(health.?department|food.?poison|lawyer|lawsuit|sued|police|assault|racist|discriminat|allerg)' + LIMIT 10 + """, job_id) + + # STAFF RECOGNITION: Aggregate named staff mentions + staff_mentions = await self.pool.fetch(""" + SELECT + entity as name, + COUNT(*) as total_mentions, + COUNT(*) FILTER (WHERE valence = 'V+') as positive_mentions, + COUNT(*) FILTER (WHERE valence = 'V-') as negative_mentions, + ARRAY_AGG(span_text ORDER BY review_time DESC) FILTER (WHERE valence = 'V+') as positive_quotes + FROM pipeline.review_spans + WHERE job_id = $1::uuid + AND is_active = TRUE + AND entity IS NOT NULL + AND entity_type IN ('staff', 'person') + GROUP BY entity + ORDER BY positive_mentions DESC + LIMIT 10 """, job_id) return { - "business_name": business["business_name"] if business else "This business", - "overview": dict(overview) if overview else {}, + "business_name": business or "This business", + "total_reviews": overview["total_reviews"] or 0, + "avg_rating": float(overview["avg_rating"] or 0), + "total_spans": overview["total_spans"] or 0, "sentiment": [dict(r) for r in sentiment], "top_issues": [dict(r) for r in top_issues], "top_strengths": [dict(r) for r in top_strengths], "domains": [dict(r) for r in domains], + # New data-driven fields + "calculated_momentum": calculated_momentum, + "momentum_detail": momentum_detail, + "momentum_data": { + "recent_negative": recent_neg, + "prior_negative": prior_neg, + "recent_positive": recent_pos, + "prior_positive": prior_pos, + }, + "risk_alerts": [r["span_text"] for r in risk_alerts], + "staff_mentions": [dict(r) for r in staff_mentions], } - async def _generate_synthesis(self, context: dict[str, Any]) -> Synthesis: - """Generate synthesis using LLM.""" + async def _gather_chart_data(self, job_id: str) -> ReportCharts: + """Gather all data needed for report charts.""" - # Build the user prompt with context - user_prompt = f"""Analyze this review data for {context['business_name']}: + # Chart 1 & 6: Rating data (current + trend) + rating_data = await self.pool.fetchrow(""" + SELECT + ROUND(AVG(rating)::numeric, 2) as current_rating, + COUNT(*) as total_reviews + FROM pipeline.reviews_enriched + WHERE job_id = $1::uuid + """, job_id) -## Overview -- Total Reviews: {context['overview'].get('total_reviews', 0)} -- Average Rating: {context['overview'].get('avg_rating', 'N/A')} -- Total Insights Extracted: {context['overview'].get('total_spans', 0)} + # Chart 2: Sentiment pie + sentiment_data = await self.pool.fetch(""" + SELECT + CASE + WHEN valence = 'V+' THEN 'Positive' + WHEN valence = 'V-' THEN 'Negative' + ELSE 'Neutral' + END as label, + COUNT(*) as value + FROM pipeline.review_spans + WHERE job_id = $1::uuid AND is_active = TRUE + GROUP BY 1 + ORDER BY value DESC + """, job_id) -## Sentiment Distribution -{self._format_sentiment(context['sentiment'])} + # Chart 3: Issue categories pie + issues_data = await self.pool.fetch(""" + SELECT + COALESCE(sc.name, 'Other') as label, + COUNT(*) as value + FROM pipeline.review_spans s + LEFT JOIN pipeline.urt_subcodes sc ON sc.code = s.urt_primary + WHERE s.job_id = $1::uuid + AND s.valence = 'V-' + AND s.is_active = TRUE + GROUP BY sc.name + ORDER BY value DESC + LIMIT 5 + """, job_id) -## Top Issues (Problems) -{self._format_issues(context['top_issues'])} + # Chart 4: Rating distribution + rating_dist = await self.pool.fetch(""" + SELECT + rating::text as label, + COUNT(*) as value + FROM pipeline.reviews_enriched + WHERE job_id = $1::uuid + GROUP BY rating + ORDER BY rating DESC + """, job_id) -## Top Strengths -{self._format_strengths(context['top_strengths'])} + # Charts 5, 6, 7: Monthly trends (complaints, rating, momentum) + monthly_data = await self.pool.fetch(""" + SELECT + TO_CHAR(DATE_TRUNC('month', r.review_time), 'Mon') as month, + TO_CHAR(DATE_TRUNC('month', r.review_time), 'YYYY-MM') as month_date, + COUNT(*) FILTER (WHERE s.valence = 'V-') as complaints, + COUNT(*) FILTER (WHERE s.valence = 'V+') as positive, + ROUND(AVG(r.rating)::numeric, 2) as avg_rating + FROM pipeline.review_spans s + JOIN pipeline.reviews_enriched r ON r.review_id = s.review_id + WHERE s.job_id = $1::uuid + AND s.is_active = TRUE + AND r.review_time > NOW() - INTERVAL '12 months' + GROUP BY 1, 2 + ORDER BY month_date + """, job_id) -## Domain Breakdown -{self._format_domains(context['domains'])} + # Build chart data structures + sentiment_colors = {"Positive": "#4CAF50", "Negative": "#F44336", "Neutral": "#9E9E9E"} + issue_colors = ["#E53935", "#FB8C00", "#FDD835", "#43A047", "#9E9E9E"] + rating_colors = { + "5": "#4CAF50", "4": "#8BC34A", "3": "#FFC107", + "2": "#FF9800", "1": "#F44336" + } -Generate a complete synthesis with actionable insights. -""" + return ReportCharts( + # Chart 1: Rating gauge + rating_current=float(rating_data["current_rating"] or 0), + rating_target=4.0, # Default target + + # Chart 2: Sentiment pie + sentiment_pie=[ + ChartDataPoint( + label=row["label"], + value=row["value"], + color=sentiment_colors.get(row["label"], "#9E9E9E") + ) + for row in sentiment_data + ], + + # Chart 3: Issues pie + issues_pie=[ + ChartDataPoint( + label=row["label"], + value=row["value"], + color=issue_colors[i] if i < len(issue_colors) else "#9E9E9E" + ) + for i, row in enumerate(issues_data) + ], + + # Chart 4: Rating distribution + rating_distribution=[ + ChartDataPoint( + label=f"{row['label']}★", + value=row["value"], + color=rating_colors.get(row["label"], "#9E9E9E") + ) + for row in rating_dist + ], + + # Chart 5: Complaints trend + complaints_trend=[ + TimeSeriesPoint( + month=row["month"], + month_date=row["month_date"], + value=row["complaints"] or 0 + ) + for row in monthly_data + ], + + # Chart 6: Rating trend + rating_trend=[ + TimeSeriesPoint( + month=row["month"], + month_date=row["month_date"], + value=float(row["avg_rating"] or 0) + ) + for row in monthly_data + ], + + # Chart 7: Momentum dual line + momentum_trend=[ + DualSeriesPoint( + month=row["month"], + month_date=row["month_date"], + positive=row["positive"] or 0, + negative=row["complaints"] or 0 + ) + for row in monthly_data + ], + ) + + async def _generate_report(self, context: dict[str, Any], chart_data: ReportCharts) -> ReportSynthesis: + """Generate the 6-section €60 report using LLM.""" + + # Build user prompt + user_prompt = self._build_user_prompt(context) - # Call LLM try: response = await self.llm_client.generate( - system_prompt=SYNTHESIS_SYSTEM_PROMPT, + system_prompt=REPORT_SYSTEM_PROMPT, user_prompt=user_prompt, - temperature=0.7, # Allow some creativity + temperature=0.7, max_tokens=4000, ) - # Parse JSON response - result = json.loads(response) + data = json.loads(response) - # Convert to Synthesis object - return Synthesis( - executive_narrative=result.get("executive_narrative", ""), - sentiment_insight=result.get("sentiment_insight", ""), - category_insight=result.get("category_insight", ""), - timeline_insight=result.get("timeline_insight", ""), - priority_domain=result.get("priority_domain"), - priority_issue=result.get("priority_issue"), - action_plan=[ - ActionItem( - id=a.get("id", f"action_{i}"), - title=a.get("title", ""), - why=a.get("why", ""), - what=a.get("what", ""), - who=a.get("who", ""), - impact=a.get("impact", ""), - evidence=a.get("evidence", []), - estimated_rating_lift=a.get("estimated_rating_lift"), - complexity=a.get("complexity", "medium"), - priority=a.get("priority", "medium"), - timeline=a.get("timeline", "This month"), - related_subcode=a.get("related_subcode", ""), - ) - for i, a in enumerate(result.get("action_plan", [])) - ], - issue_actions={}, # Can be populated from action_plan - timeline_annotations=[ - TimelineAnnotation( - date=t.get("date", ""), - label=t.get("label", ""), - description=t.get("description", ""), - type=t.get("type", "neutral"), - ) - for t in result.get("timeline_annotations", []) - ], - marketing_angles=result.get("marketing_angles", []), - competitor_context=result.get("competitor_context"), - generated_at=datetime.utcnow().isoformat(), + # Parse Executive Summary + exec_data = data.get("executive_summary", {}) + executive_summary = ExecutiveSummary( + health_score=int(exec_data.get("health_score", 50)), + health_label=exec_data.get("health_label", "Needs Attention"), + one_liner=exec_data.get("one_liner", "Analysis complete"), + current_rating=context["avg_rating"], + potential_rating=float(exec_data.get("potential_rating", context["avg_rating"] + 0.3)), + rating_gap=float(exec_data.get("potential_rating", context["avg_rating"] + 0.3)) - context["avg_rating"], + estimated_revenue_at_risk=exec_data.get("estimated_revenue_at_risk", "Unknown"), + key_insight=exec_data.get("key_insight", ""), + # Use calculated momentum, allow LLM override only if strongly justified + momentum=exec_data.get("momentum_override") or context.get("calculated_momentum", "stable"), + momentum_detail=context.get("momentum_detail", ""), + ) + + # Parse Risk Scorecard + risk_data = data.get("risk_scorecard", {}) + risk_scorecard = RiskScorecard( + overall_risk=risk_data.get("overall_risk", "medium"), + indicators=[ + RiskIndicator( + name=ind.get("name", ""), + score=int(ind.get("score", 5)), + trend=ind.get("trend", "stable"), + complaint_count=int(ind.get("complaint_count", 0)), + color=ind.get("color", "yellow"), + ) + for ind in risk_data.get("indicators", []) + ], + highest_risk_area=risk_data.get("highest_risk_area", ""), + immediate_attention=risk_data.get("immediate_attention", ""), + ) + + # Parse Critical Issues + critical_issues = [ + CriticalIssue( + rank=int(issue.get("rank", i + 1)), + title=issue.get("title", ""), + urt_code=issue.get("urt_code", ""), + complaint_count=int(issue.get("complaint_count", 0)), + revenue_impact=issue.get("revenue_impact", ""), + evidence=issue.get("evidence", [])[:3], # Max 3 quotes + root_cause=issue.get("root_cause", ""), + solution=issue.get("solution", ""), + effort=issue.get("effort", "moderate"), + timeline=issue.get("timeline", "2-4 weeks"), + ) + for i, issue in enumerate(data.get("critical_issues", [])[:3]) + ] + + # Parse Strengths + strengths = [ + StrengthToProtect( + title=s.get("title", ""), + mention_count=int(s.get("mention_count", 0)), + percentage=float(s.get("percentage", 0)), + top_quotes=s.get("top_quotes", [])[:3], + risk_of_loss=s.get("risk_of_loss", ""), + leverage_action=s.get("leverage_action", ""), + ) + for s in data.get("strengths", []) + ] + + # Parse Action Matrix + action_matrix = [ + ActionItem( + action=a.get("action", ""), + owner=a.get("owner", ""), + effort=a.get("effort", "medium"), + impact=a.get("impact", "medium"), + quadrant=a.get("quadrant", "major_project"), + expected_lift=a.get("expected_lift", ""), + deadline=a.get("deadline", ""), + success_metric=a.get("success_metric", ""), + ) + for a in data.get("action_matrix", []) + ] + + # Parse Tracking KPIs + tracking_kpis = [ + TrackingKPI( + metric=kpi.get("metric", ""), + current_value=kpi.get("current_value", ""), + target_30_day=kpi.get("target_30_day", ""), + target_60_day=kpi.get("target_60_day", ""), + target_90_day=kpi.get("target_90_day", ""), + measurement=kpi.get("measurement", ""), + ) + for kpi in data.get("tracking_kpis", []) + ] + + # Build report title + business_name = context.get("business_name", "Business") + report_date = datetime.utcnow().strftime("%B %Y") + + return ReportSynthesis( + executive_summary=executive_summary, + risk_scorecard=risk_scorecard, + critical_issues=critical_issues, + strengths=strengths, + action_matrix=action_matrix, + tracking_kpis=tracking_kpis, + report_title=f"Reputation Health Report: {business_name}", + report_date=report_date, + business_name=business_name, + generated_at=datetime.utcnow().isoformat(), + review_count=context["total_reviews"], + insight_count=context["total_spans"], + analysis_period="Last 12 months", + charts=chart_data, ) - except json.JSONDecodeError as e: - logger.error(f"Failed to parse LLM response: {e}") - return self._create_fallback_synthesis() except Exception as e: - logger.error(f"Synthesis generation failed: {e}") - return self._create_fallback_synthesis() + logger.error(f"Report generation failed: {e}") + return self._create_fallback(context, chart_data) - def _format_sentiment(self, sentiment: list[dict]) -> str: - """Format sentiment data for prompt.""" - lines = [] - for s in sentiment: - valence = s.get("valence", "Unknown") - count = s.get("count", 0) - reviews = s.get("review_count", 0) - label = {"V+": "Positive", "V-": "Negative", "V0": "Neutral", "V±": "Mixed"}.get(valence, valence) - lines.append(f"- {label}: {count} mentions ({reviews} reviews)") - return "\n".join(lines) or "No sentiment data" + def _build_user_prompt(self, ctx: dict[str, Any]) -> str: + """Build the user prompt with all context.""" - def _format_issues(self, issues: list[dict]) -> str: - """Format issues for prompt.""" - lines = [] - for i, issue in enumerate(issues[:5], 1): - subcode = issue.get("subcode", "") - name = issue.get("subcode_name", "") - domain = issue.get("domain_name", "") - count = issue.get("negative_count", 0) - quotes = issue.get("example_quotes", [])[:2] + # Format sentiment + sentiment_lines = [] + for s in ctx["sentiment"]: + label = {"V+": "Positive", "V-": "Negative", "V0": "Neutral", "V±": "Mixed"}.get(s["valence"], s["valence"]) + sentiment_lines.append(f"- {label}: {s['count']} mentions across {s['review_count']} reviews") - lines.append(f"{i}. [{subcode}] {name} ({domain})") - lines.append(f" - {count} negative mentions") - for q in quotes: - if q: - lines.append(f' - Example: "{q[:100]}..."' if len(q) > 100 else f' - Example: "{q}"') - return "\n".join(lines) or "No issues found" + # Format issues WITH taxonomy solutions AND multiple quotes for context + issue_lines = [] + for i, issue in enumerate(ctx["top_issues"][:5], 1): + quotes = (issue.get("quotes") or [])[:4] # More quotes for LLM context + solution = issue.get("taxonomy_solution") or "" + complexity = issue.get("solution_complexity") or "medium" - def _format_strengths(self, strengths: list[dict]) -> str: - """Format strengths for prompt.""" - lines = [] - for i, strength in enumerate(strengths[:3], 1): - subcode = strength.get("subcode", "") - name = strength.get("subcode_name", "") - domain = strength.get("domain_name", "") - count = strength.get("positive_count", 0) - quotes = strength.get("example_quotes", [])[:2] + # Build detailed issue block + lines = [ + f"{i}. **{issue['subcode_name'] or issue['subcode']}** ({issue['domain_name']}) - {issue['negative_count']} complaints" + ] + if solution: + lines.append(f" TAXONOMY GUIDANCE ({complexity}): {solution}") + if quotes: + lines.append(" CUSTOMER QUOTES (use these to understand the specific problem):") + for q in quotes: + truncated = q[:120] + "..." if len(q) > 120 else q + lines.append(f' - "{truncated}"') - lines.append(f"{i}. [{subcode}] {name} ({domain})") - lines.append(f" - {count} positive mentions") - for q in quotes: - if q: - lines.append(f' - Example: "{q[:100]}..."' if len(q) > 100 else f' - Example: "{q}"') - return "\n".join(lines) or "No strengths found" + issue_lines.append("\n".join(lines)) - def _format_domains(self, domains: list[dict]) -> str: - """Format domain distribution for prompt.""" - lines = [] - for d in domains: - domain = d.get("domain", "") - name = d.get("domain_name", "") - total = d.get("total_count", 0) - positive = d.get("positive_count", 0) - negative = d.get("negative_count", 0) - lines.append(f"- {domain} ({name}): {total} total ({positive} positive, {negative} negative)") - return "\n".join(lines) or "No domain data" + # Format strengths + strength_lines = [] + for i, s in enumerate(ctx["top_strengths"][:3], 1): + quotes = (s.get("quotes") or [])[:1] + quote_str = f' | "{quotes[0][:60]}..."' if quotes else "" + strength_lines.append( + f"{i}. {s['subcode_name'] or s['subcode']} ({s['domain_name']}): " + f"{s['positive_count']} praises{quote_str}" + ) - def _create_fallback_synthesis(self) -> Synthesis: - """Create a minimal synthesis when LLM fails.""" - return Synthesis( - executive_narrative="Unable to generate detailed analysis. Please review the data manually.", - sentiment_insight="", - category_insight="", - timeline_insight="", - priority_domain=None, - priority_issue=None, - action_plan=[], - issue_actions={}, - timeline_annotations=[], - marketing_angles=[], - competitor_context=None, + # Format domains + domain_lines = [] + for d in ctx["domains"]: + if d["domain_name"]: + domain_lines.append( + f"- {d['domain_name']}: {d['positive']} positive, {d['negative']} negative " + f"({d['negative_pct']}% complaint rate)" + ) + + # Format risk alerts + risk_lines = [] + for alert in ctx.get("risk_alerts", [])[:5]: + risk_lines.append(f'⚠️ "{alert[:100]}..."' if len(alert) > 100 else f'⚠️ "{alert}"') + + # Format staff mentions + staff_lines = [] + for staff in ctx.get("staff_mentions", [])[:5]: + name = staff.get("name", "Unknown") + pos = staff.get("positive_mentions", 0) + neg = staff.get("negative_mentions", 0) + quotes = staff.get("positive_quotes") or [] + quote_str = f' — "{quotes[0][:50]}..."' if quotes else "" + staff_lines.append(f"- {name}: {pos} positive, {neg} negative mentions{quote_str}") + + # Get calculated momentum (FACT, not for LLM to guess) + momentum = ctx.get("calculated_momentum", "stable") + momentum_detail = ctx.get("momentum_detail", "") + + # Infer business type from quotes and name for better context + business_name = ctx['business_name'] + + return f"""Analyze this customer feedback data for {business_name}: + +IMPORTANT: First, infer the business type from the name and quotes (nightclub, restaurant, hotel, retail, etc.) +Then tailor ALL solutions to be operationally realistic for THAT type of business. + +OVERVIEW +- Total Reviews: {ctx['total_reviews']} +- Current Rating: {ctx['avg_rating']:.1f} ★ +- Total Insights Extracted: {ctx['total_spans']} + +SENTIMENT BREAKDOWN +{chr(10).join(sentiment_lines) or 'No sentiment data'} + +MOMENTUM (calculated from data - USE THIS, do not override) +- Direction: {momentum.upper()} +- Detail: {momentum_detail} + +TOP ISSUES (ranked by complaint volume, with recommended fixes from taxonomy) +{chr(10).join(issue_lines) or 'No significant issues found'} + +TOP STRENGTHS (what customers love) +{chr(10).join(strength_lines) or 'No clear strengths identified'} + +DOMAIN PERFORMANCE +{chr(10).join(domain_lines) or 'No domain data'} + +STAFF RECOGNITION (employees mentioned by name) +{chr(10).join(staff_lines) or 'No staff mentioned by name'} + +RISK ALERTS (requires immediate attention) +{chr(10).join(risk_lines) or 'No risk indicators detected'} + +IMPORTANT INSTRUCTIONS: +1. Use the RECOMMENDED FIX from taxonomy for each issue - do not invent new solutions +2. Use the MOMENTUM direction provided above - it was calculated from data +3. If RISK ALERTS exist, mention them prominently in the report +4. Include STAFF RECOGNITION data in the strengths section if staff were mentioned positively + +Generate a comprehensive analyst report based on this data.""" + + def _create_fallback(self, ctx: dict[str, Any], chart_data: ReportCharts | None = None) -> ReportSynthesis: + """Create minimal report when LLM fails.""" + business_name = ctx.get("business_name", "Business") + report_date = datetime.utcnow().strftime("%B %Y") + + return ReportSynthesis( + executive_summary=ExecutiveSummary( + health_score=50, + health_label="Needs Attention", + one_liner="Automated analysis could not be completed", + current_rating=ctx["avg_rating"], + potential_rating=ctx["avg_rating"], + rating_gap=0, + estimated_revenue_at_risk="Unknown", + key_insight="Please review the data manually", + momentum=ctx.get("calculated_momentum", "stable"), + momentum_detail=ctx.get("momentum_detail", ""), + ), + risk_scorecard=RiskScorecard( + overall_risk="medium", + indicators=[], + highest_risk_area="Unable to determine", + immediate_attention="Manual review required", + ), + critical_issues=[], + strengths=[], + action_matrix=[], + tracking_kpis=[], + report_title=f"Reputation Health Report: {business_name}", + report_date=report_date, + business_name=business_name, generated_at=datetime.utcnow().isoformat(), + review_count=ctx["total_reviews"], + insight_count=ctx["total_spans"], + analysis_period="Last 12 months", + charts=chart_data, ) - async def _store_synthesis(self, execution_id: str, synthesis: Synthesis) -> None: - """Store synthesis in database.""" + async def _store_synthesis(self, execution_id: str, synthesis: ReportSynthesis) -> None: + """Store the 6-section report in database.""" + es = synthesis.executive_summary + rs = synthesis.risk_scorecard + await self.pool.execute(""" UPDATE pipeline.executions SET synthesis = $2 WHERE id = $1::uuid """, execution_id, json.dumps({ - "executive_narrative": synthesis.executive_narrative, - "sentiment_insight": synthesis.sentiment_insight, - "category_insight": synthesis.category_insight, - "timeline_insight": synthesis.timeline_insight, - "priority_domain": synthesis.priority_domain, - "priority_issue": synthesis.priority_issue, - "action_plan": [ - { - "id": a.id, - "title": a.title, - "why": a.why, - "what": a.what, - "who": a.who, - "impact": a.impact, - "evidence": a.evidence, - "estimated_rating_lift": a.estimated_rating_lift, - "complexity": a.complexity, - "priority": a.priority, - "timeline": a.timeline, - "related_subcode": a.related_subcode, - } - for a in synthesis.action_plan - ], - "issue_actions": synthesis.issue_actions, - "timeline_annotations": [ - { - "date": t.date, - "label": t.label, - "description": t.description, - "type": t.type, - } - for t in synthesis.timeline_annotations - ], - "marketing_angles": synthesis.marketing_angles, - "competitor_context": synthesis.competitor_context, + # Report metadata + "report_version": "2.0", # New 6-section format + "report_title": synthesis.report_title, + "report_date": synthesis.report_date, + "business_name": synthesis.business_name, "generated_at": synthesis.generated_at, + "review_count": synthesis.review_count, + "insight_count": synthesis.insight_count, + "analysis_period": synthesis.analysis_period, + + # Section 1: Executive Summary + "executive_summary": { + "health_score": es.health_score, + "health_label": es.health_label, + "one_liner": es.one_liner, + "current_rating": es.current_rating, + "potential_rating": es.potential_rating, + "rating_gap": es.rating_gap, + "estimated_revenue_at_risk": es.estimated_revenue_at_risk, + "key_insight": es.key_insight, + "momentum": es.momentum, + "momentum_detail": es.momentum_detail, + }, + + # Section 2: Risk Scorecard + "risk_scorecard": { + "overall_risk": rs.overall_risk, + "indicators": [ + { + "name": ind.name, + "score": ind.score, + "trend": ind.trend, + "complaint_count": ind.complaint_count, + "color": ind.color, + } + for ind in rs.indicators + ], + "highest_risk_area": rs.highest_risk_area, + "immediate_attention": rs.immediate_attention, + }, + + # Section 3: Critical Issues + "critical_issues": [ + { + "rank": issue.rank, + "title": issue.title, + "urt_code": issue.urt_code, + "complaint_count": issue.complaint_count, + "revenue_impact": issue.revenue_impact, + "evidence": issue.evidence, + "root_cause": issue.root_cause, + "solution": issue.solution, + "effort": issue.effort, + "timeline": issue.timeline, + } + for issue in synthesis.critical_issues + ], + + # Section 4: Strengths to Protect + "strengths": [ + { + "title": s.title, + "mention_count": s.mention_count, + "percentage": s.percentage, + "top_quotes": s.top_quotes, + "risk_of_loss": s.risk_of_loss, + "leverage_action": s.leverage_action, + } + for s in synthesis.strengths + ], + + # Section 5: Action Matrix + "action_matrix": [ + { + "action": a.action, + "owner": a.owner, + "effort": a.effort, + "impact": a.impact, + "quadrant": a.quadrant, + "expected_lift": a.expected_lift, + "deadline": a.deadline, + "success_metric": a.success_metric, + } + for a in synthesis.action_matrix + ], + + # Section 6: 90-Day Tracking + "tracking_kpis": [ + { + "metric": kpi.metric, + "current_value": kpi.current_value, + "target_30_day": kpi.target_30_day, + "target_60_day": kpi.target_60_day, + "target_90_day": kpi.target_90_day, + "measurement": kpi.measurement, + } + for kpi in synthesis.tracking_kpis + ], + + # Charts for visualization + "charts": self._serialize_charts(synthesis.charts) if synthesis.charts else None, })) + + def _serialize_charts(self, charts: ReportCharts) -> dict: + """Serialize chart data for JSON storage.""" + return { + "rating_gauge": { + "current": charts.rating_current, + "target": charts.rating_target, + "min": charts.rating_min, + "max": charts.rating_max, + }, + "sentiment_pie": [ + {"label": p.label, "value": p.value, "color": p.color} + for p in charts.sentiment_pie + ], + "issues_pie": [ + {"label": p.label, "value": p.value, "color": p.color} + for p in charts.issues_pie + ], + "rating_distribution": [ + {"label": p.label, "value": p.value, "color": p.color} + for p in charts.rating_distribution + ], + "complaints_trend": [ + {"month": p.month, "month_date": p.month_date, "value": p.value} + for p in charts.complaints_trend + ], + "rating_trend": [ + {"month": p.month, "month_date": p.month_date, "value": p.value} + for p in charts.rating_trend + ], + "momentum_trend": [ + {"month": p.month, "month_date": p.month_date, "positive": p.positive, "negative": p.negative} + for p in charts.momentum_trend + ], + }