""" Unit tests for executive summary narrative guardrails. Tests the three critical summary selection scenarios: 1. Negative driver present → cite top negative driver, no "dip" 2. No negatives + qualifying recent dip → cite most recent qualifying dip 3. No negatives + only non-qualifying dips → no "recent dip" """ import json import pytest from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, patch # Import the functions we're testing import sys from pathlib import Path # Add scripts directory to path scripts_dir = Path(__file__).parent.parent / "scripts" sys.path.insert(0, str(scripts_dir)) import reputation_report as rr # ============================================================================= # HELPER: Determine watch item based on narrative guardrails # ============================================================================= def determine_watch_item( negatives: list[dict], timeline_points: list[dict], window_end: datetime, negative_share: float = 0.0, ) -> dict: """ Determine what weakness/watch-item should be cited based on narrative guardrails. Priority: 1. If negatives non-empty → top negative driver 2. If negatives empty but qualifying dip exists → most recent qualifying dip 3. Otherwise → no watch item Qualifying dip: - Within 90 days of window_end - review_count >= 3 - strength_score < 0 OR avg_rating < 3.0 Returns: dict with keys: type, data, should_cite_recent_dip """ # Priority 1: Negative driver if negatives: return { "type": "negative_driver", "data": negatives[0], # Top by impact "should_cite_recent_dip": False, } # Priority 2: Qualifying recent dip qualifying_dips = [] cutoff = window_end - timedelta(days=90) for point in timeline_points: bucket_start = point.get("bucket_start_utc") if isinstance(bucket_start, str): # Parse ISO string bucket_dt = datetime.fromisoformat(bucket_start.replace("Z", "+00:00")) else: bucket_dt = bucket_start # Check if within 90 days if bucket_dt < cutoff: continue # Check minimum volume review_count = point.get("review_count", 0) if review_count < 3: continue # Check if it's actually a dip (negative strength or low rating) strength = point.get("strength_score", 0) avg_rating = point.get("avg_rating", 5.0) if strength < 0 or (avg_rating is not None and avg_rating < 3.0): qualifying_dips.append({ "bucket_dt": bucket_dt, "point": point, }) if qualifying_dips: # Most recent qualifying dip most_recent = max(qualifying_dips, key=lambda x: x["bucket_dt"]) return { "type": "recent_dip", "data": most_recent["point"], "should_cite_recent_dip": True, } # Priority 3: No watch item return { "type": "none", "data": None, "should_cite_recent_dip": False, } def extract_dip_info_from_summary(summary: str) -> dict: """Extract dip-related information from summary text.""" import re summary_lower = summary.lower() has_recent_dip = "recent dip" in summary_lower has_dip_mention = "dip" in summary_lower has_no_major_issues = "no major issues" in summary_lower has_no_persistent_weaknesses = "no persistent weakness" in summary_lower has_limited_data = "limited data" in summary_lower # Extract month mentions months = ["january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december"] mentioned_months = [m for m in months if m in summary_lower] # Extract metrics rating_match = re.search(r'(?:avg[_\s]?rating|average rating)[:\s]+(\d+\.?\d*)', summary_lower) strength_match = re.search(r'strength[_\s]?score[:\s]+([-]?\d+\.?\d*)', summary_lower) return { "has_recent_dip": has_recent_dip, "has_dip_mention": has_dip_mention, "has_no_major_issues": has_no_major_issues, "has_no_persistent_weaknesses": has_no_persistent_weaknesses, "has_limited_data": has_limited_data, "mentioned_months": mentioned_months, "extracted_rating": float(rating_match.group(1)) if rating_match else None, "extracted_strength": float(strength_match.group(1)) if strength_match else None, } # ============================================================================= # TEST FIXTURES # ============================================================================= @pytest.fixture def window_end(): """Fixed window end for deterministic tests.""" return datetime(2026, 1, 31, tzinfo=timezone.utc) @pytest.fixture def base_report(window_end): """Base report structure for tests.""" return { "schema_version": "1.0", "window": { "start": (window_end - timedelta(days=365)).isoformat().replace("+00:00", "Z"), "end": window_end.isoformat().replace("+00:00", "Z"), }, "business": { "business_id": "Test Business", "sector_code": "TEST", }, "scores": { "overall": { "score": 75.0, "positive_share": 0.70, "negative_share": 0.20, }, }, "population": { "reviews_processed": 100, }, "drivers": { "positives": [ {"primitive": "VALUE_FOR_MONEY", "impact": 0.15, "summary": "Good value"}, ], "negatives": [], }, "comparisons": { "previous_window": { "scores": { "overall": { "current": 75.0, "previous": 80.0, "delta": -5.0, "trend": "declining", } } } }, "timeline": { "granularity": "month", "points": [], }, } # ============================================================================= # TEST 1: Negative driver present ⇒ cite top negative driver, no "dip" # ============================================================================= class TestNegativeDriverPresent: """Test that when negative drivers exist, they are cited instead of dips.""" def test_watch_item_selection_with_negative_driver(self, window_end): """Given negative drivers, watch item should be the top negative driver.""" negatives = [ {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, {"primitive": "SPEED", "impact": 0.05, "summary": "Slow service"}, ] # Include qualifying dips that should be ignored timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"), "review_count": 10, "strength_score": -25.0, "avg_rating": 2.5, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "negative_driver" assert result["data"]["primitive"] == "RELIABILITY" assert result["should_cite_recent_dip"] is False def test_summary_input_includes_negative_driver(self, base_report): """build_summary_input should include the negative driver.""" base_report["drivers"]["negatives"] = [ {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, ] summary_input = rr.build_summary_input(base_report) assert len(summary_input["drivers"]["negatives"]) == 1 assert summary_input["drivers"]["negatives"][0]["primitive"] == "RELIABILITY" def test_fallback_summary_mentions_negative_driver(self, base_report): """Fallback summary should mention the negative driver.""" base_report["drivers"]["negatives"] = [ {"primitive": "RELIABILITY", "impact": 0.08, "summary": "Reliability issues"}, ] summary = rr.generate_fallback_summary(base_report) assert "reliability" in summary.lower() # ============================================================================= # TEST 2: No negatives + qualifying recent dip ⇒ cite most recent qualifying dip # ============================================================================= class TestQualifyingRecentDip: """Test that qualifying recent dips are cited when no negative drivers exist.""" def test_watch_item_selects_most_recent_qualifying_dip(self, window_end): """Given multiple qualifying dips, should select the most recent one.""" negatives = [] # Two qualifying dips - Dec and Nov timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=60)).isoformat().replace("+00:00", "Z"), # Dec "review_count": 8, "strength_score": -32.6, "avg_rating": 2.88, }, { "bucket_start_utc": (window_end - timedelta(days=90)).isoformat().replace("+00:00", "Z"), # Nov "review_count": 5, "strength_score": -15.0, "avg_rating": 3.2, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "recent_dip" assert result["should_cite_recent_dip"] is True # Should be December (60 days ago), not November (90 days ago) assert result["data"]["strength_score"] == -32.6 def test_qualifying_dip_must_have_min_volume(self, window_end): """Dips with review_count < 3 should not qualify.""" negatives = [] timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=30)).isoformat().replace("+00:00", "Z"), "review_count": 2, # Below threshold "strength_score": -50.0, "avg_rating": 1.5, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "none" assert result["should_cite_recent_dip"] is False def test_qualifying_dip_must_be_within_90_days(self, window_end): """Dips older than 90 days should not qualify as 'recent'.""" negatives = [] timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=120)).isoformat().replace("+00:00", "Z"), "review_count": 10, "strength_score": -40.0, "avg_rating": 2.0, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "none" assert result["should_cite_recent_dip"] is False def test_summary_should_not_say_no_major_issues_with_dip(self, base_report, window_end): """When citing a dip, summary should not say 'no major issues identified'.""" # This is a prompt constraint - test via extraction helper # Example summary that violates the rule bad_summary = "Score is 75. Recent dip in December. No major issues identified." info = extract_dip_info_from_summary(bad_summary) # This combination is invalid per our guardrails assert info["has_recent_dip"] is True assert info["has_no_major_issues"] is True # Test should flag this as a violation is_valid = not (info["has_recent_dip"] and info["has_no_major_issues"]) assert is_valid is False, "Summary violates guardrail: cites dip AND says no major issues" def test_valid_dip_summary_structure(self): """Valid summary with dip should include month and metric, no contradiction.""" good_summary = ( "Business has a score of 75. There was a recent dip in December 2025 " "(avg_rating 2.88). No dominant negative driver identified overall. " "Investigate operational changes during that month." ) info = extract_dip_info_from_summary(good_summary) assert info["has_recent_dip"] is True assert "december" in info["mentioned_months"] assert info["extracted_rating"] == 2.88 assert info["has_no_major_issues"] is False # ============================================================================= # TEST 3: No negatives + only non-qualifying dips ⇒ no "recent dip" # ============================================================================= class TestNonQualifyingDips: """Test that non-qualifying dips are not cited as 'recent'.""" def test_old_dip_not_cited_as_recent(self, window_end): """Dips older than 90 days should not be watch items.""" negatives = [] timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=180)).isoformat().replace("+00:00", "Z"), "review_count": 15, "strength_score": -45.0, "avg_rating": 2.0, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "none" assert result["should_cite_recent_dip"] is False def test_sparse_dip_not_cited_as_recent(self, window_end): """Dips with < 3 reviews should not be watch items.""" negatives = [] timeline_points = [ { "bucket_start_utc": (window_end - timedelta(days=15)).isoformat().replace("+00:00", "Z"), "review_count": 1, "strength_score": -80.0, "avg_rating": 1.0, }, ] result = determine_watch_item(negatives, timeline_points, window_end) assert result["type"] == "none" def test_valid_no_weakness_summary(self): """Valid summary with no qualifying weakness should say 'no persistent weaknesses'.""" good_summary = ( "Business has a strong score of 85. Customers love the value for money. " "No persistent weaknesses surfaced. Continue amplifying value messaging." ) info = extract_dip_info_from_summary(good_summary) assert info["has_recent_dip"] is False assert info["has_no_persistent_weaknesses"] is True def test_limited_data_framing_is_valid(self): """If old/sparse dip is mentioned, it should be framed as 'limited data'.""" limited_data_summary = ( "Business has a score of 70. A dip was observed in March (limited data). " "Value for money is the top strength." ) info = extract_dip_info_from_summary(limited_data_summary) assert info["has_dip_mention"] is True assert info["has_recent_dip"] is False # Not "recent dip" assert info["has_limited_data"] is True def test_fallback_summary_no_negatives_no_dip(self, base_report): """Fallback summary with no negatives should mention positive driver.""" base_report["drivers"]["negatives"] = [] summary = rr.generate_fallback_summary(base_report) # Should mention the positive driver assert "value for money" in summary.lower() or "value" in summary.lower() # Should not mention dip (fallback doesn't analyze timeline) # ============================================================================= # TEST: Prompt input construction # ============================================================================= class TestPromptInputConstruction: """Test that build_summary_input correctly prepares data for LLM.""" def test_summary_input_includes_timeline_last_3(self, base_report, window_end): """build_summary_input should include only the last 3 timeline points.""" base_report["timeline"]["points"] = [ {"bucket_start_utc": "2025-09-01T00:00:00Z", "review_count": 5}, {"bucket_start_utc": "2025-10-01T00:00:00Z", "review_count": 6}, {"bucket_start_utc": "2025-11-01T00:00:00Z", "review_count": 7}, {"bucket_start_utc": "2025-12-01T00:00:00Z", "review_count": 8}, ] summary_input = rr.build_summary_input(base_report) assert len(summary_input["timeline"]["points"]) == 3 # Should be the last 3 (Oct, Nov, Dec) assert summary_input["timeline"]["points"][0]["review_count"] == 6 assert summary_input["timeline"]["points"][2]["review_count"] == 8 def test_summary_input_includes_scores(self, base_report): """build_summary_input should include overall score.""" summary_input = rr.build_summary_input(base_report) assert "scores" in summary_input assert "overall" in summary_input["scores"] assert summary_input["scores"]["overall"] == base_report["scores"]["overall"] def test_summary_input_includes_comparisons(self, base_report): """build_summary_input should include comparisons for trend.""" summary_input = rr.build_summary_input(base_report) assert "comparisons" in summary_input assert summary_input["comparisons"] == base_report["comparisons"] # ============================================================================= # RUN TESTS # ============================================================================= if __name__ == "__main__": pytest.main([__file__, "-v"])