diff --git a/packages/reviewiq-pipeline/scripts/reputation_report.py b/packages/reviewiq-pipeline/scripts/reputation_report.py new file mode 100644 index 0000000..a79982b --- /dev/null +++ b/packages/reviewiq-pipeline/scripts/reputation_report.py @@ -0,0 +1,1600 @@ +#!/usr/bin/env python3 +""" +Reputation Report Generator v1.0 + +Generates business-facing, time-windowed, language-aware reputation reports +from detected_spans_v2 data. + +Usage: + python reputation_report.py --business "Pura Vida Hostel" --days 30 + python reputation_report.py --business "gokarts" --start 2026-01-01 --end 2026-01-31 + python reputation_report.py --business "gokarts" --run-id + +Backfill run_id for legacy rows (idempotent, only fills NULL run_ids): + + WITH job_groups AS ( + SELECT business_id, job_id, gen_random_uuid() AS new_run_id + FROM pipeline.detected_spans_v2 + WHERE job_id IS NOT NULL AND run_id IS NULL + GROUP BY business_id, job_id + ) + UPDATE pipeline.detected_spans_v2 s + SET run_id = g.new_run_id + FROM job_groups g + WHERE s.business_id = g.business_id + AND s.job_id = g.job_id + AND s.run_id IS NULL; +""" + +import argparse +import asyncio +import json +import os +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any + +import asyncpg + +# Optional: OpenAI for executive summary +try: + from openai import AsyncOpenAI + OPENAI_AVAILABLE = True +except ImportError: + OPENAI_AVAILABLE = False + +# Database URL +DB_URL = os.environ.get( + "DATABASE_URL", + "postgresql://scraper:scraper123@localhost:5437/scraper" +) + +# Schema version +SCHEMA_VERSION = "1.0" + +# ============================================================================= +# DOMAIN MAPPING (static) +# ============================================================================= + +DOMAIN_MAP = { + # O - Output/Product Quality + "TASTE": "O", + "CRAFT": "O", + "FRESHNESS": "O", + "TEMPERATURE": "O", + "EFFECTIVENESS": "O", + "ACCURACY": "O", + "CONDITION": "O", + "CONSISTENCY": "O", + + # P - People/Service + "MANNER": "P", + "COMPETENCE": "P", + "ATTENTIVENESS": "P", + "COMMUNICATION": "P", + + # J - Journey/Process + "SPEED": "J", + "FRICTION": "J", + "RELIABILITY": "J", + "AVAILABILITY": "J", + + # E - Environment + "CLEANLINESS": "E", + "COMFORT": "E", + "SAFETY": "E", + "AMBIANCE": "E", + "ACCESSIBILITY": "E", + "DIGITAL_UX": "E", + + # V - Value + "PRICE_LEVEL": "V", + "PRICE_FAIRNESS": "V", + "PRICE_TRANSPARENCY": "V", + "VALUE_FOR_MONEY": "V", + + # Meta primitives + "HONESTY": "meta", + "ETHICS": "meta", + "PROMISES": "meta", + "ACKNOWLEDGMENT": "meta", + "RESPONSE_QUALITY": "meta", + "RECOVERY": "meta", + "RETURN_INTENT": "meta", + "RECOMMEND": "meta", + "RECOGNITION": "meta", + "UNMAPPED": "meta", + "NON_INFORMATIVE": "meta", +} + +DOMAIN_NAMES = { + "O": "Output/Product", + "P": "People/Service", + "J": "Journey/Process", + "E": "Environment", + "V": "Value", + "meta": "Meta", +} + +# Valence mapping for scoring +VALENCE_NUM = { + "+": 1.0, + "-": -1.0, + "0": 0.0, + "±": 0.0, # Mixed = neutral for scoring +} + +# Alert thresholds (v1 defaults) +ALERT_THRESHOLDS = { + "unmapped_rate_content": {"warn": 0.10, "critical": 0.15}, + "safety_negative_share": {"warn": 0.05, "critical": 0.10}, + "avg_confidence": {"warn": 0.70}, # Below this triggers warn + "low_confidence_share": {"warn": 0.10}, +} + +# Executive summary prompt +EXEC_SUMMARY_SYSTEM = """You are a business analyst writing short performance summaries for business owners.""" + +EXEC_SUMMARY_USER = """Here is the performance data for a business: + +{json_data} + +Write a 3-5 sentence executive summary in clear business language. + +Rules: +- Mention current performance score and what it means (0-100 scale; 80+ strong, 60-80 moderate, below 60 needs attention) +- Mention trend vs previous period if available (use comparisons.previous_window) +- Mention strongest positive driver from drivers.positives (highest impact) - what customers love +- For weaknesses: + - If drivers.negatives is non-empty: cite the top negative driver as the main weakness + - If drivers.negatives is empty but timeline shows a recent dip: cite the dip as the "main watch item" with month and metric (avg_rating or strength_score) + - Only cite a "recent dip" if that month is within 90 days of window.end AND review_count >= 3 for that bucket + - If multiple qualifying dips exist, cite the most recent one (closest to window.end) + - If the dip month is older or has low volume (<3 reviews), either skip or phrase as "a dip was observed in [month] (limited data)" + - If you mention a "dip", do NOT also say "no major issues identified" - instead say "no dominant negative driver identified overall" + - If no qualifying dip and no negative drivers: say "no persistent weaknesses surfaced" +- Suggested action must tie directly to (a) the cited weakness/dip (investigate or fix), or (b) the top positive driver (amplify it) + - For a dip with unknown cause, suggest investigating what changed that month (staffing, operations, etc.) + +Do not mention: +- spans, primitives, confidence, models, algorithms +- technical implementation details +- "increase in negative feedback" unless drivers.negatives is non-empty + +Tone: neutral, professional, actionable.""" + + +# ============================================================================= +# HELPER FUNCTIONS +# ============================================================================= + +def to_utc_naive(dt: datetime | None) -> datetime | None: + """Convert a datetime to UTC-naive for DB compatibility.""" + if not dt: + return None + if dt.tzinfo is None: + return dt # assume already UTC-naive + return dt.astimezone(timezone.utc).replace(tzinfo=None) + + +def compute_span_weight(confidence: float, detail: int, intensity: int) -> float: + """ + Compute span weight for scoring. + + w = confidence * (0.75 + 0.25*(detail-1)) * (0.8 + 0.2*(intensity-1)) + """ + detail = detail or 1 + intensity = intensity or 1 + confidence = float(confidence) if confidence else 0.5 + + return confidence * (0.75 + 0.25 * (detail - 1)) * (0.8 + 0.2 * (intensity - 1)) + + +def compute_primitive_score(spans: list[dict]) -> float: + """ + Compute weighted score for a primitive. + + score = 100 * (sum(w * valence_num) / sum(w)), clamped to [-100, +100] + """ + if not spans: + return 0.0 + + total_weight = 0.0 + weighted_valence = 0.0 + + for span in spans: + w = compute_span_weight( + span.get("confidence", 0.5), + span.get("detail", 1), + span.get("intensity", 1) + ) + valence_num = VALENCE_NUM.get(span.get("valence", "0"), 0.0) + + total_weight += w + weighted_valence += w * valence_num + + if total_weight == 0: + return 0.0 + + score = 100 * (weighted_valence / total_weight) + return max(-100, min(100, score)) + + +def percentile(values: list[float], p: float) -> float: + """Compute percentile of a list of values.""" + if not values: + return 0.0 + sorted_values = sorted(values) + k = (len(sorted_values) - 1) * (p / 100) + f = int(k) + c = f + 1 if f + 1 < len(sorted_values) else f + return sorted_values[f] + (k - f) * (sorted_values[c] - sorted_values[f]) + + +def compute_trend_label(delta: float, threshold: float = 5.0) -> str: + """Compute trend label based on delta. Caller handles insufficient data.""" + if delta > threshold: + return "improving" + elif delta < -threshold: + return "declining" + else: + return "stable" + + +def iso_z(dt: datetime | None) -> str | None: + """Convert datetime to ISO-8601 UTC with Z suffix.""" + if not dt: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + + +# Minimum thresholds for trend comparisons +MIN_REVIEWS_FOR_COMPARISON = 10 # At least 10 reviews per period +MIN_COVERAGE_FOR_COMPARISON = 0.80 # At least 80% review_time coverage + + +def build_summary_input(report: dict) -> dict: + """Extract the minimal slice needed for LLM executive summary.""" + summary_input = { + "business": { + "name": report["business"]["business_id"], + "sector": report["business"]["sector_code"], + }, + "window": report["window"], + "scores": { + "overall": report["scores"]["overall"], + }, + "comparisons": report["comparisons"], + "drivers": { + # Include top driver for each (LLM only needs the highest-impact one) + "positives": [report["drivers"]["positives"][0]] if report["drivers"]["positives"] else [], + "negatives": [report["drivers"]["negatives"][0]] if report["drivers"]["negatives"] else [], + }, + "population": { + "reviews_processed": report["population"]["reviews_processed"], + }, + } + + # Add timeline (last 3 points only) + if report.get("timeline") and report["timeline"].get("points"): + summary_input["timeline"] = { + "granularity": report["timeline"]["granularity"], + "points": report["timeline"]["points"][-3:], + } + + return summary_input + + +def generate_fallback_summary(report: dict) -> str: + """Generate deterministic fallback summary from computed values.""" + score = report["scores"]["overall"]["score"] + + # Sentence 1: Score interpretation + if score >= 80: + band = "strong" + elif score >= 60: + band = "moderate" + elif score >= 40: + band = "mixed" + else: + band = "challenging" + + sentence1 = f"This business has a {band} performance score of {score}." + + # Sentence 2: Trend if available + sentence2 = "" + prev = report.get("comparisons", {}).get("previous_window") + if prev and prev.get("scores", {}).get("overall", {}).get("trend"): + trend = prev["scores"]["overall"]["trend"] + if trend == "improving": + sentence2 = " Performance is trending upward compared to the previous period." + elif trend == "declining": + sentence2 = " Performance has declined compared to the previous period." + elif trend == "stable": + sentence2 = " Performance has remained stable compared to the previous period." + + # Sentence 3: Top drivers + action + positives = report.get("drivers", {}).get("positives", []) + negatives = report.get("drivers", {}).get("negatives", []) + + sentence3 = "" + if positives: + top_pos = positives[0]["primitive"].replace("_", " ").title() + sentence3 += f" Customers respond positively to {top_pos}." + + if negatives: + top_neg = negatives[0]["primitive"].replace("_", " ").title() + sentence3 += f" {top_neg} is an area for improvement." + + return (sentence1 + sentence2 + sentence3).strip() + + +async def generate_executive_summary( + summary_input: dict, + model: str = "gpt-4o-mini", + enabled: bool = True, +) -> tuple[str | None, dict]: + """ + Generate executive summary using OpenAI. + + Returns: + Tuple of (summary_text, meta_dict) + """ + meta = { + "enabled": enabled, + "generated": False, + "model": model, + "error": None, + "generated_at": None, + } + + if not enabled: + meta["error"] = "summary generation disabled via --no-summary" + return None, meta + + if not OPENAI_AVAILABLE: + meta["error"] = "openai package not installed" + return None, meta + + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + meta["error"] = "OPENAI_API_KEY not set" + print("Warning: OPENAI_API_KEY not set, skipping executive summary generation.") + return None, meta + + try: + client = AsyncOpenAI(api_key=api_key) + response = await client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": EXEC_SUMMARY_SYSTEM}, + {"role": "user", "content": EXEC_SUMMARY_USER.format(json_data=json.dumps(summary_input, indent=2))}, + ], + temperature=0.3, + max_tokens=300, + ) + summary = response.choices[0].message.content.strip() + meta["generated"] = True + meta["generated_at"] = iso_z(datetime.now(timezone.utc)) + return summary, meta + except Exception as e: + meta["error"] = str(e) + print(f"Warning: Executive summary generation failed: {e}") + return None, meta + + +def select_granularity(days: int) -> str: + """Select appropriate time bucket granularity based on window size.""" + if days <= 14: + return "day" + elif days <= 90: + return "week" + else: + return "month" + + +async def resolve_business_id(pool: asyncpg.Pool, search_term: str) -> str | None: + """Resolve partial business name to canonical business_id.""" + import re + + # Try exact match first + row = await pool.fetchrow(""" + SELECT business_id + FROM pipeline.business_taxonomy_map + WHERE business_id = $1 + """, search_term) + if row: + return row["business_id"] + + # Build search patterns + search_patterns = [ + f"%{search_term}%", + f"%{search_term.replace(' ', '%')}%", + ] + + # CamelCase pattern + camel_pattern = re.sub(r'([a-z])([A-Z])', r'\1%\2', search_term) + if camel_pattern != search_term: + search_patterns.append(f"%{camel_pattern}%") + + # Lowercase concatenated words + if search_term.islower() and len(search_term) >= 4: + for i in range(2, len(search_term) - 1): + search_patterns.append(f"%{search_term[:i]}%{search_term[i:]}%") + + for pattern in search_patterns: + row = await pool.fetchrow(""" + SELECT business_id + FROM pipeline.business_taxonomy_map + WHERE LOWER(business_id) LIKE LOWER($1) + """, pattern) + if row: + return row["business_id"] + + return None + + +# ============================================================================= +# SQL QUERIES +# ============================================================================= + +# For latest_run mode: filter by run_id and/or created_at +SPANS_QUERY_LATEST_RUN = """ +SELECT + s.id, + s.review_id, + s.run_id, + s.job_id, + s.primitive, + s.valence, + s.intensity, + s.detail, + s.confidence, + s.span_text, + s.language, + s.entity, + s.entity_type, + s.created_at, + NULL::timestamptz AS review_time_utc, + NULL::smallint AS rating +FROM pipeline.detected_spans_v2 s +WHERE s.business_id = $1 + AND ($2::uuid IS NULL OR s.run_id = $2) +ORDER BY s.created_at DESC +""" + +# For time_window mode: filter by review_time_utc via join with review_facts_v1 +# CRITICAL: Join on BOTH review_id AND business_id to prevent cross-business contamination +SPANS_QUERY_TIME_WINDOW = """ +SELECT + s.id, + s.review_id, + s.run_id, + s.job_id, + s.primitive, + s.valence, + s.intensity, + s.detail, + s.confidence, + s.span_text, + s.language, + s.entity, + s.entity_type, + s.created_at, + f.review_time_utc, + f.rating +FROM pipeline.detected_spans_v2 s +JOIN pipeline.review_facts_v1 f + ON f.review_id = s.review_id + AND f.business_id = s.business_id +WHERE s.business_id = $1 + AND f.review_time_utc >= $2 + AND f.review_time_utc < $3 +ORDER BY f.review_time_utc DESC +""" + +BUSINESS_CONFIG_QUERY = """ +SELECT + business_id, + sector_code, + gbp_path::text as gbp_path +FROM pipeline.business_taxonomy_map +WHERE business_id = $1 +""" + +TIMESERIES_QUERY = """ +SELECT + date_trunc($4, f.review_time_utc) AS bucket, + COUNT(DISTINCT s.review_id) AS review_count, + COUNT(*) AS span_count, + COUNT(*) FILTER (WHERE s.valence = '+') AS positive_count, + COUNT(*) FILTER (WHERE s.valence = '-') AS negative_count, + COUNT(*) FILTER (WHERE s.valence = '0') AS neutral_count, + COUNT(*) FILTER (WHERE s.valence = '±') AS mixed_count, + ROUND(AVG(f.rating)::numeric, 2) AS avg_rating, + -- Strength score scaled to [-100, 100] + CASE WHEN SUM(s.confidence * s.intensity) > 0 THEN + ROUND((100 * SUM( + CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END + * s.confidence * s.intensity + ) / SUM(s.confidence * s.intensity))::numeric, 1) + ELSE 0 END AS strength_score +FROM pipeline.detected_spans_v2 s +JOIN pipeline.review_facts_v1 f + ON f.review_id = s.review_id + AND f.business_id = s.business_id +WHERE s.business_id = $1 + AND f.review_time_utc >= $2 + AND f.review_time_utc < $3 + AND s.primitive <> 'NON_INFORMATIVE' +GROUP BY 1 +ORDER BY 1 +""" + +PERIOD_SCORES_QUERY = """ +SELECT + COUNT(DISTINCT s.review_id) AS review_count, + COUNT(*) AS span_count, + COUNT(*) FILTER (WHERE s.valence = '+') AS positive_count, + COUNT(*) FILTER (WHERE s.valence = '-') AS negative_count, + ROUND(AVG(f.rating)::numeric, 2) AS avg_rating, + CASE WHEN SUM(s.confidence * s.intensity) > 0 THEN + ROUND((100 * SUM( + CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END + * s.confidence * s.intensity + ) / SUM(s.confidence * s.intensity))::numeric, 1) + ELSE 0 END AS overall_score +FROM pipeline.detected_spans_v2 s +JOIN pipeline.review_facts_v1 f + ON f.review_id = s.review_id + AND f.business_id = s.business_id +WHERE s.business_id = $1 + AND f.review_time_utc >= $2 + AND f.review_time_utc < $3 + AND s.primitive <> 'NON_INFORMATIVE' +""" + +REVIEW_TIME_COVERAGE_QUERY = """ +SELECT + COUNT(DISTINCT s.review_id) AS spans_reviews, + COUNT(DISTINCT f.review_id) AS facts_reviews, + MIN(f.review_time_utc) AS min_review_time, + MAX(f.review_time_utc) AS max_review_time +FROM pipeline.detected_spans_v2 s +LEFT JOIN pipeline.review_facts_v1 f + ON f.review_id = s.review_id + AND f.business_id = s.business_id +WHERE s.business_id = $1 + AND s.primitive <> 'NON_INFORMATIVE' + AND ($2::timestamptz IS NULL OR f.review_time_utc >= $2) + AND ($3::timestamptz IS NULL OR f.review_time_utc < $3) +""" + +SECTOR_BENCHMARK_QUERY = """ +SELECT + ROUND((100 * SUM( + CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END + * s.confidence * s.intensity + ) / NULLIF(SUM(s.confidence * s.intensity), 0))::numeric, 1) AS sector_overall_score, + COUNT(*) AS sector_span_count, + COUNT(DISTINCT s.business_id) AS sector_business_count +FROM pipeline.detected_spans_v2 s +JOIN pipeline.review_facts_v1 f + ON f.review_id = s.review_id + AND f.business_id = s.business_id +WHERE s.sector_code = $1 + AND f.review_time_utc >= $2 + AND f.review_time_utc < $3 + AND s.primitive <> 'NON_INFORMATIVE' +""" + +LATEST_RUN_QUERY = """ +SELECT run_id +FROM pipeline.detected_spans_v2 +WHERE business_id = $1 AND run_id IS NOT NULL +GROUP BY run_id +ORDER BY MAX(created_at) DESC, run_id DESC +LIMIT 1 +""" + + +# ============================================================================= +# REPORT GENERATOR +# ============================================================================= + +async def generate_report( + business_id: str, + start: datetime | None = None, + end: datetime | None = None, + run_id: str | None = None, + tz_name: str = "UTC", + pool: asyncpg.Pool | None = None, + summary_enabled: bool = True, + summary_model: str = "gpt-4o-mini", +) -> dict[str, Any]: + """ + Generate a reputation report for a business. + + Args: + business_id: Business ID or search pattern + start: Window start (optional if run_id provided) + end: Window end (optional if run_id provided) + run_id: Specific run ID to use (overrides time window) + tz_name: IANA timezone for window + pool: Database pool (created if not provided) + + Returns: + ReputationReport JSON object + """ + close_pool = False + if pool is None: + pool = await asyncpg.create_pool(DB_URL) + close_pool = True + + try: + # Resolve business_id + canonical_id = await resolve_business_id(pool, business_id) + if not canonical_id: + return {"error": f"Business not found: {business_id}"} + + # Get business config + config_row = await pool.fetchrow(BUSINESS_CONFIG_QUERY, canonical_id) + if not config_row: + return {"error": f"Business not mapped: {canonical_id}"} + + # Determine window mode: time_window if start/end provided, otherwise latest_run + window_mode = "time_window" if start and end else "latest_run" + + # Convert to UTC-naive for DB compatibility + start_naive = to_utc_naive(start) + end_naive = to_utc_naive(end) + + # Fetch spans based on mode - SINGLE SOURCE OF TRUTH + run_uuid = None + if window_mode == "time_window": + # Time window mode: filter by review_time_utc + spans = await pool.fetch( + SPANS_QUERY_TIME_WINDOW, + canonical_id, + start_naive, + end_naive + ) + else: + # Latest run mode: filter by run_id + if run_id: + run_uuid = uuid.UUID(run_id) + else: + latest = await pool.fetchrow(LATEST_RUN_QUERY, canonical_id) + if latest and latest["run_id"]: + run_uuid = latest["run_id"] + + spans = await pool.fetch( + SPANS_QUERY_LATEST_RUN, + canonical_id, + run_uuid + ) + + if not spans: + return {"error": "No spans found for this business/window"} + + spans = [dict(s) for s in spans] + + # Determine actual window from data + if window_mode == "time_window": + # Use review_time_utc for time window mode + review_times = [s["review_time_utc"] for s in spans if s.get("review_time_utc")] + actual_start = min(review_times) if review_times else start + actual_end = max(review_times) if review_times else end + else: + # Use created_at for latest run mode + created_times = [s["created_at"] for s in spans if s["created_at"]] + actual_start = min(created_times) if created_times else datetime.now(timezone.utc) + actual_end = max(created_times) if created_times else datetime.now(timezone.utc) + + # Get unique run_ids (handle legacy data without run_id), sorted for determinism + run_ids_set = set(str(s["run_id"]) for s in spans if s["run_id"]) + runs_included = sorted(run_ids_set) if run_ids_set else ["legacy:run_id_missing"] + + # Get unique job_ids for traceability (ingestion vs classification debugging) + job_ids_set = set(str(s["job_id"]) for s in spans if s.get("job_id")) + jobs_included = sorted(job_ids_set) if job_ids_set else [] + + # Primary run_id: + # - In latest_run mode: the specific run we queried + # - In time_window mode: the newest run by max(created_at) + if window_mode == "latest_run": + primary_run_id = str(run_uuid) if run_uuid else None + else: + # time_window mode: find run with max created_at + run_max_created: dict[str, datetime] = {} + for s in spans: + rid = s.get("run_id") + if not rid: + continue + t = s.get("created_at") + rid_str = str(rid) + if t and (rid_str not in run_max_created or t > run_max_created[rid_str]): + run_max_created[rid_str] = t + primary_run_id = max(run_max_created, key=run_max_created.get) if run_max_created else None + + # Latest span timestamp for traceability (UTC) + latest_span_created_at_utc = actual_end + + # Get unique review_ids + review_ids = set(s["review_id"] for s in spans) + + # ================================================================= + # POPULATION STATS + # ================================================================= + + total_spans = len(spans) + non_informative = [s for s in spans if s["primitive"] == "NON_INFORMATIVE"] + content_spans = [s for s in spans if s["primitive"] != "NON_INFORMATIVE"] + unmapped_spans = [s for s in spans if s["primitive"] == "UNMAPPED"] + + # Normalize language codes: map internal markers to standard codes + def normalize_language(lang: str | None) -> str: + if not lang: + return "unknown" + lang = lang.lower().strip() + # Map internal markers to "unknown" + if lang in ("auto", "auto-detect", ""): + return "unknown" + return lang + + # Language distribution (by reviews - use majority span language per review) + review_lang_counts: dict[str, dict[str, int]] = {} + for s in spans: + rid = s["review_id"] + lang = normalize_language(s.get("language")) + if rid not in review_lang_counts: + review_lang_counts[rid] = {} + review_lang_counts[rid][lang] = review_lang_counts[rid].get(lang, 0) + 1 + + # Pick majority language for each review + review_langs = {rid: max(counts, key=counts.get) for rid, counts in review_lang_counts.items()} + + lang_review_counts = {} + for lang in review_langs.values(): + lang_review_counts[lang] = lang_review_counts.get(lang, 0) + 1 + total_reviews = len(review_langs) + lang_dist_reviews = {k: v / total_reviews for k, v in lang_review_counts.items()} if total_reviews > 0 else {} + + # Language distribution (by spans) + lang_span_counts = {} + for s in content_spans: + lang = normalize_language(s.get("language")) + lang_span_counts[lang] = lang_span_counts.get(lang, 0) + 1 + total_content = len(content_spans) + lang_dist_spans = {k: v / total_content for k, v in lang_span_counts.items()} if total_content > 0 else {} + + # Confidence stats + confidences = [float(s["confidence"]) for s in content_spans if s["confidence"]] + avg_conf = sum(confidences) / len(confidences) if confidences else 0.0 + low_conf_count = len([c for c in confidences if c < 0.5]) + + population = { + "runs_included": runs_included, + "jobs_included": jobs_included, + "latest_span_created_at_utc": iso_z(latest_span_created_at_utc), + "reviews_processed": total_reviews, + "spans_total": total_spans, + "spans_non_informative": len(non_informative), + "spans_content": total_content, + "unmapped_spans": len(unmapped_spans), + "unmapped_rate_raw": len(unmapped_spans) / total_spans if total_spans > 0 else 0, + "unmapped_rate_content": len(unmapped_spans) / total_content if total_content > 0 else 0, + "language_distribution_reviews": lang_dist_reviews, + "language_distribution_spans": lang_dist_spans, + "confidence": { + "avg": round(avg_conf, 3), + "p10": round(percentile(confidences, 10), 3) if confidences else 0, + "p50": round(percentile(confidences, 50), 3) if confidences else 0, + "p90": round(percentile(confidences, 90), 3) if confidences else 0, + "low_confidence_count": low_conf_count, + } + } + + # ================================================================= + # SCORES + # ================================================================= + + # Group spans by primitive + primitive_spans = {} + for s in content_spans: + prim = s["primitive"] + if prim not in primitive_spans: + primitive_spans[prim] = [] + primitive_spans[prim].append(s) + + # Compute primitive scores + primitive_scores = {} + for prim, prim_spans in primitive_spans.items(): + domain = DOMAIN_MAP.get(prim, "meta") + + valence_counts = {"+": 0, "-": 0, "0": 0, "±": 0} + intensities = [] + details = [] + confidences_prim = [] + entities = {} + + for s in prim_spans: + v = s.get("valence", "0") + if v in valence_counts: + valence_counts[v] += 1 + if s.get("intensity"): + intensities.append(s["intensity"]) + if s.get("detail"): + details.append(s["detail"]) + if s.get("confidence"): + confidences_prim.append(float(s["confidence"])) + if s.get("entity"): + key = (s["entity"], s.get("entity_type", "unknown")) + entities[key] = entities.get(key, 0) + 1 + + # Top entities + top_entities = [ + {"entity": e[0], "entity_type": e[1], "count": c} + for e, c in sorted(entities.items(), key=lambda x: -x[1])[:5] + ] + + primitive_scores[prim] = { + "domain": domain, + "enabled": True, + "score": round(compute_primitive_score(prim_spans), 1), + "volume": len(prim_spans), + "valence_counts": valence_counts, + "intensity_avg": round(sum(intensities) / len(intensities), 2) if intensities else 1.0, + "detail_avg": round(sum(details) / len(details), 2) if details else 1.0, + "confidence_avg": round(sum(confidences_prim) / len(confidences_prim), 3) if confidences_prim else 0.5, + "top_entities": top_entities, + } + + # Compute domain scores + domain_scores = {} + for domain_code in ["O", "P", "J", "E", "V"]: + domain_prims = [p for p, d in DOMAIN_MAP.items() if d == domain_code and p in primitive_scores] + if domain_prims: + total_volume = sum(primitive_scores[p]["volume"] for p in domain_prims) + weighted_score = sum( + primitive_scores[p]["score"] * primitive_scores[p]["volume"] + for p in domain_prims + ) + domain_scores[domain_code] = { + "score": round(weighted_score / total_volume, 1) if total_volume > 0 else 0, + "volume": total_volume, + } + else: + domain_scores[domain_code] = {"score": 0, "volume": 0} + + # Overall score - use same formula as PERIOD_SCORES_QUERY for consistency: + # 100 * SUM(valence * confidence * intensity) / SUM(confidence * intensity) + total_weight = 0.0 + weighted_valence_sum = 0.0 + for s in content_spans: + conf = float(s.get("confidence", 0.5)) + intensity = float(s.get("intensity", 1)) + weight = conf * intensity + total_weight += weight + valence = s.get("valence", "0") + if valence == "+": + weighted_valence_sum += weight + elif valence == "-": + weighted_valence_sum -= weight + # "0" and "±" contribute 0 + + overall_score = (100 * weighted_valence_sum / total_weight) if total_weight > 0 else 0 + + # Domain-weighted score (kept for domain breakdown, but not used as primary) + total_domain_volume = sum(d["volume"] for d in domain_scores.values()) + domain_weighted_score = sum( + d["score"] * d["volume"] for d in domain_scores.values() + ) / total_domain_volume if total_domain_volume > 0 else 0 + + # Valence shares + all_valences = [s.get("valence", "0") for s in content_spans] + valence_total = len(all_valences) + + scores = { + "overall": { + "score": round(overall_score, 1), + "score_domain_weighted": round(domain_weighted_score, 1), # Alternative metric + "positive_share": round(all_valences.count("+") / valence_total, 3) if valence_total > 0 else 0, + "negative_share": round(all_valences.count("-") / valence_total, 3) if valence_total > 0 else 0, + "mixed_share": round(all_valences.count("±") / valence_total, 3) if valence_total > 0 else 0, + "neutral_share": round(all_valences.count("0") / valence_total, 3) if valence_total > 0 else 0, + }, + "domains": domain_scores, + "primitives": primitive_scores, + } + + # ================================================================= + # DRIVERS + # ================================================================= + + def compute_impact(prim_spans: list[dict], valence_filter: str) -> float: + """Compute impact as share of total weight for a valence.""" + filtered = [s for s in prim_spans if s.get("valence") == valence_filter] + if not filtered: + return 0.0 + + filtered_weight = sum( + compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1)) + for s in filtered + ) + total_weight = sum( + compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1)) + for s in content_spans + ) + return filtered_weight / total_weight if total_weight > 0 else 0 + + def select_evidence(prim_spans: list[dict], valence: str, max_count: int = 3) -> list[dict]: + """Select top evidence spans by weight, one per review_id.""" + filtered = [s for s in prim_spans if s.get("valence") == valence] + weighted = [ + (s, compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1))) + for s in filtered + ] + weighted.sort(key=lambda x: -x[1]) + + selected = [] + seen_reviews = set() + for s, w in weighted: + if s["review_id"] not in seen_reviews and len(selected) < max_count: + text = s.get("span_text", "")[:240] # Cap at 240 chars + selected.append({ + "review_id": s["review_id"], + "language": s.get("language", "unknown"), + "span_text": text, + "valence": s.get("valence", "0"), + "intensity": s.get("intensity", 1), + "detail": s.get("detail", 1), + "confidence": round(float(s.get("confidence", 0.5)), 3), + }) + seen_reviews.add(s["review_id"]) + return selected + + # Compute drivers + positive_drivers = [] + negative_drivers = [] + + for prim, prim_spans in primitive_spans.items(): + if prim in ("UNMAPPED", "NON_INFORMATIVE"): + continue + + pos_impact = compute_impact(prim_spans, "+") + neg_impact = compute_impact(prim_spans, "-") + + if pos_impact > 0.02: # Threshold for significance + positive_drivers.append({ + "primitive": prim, + "impact": round(pos_impact, 3), + "summary": f"Positive {DOMAIN_MAP.get(prim, 'meta')}/{prim} mentions.", + "evidence": select_evidence(prim_spans, "+"), + }) + + if neg_impact > 0.02: + negative_drivers.append({ + "primitive": prim, + "impact": round(neg_impact, 3), + "summary": f"Negative {DOMAIN_MAP.get(prim, 'meta')}/{prim} mentions.", + "evidence": select_evidence(prim_spans, "-"), + }) + + # Sort by impact and take top 5 + positive_drivers.sort(key=lambda x: -x["impact"]) + negative_drivers.sort(key=lambda x: -x["impact"]) + + drivers = { + "positives": positive_drivers[:5], + "negatives": negative_drivers[:5], + } + + # ================================================================= + # ALERTS + # ================================================================= + + alerts = [] + + # UNMAPPED rate alert + unmapped_rate = population["unmapped_rate_content"] + if unmapped_rate > ALERT_THRESHOLDS["unmapped_rate_content"]["critical"]: + alerts.append({ + "severity": "critical", + "type": "unmapped_high", + "message": f"Content-adjusted UNMAPPED rate ({unmapped_rate:.1%}) exceeds critical threshold.", + "metric": {"name": "unmapped_rate_content", "value": unmapped_rate, "threshold": 0.15}, + "scope": {"primitive": None, "language": None}, + }) + elif unmapped_rate > ALERT_THRESHOLDS["unmapped_rate_content"]["warn"]: + alerts.append({ + "severity": "warn", + "type": "unmapped_high", + "message": f"Content-adjusted UNMAPPED rate ({unmapped_rate:.1%}) above target.", + "metric": {"name": "unmapped_rate_content", "value": unmapped_rate, "threshold": 0.10}, + "scope": {"primitive": None, "language": None}, + }) + + # SAFETY alert + if "SAFETY" in primitive_scores: + safety_neg = primitive_scores["SAFETY"]["valence_counts"].get("-", 0) + safety_total = primitive_scores["SAFETY"]["volume"] + safety_neg_share = safety_neg / safety_total if safety_total > 0 else 0 + + if safety_neg_share > ALERT_THRESHOLDS["safety_negative_share"]["critical"]: + alerts.append({ + "severity": "critical", + "type": "safety_spike", + "message": f"SAFETY negative share ({safety_neg_share:.1%}) exceeds critical threshold.", + "metric": {"name": "safety_negative_share", "value": safety_neg_share, "threshold": 0.10}, + "scope": {"primitive": "SAFETY", "language": None}, + }) + elif safety_neg_share > ALERT_THRESHOLDS["safety_negative_share"]["warn"]: + alerts.append({ + "severity": "warn", + "type": "safety_spike", + "message": f"SAFETY negative share ({safety_neg_share:.1%}) above warning threshold.", + "metric": {"name": "safety_negative_share", "value": safety_neg_share, "threshold": 0.05}, + "scope": {"primitive": "SAFETY", "language": None}, + }) + + # Low confidence alert + if avg_conf < ALERT_THRESHOLDS["avg_confidence"]["warn"]: + alerts.append({ + "severity": "warn", + "type": "low_confidence_high", + "message": f"Average confidence ({avg_conf:.2f}) below threshold.", + "metric": {"name": "avg_confidence", "value": avg_conf, "threshold": 0.70}, + "scope": {"primitive": None, "language": None}, + }) + + # ================================================================= + # RECOMMENDATIONS (templated based on negative drivers) + # ================================================================= + + recommendations = [] + + # Generate recommendations from top negative drivers + playbook_templates = { + "CLEANLINESS": { + "title": "Improve cleanliness protocols", + "actions": [ + "Increase cleaning frequency during peak hours", + "Implement visible cleaning checklists", + "Train staff on cleanliness standards" + ], + "owner": "ops", + "time_to_impact": 14, + }, + "SPEED": { + "title": "Reduce wait times", + "actions": [ + "Analyze peak hour staffing", + "Optimize queue management", + "Set wait time targets and monitor" + ], + "owner": "ops", + "time_to_impact": 21, + }, + "MANNER": { + "title": "Enhance staff friendliness", + "actions": [ + "Conduct hospitality training", + "Implement customer feedback program", + "Recognize staff for positive mentions" + ], + "owner": "service", + "time_to_impact": 30, + }, + "PRICE_FAIRNESS": { + "title": "Review pricing perception", + "actions": [ + "Audit pricing vs. competitor benchmarks", + "Improve value communication", + "Consider value-add bundles" + ], + "owner": "pricing", + "time_to_impact": 45, + }, + "SAFETY": { + "title": "Address safety concerns immediately", + "actions": [ + "Conduct safety audit", + "Review and update safety protocols", + "Increase visible safety measures" + ], + "owner": "safety", + "time_to_impact": 7, + }, + } + + priority_map = {"critical": "P0", "high": "P1", "medium": "P2"} + + for i, driver in enumerate(negative_drivers[:3]): + prim = driver["primitive"] + template = playbook_templates.get(prim, { + "title": f"Improve {prim.replace('_', ' ').title()}", + "actions": [ + f"Investigate root causes of negative {prim} feedback", + "Develop improvement action plan", + "Monitor {prim} score weekly" + ], + "owner": "ops", + "time_to_impact": 30, + }) + + priority = "P0" if i == 0 and driver["impact"] > 0.10 else ("P1" if i < 2 else "P2") + + recommendations.append({ + "priority": priority, + "primitive": prim, + "title": template["title"], + "rationale": driver["summary"], + "playbook": { + "actions": template["actions"], + "owner": template["owner"], + "expected_time_to_impact_days": template["time_to_impact"], + "measurement": [f"{prim} score improvement", f"Negative {prim} share reduction"], + }, + }) + + # If SAFETY alert triggered, ensure SAFETY is P0 recommendation + safety_alert_triggered = any(a["type"] == "safety_spike" for a in alerts) + + if safety_alert_triggered: + # Check if SAFETY already in recommendations + safety_rec_idx = next( + (i for i, r in enumerate(recommendations) if r["primitive"] == "SAFETY"), + None + ) + + if safety_rec_idx is not None: + # Upgrade existing SAFETY recommendation to P0 and move to front + safety_rec = recommendations.pop(safety_rec_idx) + safety_rec["priority"] = "P0" + safety_rec["rationale"] = "SAFETY alert triggered - immediate action required." + recommendations.insert(0, safety_rec) + else: + # Add new SAFETY P0 recommendation at front + safety_template = playbook_templates["SAFETY"] + recommendations.insert(0, { + "priority": "P0", + "primitive": "SAFETY", + "title": safety_template["title"], + "rationale": "SAFETY alert triggered - immediate action required.", + "playbook": { + "actions": safety_template["actions"], + "owner": safety_template["owner"], + "expected_time_to_impact_days": safety_template["time_to_impact"], + "measurement": ["SAFETY score improvement", "Negative SAFETY share reduction"], + }, + }) + + # ================================================================= + # TIME COMPARISON (previous period) + # ================================================================= + + comparisons = { + "previous_window": None, + "sector_benchmark": None, + } + timeline_points = [] + timeline_granularity = "week" # Default granularity + + # Use CLI-provided window for time comparison + current_start = start + current_end = end + + # Check review_time coverage within the requested window + coverage_row = await pool.fetchrow( + REVIEW_TIME_COVERAGE_QUERY, + canonical_id, + current_start, + current_end + ) + + review_time_coverage = 0.0 + review_time_window_utc = None + + if coverage_row and coverage_row["spans_reviews"] > 0: + review_time_coverage = (coverage_row["facts_reviews"] or 0) / coverage_row["spans_reviews"] + if coverage_row["min_review_time"] and coverage_row["max_review_time"]: + review_time_window_utc = { + "start": iso_z(coverage_row["min_review_time"]), + "end": iso_z(coverage_row["max_review_time"]), + } + + # Add to population + population["review_time_coverage_pct"] = round(review_time_coverage * 100, 1) + population["review_time_window_utc"] = review_time_window_utc + + # Only compute time comparison if we have sufficient coverage + if current_start and current_end and review_time_coverage >= MIN_COVERAGE_FOR_COMPARISON: + # Compute window duration + window_duration = current_end - current_start + days_in_window = max(window_duration.days, 1) + + # Previous period: same duration, immediately preceding current + prev_end = current_start + prev_start = prev_end - window_duration + + # Get current period scores + current_row = await pool.fetchrow( + PERIOD_SCORES_QUERY, + canonical_id, + current_start, + current_end + ) + + # Get previous period scores + prev_row = await pool.fetchrow( + PERIOD_SCORES_QUERY, + canonical_id, + prev_start, + prev_end + ) + + current_reviews = current_row["review_count"] if current_row else 0 + prev_reviews = prev_row["review_count"] if prev_row else 0 + + # Check minimum sample size thresholds + if current_reviews >= MIN_REVIEWS_FOR_COMPARISON: + current_score = float(current_row["overall_score"] or 0) + current_rating = float(current_row["avg_rating"] or 0) + current_pos_share = (current_row["positive_count"] or 0) / max(current_row["span_count"], 1) + current_neg_share = (current_row["negative_count"] or 0) / max(current_row["span_count"], 1) + + prev_score = 0.0 + prev_rating = 0.0 + prev_pos_share = 0.0 + prev_neg_share = 0.0 + + if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON: + prev_score = float(prev_row["overall_score"] or 0) + prev_rating = float(prev_row["avg_rating"] or 0) + prev_pos_share = (prev_row["positive_count"] or 0) / max(prev_row["span_count"], 1) + prev_neg_share = (prev_row["negative_count"] or 0) / max(prev_row["span_count"], 1) + + # Compute delta and trend + delta = current_score - prev_score + if prev_reviews < MIN_REVIEWS_FOR_COMPARISON: + trend = "insufficient_data" + else: + trend = compute_trend_label(delta) + + comparisons["previous_window"] = { + "window": { + "start": iso_z(prev_start), + "end": iso_z(prev_end), + }, + "scores": { + "overall": { + "current": round(current_score, 1), + "previous": round(prev_score, 1) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + "delta": round(delta, 1) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + "trend": trend, + }, + "avg_rating": { + "current": round(current_rating, 2), + "previous": round(prev_rating, 2) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + "delta": round(current_rating - prev_rating, 2) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + }, + "positive_share": { + "current": round(current_pos_share, 3), + "previous": round(prev_pos_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + "delta": round(current_pos_share - prev_pos_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + }, + "negative_share": { + "current": round(current_neg_share, 3), + "previous": round(prev_neg_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + "delta": round(current_neg_share - prev_neg_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None, + }, + }, + "reviews": { + "current": current_reviews, + "previous": prev_reviews, + }, + } + + # Generate timeline for charts (over current window only) + timeline_granularity = select_granularity(days_in_window) + if timeline_granularity not in ("day", "week", "month"): + timeline_granularity = "week" + + timeline_rows = await pool.fetch( + TIMESERIES_QUERY, + canonical_id, + current_start, + current_end, + timeline_granularity + ) + + for row in timeline_rows: + if row["bucket"]: + timeline_points.append({ + "bucket_start_utc": iso_z(row["bucket"]), + "review_count": row["review_count"], + "span_count": row["span_count"], + "positive_count": row["positive_count"], + "negative_count": row["negative_count"], + "avg_rating": float(row["avg_rating"]) if row["avg_rating"] else None, + "strength_score": float(row["strength_score"]) if row["strength_score"] else 0, + }) + + # ================================================================= + # SECTOR BENCHMARK (always a dict with status field) + # ================================================================= + + sector_code = config_row["sector_code"] + if not sector_code: + comparisons["sector_benchmark"] = { + "status": "missing_sector_code", + "sector_code": None, + "window": None, + "overall": None, + "sample_size": None, + } + else: + sector_row = await pool.fetchrow( + SECTOR_BENCHMARK_QUERY, + sector_code, + current_start, + current_end + ) + + if sector_row: + sector_spans = sector_row["sector_span_count"] or 0 + sector_businesses = sector_row["sector_business_count"] or 0 + sector_score = float(sector_row["sector_overall_score"] or 0) + + # Only include benchmark if sufficient sector data + if sector_spans >= 500 and sector_businesses >= 3: + delta_vs_sector = current_score - sector_score + comparisons["sector_benchmark"] = { + "status": "ok", + "sector_code": sector_code, + "window": { + "start": iso_z(current_start), + "end": iso_z(current_end), + }, + "overall": { + "business": round(current_score, 1), + "sector_avg": round(sector_score, 1), + "delta_vs_sector": round(delta_vs_sector, 1), + }, + "sample_size": { + "sector_spans": sector_spans, + "sector_businesses": sector_businesses, + }, + } + else: + comparisons["sector_benchmark"] = { + "status": "insufficient_data", + "sector_code": sector_code, + "window": { + "start": iso_z(current_start), + "end": iso_z(current_end), + }, + "overall": None, + "sample_size": { + "sector_spans": sector_spans, + "sector_businesses": sector_businesses, + }, + } + else: + comparisons["sector_benchmark"] = { + "status": "no_sector_data", + "sector_code": sector_code, + "window": { + "start": iso_z(current_start), + "end": iso_z(current_end), + }, + "overall": None, + "sample_size": None, + } + + # Add alert if coverage is too low for comparisons + if review_time_coverage < MIN_COVERAGE_FOR_COMPARISON and current_start and current_end: + alerts.append({ + "severity": "warn", + "type": "insufficient_trend_data", + "message": f"Review time coverage ({review_time_coverage:.0%}) below threshold for trend analysis.", + "metric": {"name": "review_time_coverage_pct", "value": review_time_coverage, "threshold": MIN_COVERAGE_FOR_COMPARISON}, + "scope": {"primitive": None, "language": None}, + }) + + # ================================================================= + # INVARIANT CHECK: scores.overall must match comparisons current score + # ================================================================= + + if window_mode == "time_window" and comparisons.get("previous_window"): + comparison_current = comparisons["previous_window"]["scores"]["overall"].get("current") + if comparison_current is not None: + delta = abs(overall_score - comparison_current) + if delta > 1.0: # Allow small rounding differences + alerts.append({ + "severity": "error", + "type": "internal_inconsistency", + "message": f"INTERNAL ERROR: scores.overall ({overall_score:.1f}) != comparisons.current ({comparison_current:.1f}). Report data is inconsistent.", + "metric": {"name": "score_delta", "value": delta, "threshold": 1.0}, + "scope": {"primitive": None, "language": None}, + }) + + # ================================================================= + # BUILD REPORT + # ================================================================= + + report = { + "schema_version": SCHEMA_VERSION, + "report_id": str(uuid.uuid4()), + "primary_run_id": primary_run_id, + "generated_at": iso_z(datetime.now(timezone.utc)), + "window": { + "start": iso_z(start), + "end": iso_z(end), + "timezone": tz_name, + "mode": window_mode, + }, + "business": { + "business_id": canonical_id, + "sector_code": config_row["sector_code"], + "gbp_path": config_row["gbp_path"], + "config_version": "1.0", # TODO: Get from spans + "l2_applied": False, # TODO: Get from config + }, + "population": population, + "scores": scores, + "drivers": drivers, + "alerts": alerts, + "recommendations": recommendations, + "comparisons": comparisons, + # Timeline: always a dict with points: [] (never null in time_window mode) + "timeline": { + "granularity": timeline_granularity, + "points": timeline_points, + } if window_mode == "time_window" else None, + "executive_summary": None, # Placeholder, filled below + "executive_summary_meta": None, # Placeholder, filled below + } + + # ================================================================= + # EXECUTIVE SUMMARY (LLM-generated with fallback) + # ================================================================= + + summary_input = build_summary_input(report) + summary_text, summary_meta = await generate_executive_summary( + summary_input, + model=summary_model, + enabled=summary_enabled, + ) + + # If LLM failed but summary was enabled, use deterministic fallback + if summary_enabled and not summary_text: + summary_text = generate_fallback_summary(report) + summary_meta["fallback_used"] = True + else: + summary_meta["fallback_used"] = False + + report["executive_summary"] = summary_text + report["executive_summary_meta"] = summary_meta + + return report + + finally: + if close_pool: + await pool.close() + + +# ============================================================================= +# CLI +# ============================================================================= + +async def main_async(args): + """Main async entry point.""" + # Determine time window + now_utc = datetime.now(timezone.utc) + end = now_utc + start = None + days = args.days + + if args.start and args.end: + start = datetime.fromisoformat(args.start) + end = datetime.fromisoformat(args.end) + # Make timezone-aware if not + if start.tzinfo is None: + start = start.replace(tzinfo=timezone.utc) + if end.tzinfo is None: + end = end.replace(tzinfo=timezone.utc) + elif args.days: + start = end - timedelta(days=args.days) + else: + # Default: last 30 days + days = 30 + start = end - timedelta(days=30) + + # Determine summary settings + summary_enabled = not args.no_summary + + # Generate report + report = await generate_report( + business_id=args.business, + start=start, + end=end, + run_id=args.run_id, + tz_name=args.timezone, + summary_enabled=summary_enabled, + summary_model=args.summary_model, + ) + + if "error" in report: + print(f"Error: {report['error']}") + raise SystemExit(1) + + # Check --require-summary: if LLM summary required but not generated, fail before writing + if args.require_summary: + meta = report.get("executive_summary_meta", {}) + if not meta.get("generated", False): + error_msg = meta.get("error", "unknown error") + print(f"Error: --require-summary specified but LLM summary failed: {error_msg}") + raise SystemExit(2) + + # Output + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2, default=str) + print(f"Report written to {args.output}") + else: + print(json.dumps(report, indent=2, default=str)) + + # Print summary + if not args.quiet: + print("\n" + "=" * 60) + print(f"REPUTATION REPORT: {report['business']['business_id']}") + print("=" * 60) + print(f"Window: {report['window']['start']} - {report['window']['end']}") + print(f"Reviews: {report['population']['reviews_processed']}") + print(f"Content spans: {report['population']['spans_content']}") + print(f"Overall score: {report['scores']['overall']['score']}") + print(f"Positive share: {report['scores']['overall']['positive_share']:.1%}") + print(f"Negative share: {report['scores']['overall']['negative_share']:.1%}") + print(f"\nTop positive drivers:") + for d in report['drivers']['positives'][:3]: + print(f" {d['primitive']}: {d['impact']:.1%} impact") + print(f"\nTop negative drivers:") + for d in report['drivers']['negatives'][:3]: + print(f" {d['primitive']}: {d['impact']:.1%} impact") + if report['alerts']: + print(f"\nAlerts ({len(report['alerts'])}):") + for a in report['alerts']: + print(f" [{a['severity']}] {a['message']}") + print("=" * 60) + + +def main(): + parser = argparse.ArgumentParser(description="Generate reputation report") + parser.add_argument("--business", required=True, help="Business ID or search pattern") + parser.add_argument("--start", help="Window start (ISO-8601)") + parser.add_argument("--end", help="Window end (ISO-8601)") + parser.add_argument("--days", type=int, help="Last N days (alternative to start/end)") + parser.add_argument("--run-id", help="Specific run ID to use") + parser.add_argument("--timezone", default="UTC", help="IANA timezone (default: UTC)") + parser.add_argument("--output", "-o", help="Output file (default: stdout)") + parser.add_argument("--quiet", "-q", action="store_true", help="Suppress summary output") + + # Executive summary options + parser.add_argument("--no-summary", action="store_true", help="Disable executive summary generation") + parser.add_argument("--require-summary", action="store_true", help="Fail with exit code 2 if LLM summary fails") + parser.add_argument("--summary-model", default="gpt-4o-mini", help="Model for executive summary (default: gpt-4o-mini)") + + args = parser.parse_args() + + if not args.start and not args.end and not args.days and not args.run_id: + print("Note: No time window specified, using latest run.") + + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + main() diff --git a/packages/reviewiq-pipeline/tests/test_executive_summary.py b/packages/reviewiq-pipeline/tests/test_executive_summary.py new file mode 100644 index 0000000..3c19f73 --- /dev/null +++ b/packages/reviewiq-pipeline/tests/test_executive_summary.py @@ -0,0 +1,473 @@ +""" +Unit tests for executive summary narrative guardrails. + +Tests the three critical summary selection scenarios: +1. Negative driver present → cite top negative driver, no "dip" +2. No negatives + qualifying recent dip → cite most recent qualifying dip +3. No negatives + only non-qualifying dips → no "recent dip" +""" + +import json +import pytest +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +# Import the functions we're testing +import sys +from pathlib import Path + +# Add scripts directory to path +scripts_dir = Path(__file__).parent.parent / "scripts" +sys.path.insert(0, str(scripts_dir)) + +import reputation_report as rr + + +# ============================================================================= +# HELPER: Determine watch item based on narrative guardrails +# ============================================================================= + +def determine_watch_item( + negatives: list[dict], + timeline_points: list[dict], + window_end: datetime, + negative_share: float = 0.0, +) -> dict: + """ + Determine what weakness/watch-item should be cited based on narrative guardrails. + + Priority: + 1. If negatives non-empty → top negative driver + 2. If negatives empty but qualifying dip exists → most recent qualifying dip + 3. Otherwise → no watch item + + Qualifying dip: + - Within 90 days of window_end + - review_count >= 3 + - strength_score < 0 OR avg_rating < 3.0 + + Returns: + dict with keys: type, data, should_cite_recent_dip + """ + # Priority 1: Negative driver + if negatives: + return { + "type": "negative_driver", + "data": negatives[0], # Top by impact + "should_cite_recent_dip": False, + } + + # Priority 2: Qualifying recent dip + qualifying_dips = [] + cutoff = window_end - timedelta(days=90) + + for point in timeline_points: + bucket_start = point.get("bucket_start_utc") + if isinstance(bucket_start, str): + # Parse ISO string + bucket_dt = datetime.fromisoformat(bucket_start.replace("Z", "+00:00")) + else: + bucket_dt = bucket_start + + # Check if within 90 days + if bucket_dt < cutoff: + continue + + # Check minimum volume + review_count = point.get("review_count", 0) + if review_count < 3: + continue + + # Check if it's actually a dip (negative strength or low rating) + strength = point.get("strength_score", 0) + avg_rating = point.get("avg_rating", 5.0) + if strength < 0 or (avg_rating is not None and avg_rating < 3.0): + qualifying_dips.append({ + "bucket_dt": bucket_dt, + "point": point, + }) + + if qualifying_dips: + # Most recent qualifying dip + most_recent = max(qualifying_dips, key=lambda x: x["bucket_dt"]) + return { + "type": "recent_dip", + "data": most_recent["point"], + "should_cite_recent_dip": True, + } + + # Priority 3: No watch item + return { + "type": "none", + "data": None, + "should_cite_recent_dip": False, + } + + +def extract_dip_info_from_summary(summary: str) -> dict: + """Extract dip-related information from summary text.""" + import re + + summary_lower = summary.lower() + + has_recent_dip = "recent dip" in summary_lower + has_dip_mention = "dip" in summary_lower + has_no_major_issues = "no major issues" in summary_lower + has_no_persistent_weaknesses = "no persistent weakness" in summary_lower + has_limited_data = "limited data" in summary_lower + + # Extract month mentions + months = ["january", "february", "march", "april", "may", "june", + "july", "august", "september", "october", "november", "december"] + mentioned_months = [m for m in months if m in summary_lower] + + # Extract metrics + rating_match = re.search(r'(?:avg[_\s]?rating|average rating)[:\s]+(\d+\.?\d*)', summary_lower) + strength_match = re.search(r'strength[_\s]?score[:\s]+([-]?\d+\.?\d*)', summary_lower) + + return { + "has_recent_dip": has_recent_dip, + "has_dip_mention": has_dip_mention, + "has_no_major_issues": has_no_major_issues, + "has_no_persistent_weaknesses": has_no_persistent_weaknesses, + "has_limited_data": has_limited_data, + "mentioned_months": mentioned_months, + "extracted_rating": float(rating_match.group(1)) if rating_match else None, + "extracted_strength": float(strength_match.group(1)) if strength_match else None, + } + + +# ============================================================================= +# TEST FIXTURES +# ============================================================================= + +@pytest.fixture +def window_end(): + """Fixed window end for deterministic tests.""" + return datetime(2026, 1, 31, tzinfo=timezone.utc) + + +@pytest.fixture +def base_report(window_end): + """Base report structure for tests.""" + return { + "schema_version": "1.0", + "window": { + "start": (window_end - timedelta(days=365)).isoformat().replace("+00:00", "Z"), + "end": window_end.isoformat().replace("+00:00", "Z"), + }, + "business": { + "business_id": "Test Business", + "sector_code": "TEST", + }, + "scores": { + "overall": { + "score": 75.0, + "positive_share": 0.70, + "negative_share": 0.20, + }, + }, + "population": { + "reviews_processed": 100, + }, + "drivers": { + "positives": [ + {"primitive": "VALUE_FOR_MONEY", "impact": 0.15, "summary": "Good value"}, + ], + "negatives": [], + }, + "comparisons": { + "previous_window": { + "scores": { + "overall": { + "current": 75.0, + "previous": 80.0, + "delta": -5.0, + "trend": "declining", + } + } + } + }, + "timeline": { + "granularity": "month", + "points": [], + }, + } + + +# ============================================================================= +# TEST 1: Negative driver present ⇒ cite top negative driver, no "dip" +# ============================================================================= + +class TestNegativeDriverPresent: + """Test that when negative drivers exist, they are cited instead of dips.""" + + def test_watch_item_selection_with_negative_driver(self, window_end): + """Given negative drivers, watch item should be the top negative driver.""" + negatives = [ + {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, + {"primitive": "SPEED", "impact": 0.05, "summary": "Slow service"}, + ] + + # Include qualifying dips that should be ignored + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"), + "review_count": 10, + "strength_score": -25.0, + "avg_rating": 2.5, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "negative_driver" + assert result["data"]["primitive"] == "RELIABILITY" + assert result["should_cite_recent_dip"] is False + + def test_summary_input_includes_negative_driver(self, base_report): + """build_summary_input should include the negative driver.""" + base_report["drivers"]["negatives"] = [ + {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, + ] + + summary_input = rr.build_summary_input(base_report) + + assert len(summary_input["drivers"]["negatives"]) == 1 + assert summary_input["drivers"]["negatives"][0]["primitive"] == "RELIABILITY" + + def test_fallback_summary_mentions_negative_driver(self, base_report): + """Fallback summary should mention the negative driver.""" + base_report["drivers"]["negatives"] = [ + {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, + ] + + summary = rr.generate_fallback_summary(base_report) + + assert "reliability" in summary.lower() + + +# ============================================================================= +# TEST 2: No negatives + qualifying recent dip ⇒ cite most recent qualifying dip +# ============================================================================= + +class TestQualifyingRecentDip: + """Test that qualifying recent dips are cited when no negative drivers exist.""" + + def test_watch_item_selects_most_recent_qualifying_dip(self, window_end): + """Given multiple qualifying dips, should select the most recent one.""" + negatives = [] + + # Two qualifying dips - Dec and Nov + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=60)).isoformat().replace("+00:00", "Z"), # Dec + "review_count": 8, + "strength_score": -32.6, + "avg_rating": 2.88, + }, + { + "bucket_start_utc": (window_end - timedelta(days=90)).isoformat().replace("+00:00", "Z"), # Nov + "review_count": 5, + "strength_score": -15.0, + "avg_rating": 3.2, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "recent_dip" + assert result["should_cite_recent_dip"] is True + # Should be December (60 days ago), not November (90 days ago) + assert result["data"]["strength_score"] == -32.6 + + def test_qualifying_dip_must_have_min_volume(self, window_end): + """Dips with review_count < 3 should not qualify.""" + negatives = [] + + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"), + "review_count": 2, # Below threshold + "strength_score": -50.0, + "avg_rating": 1.5, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "none" + assert result["should_cite_recent_dip"] is False + + def test_qualifying_dip_must_be_within_90_days(self, window_end): + """Dips older than 90 days should not qualify as 'recent'.""" + negatives = [] + + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=120)).isoformat().replace("+00:00", "Z"), + "review_count": 10, + "strength_score": -40.0, + "avg_rating": 2.0, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "none" + assert result["should_cite_recent_dip"] is False + + def test_summary_should_not_say_no_major_issues_with_dip(self, base_report, window_end): + """When citing a dip, summary should not say 'no major issues identified'.""" + # This is a prompt constraint - test via extraction helper + + # Example summary that violates the rule + bad_summary = "Score is 75. Recent dip in December. No major issues identified." + info = extract_dip_info_from_summary(bad_summary) + + # This combination is invalid per our guardrails + assert info["has_recent_dip"] is True + assert info["has_no_major_issues"] is True + # Test should flag this as a violation + is_valid = not (info["has_recent_dip"] and info["has_no_major_issues"]) + assert is_valid is False, "Summary violates guardrail: cites dip AND says no major issues" + + def test_valid_dip_summary_structure(self): + """Valid summary with dip should include month and metric, no contradiction.""" + good_summary = ( + "Business has a score of 75. There was a recent dip in December 2025 " + "(avg_rating 2.88). No dominant negative driver identified overall. " + "Investigate operational changes during that month." + ) + + info = extract_dip_info_from_summary(good_summary) + + assert info["has_recent_dip"] is True + assert "december" in info["mentioned_months"] + assert info["extracted_rating"] == 2.88 + assert info["has_no_major_issues"] is False + + +# ============================================================================= +# TEST 3: No negatives + only non-qualifying dips ⇒ no "recent dip" +# ============================================================================= + +class TestNonQualifyingDips: + """Test that non-qualifying dips are not cited as 'recent'.""" + + def test_old_dip_not_cited_as_recent(self, window_end): + """Dips older than 90 days should not be watch items.""" + negatives = [] + + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=180)).isoformat().replace("+00:00", "Z"), + "review_count": 15, + "strength_score": -45.0, + "avg_rating": 2.0, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "none" + assert result["should_cite_recent_dip"] is False + + def test_sparse_dip_not_cited_as_recent(self, window_end): + """Dips with < 3 reviews should not be watch items.""" + negatives = [] + + timeline_points = [ + { + "bucket_start_utc": (window_end - timedelta(days=15)).isoformat().replace("+00:00", "Z"), + "review_count": 1, + "strength_score": -80.0, + "avg_rating": 1.0, + }, + ] + + result = determine_watch_item(negatives, timeline_points, window_end) + + assert result["type"] == "none" + + def test_valid_no_weakness_summary(self): + """Valid summary with no qualifying weakness should say 'no persistent weaknesses'.""" + good_summary = ( + "Business has a strong score of 85. Customers love the value for money. " + "No persistent weaknesses surfaced. Continue amplifying value messaging." + ) + + info = extract_dip_info_from_summary(good_summary) + + assert info["has_recent_dip"] is False + assert info["has_no_persistent_weaknesses"] is True + + def test_limited_data_framing_is_valid(self): + """If old/sparse dip is mentioned, it should be framed as 'limited data'.""" + limited_data_summary = ( + "Business has a score of 70. A dip was observed in March (limited data). " + "Value for money is the top strength." + ) + + info = extract_dip_info_from_summary(limited_data_summary) + + assert info["has_dip_mention"] is True + assert info["has_recent_dip"] is False # Not "recent dip" + assert info["has_limited_data"] is True + + def test_fallback_summary_no_negatives_no_dip(self, base_report): + """Fallback summary with no negatives should mention positive driver.""" + base_report["drivers"]["negatives"] = [] + + summary = rr.generate_fallback_summary(base_report) + + # Should mention the positive driver + assert "value for money" in summary.lower() or "value" in summary.lower() + # Should not mention dip (fallback doesn't analyze timeline) + + +# ============================================================================= +# TEST: Prompt input construction +# ============================================================================= + +class TestPromptInputConstruction: + """Test that build_summary_input correctly prepares data for LLM.""" + + def test_summary_input_includes_timeline_last_3(self, base_report, window_end): + """build_summary_input should include only the last 3 timeline points.""" + base_report["timeline"]["points"] = [ + {"bucket_start_utc": "2025-09-01T00:00:00Z", "review_count": 5}, + {"bucket_start_utc": "2025-10-01T00:00:00Z", "review_count": 6}, + {"bucket_start_utc": "2025-11-01T00:00:00Z", "review_count": 7}, + {"bucket_start_utc": "2025-12-01T00:00:00Z", "review_count": 8}, + ] + + summary_input = rr.build_summary_input(base_report) + + assert len(summary_input["timeline"]["points"]) == 3 + # Should be the last 3 (Oct, Nov, Dec) + assert summary_input["timeline"]["points"][0]["review_count"] == 6 + assert summary_input["timeline"]["points"][2]["review_count"] == 8 + + def test_summary_input_includes_scores(self, base_report): + """build_summary_input should include overall score.""" + summary_input = rr.build_summary_input(base_report) + + assert "scores" in summary_input + assert "overall" in summary_input["scores"] + assert summary_input["scores"]["overall"] == base_report["scores"]["overall"] + + def test_summary_input_includes_comparisons(self, base_report): + """build_summary_input should include comparisons for trend.""" + summary_input = rr.build_summary_input(base_report) + + assert "comparisons" in summary_input + assert summary_input["comparisons"] == base_report["comparisons"] + + +# ============================================================================= +# RUN TESTS +# ============================================================================= + +if __name__ == "__main__": + pytest.main([__file__, "-v"])