Files
2026-02-02 18:19:00 +00:00

458 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Wave 1 L1 Config Validation Script
Validates L1 primitive configs against real review data by analyzing:
1. Coverage: % of spans mapped to enabled primitives
2. Top primitives by frequency
3. Disabled primitives appearing (potential misconfig)
4. Weight effectiveness
Usage:
python validate_l1_configs.py --sector ENTERTAINMENT --job-url "gokarts"
python validate_l1_configs.py --sector AUTOMOTIVE --job-url "clickrent"
python validate_l1_configs.py --all
"""
import argparse
import asyncio
import json
import os
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import asyncpg
# Paths
DATA_DIR = Path(__file__).parent.parent / "data"
CONFIGS_DIR = DATA_DIR / "primitive_configs" / "l1"
BRIEFS_DIR = DATA_DIR / "sector_briefs"
# Primitive to URT domain mapping
# Primitives map to URT domains: O=Offering, P=People, J=Journey, E=Environment, A=Access, V=Value, R=Relationship
PRIMITIVE_TO_DOMAIN = {
# Quality -> Offering (O)
"TASTE": "O", "CRAFT": "O", "FRESHNESS": "O", "TEMPERATURE": "O",
"EFFECTIVENESS": "O", "ACCURACY": "O", "CONDITION": "O", "CONSISTENCY": "O",
# Service -> People (P)
"MANNER": "P", "COMPETENCE": "P", "ATTENTIVENESS": "P", "COMMUNICATION": "P",
# Process -> Journey (J)
"SPEED": "J", "FRICTION": "J", "RELIABILITY": "J", "AVAILABILITY": "J",
# Environment -> Environment (E)
"CLEANLINESS": "E", "COMFORT": "E", "SAFETY": "E", "AMBIANCE": "E",
"ACCESSIBILITY": "E", "DIGITAL_UX": "E",
# Value -> Value (V)
"PRICE_LEVEL": "V", "PRICE_FAIRNESS": "V", "PRICE_TRANSPARENCY": "V", "VALUE_FOR_MONEY": "V",
}
# URT code to primitive mapping (simplified - maps URT codes to closest primitive)
URT_TO_PRIMITIVE = {
# Offering codes
"O1.01": "CONSISTENCY", "O1.02": "CRAFT", "O1.03": "FRESHNESS",
"O1.04": "EFFECTIVENESS", "O1.05": "TASTE", "O1.06": "CONDITION",
"O2.01": "ACCURACY", "O2.02": "EFFECTIVENESS", "O2.03": "CRAFT",
"O3.01": "ACCURACY", "O3.02": "CONSISTENCY", "O3.03": "EFFECTIVENESS",
# People codes
"P1.01": "MANNER", "P1.02": "MANNER", "P1.03": "ATTENTIVENESS",
"P1.04": "COMMUNICATION", "P1.05": "ATTENTIVENESS",
"P2.01": "COMPETENCE", "P2.02": "COMPETENCE", "P2.03": "COMPETENCE",
"P3.01": "COMMUNICATION", "P3.02": "COMMUNICATION", "P3.03": "COMMUNICATION",
# Journey codes
"J1.01": "SPEED", "J1.02": "RELIABILITY", "J1.03": "FRICTION",
"J1.04": "SPEED", "J1.05": "RELIABILITY",
"J2.01": "RELIABILITY", "J2.02": "RELIABILITY", "J2.03": "FRICTION",
"J3.01": "FRICTION", "J3.02": "FRICTION", "J3.03": "FRICTION",
# Environment codes
"E1.01": "CLEANLINESS", "E1.02": "COMFORT", "E1.03": "AMBIANCE",
"E1.04": "AMBIANCE", "E1.05": "COMFORT",
"E2.01": "AMBIANCE", "E2.02": "COMFORT", "E2.03": "COMFORT",
"E2.04": "AMBIANCE", "E2.05": "DIGITAL_UX",
"E3.01": "SAFETY", "E3.02": "SAFETY", "E3.03": "ACCESSIBILITY",
"E4.01": "ACCESSIBILITY", "E4.02": "ACCESSIBILITY", "E4.03": "DIGITAL_UX",
# Access codes
"A1.01": "AVAILABILITY", "A1.02": "AVAILABILITY", "A1.03": "AVAILABILITY",
"A1.04": "ACCESSIBILITY", "A1.05": "ACCESSIBILITY",
"A2.01": "ACCESSIBILITY", "A2.02": "ACCESSIBILITY", "A2.03": "DIGITAL_UX",
"A3.01": "ACCESSIBILITY", "A3.02": "ACCESSIBILITY", "A3.03": "SPEED",
"A4.01": "ACCESSIBILITY", "A4.02": "ACCESSIBILITY", "A4.03": "AVAILABILITY",
# Value codes
"V1.01": "PRICE_LEVEL", "V1.02": "PRICE_FAIRNESS", "V1.03": "PRICE_TRANSPARENCY",
"V2.01": "PRICE_FAIRNESS", "V2.02": "PRICE_TRANSPARENCY", "V2.03": "VALUE_FOR_MONEY",
"V3.01": "VALUE_FOR_MONEY", "V3.02": "VALUE_FOR_MONEY", "V3.03": "PRICE_FAIRNESS",
"V4.01": "VALUE_FOR_MONEY", "V4.02": "VALUE_FOR_MONEY", "V4.03": "VALUE_FOR_MONEY",
# Relationship codes
"R1.01": "RELIABILITY", "R1.02": "RELIABILITY", "R1.03": "RELIABILITY",
"R2.01": "RELIABILITY", "R2.02": "CONSISTENCY", "R2.03": "RELIABILITY",
"R3.01": "MANNER", "R3.02": "MANNER", "R3.03": "COMMUNICATION",
"R4.01": "CONSISTENCY", "R4.02": "RELIABILITY", "R4.03": "CONSISTENCY",
}
@dataclass
class ValidationResult:
"""Validation results for a sector."""
sector_code: str
job_count: int
review_count: int
span_count: int
# Coverage metrics
enabled_coverage: float # % spans using enabled primitives
disabled_hits: dict[str, int] # disabled primitives that appeared
unmapped_count: int # spans that couldn't be mapped
# Distribution
primitive_counts: dict[str, int] # all primitives by count
domain_distribution: dict[str, int] # O, P, J, E, A, V, R
valence_distribution: dict[str, int] # V+, V-, V0, V±
# Top codes
top_urt_codes: list[tuple[str, int]]
# Recommendations
recommendations: list[str]
def load_l1_config(sector_code: str) -> dict[str, Any] | None:
"""Load L1 config for a sector."""
config_file = CONFIGS_DIR / f"{sector_code.lower()}_config.json"
if not config_file.exists():
return None
with open(config_file) as f:
return json.load(f)
def load_sector_brief(sector_code: str) -> dict[str, Any] | None:
"""Load sector brief for a sector."""
brief_file = BRIEFS_DIR / f"{sector_code.lower()}_brief.json"
if not brief_file.exists():
return None
with open(brief_file) as f:
return json.load(f)
def map_urt_to_primitive(urt_code: str) -> str | None:
"""Map URT code to primitive."""
return URT_TO_PRIMITIVE.get(urt_code)
async def fetch_spans_for_jobs(pool: asyncpg.Pool, job_url_pattern: str) -> list[dict]:
"""Fetch spans for jobs matching URL pattern."""
query = """
SELECT
rs.urt_primary,
rs.valence,
rs.intensity,
rs.span_text,
j.url
FROM pipeline.review_spans rs
JOIN pipeline.reviews_raw rr ON rs.review_id = rr.review_id
JOIN public.jobs j ON rr.job_id = j.job_id
WHERE LOWER(j.url) LIKE $1
ORDER BY rs.created_at DESC
"""
rows = await pool.fetch(query, f"%{job_url_pattern.lower()}%")
return [dict(row) for row in rows]
async def fetch_all_spans(pool: asyncpg.Pool) -> list[dict]:
"""Fetch all spans from database."""
query = """
SELECT
urt_primary,
valence,
intensity,
span_text
FROM pipeline.review_spans
ORDER BY created_at DESC
"""
rows = await pool.fetch(query)
return [dict(row) for row in rows]
def analyze_spans(
spans: list[dict],
config: dict[str, Any],
) -> ValidationResult:
"""Analyze spans against L1 config."""
sector_code = config["sector_code"]
enabled = set(config.get("enabled", []))
disabled = set(config.get("disabled", []))
weights = config.get("weights", {})
# Counters
primitive_counts: Counter = Counter()
domain_counts: Counter = Counter()
valence_counts: Counter = Counter()
urt_counts: Counter = Counter()
disabled_hits: Counter = Counter()
unmapped = 0
enabled_hits = 0
for span in spans:
urt_code = span["urt_primary"]
valence = span.get("valence", "V0")
# Count URT codes
urt_counts[urt_code] += 1
# Count valence
valence_counts[valence] += 1
# Map to primitive
primitive = map_urt_to_primitive(urt_code)
if primitive:
primitive_counts[primitive] += 1
# Count domain
domain = PRIMITIVE_TO_DOMAIN.get(primitive, urt_code[0])
domain_counts[domain] += 1
# Check if enabled or disabled
if primitive in enabled:
enabled_hits += 1
elif primitive in disabled:
disabled_hits[primitive] += 1
else:
unmapped += 1
# Still count domain from URT code
domain_counts[urt_code[0]] += 1
# Calculate coverage
total = len(spans)
enabled_coverage = enabled_hits / total if total > 0 else 0
# Generate recommendations
recommendations = []
# Check disabled primitives that appeared frequently
for prim, count in disabled_hits.most_common(5):
if count >= 10:
pct = count / total * 100
recommendations.append(
f"ENABLE {prim}: Disabled but appeared {count} times ({pct:.1f}%)"
)
# Check for missing high-weight primitives
weighted_set = set(weights.keys())
for prim in weighted_set:
if primitive_counts[prim] == 0 and prim in enabled:
recommendations.append(
f"CHECK {prim}: Weighted ({weights[prim]}x) but no appearances"
)
# Check for frequently appearing unweighted primitives
for prim, count in primitive_counts.most_common(10):
if prim in enabled and prim not in weights and count >= total * 0.1:
pct = count / total * 100
recommendations.append(
f"WEIGHT {prim}: High frequency ({count}, {pct:.1f}%) but not weighted"
)
return ValidationResult(
sector_code=sector_code,
job_count=1, # Will be updated by caller
review_count=0, # Not tracked at span level
span_count=total,
enabled_coverage=enabled_coverage,
disabled_hits=dict(disabled_hits),
unmapped_count=unmapped,
primitive_counts=dict(primitive_counts),
domain_distribution=dict(domain_counts),
valence_distribution=dict(valence_counts),
top_urt_codes=urt_counts.most_common(15),
recommendations=recommendations,
)
def print_validation_report(result: ValidationResult, config: dict, brief: dict | None):
"""Print formatted validation report."""
print("\n" + "=" * 70)
print(f"VALIDATION REPORT: {result.sector_code}")
print("=" * 70)
# Overview
print(f"\n📊 OVERVIEW")
print(f" Spans analyzed: {result.span_count:,}")
print(f" Enabled coverage: {result.enabled_coverage:.1%}")
print(f" Unmapped spans: {result.unmapped_count} ({result.unmapped_count/result.span_count*100:.1f}%)" if result.span_count > 0 else " No spans")
# Config summary
print(f"\n⚙️ CONFIG SUMMARY")
print(f" Enabled: {len(config.get('enabled', []))} primitives")
print(f" Disabled: {len(config.get('disabled', []))} primitives")
print(f" Weighted: {len(config.get('weights', {}))} primitives")
# Domain distribution
print(f"\n📁 DOMAIN DISTRIBUTION")
domain_names = {"O": "Offering", "P": "People", "J": "Journey",
"E": "Environment", "A": "Access", "V": "Value", "R": "Relationship"}
for domain in "OPJEVRA":
count = result.domain_distribution.get(domain, 0)
pct = count / result.span_count * 100 if result.span_count > 0 else 0
bar = "" * int(pct / 2)
print(f" {domain} {domain_names.get(domain, '?'):12} {count:4} ({pct:5.1f}%) {bar}")
# Valence distribution
print(f"\n😊 VALENCE DISTRIBUTION")
for val in ["V+", "V-", "V0", ""]:
count = result.valence_distribution.get(val, 0)
pct = count / result.span_count * 100 if result.span_count > 0 else 0
print(f" {val}: {count:4} ({pct:5.1f}%)")
# Top primitives
print(f"\n🔝 TOP PRIMITIVES")
enabled_set = set(config.get("enabled", []))
weights = config.get("weights", {})
for prim, count in sorted(result.primitive_counts.items(), key=lambda x: -x[1])[:12]:
pct = count / result.span_count * 100 if result.span_count > 0 else 0
status = "" if prim in enabled_set else ""
weight = f"({weights[prim]}x)" if prim in weights else ""
print(f" {status} {prim:20} {count:4} ({pct:5.1f}%) {weight}")
# Top URT codes
print(f"\n📋 TOP URT CODES")
for code, count in result.top_urt_codes[:10]:
pct = count / result.span_count * 100 if result.span_count > 0 else 0
mapped = URT_TO_PRIMITIVE.get(code, "UNMAPPED")
print(f" {code}: {count:4} ({pct:5.1f}%) → {mapped}")
# Disabled but appearing
if result.disabled_hits:
print(f"\n⚠️ DISABLED BUT APPEARING")
for prim, count in sorted(result.disabled_hits.items(), key=lambda x: -x[1]):
pct = count / result.span_count * 100 if result.span_count > 0 else 0
print(f" {prim}: {count} ({pct:.1f}%)")
# Recommendations
if result.recommendations:
print(f"\n💡 RECOMMENDATIONS")
for rec in result.recommendations:
print(f"{rec}")
# Brief signals check (if available)
if brief:
print(f"\n📝 BRIEF SIGNALS CHECK")
what_customers_judge = brief.get("what_customers_judge", {})
if isinstance(what_customers_judge, dict):
items = what_customers_judge.get("items", [])
else:
items = what_customers_judge if isinstance(what_customers_judge, list) else []
print(f" Key judgment areas from brief:")
for item in items[:5]:
if isinstance(item, dict):
print(f"{item.get('area', item)}")
else:
print(f"{item}")
print("\n" + "=" * 70)
async def run_validation(
sector_code: str,
job_url_pattern: str | None = None,
db_url: str | None = None,
):
"""Run validation for a sector."""
# Load config
config = load_l1_config(sector_code)
if not config:
print(f"❌ No L1 config found for {sector_code}")
return None
# Load brief
brief = load_sector_brief(sector_code)
# Connect to database
db_url = db_url or os.environ.get(
"DATABASE_URL",
"postgresql://scraper:scraper123@localhost:5437/scraper"
)
pool = await asyncpg.create_pool(db_url)
try:
# Fetch spans
if job_url_pattern:
spans = await fetch_spans_for_jobs(pool, job_url_pattern)
if not spans:
print(f"⚠️ No spans found for jobs matching '{job_url_pattern}'")
return None
else:
spans = await fetch_all_spans(pool)
# Analyze
result = analyze_spans(spans, config)
# Print report
print_validation_report(result, config, brief)
return result
finally:
await pool.close()
async def run_all_validations(db_url: str | None = None):
"""Run validation for all sectors with available data."""
# Known jobs and their sectors
jobs_by_sector = {
"ENTERTAINMENT": ["gokarts", "soho"],
"AUTOMOTIVE": ["clickrent"],
"PERSONAL_SERVICES": ["fleitas"],
"FOOD_DINING": ["fika"],
}
results = {}
for sector, job_patterns in jobs_by_sector.items():
print(f"\n{'='*70}")
print(f"Validating {sector}...")
print(f"{'='*70}")
for pattern in job_patterns:
result = await run_validation(sector, pattern, db_url)
if result:
results[f"{sector}:{pattern}"] = result
# Summary
print("\n" + "=" * 70)
print("VALIDATION SUMMARY")
print("=" * 70)
for key, result in results.items():
sector, pattern = key.split(":")
print(f"\n{sector} ({pattern}):")
print(f" Coverage: {result.enabled_coverage:.1%}")
print(f" Spans: {result.span_count}")
if result.disabled_hits:
print(f" ⚠️ Disabled hits: {sum(result.disabled_hits.values())}")
if result.recommendations:
print(f" Recommendations: {len(result.recommendations)}")
def main():
parser = argparse.ArgumentParser(description="Validate L1 primitive configs")
parser.add_argument("--sector", help="Sector code (e.g., ENTERTAINMENT)")
parser.add_argument("--job-url", help="Job URL pattern to filter (e.g., 'gokarts')")
parser.add_argument("--all", action="store_true", help="Run all validations")
parser.add_argument("--db-url", help="Database URL")
args = parser.parse_args()
if args.all:
asyncio.run(run_all_validations(args.db_url))
elif args.sector:
asyncio.run(run_validation(args.sector, args.job_url, args.db_url))
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()