Files
whyrating-engine-legacy/packages/reviewiq-pipeline/scripts/reputation_report.py
Alejandro Gutiérrez 5324542e72 feat(reputation-report): Add production-grade reputation report generator v8
Production fixes:
- Cross-business join safety: all queries join on (review_id, business_id)
- Timestamp normalization: iso_z() for all output timestamps
- Score formula alignment: matches PERIOD_SCORES_QUERY for consistency
- Invariant check: fails if scores.overall != comparisons.current
- primary_run_id: uses max(created_at) in time_window mode
- Language normalization: auto/auto-detect -> unknown
- Review language: majority voting over spans per review

Executive summary guardrails:
- Weakness priority: negative driver > qualifying dip > none
- Dip qualification: within 90 days AND review_count >= 3
- Most recent dip selection when multiple qualify
- No contradiction: "dip" cannot pair with "no major issues"
- Action grounding: must tie to cited weakness or top positive driver

CLI options:
- --no-summary: disable executive summary
- --require-summary: exit code 2 if LLM fails
- --summary-model: configurable model (default gpt-4o-mini)

Includes unit test suite (16 tests) for narrative guardrails.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 23:10:25 +00:00

1601 lines
61 KiB
Python

#!/usr/bin/env python3
"""
Reputation Report Generator v1.0
Generates business-facing, time-windowed, language-aware reputation reports
from detected_spans_v2 data.
Usage:
python reputation_report.py --business "Pura Vida Hostel" --days 30
python reputation_report.py --business "gokarts" --start 2026-01-01 --end 2026-01-31
python reputation_report.py --business "gokarts" --run-id <uuid>
Backfill run_id for legacy rows (idempotent, only fills NULL run_ids):
WITH job_groups AS (
SELECT business_id, job_id, gen_random_uuid() AS new_run_id
FROM pipeline.detected_spans_v2
WHERE job_id IS NOT NULL AND run_id IS NULL
GROUP BY business_id, job_id
)
UPDATE pipeline.detected_spans_v2 s
SET run_id = g.new_run_id
FROM job_groups g
WHERE s.business_id = g.business_id
AND s.job_id = g.job_id
AND s.run_id IS NULL;
"""
import argparse
import asyncio
import json
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
import asyncpg
# Optional: OpenAI for executive summary
try:
from openai import AsyncOpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
# Database URL
DB_URL = os.environ.get(
"DATABASE_URL",
"postgresql://scraper:scraper123@localhost:5437/scraper"
)
# Schema version
SCHEMA_VERSION = "1.0"
# =============================================================================
# DOMAIN MAPPING (static)
# =============================================================================
DOMAIN_MAP = {
# O - Output/Product Quality
"TASTE": "O",
"CRAFT": "O",
"FRESHNESS": "O",
"TEMPERATURE": "O",
"EFFECTIVENESS": "O",
"ACCURACY": "O",
"CONDITION": "O",
"CONSISTENCY": "O",
# P - People/Service
"MANNER": "P",
"COMPETENCE": "P",
"ATTENTIVENESS": "P",
"COMMUNICATION": "P",
# J - Journey/Process
"SPEED": "J",
"FRICTION": "J",
"RELIABILITY": "J",
"AVAILABILITY": "J",
# E - Environment
"CLEANLINESS": "E",
"COMFORT": "E",
"SAFETY": "E",
"AMBIANCE": "E",
"ACCESSIBILITY": "E",
"DIGITAL_UX": "E",
# V - Value
"PRICE_LEVEL": "V",
"PRICE_FAIRNESS": "V",
"PRICE_TRANSPARENCY": "V",
"VALUE_FOR_MONEY": "V",
# Meta primitives
"HONESTY": "meta",
"ETHICS": "meta",
"PROMISES": "meta",
"ACKNOWLEDGMENT": "meta",
"RESPONSE_QUALITY": "meta",
"RECOVERY": "meta",
"RETURN_INTENT": "meta",
"RECOMMEND": "meta",
"RECOGNITION": "meta",
"UNMAPPED": "meta",
"NON_INFORMATIVE": "meta",
}
DOMAIN_NAMES = {
"O": "Output/Product",
"P": "People/Service",
"J": "Journey/Process",
"E": "Environment",
"V": "Value",
"meta": "Meta",
}
# Valence mapping for scoring
VALENCE_NUM = {
"+": 1.0,
"-": -1.0,
"0": 0.0,
"±": 0.0, # Mixed = neutral for scoring
}
# Alert thresholds (v1 defaults)
ALERT_THRESHOLDS = {
"unmapped_rate_content": {"warn": 0.10, "critical": 0.15},
"safety_negative_share": {"warn": 0.05, "critical": 0.10},
"avg_confidence": {"warn": 0.70}, # Below this triggers warn
"low_confidence_share": {"warn": 0.10},
}
# Executive summary prompt
EXEC_SUMMARY_SYSTEM = """You are a business analyst writing short performance summaries for business owners."""
EXEC_SUMMARY_USER = """Here is the performance data for a business:
{json_data}
Write a 3-5 sentence executive summary in clear business language.
Rules:
- Mention current performance score and what it means (0-100 scale; 80+ strong, 60-80 moderate, below 60 needs attention)
- Mention trend vs previous period if available (use comparisons.previous_window)
- Mention strongest positive driver from drivers.positives (highest impact) - what customers love
- For weaknesses:
- If drivers.negatives is non-empty: cite the top negative driver as the main weakness
- If drivers.negatives is empty but timeline shows a recent dip: cite the dip as the "main watch item" with month and metric (avg_rating or strength_score)
- Only cite a "recent dip" if that month is within 90 days of window.end AND review_count >= 3 for that bucket
- If multiple qualifying dips exist, cite the most recent one (closest to window.end)
- If the dip month is older or has low volume (<3 reviews), either skip or phrase as "a dip was observed in [month] (limited data)"
- If you mention a "dip", do NOT also say "no major issues identified" - instead say "no dominant negative driver identified overall"
- If no qualifying dip and no negative drivers: say "no persistent weaknesses surfaced"
- Suggested action must tie directly to (a) the cited weakness/dip (investigate or fix), or (b) the top positive driver (amplify it)
- For a dip with unknown cause, suggest investigating what changed that month (staffing, operations, etc.)
Do not mention:
- spans, primitives, confidence, models, algorithms
- technical implementation details
- "increase in negative feedback" unless drivers.negatives is non-empty
Tone: neutral, professional, actionable."""
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def to_utc_naive(dt: datetime | None) -> datetime | None:
"""Convert a datetime to UTC-naive for DB compatibility."""
if not dt:
return None
if dt.tzinfo is None:
return dt # assume already UTC-naive
return dt.astimezone(timezone.utc).replace(tzinfo=None)
def compute_span_weight(confidence: float, detail: int, intensity: int) -> float:
"""
Compute span weight for scoring.
w = confidence * (0.75 + 0.25*(detail-1)) * (0.8 + 0.2*(intensity-1))
"""
detail = detail or 1
intensity = intensity or 1
confidence = float(confidence) if confidence else 0.5
return confidence * (0.75 + 0.25 * (detail - 1)) * (0.8 + 0.2 * (intensity - 1))
def compute_primitive_score(spans: list[dict]) -> float:
"""
Compute weighted score for a primitive.
score = 100 * (sum(w * valence_num) / sum(w)), clamped to [-100, +100]
"""
if not spans:
return 0.0
total_weight = 0.0
weighted_valence = 0.0
for span in spans:
w = compute_span_weight(
span.get("confidence", 0.5),
span.get("detail", 1),
span.get("intensity", 1)
)
valence_num = VALENCE_NUM.get(span.get("valence", "0"), 0.0)
total_weight += w
weighted_valence += w * valence_num
if total_weight == 0:
return 0.0
score = 100 * (weighted_valence / total_weight)
return max(-100, min(100, score))
def percentile(values: list[float], p: float) -> float:
"""Compute percentile of a list of values."""
if not values:
return 0.0
sorted_values = sorted(values)
k = (len(sorted_values) - 1) * (p / 100)
f = int(k)
c = f + 1 if f + 1 < len(sorted_values) else f
return sorted_values[f] + (k - f) * (sorted_values[c] - sorted_values[f])
def compute_trend_label(delta: float, threshold: float = 5.0) -> str:
"""Compute trend label based on delta. Caller handles insufficient data."""
if delta > threshold:
return "improving"
elif delta < -threshold:
return "declining"
else:
return "stable"
def iso_z(dt: datetime | None) -> str | None:
"""Convert datetime to ISO-8601 UTC with Z suffix."""
if not dt:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
# Minimum thresholds for trend comparisons
MIN_REVIEWS_FOR_COMPARISON = 10 # At least 10 reviews per period
MIN_COVERAGE_FOR_COMPARISON = 0.80 # At least 80% review_time coverage
def build_summary_input(report: dict) -> dict:
"""Extract the minimal slice needed for LLM executive summary."""
summary_input = {
"business": {
"name": report["business"]["business_id"],
"sector": report["business"]["sector_code"],
},
"window": report["window"],
"scores": {
"overall": report["scores"]["overall"],
},
"comparisons": report["comparisons"],
"drivers": {
# Include top driver for each (LLM only needs the highest-impact one)
"positives": [report["drivers"]["positives"][0]] if report["drivers"]["positives"] else [],
"negatives": [report["drivers"]["negatives"][0]] if report["drivers"]["negatives"] else [],
},
"population": {
"reviews_processed": report["population"]["reviews_processed"],
},
}
# Add timeline (last 3 points only)
if report.get("timeline") and report["timeline"].get("points"):
summary_input["timeline"] = {
"granularity": report["timeline"]["granularity"],
"points": report["timeline"]["points"][-3:],
}
return summary_input
def generate_fallback_summary(report: dict) -> str:
"""Generate deterministic fallback summary from computed values."""
score = report["scores"]["overall"]["score"]
# Sentence 1: Score interpretation
if score >= 80:
band = "strong"
elif score >= 60:
band = "moderate"
elif score >= 40:
band = "mixed"
else:
band = "challenging"
sentence1 = f"This business has a {band} performance score of {score}."
# Sentence 2: Trend if available
sentence2 = ""
prev = report.get("comparisons", {}).get("previous_window")
if prev and prev.get("scores", {}).get("overall", {}).get("trend"):
trend = prev["scores"]["overall"]["trend"]
if trend == "improving":
sentence2 = " Performance is trending upward compared to the previous period."
elif trend == "declining":
sentence2 = " Performance has declined compared to the previous period."
elif trend == "stable":
sentence2 = " Performance has remained stable compared to the previous period."
# Sentence 3: Top drivers + action
positives = report.get("drivers", {}).get("positives", [])
negatives = report.get("drivers", {}).get("negatives", [])
sentence3 = ""
if positives:
top_pos = positives[0]["primitive"].replace("_", " ").title()
sentence3 += f" Customers respond positively to {top_pos}."
if negatives:
top_neg = negatives[0]["primitive"].replace("_", " ").title()
sentence3 += f" {top_neg} is an area for improvement."
return (sentence1 + sentence2 + sentence3).strip()
async def generate_executive_summary(
summary_input: dict,
model: str = "gpt-4o-mini",
enabled: bool = True,
) -> tuple[str | None, dict]:
"""
Generate executive summary using OpenAI.
Returns:
Tuple of (summary_text, meta_dict)
"""
meta = {
"enabled": enabled,
"generated": False,
"model": model,
"error": None,
"generated_at": None,
}
if not enabled:
meta["error"] = "summary generation disabled via --no-summary"
return None, meta
if not OPENAI_AVAILABLE:
meta["error"] = "openai package not installed"
return None, meta
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
meta["error"] = "OPENAI_API_KEY not set"
print("Warning: OPENAI_API_KEY not set, skipping executive summary generation.")
return None, meta
try:
client = AsyncOpenAI(api_key=api_key)
response = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": EXEC_SUMMARY_SYSTEM},
{"role": "user", "content": EXEC_SUMMARY_USER.format(json_data=json.dumps(summary_input, indent=2))},
],
temperature=0.3,
max_tokens=300,
)
summary = response.choices[0].message.content.strip()
meta["generated"] = True
meta["generated_at"] = iso_z(datetime.now(timezone.utc))
return summary, meta
except Exception as e:
meta["error"] = str(e)
print(f"Warning: Executive summary generation failed: {e}")
return None, meta
def select_granularity(days: int) -> str:
"""Select appropriate time bucket granularity based on window size."""
if days <= 14:
return "day"
elif days <= 90:
return "week"
else:
return "month"
async def resolve_business_id(pool: asyncpg.Pool, search_term: str) -> str | None:
"""Resolve partial business name to canonical business_id."""
import re
# Try exact match first
row = await pool.fetchrow("""
SELECT business_id
FROM pipeline.business_taxonomy_map
WHERE business_id = $1
""", search_term)
if row:
return row["business_id"]
# Build search patterns
search_patterns = [
f"%{search_term}%",
f"%{search_term.replace(' ', '%')}%",
]
# CamelCase pattern
camel_pattern = re.sub(r'([a-z])([A-Z])', r'\1%\2', search_term)
if camel_pattern != search_term:
search_patterns.append(f"%{camel_pattern}%")
# Lowercase concatenated words
if search_term.islower() and len(search_term) >= 4:
for i in range(2, len(search_term) - 1):
search_patterns.append(f"%{search_term[:i]}%{search_term[i:]}%")
for pattern in search_patterns:
row = await pool.fetchrow("""
SELECT business_id
FROM pipeline.business_taxonomy_map
WHERE LOWER(business_id) LIKE LOWER($1)
""", pattern)
if row:
return row["business_id"]
return None
# =============================================================================
# SQL QUERIES
# =============================================================================
# For latest_run mode: filter by run_id and/or created_at
SPANS_QUERY_LATEST_RUN = """
SELECT
s.id,
s.review_id,
s.run_id,
s.job_id,
s.primitive,
s.valence,
s.intensity,
s.detail,
s.confidence,
s.span_text,
s.language,
s.entity,
s.entity_type,
s.created_at,
NULL::timestamptz AS review_time_utc,
NULL::smallint AS rating
FROM pipeline.detected_spans_v2 s
WHERE s.business_id = $1
AND ($2::uuid IS NULL OR s.run_id = $2)
ORDER BY s.created_at DESC
"""
# For time_window mode: filter by review_time_utc via join with review_facts_v1
# CRITICAL: Join on BOTH review_id AND business_id to prevent cross-business contamination
SPANS_QUERY_TIME_WINDOW = """
SELECT
s.id,
s.review_id,
s.run_id,
s.job_id,
s.primitive,
s.valence,
s.intensity,
s.detail,
s.confidence,
s.span_text,
s.language,
s.entity,
s.entity_type,
s.created_at,
f.review_time_utc,
f.rating
FROM pipeline.detected_spans_v2 s
JOIN pipeline.review_facts_v1 f
ON f.review_id = s.review_id
AND f.business_id = s.business_id
WHERE s.business_id = $1
AND f.review_time_utc >= $2
AND f.review_time_utc < $3
ORDER BY f.review_time_utc DESC
"""
BUSINESS_CONFIG_QUERY = """
SELECT
business_id,
sector_code,
gbp_path::text as gbp_path
FROM pipeline.business_taxonomy_map
WHERE business_id = $1
"""
TIMESERIES_QUERY = """
SELECT
date_trunc($4, f.review_time_utc) AS bucket,
COUNT(DISTINCT s.review_id) AS review_count,
COUNT(*) AS span_count,
COUNT(*) FILTER (WHERE s.valence = '+') AS positive_count,
COUNT(*) FILTER (WHERE s.valence = '-') AS negative_count,
COUNT(*) FILTER (WHERE s.valence = '0') AS neutral_count,
COUNT(*) FILTER (WHERE s.valence = '±') AS mixed_count,
ROUND(AVG(f.rating)::numeric, 2) AS avg_rating,
-- Strength score scaled to [-100, 100]
CASE WHEN SUM(s.confidence * s.intensity) > 0 THEN
ROUND((100 * SUM(
CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END
* s.confidence * s.intensity
) / SUM(s.confidence * s.intensity))::numeric, 1)
ELSE 0 END AS strength_score
FROM pipeline.detected_spans_v2 s
JOIN pipeline.review_facts_v1 f
ON f.review_id = s.review_id
AND f.business_id = s.business_id
WHERE s.business_id = $1
AND f.review_time_utc >= $2
AND f.review_time_utc < $3
AND s.primitive <> 'NON_INFORMATIVE'
GROUP BY 1
ORDER BY 1
"""
PERIOD_SCORES_QUERY = """
SELECT
COUNT(DISTINCT s.review_id) AS review_count,
COUNT(*) AS span_count,
COUNT(*) FILTER (WHERE s.valence = '+') AS positive_count,
COUNT(*) FILTER (WHERE s.valence = '-') AS negative_count,
ROUND(AVG(f.rating)::numeric, 2) AS avg_rating,
CASE WHEN SUM(s.confidence * s.intensity) > 0 THEN
ROUND((100 * SUM(
CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END
* s.confidence * s.intensity
) / SUM(s.confidence * s.intensity))::numeric, 1)
ELSE 0 END AS overall_score
FROM pipeline.detected_spans_v2 s
JOIN pipeline.review_facts_v1 f
ON f.review_id = s.review_id
AND f.business_id = s.business_id
WHERE s.business_id = $1
AND f.review_time_utc >= $2
AND f.review_time_utc < $3
AND s.primitive <> 'NON_INFORMATIVE'
"""
REVIEW_TIME_COVERAGE_QUERY = """
SELECT
COUNT(DISTINCT s.review_id) AS spans_reviews,
COUNT(DISTINCT f.review_id) AS facts_reviews,
MIN(f.review_time_utc) AS min_review_time,
MAX(f.review_time_utc) AS max_review_time
FROM pipeline.detected_spans_v2 s
LEFT JOIN pipeline.review_facts_v1 f
ON f.review_id = s.review_id
AND f.business_id = s.business_id
WHERE s.business_id = $1
AND s.primitive <> 'NON_INFORMATIVE'
AND ($2::timestamptz IS NULL OR f.review_time_utc >= $2)
AND ($3::timestamptz IS NULL OR f.review_time_utc < $3)
"""
SECTOR_BENCHMARK_QUERY = """
SELECT
ROUND((100 * SUM(
CASE s.valence WHEN '+' THEN 1 WHEN '-' THEN -1 ELSE 0 END
* s.confidence * s.intensity
) / NULLIF(SUM(s.confidence * s.intensity), 0))::numeric, 1) AS sector_overall_score,
COUNT(*) AS sector_span_count,
COUNT(DISTINCT s.business_id) AS sector_business_count
FROM pipeline.detected_spans_v2 s
JOIN pipeline.review_facts_v1 f
ON f.review_id = s.review_id
AND f.business_id = s.business_id
WHERE s.sector_code = $1
AND f.review_time_utc >= $2
AND f.review_time_utc < $3
AND s.primitive <> 'NON_INFORMATIVE'
"""
LATEST_RUN_QUERY = """
SELECT run_id
FROM pipeline.detected_spans_v2
WHERE business_id = $1 AND run_id IS NOT NULL
GROUP BY run_id
ORDER BY MAX(created_at) DESC, run_id DESC
LIMIT 1
"""
# =============================================================================
# REPORT GENERATOR
# =============================================================================
async def generate_report(
business_id: str,
start: datetime | None = None,
end: datetime | None = None,
run_id: str | None = None,
tz_name: str = "UTC",
pool: asyncpg.Pool | None = None,
summary_enabled: bool = True,
summary_model: str = "gpt-4o-mini",
) -> dict[str, Any]:
"""
Generate a reputation report for a business.
Args:
business_id: Business ID or search pattern
start: Window start (optional if run_id provided)
end: Window end (optional if run_id provided)
run_id: Specific run ID to use (overrides time window)
tz_name: IANA timezone for window
pool: Database pool (created if not provided)
Returns:
ReputationReport JSON object
"""
close_pool = False
if pool is None:
pool = await asyncpg.create_pool(DB_URL)
close_pool = True
try:
# Resolve business_id
canonical_id = await resolve_business_id(pool, business_id)
if not canonical_id:
return {"error": f"Business not found: {business_id}"}
# Get business config
config_row = await pool.fetchrow(BUSINESS_CONFIG_QUERY, canonical_id)
if not config_row:
return {"error": f"Business not mapped: {canonical_id}"}
# Determine window mode: time_window if start/end provided, otherwise latest_run
window_mode = "time_window" if start and end else "latest_run"
# Convert to UTC-naive for DB compatibility
start_naive = to_utc_naive(start)
end_naive = to_utc_naive(end)
# Fetch spans based on mode - SINGLE SOURCE OF TRUTH
run_uuid = None
if window_mode == "time_window":
# Time window mode: filter by review_time_utc
spans = await pool.fetch(
SPANS_QUERY_TIME_WINDOW,
canonical_id,
start_naive,
end_naive
)
else:
# Latest run mode: filter by run_id
if run_id:
run_uuid = uuid.UUID(run_id)
else:
latest = await pool.fetchrow(LATEST_RUN_QUERY, canonical_id)
if latest and latest["run_id"]:
run_uuid = latest["run_id"]
spans = await pool.fetch(
SPANS_QUERY_LATEST_RUN,
canonical_id,
run_uuid
)
if not spans:
return {"error": "No spans found for this business/window"}
spans = [dict(s) for s in spans]
# Determine actual window from data
if window_mode == "time_window":
# Use review_time_utc for time window mode
review_times = [s["review_time_utc"] for s in spans if s.get("review_time_utc")]
actual_start = min(review_times) if review_times else start
actual_end = max(review_times) if review_times else end
else:
# Use created_at for latest run mode
created_times = [s["created_at"] for s in spans if s["created_at"]]
actual_start = min(created_times) if created_times else datetime.now(timezone.utc)
actual_end = max(created_times) if created_times else datetime.now(timezone.utc)
# Get unique run_ids (handle legacy data without run_id), sorted for determinism
run_ids_set = set(str(s["run_id"]) for s in spans if s["run_id"])
runs_included = sorted(run_ids_set) if run_ids_set else ["legacy:run_id_missing"]
# Get unique job_ids for traceability (ingestion vs classification debugging)
job_ids_set = set(str(s["job_id"]) for s in spans if s.get("job_id"))
jobs_included = sorted(job_ids_set) if job_ids_set else []
# Primary run_id:
# - In latest_run mode: the specific run we queried
# - In time_window mode: the newest run by max(created_at)
if window_mode == "latest_run":
primary_run_id = str(run_uuid) if run_uuid else None
else:
# time_window mode: find run with max created_at
run_max_created: dict[str, datetime] = {}
for s in spans:
rid = s.get("run_id")
if not rid:
continue
t = s.get("created_at")
rid_str = str(rid)
if t and (rid_str not in run_max_created or t > run_max_created[rid_str]):
run_max_created[rid_str] = t
primary_run_id = max(run_max_created, key=run_max_created.get) if run_max_created else None
# Latest span timestamp for traceability (UTC)
latest_span_created_at_utc = actual_end
# Get unique review_ids
review_ids = set(s["review_id"] for s in spans)
# =================================================================
# POPULATION STATS
# =================================================================
total_spans = len(spans)
non_informative = [s for s in spans if s["primitive"] == "NON_INFORMATIVE"]
content_spans = [s for s in spans if s["primitive"] != "NON_INFORMATIVE"]
unmapped_spans = [s for s in spans if s["primitive"] == "UNMAPPED"]
# Normalize language codes: map internal markers to standard codes
def normalize_language(lang: str | None) -> str:
if not lang:
return "unknown"
lang = lang.lower().strip()
# Map internal markers to "unknown"
if lang in ("auto", "auto-detect", ""):
return "unknown"
return lang
# Language distribution (by reviews - use majority span language per review)
review_lang_counts: dict[str, dict[str, int]] = {}
for s in spans:
rid = s["review_id"]
lang = normalize_language(s.get("language"))
if rid not in review_lang_counts:
review_lang_counts[rid] = {}
review_lang_counts[rid][lang] = review_lang_counts[rid].get(lang, 0) + 1
# Pick majority language for each review
review_langs = {rid: max(counts, key=counts.get) for rid, counts in review_lang_counts.items()}
lang_review_counts = {}
for lang in review_langs.values():
lang_review_counts[lang] = lang_review_counts.get(lang, 0) + 1
total_reviews = len(review_langs)
lang_dist_reviews = {k: v / total_reviews for k, v in lang_review_counts.items()} if total_reviews > 0 else {}
# Language distribution (by spans)
lang_span_counts = {}
for s in content_spans:
lang = normalize_language(s.get("language"))
lang_span_counts[lang] = lang_span_counts.get(lang, 0) + 1
total_content = len(content_spans)
lang_dist_spans = {k: v / total_content for k, v in lang_span_counts.items()} if total_content > 0 else {}
# Confidence stats
confidences = [float(s["confidence"]) for s in content_spans if s["confidence"]]
avg_conf = sum(confidences) / len(confidences) if confidences else 0.0
low_conf_count = len([c for c in confidences if c < 0.5])
population = {
"runs_included": runs_included,
"jobs_included": jobs_included,
"latest_span_created_at_utc": iso_z(latest_span_created_at_utc),
"reviews_processed": total_reviews,
"spans_total": total_spans,
"spans_non_informative": len(non_informative),
"spans_content": total_content,
"unmapped_spans": len(unmapped_spans),
"unmapped_rate_raw": len(unmapped_spans) / total_spans if total_spans > 0 else 0,
"unmapped_rate_content": len(unmapped_spans) / total_content if total_content > 0 else 0,
"language_distribution_reviews": lang_dist_reviews,
"language_distribution_spans": lang_dist_spans,
"confidence": {
"avg": round(avg_conf, 3),
"p10": round(percentile(confidences, 10), 3) if confidences else 0,
"p50": round(percentile(confidences, 50), 3) if confidences else 0,
"p90": round(percentile(confidences, 90), 3) if confidences else 0,
"low_confidence_count": low_conf_count,
}
}
# =================================================================
# SCORES
# =================================================================
# Group spans by primitive
primitive_spans = {}
for s in content_spans:
prim = s["primitive"]
if prim not in primitive_spans:
primitive_spans[prim] = []
primitive_spans[prim].append(s)
# Compute primitive scores
primitive_scores = {}
for prim, prim_spans in primitive_spans.items():
domain = DOMAIN_MAP.get(prim, "meta")
valence_counts = {"+": 0, "-": 0, "0": 0, "±": 0}
intensities = []
details = []
confidences_prim = []
entities = {}
for s in prim_spans:
v = s.get("valence", "0")
if v in valence_counts:
valence_counts[v] += 1
if s.get("intensity"):
intensities.append(s["intensity"])
if s.get("detail"):
details.append(s["detail"])
if s.get("confidence"):
confidences_prim.append(float(s["confidence"]))
if s.get("entity"):
key = (s["entity"], s.get("entity_type", "unknown"))
entities[key] = entities.get(key, 0) + 1
# Top entities
top_entities = [
{"entity": e[0], "entity_type": e[1], "count": c}
for e, c in sorted(entities.items(), key=lambda x: -x[1])[:5]
]
primitive_scores[prim] = {
"domain": domain,
"enabled": True,
"score": round(compute_primitive_score(prim_spans), 1),
"volume": len(prim_spans),
"valence_counts": valence_counts,
"intensity_avg": round(sum(intensities) / len(intensities), 2) if intensities else 1.0,
"detail_avg": round(sum(details) / len(details), 2) if details else 1.0,
"confidence_avg": round(sum(confidences_prim) / len(confidences_prim), 3) if confidences_prim else 0.5,
"top_entities": top_entities,
}
# Compute domain scores
domain_scores = {}
for domain_code in ["O", "P", "J", "E", "V"]:
domain_prims = [p for p, d in DOMAIN_MAP.items() if d == domain_code and p in primitive_scores]
if domain_prims:
total_volume = sum(primitive_scores[p]["volume"] for p in domain_prims)
weighted_score = sum(
primitive_scores[p]["score"] * primitive_scores[p]["volume"]
for p in domain_prims
)
domain_scores[domain_code] = {
"score": round(weighted_score / total_volume, 1) if total_volume > 0 else 0,
"volume": total_volume,
}
else:
domain_scores[domain_code] = {"score": 0, "volume": 0}
# Overall score - use same formula as PERIOD_SCORES_QUERY for consistency:
# 100 * SUM(valence * confidence * intensity) / SUM(confidence * intensity)
total_weight = 0.0
weighted_valence_sum = 0.0
for s in content_spans:
conf = float(s.get("confidence", 0.5))
intensity = float(s.get("intensity", 1))
weight = conf * intensity
total_weight += weight
valence = s.get("valence", "0")
if valence == "+":
weighted_valence_sum += weight
elif valence == "-":
weighted_valence_sum -= weight
# "0" and "±" contribute 0
overall_score = (100 * weighted_valence_sum / total_weight) if total_weight > 0 else 0
# Domain-weighted score (kept for domain breakdown, but not used as primary)
total_domain_volume = sum(d["volume"] for d in domain_scores.values())
domain_weighted_score = sum(
d["score"] * d["volume"] for d in domain_scores.values()
) / total_domain_volume if total_domain_volume > 0 else 0
# Valence shares
all_valences = [s.get("valence", "0") for s in content_spans]
valence_total = len(all_valences)
scores = {
"overall": {
"score": round(overall_score, 1),
"score_domain_weighted": round(domain_weighted_score, 1), # Alternative metric
"positive_share": round(all_valences.count("+") / valence_total, 3) if valence_total > 0 else 0,
"negative_share": round(all_valences.count("-") / valence_total, 3) if valence_total > 0 else 0,
"mixed_share": round(all_valences.count("±") / valence_total, 3) if valence_total > 0 else 0,
"neutral_share": round(all_valences.count("0") / valence_total, 3) if valence_total > 0 else 0,
},
"domains": domain_scores,
"primitives": primitive_scores,
}
# =================================================================
# DRIVERS
# =================================================================
def compute_impact(prim_spans: list[dict], valence_filter: str) -> float:
"""Compute impact as share of total weight for a valence."""
filtered = [s for s in prim_spans if s.get("valence") == valence_filter]
if not filtered:
return 0.0
filtered_weight = sum(
compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1))
for s in filtered
)
total_weight = sum(
compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1))
for s in content_spans
)
return filtered_weight / total_weight if total_weight > 0 else 0
def select_evidence(prim_spans: list[dict], valence: str, max_count: int = 3) -> list[dict]:
"""Select top evidence spans by weight, one per review_id."""
filtered = [s for s in prim_spans if s.get("valence") == valence]
weighted = [
(s, compute_span_weight(s.get("confidence", 0.5), s.get("detail", 1), s.get("intensity", 1)))
for s in filtered
]
weighted.sort(key=lambda x: -x[1])
selected = []
seen_reviews = set()
for s, w in weighted:
if s["review_id"] not in seen_reviews and len(selected) < max_count:
text = s.get("span_text", "")[:240] # Cap at 240 chars
selected.append({
"review_id": s["review_id"],
"language": s.get("language", "unknown"),
"span_text": text,
"valence": s.get("valence", "0"),
"intensity": s.get("intensity", 1),
"detail": s.get("detail", 1),
"confidence": round(float(s.get("confidence", 0.5)), 3),
})
seen_reviews.add(s["review_id"])
return selected
# Compute drivers
positive_drivers = []
negative_drivers = []
for prim, prim_spans in primitive_spans.items():
if prim in ("UNMAPPED", "NON_INFORMATIVE"):
continue
pos_impact = compute_impact(prim_spans, "+")
neg_impact = compute_impact(prim_spans, "-")
if pos_impact > 0.02: # Threshold for significance
positive_drivers.append({
"primitive": prim,
"impact": round(pos_impact, 3),
"summary": f"Positive {DOMAIN_MAP.get(prim, 'meta')}/{prim} mentions.",
"evidence": select_evidence(prim_spans, "+"),
})
if neg_impact > 0.02:
negative_drivers.append({
"primitive": prim,
"impact": round(neg_impact, 3),
"summary": f"Negative {DOMAIN_MAP.get(prim, 'meta')}/{prim} mentions.",
"evidence": select_evidence(prim_spans, "-"),
})
# Sort by impact and take top 5
positive_drivers.sort(key=lambda x: -x["impact"])
negative_drivers.sort(key=lambda x: -x["impact"])
drivers = {
"positives": positive_drivers[:5],
"negatives": negative_drivers[:5],
}
# =================================================================
# ALERTS
# =================================================================
alerts = []
# UNMAPPED rate alert
unmapped_rate = population["unmapped_rate_content"]
if unmapped_rate > ALERT_THRESHOLDS["unmapped_rate_content"]["critical"]:
alerts.append({
"severity": "critical",
"type": "unmapped_high",
"message": f"Content-adjusted UNMAPPED rate ({unmapped_rate:.1%}) exceeds critical threshold.",
"metric": {"name": "unmapped_rate_content", "value": unmapped_rate, "threshold": 0.15},
"scope": {"primitive": None, "language": None},
})
elif unmapped_rate > ALERT_THRESHOLDS["unmapped_rate_content"]["warn"]:
alerts.append({
"severity": "warn",
"type": "unmapped_high",
"message": f"Content-adjusted UNMAPPED rate ({unmapped_rate:.1%}) above target.",
"metric": {"name": "unmapped_rate_content", "value": unmapped_rate, "threshold": 0.10},
"scope": {"primitive": None, "language": None},
})
# SAFETY alert
if "SAFETY" in primitive_scores:
safety_neg = primitive_scores["SAFETY"]["valence_counts"].get("-", 0)
safety_total = primitive_scores["SAFETY"]["volume"]
safety_neg_share = safety_neg / safety_total if safety_total > 0 else 0
if safety_neg_share > ALERT_THRESHOLDS["safety_negative_share"]["critical"]:
alerts.append({
"severity": "critical",
"type": "safety_spike",
"message": f"SAFETY negative share ({safety_neg_share:.1%}) exceeds critical threshold.",
"metric": {"name": "safety_negative_share", "value": safety_neg_share, "threshold": 0.10},
"scope": {"primitive": "SAFETY", "language": None},
})
elif safety_neg_share > ALERT_THRESHOLDS["safety_negative_share"]["warn"]:
alerts.append({
"severity": "warn",
"type": "safety_spike",
"message": f"SAFETY negative share ({safety_neg_share:.1%}) above warning threshold.",
"metric": {"name": "safety_negative_share", "value": safety_neg_share, "threshold": 0.05},
"scope": {"primitive": "SAFETY", "language": None},
})
# Low confidence alert
if avg_conf < ALERT_THRESHOLDS["avg_confidence"]["warn"]:
alerts.append({
"severity": "warn",
"type": "low_confidence_high",
"message": f"Average confidence ({avg_conf:.2f}) below threshold.",
"metric": {"name": "avg_confidence", "value": avg_conf, "threshold": 0.70},
"scope": {"primitive": None, "language": None},
})
# =================================================================
# RECOMMENDATIONS (templated based on negative drivers)
# =================================================================
recommendations = []
# Generate recommendations from top negative drivers
playbook_templates = {
"CLEANLINESS": {
"title": "Improve cleanliness protocols",
"actions": [
"Increase cleaning frequency during peak hours",
"Implement visible cleaning checklists",
"Train staff on cleanliness standards"
],
"owner": "ops",
"time_to_impact": 14,
},
"SPEED": {
"title": "Reduce wait times",
"actions": [
"Analyze peak hour staffing",
"Optimize queue management",
"Set wait time targets and monitor"
],
"owner": "ops",
"time_to_impact": 21,
},
"MANNER": {
"title": "Enhance staff friendliness",
"actions": [
"Conduct hospitality training",
"Implement customer feedback program",
"Recognize staff for positive mentions"
],
"owner": "service",
"time_to_impact": 30,
},
"PRICE_FAIRNESS": {
"title": "Review pricing perception",
"actions": [
"Audit pricing vs. competitor benchmarks",
"Improve value communication",
"Consider value-add bundles"
],
"owner": "pricing",
"time_to_impact": 45,
},
"SAFETY": {
"title": "Address safety concerns immediately",
"actions": [
"Conduct safety audit",
"Review and update safety protocols",
"Increase visible safety measures"
],
"owner": "safety",
"time_to_impact": 7,
},
}
priority_map = {"critical": "P0", "high": "P1", "medium": "P2"}
for i, driver in enumerate(negative_drivers[:3]):
prim = driver["primitive"]
template = playbook_templates.get(prim, {
"title": f"Improve {prim.replace('_', ' ').title()}",
"actions": [
f"Investigate root causes of negative {prim} feedback",
"Develop improvement action plan",
"Monitor {prim} score weekly"
],
"owner": "ops",
"time_to_impact": 30,
})
priority = "P0" if i == 0 and driver["impact"] > 0.10 else ("P1" if i < 2 else "P2")
recommendations.append({
"priority": priority,
"primitive": prim,
"title": template["title"],
"rationale": driver["summary"],
"playbook": {
"actions": template["actions"],
"owner": template["owner"],
"expected_time_to_impact_days": template["time_to_impact"],
"measurement": [f"{prim} score improvement", f"Negative {prim} share reduction"],
},
})
# If SAFETY alert triggered, ensure SAFETY is P0 recommendation
safety_alert_triggered = any(a["type"] == "safety_spike" for a in alerts)
if safety_alert_triggered:
# Check if SAFETY already in recommendations
safety_rec_idx = next(
(i for i, r in enumerate(recommendations) if r["primitive"] == "SAFETY"),
None
)
if safety_rec_idx is not None:
# Upgrade existing SAFETY recommendation to P0 and move to front
safety_rec = recommendations.pop(safety_rec_idx)
safety_rec["priority"] = "P0"
safety_rec["rationale"] = "SAFETY alert triggered - immediate action required."
recommendations.insert(0, safety_rec)
else:
# Add new SAFETY P0 recommendation at front
safety_template = playbook_templates["SAFETY"]
recommendations.insert(0, {
"priority": "P0",
"primitive": "SAFETY",
"title": safety_template["title"],
"rationale": "SAFETY alert triggered - immediate action required.",
"playbook": {
"actions": safety_template["actions"],
"owner": safety_template["owner"],
"expected_time_to_impact_days": safety_template["time_to_impact"],
"measurement": ["SAFETY score improvement", "Negative SAFETY share reduction"],
},
})
# =================================================================
# TIME COMPARISON (previous period)
# =================================================================
comparisons = {
"previous_window": None,
"sector_benchmark": None,
}
timeline_points = []
timeline_granularity = "week" # Default granularity
# Use CLI-provided window for time comparison
current_start = start
current_end = end
# Check review_time coverage within the requested window
coverage_row = await pool.fetchrow(
REVIEW_TIME_COVERAGE_QUERY,
canonical_id,
current_start,
current_end
)
review_time_coverage = 0.0
review_time_window_utc = None
if coverage_row and coverage_row["spans_reviews"] > 0:
review_time_coverage = (coverage_row["facts_reviews"] or 0) / coverage_row["spans_reviews"]
if coverage_row["min_review_time"] and coverage_row["max_review_time"]:
review_time_window_utc = {
"start": iso_z(coverage_row["min_review_time"]),
"end": iso_z(coverage_row["max_review_time"]),
}
# Add to population
population["review_time_coverage_pct"] = round(review_time_coverage * 100, 1)
population["review_time_window_utc"] = review_time_window_utc
# Only compute time comparison if we have sufficient coverage
if current_start and current_end and review_time_coverage >= MIN_COVERAGE_FOR_COMPARISON:
# Compute window duration
window_duration = current_end - current_start
days_in_window = max(window_duration.days, 1)
# Previous period: same duration, immediately preceding current
prev_end = current_start
prev_start = prev_end - window_duration
# Get current period scores
current_row = await pool.fetchrow(
PERIOD_SCORES_QUERY,
canonical_id,
current_start,
current_end
)
# Get previous period scores
prev_row = await pool.fetchrow(
PERIOD_SCORES_QUERY,
canonical_id,
prev_start,
prev_end
)
current_reviews = current_row["review_count"] if current_row else 0
prev_reviews = prev_row["review_count"] if prev_row else 0
# Check minimum sample size thresholds
if current_reviews >= MIN_REVIEWS_FOR_COMPARISON:
current_score = float(current_row["overall_score"] or 0)
current_rating = float(current_row["avg_rating"] or 0)
current_pos_share = (current_row["positive_count"] or 0) / max(current_row["span_count"], 1)
current_neg_share = (current_row["negative_count"] or 0) / max(current_row["span_count"], 1)
prev_score = 0.0
prev_rating = 0.0
prev_pos_share = 0.0
prev_neg_share = 0.0
if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON:
prev_score = float(prev_row["overall_score"] or 0)
prev_rating = float(prev_row["avg_rating"] or 0)
prev_pos_share = (prev_row["positive_count"] or 0) / max(prev_row["span_count"], 1)
prev_neg_share = (prev_row["negative_count"] or 0) / max(prev_row["span_count"], 1)
# Compute delta and trend
delta = current_score - prev_score
if prev_reviews < MIN_REVIEWS_FOR_COMPARISON:
trend = "insufficient_data"
else:
trend = compute_trend_label(delta)
comparisons["previous_window"] = {
"window": {
"start": iso_z(prev_start),
"end": iso_z(prev_end),
},
"scores": {
"overall": {
"current": round(current_score, 1),
"previous": round(prev_score, 1) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
"delta": round(delta, 1) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
"trend": trend,
},
"avg_rating": {
"current": round(current_rating, 2),
"previous": round(prev_rating, 2) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
"delta": round(current_rating - prev_rating, 2) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
},
"positive_share": {
"current": round(current_pos_share, 3),
"previous": round(prev_pos_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
"delta": round(current_pos_share - prev_pos_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
},
"negative_share": {
"current": round(current_neg_share, 3),
"previous": round(prev_neg_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
"delta": round(current_neg_share - prev_neg_share, 3) if prev_reviews >= MIN_REVIEWS_FOR_COMPARISON else None,
},
},
"reviews": {
"current": current_reviews,
"previous": prev_reviews,
},
}
# Generate timeline for charts (over current window only)
timeline_granularity = select_granularity(days_in_window)
if timeline_granularity not in ("day", "week", "month"):
timeline_granularity = "week"
timeline_rows = await pool.fetch(
TIMESERIES_QUERY,
canonical_id,
current_start,
current_end,
timeline_granularity
)
for row in timeline_rows:
if row["bucket"]:
timeline_points.append({
"bucket_start_utc": iso_z(row["bucket"]),
"review_count": row["review_count"],
"span_count": row["span_count"],
"positive_count": row["positive_count"],
"negative_count": row["negative_count"],
"avg_rating": float(row["avg_rating"]) if row["avg_rating"] else None,
"strength_score": float(row["strength_score"]) if row["strength_score"] else 0,
})
# =================================================================
# SECTOR BENCHMARK (always a dict with status field)
# =================================================================
sector_code = config_row["sector_code"]
if not sector_code:
comparisons["sector_benchmark"] = {
"status": "missing_sector_code",
"sector_code": None,
"window": None,
"overall": None,
"sample_size": None,
}
else:
sector_row = await pool.fetchrow(
SECTOR_BENCHMARK_QUERY,
sector_code,
current_start,
current_end
)
if sector_row:
sector_spans = sector_row["sector_span_count"] or 0
sector_businesses = sector_row["sector_business_count"] or 0
sector_score = float(sector_row["sector_overall_score"] or 0)
# Only include benchmark if sufficient sector data
if sector_spans >= 500 and sector_businesses >= 3:
delta_vs_sector = current_score - sector_score
comparisons["sector_benchmark"] = {
"status": "ok",
"sector_code": sector_code,
"window": {
"start": iso_z(current_start),
"end": iso_z(current_end),
},
"overall": {
"business": round(current_score, 1),
"sector_avg": round(sector_score, 1),
"delta_vs_sector": round(delta_vs_sector, 1),
},
"sample_size": {
"sector_spans": sector_spans,
"sector_businesses": sector_businesses,
},
}
else:
comparisons["sector_benchmark"] = {
"status": "insufficient_data",
"sector_code": sector_code,
"window": {
"start": iso_z(current_start),
"end": iso_z(current_end),
},
"overall": None,
"sample_size": {
"sector_spans": sector_spans,
"sector_businesses": sector_businesses,
},
}
else:
comparisons["sector_benchmark"] = {
"status": "no_sector_data",
"sector_code": sector_code,
"window": {
"start": iso_z(current_start),
"end": iso_z(current_end),
},
"overall": None,
"sample_size": None,
}
# Add alert if coverage is too low for comparisons
if review_time_coverage < MIN_COVERAGE_FOR_COMPARISON and current_start and current_end:
alerts.append({
"severity": "warn",
"type": "insufficient_trend_data",
"message": f"Review time coverage ({review_time_coverage:.0%}) below threshold for trend analysis.",
"metric": {"name": "review_time_coverage_pct", "value": review_time_coverage, "threshold": MIN_COVERAGE_FOR_COMPARISON},
"scope": {"primitive": None, "language": None},
})
# =================================================================
# INVARIANT CHECK: scores.overall must match comparisons current score
# =================================================================
if window_mode == "time_window" and comparisons.get("previous_window"):
comparison_current = comparisons["previous_window"]["scores"]["overall"].get("current")
if comparison_current is not None:
delta = abs(overall_score - comparison_current)
if delta > 1.0: # Allow small rounding differences
alerts.append({
"severity": "error",
"type": "internal_inconsistency",
"message": f"INTERNAL ERROR: scores.overall ({overall_score:.1f}) != comparisons.current ({comparison_current:.1f}). Report data is inconsistent.",
"metric": {"name": "score_delta", "value": delta, "threshold": 1.0},
"scope": {"primitive": None, "language": None},
})
# =================================================================
# BUILD REPORT
# =================================================================
report = {
"schema_version": SCHEMA_VERSION,
"report_id": str(uuid.uuid4()),
"primary_run_id": primary_run_id,
"generated_at": iso_z(datetime.now(timezone.utc)),
"window": {
"start": iso_z(start),
"end": iso_z(end),
"timezone": tz_name,
"mode": window_mode,
},
"business": {
"business_id": canonical_id,
"sector_code": config_row["sector_code"],
"gbp_path": config_row["gbp_path"],
"config_version": "1.0", # TODO: Get from spans
"l2_applied": False, # TODO: Get from config
},
"population": population,
"scores": scores,
"drivers": drivers,
"alerts": alerts,
"recommendations": recommendations,
"comparisons": comparisons,
# Timeline: always a dict with points: [] (never null in time_window mode)
"timeline": {
"granularity": timeline_granularity,
"points": timeline_points,
} if window_mode == "time_window" else None,
"executive_summary": None, # Placeholder, filled below
"executive_summary_meta": None, # Placeholder, filled below
}
# =================================================================
# EXECUTIVE SUMMARY (LLM-generated with fallback)
# =================================================================
summary_input = build_summary_input(report)
summary_text, summary_meta = await generate_executive_summary(
summary_input,
model=summary_model,
enabled=summary_enabled,
)
# If LLM failed but summary was enabled, use deterministic fallback
if summary_enabled and not summary_text:
summary_text = generate_fallback_summary(report)
summary_meta["fallback_used"] = True
else:
summary_meta["fallback_used"] = False
report["executive_summary"] = summary_text
report["executive_summary_meta"] = summary_meta
return report
finally:
if close_pool:
await pool.close()
# =============================================================================
# CLI
# =============================================================================
async def main_async(args):
"""Main async entry point."""
# Determine time window
now_utc = datetime.now(timezone.utc)
end = now_utc
start = None
days = args.days
if args.start and args.end:
start = datetime.fromisoformat(args.start)
end = datetime.fromisoformat(args.end)
# Make timezone-aware if not
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc)
elif args.days:
start = end - timedelta(days=args.days)
else:
# Default: last 30 days
days = 30
start = end - timedelta(days=30)
# Determine summary settings
summary_enabled = not args.no_summary
# Generate report
report = await generate_report(
business_id=args.business,
start=start,
end=end,
run_id=args.run_id,
tz_name=args.timezone,
summary_enabled=summary_enabled,
summary_model=args.summary_model,
)
if "error" in report:
print(f"Error: {report['error']}")
raise SystemExit(1)
# Check --require-summary: if LLM summary required but not generated, fail before writing
if args.require_summary:
meta = report.get("executive_summary_meta", {})
if not meta.get("generated", False):
error_msg = meta.get("error", "unknown error")
print(f"Error: --require-summary specified but LLM summary failed: {error_msg}")
raise SystemExit(2)
# Output
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"Report written to {args.output}")
else:
print(json.dumps(report, indent=2, default=str))
# Print summary
if not args.quiet:
print("\n" + "=" * 60)
print(f"REPUTATION REPORT: {report['business']['business_id']}")
print("=" * 60)
print(f"Window: {report['window']['start']} - {report['window']['end']}")
print(f"Reviews: {report['population']['reviews_processed']}")
print(f"Content spans: {report['population']['spans_content']}")
print(f"Overall score: {report['scores']['overall']['score']}")
print(f"Positive share: {report['scores']['overall']['positive_share']:.1%}")
print(f"Negative share: {report['scores']['overall']['negative_share']:.1%}")
print(f"\nTop positive drivers:")
for d in report['drivers']['positives'][:3]:
print(f" {d['primitive']}: {d['impact']:.1%} impact")
print(f"\nTop negative drivers:")
for d in report['drivers']['negatives'][:3]:
print(f" {d['primitive']}: {d['impact']:.1%} impact")
if report['alerts']:
print(f"\nAlerts ({len(report['alerts'])}):")
for a in report['alerts']:
print(f" [{a['severity']}] {a['message']}")
print("=" * 60)
def main():
parser = argparse.ArgumentParser(description="Generate reputation report")
parser.add_argument("--business", required=True, help="Business ID or search pattern")
parser.add_argument("--start", help="Window start (ISO-8601)")
parser.add_argument("--end", help="Window end (ISO-8601)")
parser.add_argument("--days", type=int, help="Last N days (alternative to start/end)")
parser.add_argument("--run-id", help="Specific run ID to use")
parser.add_argument("--timezone", default="UTC", help="IANA timezone (default: UTC)")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress summary output")
# Executive summary options
parser.add_argument("--no-summary", action="store_true", help="Disable executive summary generation")
parser.add_argument("--require-summary", action="store_true", help="Fail with exit code 2 if LLM summary fails")
parser.add_argument("--summary-model", default="gpt-4o-mini", help="Model for executive summary (default: gpt-4o-mini)")
args = parser.parse_args()
if not args.start and not args.end and not args.days and not args.run_id:
print("Note: No time window specified, using latest run.")
asyncio.run(main_async(args))
if __name__ == "__main__":
main()