Files
Alejandro Gutiérrez 5324542e72 feat(reputation-report): Add production-grade reputation report generator v8
Production fixes:
- Cross-business join safety: all queries join on (review_id, business_id)
- Timestamp normalization: iso_z() for all output timestamps
- Score formula alignment: matches PERIOD_SCORES_QUERY for consistency
- Invariant check: fails if scores.overall != comparisons.current
- primary_run_id: uses max(created_at) in time_window mode
- Language normalization: auto/auto-detect -> unknown
- Review language: majority voting over spans per review

Executive summary guardrails:
- Weakness priority: negative driver > qualifying dip > none
- Dip qualification: within 90 days AND review_count >= 3
- Most recent dip selection when multiple qualify
- No contradiction: "dip" cannot pair with "no major issues"
- Action grounding: must tie to cited weakness or top positive driver

CLI options:
- --no-summary: disable executive summary
- --require-summary: exit code 2 if LLM fails
- --summary-model: configurable model (default gpt-4o-mini)

Includes unit test suite (16 tests) for narrative guardrails.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 23:10:25 +00:00

474 lines
17 KiB
Python

"""
Unit tests for executive summary narrative guardrails.
Tests the three critical summary selection scenarios:
1. Negative driver present → cite top negative driver, no "dip"
2. No negatives + qualifying recent dip → cite most recent qualifying dip
3. No negatives + only non-qualifying dips → no "recent dip"
"""
import json
import pytest
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
# Import the functions we're testing
import sys
from pathlib import Path
# Add scripts directory to path
scripts_dir = Path(__file__).parent.parent / "scripts"
sys.path.insert(0, str(scripts_dir))
import reputation_report as rr
# =============================================================================
# HELPER: Determine watch item based on narrative guardrails
# =============================================================================
def determine_watch_item(
negatives: list[dict],
timeline_points: list[dict],
window_end: datetime,
negative_share: float = 0.0,
) -> dict:
"""
Determine what weakness/watch-item should be cited based on narrative guardrails.
Priority:
1. If negatives non-empty → top negative driver
2. If negatives empty but qualifying dip exists → most recent qualifying dip
3. Otherwise → no watch item
Qualifying dip:
- Within 90 days of window_end
- review_count >= 3
- strength_score < 0 OR avg_rating < 3.0
Returns:
dict with keys: type, data, should_cite_recent_dip
"""
# Priority 1: Negative driver
if negatives:
return {
"type": "negative_driver",
"data": negatives[0], # Top by impact
"should_cite_recent_dip": False,
}
# Priority 2: Qualifying recent dip
qualifying_dips = []
cutoff = window_end - timedelta(days=90)
for point in timeline_points:
bucket_start = point.get("bucket_start_utc")
if isinstance(bucket_start, str):
# Parse ISO string
bucket_dt = datetime.fromisoformat(bucket_start.replace("Z", "+00:00"))
else:
bucket_dt = bucket_start
# Check if within 90 days
if bucket_dt < cutoff:
continue
# Check minimum volume
review_count = point.get("review_count", 0)
if review_count < 3:
continue
# Check if it's actually a dip (negative strength or low rating)
strength = point.get("strength_score", 0)
avg_rating = point.get("avg_rating", 5.0)
if strength < 0 or (avg_rating is not None and avg_rating < 3.0):
qualifying_dips.append({
"bucket_dt": bucket_dt,
"point": point,
})
if qualifying_dips:
# Most recent qualifying dip
most_recent = max(qualifying_dips, key=lambda x: x["bucket_dt"])
return {
"type": "recent_dip",
"data": most_recent["point"],
"should_cite_recent_dip": True,
}
# Priority 3: No watch item
return {
"type": "none",
"data": None,
"should_cite_recent_dip": False,
}
def extract_dip_info_from_summary(summary: str) -> dict:
"""Extract dip-related information from summary text."""
import re
summary_lower = summary.lower()
has_recent_dip = "recent dip" in summary_lower
has_dip_mention = "dip" in summary_lower
has_no_major_issues = "no major issues" in summary_lower
has_no_persistent_weaknesses = "no persistent weakness" in summary_lower
has_limited_data = "limited data" in summary_lower
# Extract month mentions
months = ["january", "february", "march", "april", "may", "june",
"july", "august", "september", "october", "november", "december"]
mentioned_months = [m for m in months if m in summary_lower]
# Extract metrics
rating_match = re.search(r'(?:avg[_\s]?rating|average rating)[:\s]+(\d+\.?\d*)', summary_lower)
strength_match = re.search(r'strength[_\s]?score[:\s]+([-]?\d+\.?\d*)', summary_lower)
return {
"has_recent_dip": has_recent_dip,
"has_dip_mention": has_dip_mention,
"has_no_major_issues": has_no_major_issues,
"has_no_persistent_weaknesses": has_no_persistent_weaknesses,
"has_limited_data": has_limited_data,
"mentioned_months": mentioned_months,
"extracted_rating": float(rating_match.group(1)) if rating_match else None,
"extracted_strength": float(strength_match.group(1)) if strength_match else None,
}
# =============================================================================
# TEST FIXTURES
# =============================================================================
@pytest.fixture
def window_end():
"""Fixed window end for deterministic tests."""
return datetime(2026, 1, 31, tzinfo=timezone.utc)
@pytest.fixture
def base_report(window_end):
"""Base report structure for tests."""
return {
"schema_version": "1.0",
"window": {
"start": (window_end - timedelta(days=365)).isoformat().replace("+00:00", "Z"),
"end": window_end.isoformat().replace("+00:00", "Z"),
},
"business": {
"business_id": "Test Business",
"sector_code": "TEST",
},
"scores": {
"overall": {
"score": 75.0,
"positive_share": 0.70,
"negative_share": 0.20,
},
},
"population": {
"reviews_processed": 100,
},
"drivers": {
"positives": [
{"primitive": "VALUE_FOR_MONEY", "impact": 0.15, "summary": "Good value"},
],
"negatives": [],
},
"comparisons": {
"previous_window": {
"scores": {
"overall": {
"current": 75.0,
"previous": 80.0,
"delta": -5.0,
"trend": "declining",
}
}
}
},
"timeline": {
"granularity": "month",
"points": [],
},
}
# =============================================================================
# TEST 1: Negative driver present ⇒ cite top negative driver, no "dip"
# =============================================================================
class TestNegativeDriverPresent:
"""Test that when negative drivers exist, they are cited instead of dips."""
def test_watch_item_selection_with_negative_driver(self, window_end):
"""Given negative drivers, watch item should be the top negative driver."""
negatives = [
{"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"},
{"primitive": "SPEED", "impact": 0.05, "summary": "Slow service"},
]
# Include qualifying dips that should be ignored
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"),
"review_count": 10,
"strength_score": -25.0,
"avg_rating": 2.5,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "negative_driver"
assert result["data"]["primitive"] == "RELIABILITY"
assert result["should_cite_recent_dip"] is False
def test_summary_input_includes_negative_driver(self, base_report):
"""build_summary_input should include the negative driver."""
base_report["drivers"]["negatives"] = [
{"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"},
]
summary_input = rr.build_summary_input(base_report)
assert len(summary_input["drivers"]["negatives"]) == 1
assert summary_input["drivers"]["negatives"][0]["primitive"] == "RELIABILITY"
def test_fallback_summary_mentions_negative_driver(self, base_report):
"""Fallback summary should mention the negative driver."""
base_report["drivers"]["negatives"] = [
{"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"},
]
summary = rr.generate_fallback_summary(base_report)
assert "reliability" in summary.lower()
# =============================================================================
# TEST 2: No negatives + qualifying recent dip ⇒ cite most recent qualifying dip
# =============================================================================
class TestQualifyingRecentDip:
"""Test that qualifying recent dips are cited when no negative drivers exist."""
def test_watch_item_selects_most_recent_qualifying_dip(self, window_end):
"""Given multiple qualifying dips, should select the most recent one."""
negatives = []
# Two qualifying dips - Dec and Nov
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=60)).isoformat().replace("+00:00", "Z"), # Dec
"review_count": 8,
"strength_score": -32.6,
"avg_rating": 2.88,
},
{
"bucket_start_utc": (window_end - timedelta(days=90)).isoformat().replace("+00:00", "Z"), # Nov
"review_count": 5,
"strength_score": -15.0,
"avg_rating": 3.2,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "recent_dip"
assert result["should_cite_recent_dip"] is True
# Should be December (60 days ago), not November (90 days ago)
assert result["data"]["strength_score"] == -32.6
def test_qualifying_dip_must_have_min_volume(self, window_end):
"""Dips with review_count < 3 should not qualify."""
negatives = []
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"),
"review_count": 2, # Below threshold
"strength_score": -50.0,
"avg_rating": 1.5,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "none"
assert result["should_cite_recent_dip"] is False
def test_qualifying_dip_must_be_within_90_days(self, window_end):
"""Dips older than 90 days should not qualify as 'recent'."""
negatives = []
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=120)).isoformat().replace("+00:00", "Z"),
"review_count": 10,
"strength_score": -40.0,
"avg_rating": 2.0,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "none"
assert result["should_cite_recent_dip"] is False
def test_summary_should_not_say_no_major_issues_with_dip(self, base_report, window_end):
"""When citing a dip, summary should not say 'no major issues identified'."""
# This is a prompt constraint - test via extraction helper
# Example summary that violates the rule
bad_summary = "Score is 75. Recent dip in December. No major issues identified."
info = extract_dip_info_from_summary(bad_summary)
# This combination is invalid per our guardrails
assert info["has_recent_dip"] is True
assert info["has_no_major_issues"] is True
# Test should flag this as a violation
is_valid = not (info["has_recent_dip"] and info["has_no_major_issues"])
assert is_valid is False, "Summary violates guardrail: cites dip AND says no major issues"
def test_valid_dip_summary_structure(self):
"""Valid summary with dip should include month and metric, no contradiction."""
good_summary = (
"Business has a score of 75. There was a recent dip in December 2025 "
"(avg_rating 2.88). No dominant negative driver identified overall. "
"Investigate operational changes during that month."
)
info = extract_dip_info_from_summary(good_summary)
assert info["has_recent_dip"] is True
assert "december" in info["mentioned_months"]
assert info["extracted_rating"] == 2.88
assert info["has_no_major_issues"] is False
# =============================================================================
# TEST 3: No negatives + only non-qualifying dips ⇒ no "recent dip"
# =============================================================================
class TestNonQualifyingDips:
"""Test that non-qualifying dips are not cited as 'recent'."""
def test_old_dip_not_cited_as_recent(self, window_end):
"""Dips older than 90 days should not be watch items."""
negatives = []
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=180)).isoformat().replace("+00:00", "Z"),
"review_count": 15,
"strength_score": -45.0,
"avg_rating": 2.0,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "none"
assert result["should_cite_recent_dip"] is False
def test_sparse_dip_not_cited_as_recent(self, window_end):
"""Dips with < 3 reviews should not be watch items."""
negatives = []
timeline_points = [
{
"bucket_start_utc": (window_end - timedelta(days=15)).isoformat().replace("+00:00", "Z"),
"review_count": 1,
"strength_score": -80.0,
"avg_rating": 1.0,
},
]
result = determine_watch_item(negatives, timeline_points, window_end)
assert result["type"] == "none"
def test_valid_no_weakness_summary(self):
"""Valid summary with no qualifying weakness should say 'no persistent weaknesses'."""
good_summary = (
"Business has a strong score of 85. Customers love the value for money. "
"No persistent weaknesses surfaced. Continue amplifying value messaging."
)
info = extract_dip_info_from_summary(good_summary)
assert info["has_recent_dip"] is False
assert info["has_no_persistent_weaknesses"] is True
def test_limited_data_framing_is_valid(self):
"""If old/sparse dip is mentioned, it should be framed as 'limited data'."""
limited_data_summary = (
"Business has a score of 70. A dip was observed in March (limited data). "
"Value for money is the top strength."
)
info = extract_dip_info_from_summary(limited_data_summary)
assert info["has_dip_mention"] is True
assert info["has_recent_dip"] is False # Not "recent dip"
assert info["has_limited_data"] is True
def test_fallback_summary_no_negatives_no_dip(self, base_report):
"""Fallback summary with no negatives should mention positive driver."""
base_report["drivers"]["negatives"] = []
summary = rr.generate_fallback_summary(base_report)
# Should mention the positive driver
assert "value for money" in summary.lower() or "value" in summary.lower()
# Should not mention dip (fallback doesn't analyze timeline)
# =============================================================================
# TEST: Prompt input construction
# =============================================================================
class TestPromptInputConstruction:
"""Test that build_summary_input correctly prepares data for LLM."""
def test_summary_input_includes_timeline_last_3(self, base_report, window_end):
"""build_summary_input should include only the last 3 timeline points."""
base_report["timeline"]["points"] = [
{"bucket_start_utc": "2025-09-01T00:00:00Z", "review_count": 5},
{"bucket_start_utc": "2025-10-01T00:00:00Z", "review_count": 6},
{"bucket_start_utc": "2025-11-01T00:00:00Z", "review_count": 7},
{"bucket_start_utc": "2025-12-01T00:00:00Z", "review_count": 8},
]
summary_input = rr.build_summary_input(base_report)
assert len(summary_input["timeline"]["points"]) == 3
# Should be the last 3 (Oct, Nov, Dec)
assert summary_input["timeline"]["points"][0]["review_count"] == 6
assert summary_input["timeline"]["points"][2]["review_count"] == 8
def test_summary_input_includes_scores(self, base_report):
"""build_summary_input should include overall score."""
summary_input = rr.build_summary_input(base_report)
assert "scores" in summary_input
assert "overall" in summary_input["scores"]
assert summary_input["scores"]["overall"] == base_report["scores"]["overall"]
def test_summary_input_includes_comparisons(self, base_report):
"""build_summary_input should include comparisons for trend."""
summary_input = rr.build_summary_input(base_report)
assert "comparisons" in summary_input
assert summary_input["comparisons"] == base_report["comparisons"]
# =============================================================================
# RUN TESTS
# =============================================================================
if __name__ == "__main__":
pytest.main([__file__, "-v"])