Files
whyrating-engine-legacy/.artifacts/reviewiq-pipeline-v1-final.md
Alejandro Gutiérrez 3da243be79 Add ReviewIQ pipeline spec and metadata extraction test
- reviewiq-pipeline-v1-final.md: Earlier pipeline specification
- test_metadata_extraction.py: Test script for metadata extraction

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 11:21:33 +00:00

33 KiB

ReviewIQ Pipeline v1 — Final Architecture

Design principle: Minimum state, defensible stats, multilingual, robust to messy mobile text, 1 LLM call per report, <$0.30/report.

Core decision: Do not persist topics. Persist only enriched spans. Build topics at report time via clustering and match across periods for trends.


A. Architecture Overview

                            INGEST (continuous, stateless, ~$0.00)
┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Raw Review   │────▶│ Span         │────▶│ Embed +      │────▶│ Store        │
│ (text,rating,│     │ Splitter     │     │ Sentiment    │     │ Enriched     │
│  date, lang) │     │              │     │ + NER        │     │ Spans        │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘

No topic assignment at ingest. Just store enriched spans.

                            REPORT (per request, ~$0.20)
┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Fetch        │────▶│ Cluster      │────▶│ Stats +      │────▶│ LLM          │
│ Spans        │     │ (HDBSCAN)    │     │ Labels +     │     │ Narrate      │
│              │     │              │     │ Quotes       │     │ (1 call)     │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘

Topics are ephemeral. They exist only for this report.
Trends are computed by matching clusters across periods via centroid similarity.

Cost Model

Stage When Cost Notes
Span splitting Per review ingested $0.00 Regex only
Embedding Per span ingested $0.00 Local model, batched
Sentiment Per span ingested $0.00 Embedding math (EN/ES/DE multi-anchor)
NER (staff) Per span ingested $0.00 spaCy, guarded
Clustering Per report $0.00 HDBSCAN <4k spans, PCA+KMeans fallback
Stats + labels Per report $0.00 Python/SQL
LLM narration Per report ~$0.15-0.25 Single API call

Total: ~$0.20/report (dominated by LLM)


B. Data Model (Only What Persists)

1. Raw Reviews

CREATE TABLE reviews (
    review_id       TEXT PRIMARY KEY,
    business_id     TEXT NOT NULL,
    text            TEXT NOT NULL,
    rating          INT NOT NULL,
    date            TIMESTAMP,
    source          TEXT DEFAULT 'google',
    ingested_at     TIMESTAMP DEFAULT NOW()
);

2. Enriched Spans (The Only ML Artifact)

CREATE TABLE spans (
    span_id         TEXT PRIMARY KEY,
    review_id       TEXT REFERENCES reviews(review_id),
    business_id     TEXT NOT NULL,
    span_index      INT NOT NULL,
    text            TEXT NOT NULL,
    embedding       VECTOR(384),
    sentiment       TEXT,          -- 'positive', 'negative', 'neutral'
    sentiment_score FLOAT,
    staff_mentions  TEXT[],        -- guarded extraction
    date            TIMESTAMP,
    created_at      TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_spans_business_date ON spans(business_id, date);

-- Embedding index: prefer HNSW if available (pgvector 0.5+), otherwise ivfflat
-- HNSW: no training required, better query performance
CREATE INDEX idx_spans_embedding ON spans USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- Alternative for older pgvector (requires ANALYZE after bulk inserts):
-- CREATE INDEX idx_spans_embedding ON spans USING ivfflat (embedding vector_cosine_ops)
--     WITH (lists = 100);
-- ANALYZE spans;  -- Required after bulk insert for ivfflat to work correctly

3. Review-Topic Presence (Computed at Report Time, Not Stored)

Topics are ephemeral. Presence is computed per report, not persisted.


C. Ingest Pipeline

Step 1: Span Splitting

Split on punctuation. Fallback split on contrast markers. Merge tiny fragments.

import re

CONTRAST_RE = re.compile(
    r'\b(?:but|pero|aber|aunque|however|though|although|yet|still|sin embargo)\b',
    re.IGNORECASE
)

def split_spans(text: str) -> list[str]:
    # Split on punctuation (good enough for most text, with contrast fallback)
    parts = re.split(r'[.!?;:,]\s*|\s{2,}', text)
    parts = [p.strip() for p in parts if len(p.strip()) >= 12]

    # Fallback split on contrast markers
    refined = []
    for p in parts:
        if CONTRAST_RE.search(p):
            sub = [s.strip() for s in CONTRAST_RE.split(p)]
            # Merge tiny fragments back
            merged = []
            for s in sub:
                if not s:
                    continue
                if len(s) < 12 and merged:
                    merged[-1] = merged[-1] + ' ' + s
                else:
                    merged.append(s)
            refined.extend([m for m in merged if len(m) >= 12])
        else:
            refined.append(p)

    return refined

Note: Do NOT split on "and/y/und" by default — these often connect positive qualities ("friendly and fast").

Step 2: Embedding

Use multilingual model. No translation needed.

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('intfloat/multilingual-e5-small')

def embed_spans(spans: list[str]) -> np.ndarray:
    return model.encode(spans, normalize_embeddings=True)

Step 3: Sentiment (Anchor-Based)

Score sentiment via embedding distance to polar anchors. Works across all languages.

Note: Encode multiple short anchors separately, normalize, then average. This gives better multilingual alignment than a single "bag sentence".

# Multiple short anchors for better multilingual alignment
# Include ES/DE anchors for improved cross-language recall
POSITIVE_WORDS = [
    # English
    "excellent", "wonderful", "amazing", "great", "fantastic",
    "delicious", "friendly", "helpful", "perfect", "outstanding",
    # Spanish
    "excelente", "increíble", "delicioso", "amable", "rápido",
    # German
    "toll", "lecker", "freundlich", "schnell", "perfekt",
]
NEGATIVE_WORDS = [
    # English
    "terrible", "awful", "horrible", "bad", "disgusting",
    "rude", "slow", "dirty", "broken", "disappointing",
    # Spanish
    "horrible", "sucio", "lento", "grosero", "caro",
    # German
    "schlecht", "langsam", "unhöflich", "dreckig", "teuer",
]

def _compute_anchor(words: list[str]) -> np.ndarray:
    """Encode multiple anchors, normalize each, then average.

    Deduplicates words to avoid implicit weighting.
    """
    unique_words = list(dict.fromkeys(words))  # Preserve order, remove dupes
    embeddings = model.encode(unique_words, normalize_embeddings=True)
    avg = embeddings.mean(axis=0)
    return avg / np.linalg.norm(avg)  # Re-normalize the average

POSITIVE_ANCHOR = _compute_anchor(POSITIVE_WORDS)
NEGATIVE_ANCHOR = _compute_anchor(NEGATIVE_WORDS)

def score_sentiment(embedding: np.ndarray) -> tuple[str, float]:
    pos_sim = embedding @ POSITIVE_ANCHOR
    neg_sim = embedding @ NEGATIVE_ANCHOR

    score = (pos_sim - neg_sim) / (pos_sim + neg_sim + 1e-6)

    if score > 0.15:
        return ('positive', float(score))
    elif score < -0.15:
        return ('negative', float(abs(score)))
    else:
        return ('neutral', 0.0)

Step 4: Staff Extraction (Guarded)

Use spaCy NER, but only count as staff when guarded:

import spacy

nlp = spacy.load('xx_ent_wiki_sm')  # multilingual

ROLE_WORDS = {'server', 'waiter', 'waitress', 'manager', 'chef', 'doctor',
              'nurse', 'receptionist', 'mesero', 'gerente', 'doctor', 'kellner'}

def extract_staff(text: str, business_history: dict = None) -> list[str]:
    doc = nlp(text)
    staff = []

    for ent in doc.ents:
        if ent.label_ != 'PERSON':
            continue

        name = ent.text.strip()
        normalized = normalize_name(name)  # Normalize early for consistent lookup
        context = text[max(0, ent.start_char-30):ent.end_char+30].lower()

        # Guard 1: Near role word
        if any(role in context for role in ROLE_WORDS):
            staff.append(normalized)
            continue

        # Guard 2: Appears in thanks pattern
        if any(p in context for p in ['thank', 'gracias', 'danke', 'shout out', 'kudos']):
            staff.append(normalized)
            continue

        # Guard 3: Frequent across reviews (if history available)
        # Use normalized name for lookup (history keys are also normalized)
        if business_history and business_history.get(normalized, 0) >= 3:
            staff.append(normalized)

    return list(set(staff))

def normalize_name(name: str) -> str:
    return ' '.join(name.strip().title().split())

Full Ingest Function

def ingest_review(review: dict) -> list[dict]:
    spans = split_spans(review['text'])
    if not spans:
        return []

    embeddings = embed_spans(spans)

    enriched = []
    for i, (text, emb) in enumerate(zip(spans, embeddings)):
        sentiment, confidence = score_sentiment(emb)
        staff = extract_staff(text)

        enriched.append({
            'span_id': f"{review['review_id']}_{i}",
            'review_id': review['review_id'],
            'business_id': review['business_id'],
            'span_index': i,
            'text': text,
            'embedding': emb,
            'sentiment': sentiment,
            'sentiment_score': confidence,
            'staff_mentions': staff if staff else None,
            'date': review['date'],
        })

    return enriched

D. Report Generation

Step 1: Fetch Spans

def fetch_spans(business_id: str, start: date, end: date) -> list[dict]:
    return db.query("""
        SELECT span_id, review_id, text, embedding, sentiment,
               sentiment_score, staff_mentions, date
        FROM spans
        WHERE business_id = %s AND date >= %s AND date < %s
    """, [business_id, start, end])

Step 2: Cluster Spans (Ephemeral Topics)

Cluster ALL spans together (not pos/neg separately). Compute sentiment breakdown within each cluster.

Scalability note: Full distance matrix is O(n²) memory/time. For large span counts, we fall back to PCA + MiniBatchKMeans.

import hdbscan
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import MiniBatchKMeans

MAX_SPANS_FOR_HDBSCAN = 4000  # Beyond this, O(n²) distance matrix is too expensive

def cluster_spans(spans: list[dict]) -> tuple[list[dict], list[dict]]:
    """Returns (topics, noise_spans)

    Uses HDBSCAN for small datasets, falls back to PCA+KMeans for large ones.
    """

    if len(spans) > MAX_SPANS_FOR_HDBSCAN:
        return _cluster_spans_fallback(spans)

    embeddings = np.array([s['embedding'] for s in spans])

    # L2-normalize and compute distance matrix
    normed = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    dist_matrix = 1 - (normed @ normed.T)
    np.fill_diagonal(dist_matrix, 0)

    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=10,  # Aligned with publish gate
        min_samples=5,
        metric='precomputed'
    )
    labels = clusterer.fit_predict(dist_matrix)

    # Group spans by cluster
    topics = {}
    noise_spans = []

    for span, label in zip(spans, labels):
        if label == -1:
            # Keep high-confidence noise for quotes
            if abs(span['sentiment_score']) > 0.5:
                noise_spans.append(span)
            continue

        if label not in topics:
            topics[label] = {'spans': [], 'embeddings': []}
        topics[label]['spans'].append(span)
        topics[label]['embeddings'].append(span['embedding'])

    # Compute centroids
    result = []
    for label, data in topics.items():
        embs = np.array(data['embeddings'])
        centroid = embs.mean(axis=0)
        centroid = centroid / np.linalg.norm(centroid)

        result.append({
            'cluster_id': label,
            'spans': data['spans'],
            'embeddings': embs,
            'centroid': centroid,
        })

    return result, noise_spans


def _cluster_spans_fallback(spans: list[dict]) -> tuple[list[dict], list[dict]]:
    """Fallback clustering for large datasets using PCA + MiniBatchKMeans.

    Trades cluster quality for O(n) scalability.
    Generates pseudo-noise from spans far from their cluster centroid.

    Requires: Each span must have 'embedding' and 'sentiment_score' populated.
    """

    embeddings = np.array([s['embedding'] for s in spans])

    # Reduce dimensionality
    pca = PCA(n_components=50)
    reduced = pca.fit_transform(embeddings)

    # Estimate k (heuristic: sqrt(n/10), clamped)
    k = max(5, min(50, int(np.sqrt(len(spans) / 10))))

    kmeans = MiniBatchKMeans(n_clusters=k, batch_size=256, n_init=3)
    labels = kmeans.fit_predict(reduced)

    # Group spans by cluster
    topics = {}
    for span, emb, label in zip(spans, embeddings, labels):
        if label not in topics:
            topics[label] = {'spans': [], 'embeddings': []}
        topics[label]['spans'].append(span)
        topics[label]['embeddings'].append(emb)

    # Compute centroids and identify pseudo-noise (bottom 3% by similarity)
    result = []
    all_distances = []  # (distance, span) tuples for pseudo-noise selection

    for label, data in topics.items():
        embs = np.array(data['embeddings'])
        centroid = embs.mean(axis=0)
        centroid = centroid / np.linalg.norm(centroid)

        # Compute similarities to centroid
        normed_embs = embs / np.linalg.norm(embs, axis=1, keepdims=True)
        sims = normed_embs @ centroid

        # Track distances for pseudo-noise
        for span, sim in zip(data['spans'], sims):
            all_distances.append((1 - sim, span))

        result.append({
            'cluster_id': label,
            'spans': data['spans'],
            'embeddings': embs,
            'centroid': centroid,
        })

    # Pseudo-noise: bottom 3% by similarity (farthest from any centroid)
    # Only include high-confidence sentiment spans (same as HDBSCAN noise handling)
    all_distances.sort(key=lambda x: x[0], reverse=True)
    noise_cutoff = int(len(all_distances) * 0.03)
    pseudo_noise = [
        span for _, span in all_distances[:noise_cutoff]
        if abs(span['sentiment_score']) > 0.5
    ]

    return result, pseudo_noise

Step 3: Compute Review-Level Stats

Stats are review-level presence (not span counts). This is critical for defensible claims.

def compute_topic_stats(topic: dict, all_review_ids: set) -> dict:
    """Compute review-level presence stats."""

    spans = topic['spans']
    n = len(all_review_ids)

    # Review-level presence
    reviews_any = set(s['review_id'] for s in spans)
    reviews_neg = set(s['review_id'] for s in spans if s['sentiment'] == 'negative')
    reviews_pos = set(s['review_id'] for s in spans if s['sentiment'] == 'positive')

    k_neg = len(reviews_neg)
    k_pos = len(reviews_pos)

    return {
        'k_any': len(reviews_any),
        'k_neg': k_neg,
        'k_pos': k_pos,
        'n': n,
        'rate_neg': k_neg / n if n > 0 else 0,
        'rate_pos': k_pos / n if n > 0 else 0,
        'ci_neg': wilson_interval(k_neg, n),
        'ci_pos': wilson_interval(k_pos, n),
    }

def wilson_interval(k: int, n: int, z: float = 1.96) -> tuple[float, float]:
    if n == 0:
        return (0.0, 1.0)

    p = k / n
    denom = 1 + z**2 / n
    center = (p + z**2 / (2*n)) / denom
    margin = (z / denom) * np.sqrt(p*(1-p)/n + z**2/(4*n**2))

    return (max(0, center - margin), min(1, center + margin))

Step 4: Label Topics (Representative Spans, No Stopwords)

Topic identity = centroid (for matching). Display label = cleaned representative span (for UI).

import re

EMAIL_RE = re.compile(r'\b\S+@\S+\.\S+\b')
URL_RE = re.compile(r'\b(?:https?://|www\.)\S+\b', re.I)
PHONE_RE = re.compile(r'\b(?:\+?\d[\d .()-]{7,}\d)\b')
LONGDIG_RE = re.compile(r'\b\d{8,}\b')

def beautify_label(text: str) -> str:
    """Clean PII and noise from label text."""
    text = ' '.join(text.split())
    text = EMAIL_RE.sub('', text)
    text = URL_RE.sub('', text)
    text = PHONE_RE.sub('', text)
    text = LONGDIG_RE.sub('', text)
    text = re.sub(r'([!?.]){2,}', r'\1', text)
    return text.strip()

def norm_for_dedup(text: str) -> str:
    """Normalize for near-duplicate detection. Unicode-safe for multilingual."""
    import unicodedata

    # Casefold (stronger than lower() for Unicode)
    t = text.casefold()

    # Normalize Unicode (NFC form)
    t = unicodedata.normalize('NFC', t)

    # Replace digits with placeholder
    t = re.sub(r'\d+', '#', t)

    # Remove punctuation but keep letters from any alphabet (\w includes Unicode letters)
    t = re.sub(r'[^\w\s#]+', ' ', t, flags=re.UNICODE)

    # Collapse whitespace
    t = ' '.join(t.split())

    return t

def select_label(topic: dict, used_labels: set) -> str:
    """Select clean, unique display label from representative spans."""

    spans = topic['spans']
    embeddings = np.array(topic['embeddings'])
    centroid = topic['centroid']

    # Rank by similarity to centroid
    sims = embeddings @ centroid
    ranked = np.argsort(sims)[::-1]

    for idx in ranked[:15]:
        cleaned = beautify_label(spans[idx]['text'])

        if not (15 <= len(cleaned) <= 80):
            continue

        key = norm_for_dedup(cleaned)
        if key in used_labels:
            continue

        used_labels.add(key)
        return cleaned

    # Fallback: truncate best match
    best = beautify_label(spans[ranked[0]]['text'])
    return best[:60].rstrip() + ("..." if len(best) > 60 else "")

Step 5: Trend Matching (Centroid-Based)

Match current topics to prior topics by centroid similarity. Never use label text for matching.

v1 decision: Compute separate trends for negative and positive rates. This ensures strengths get correct trend values (not reusing negative-only logic).

def match_trends(current_topics: list, prior_topics: list,
                 threshold: float = 0.70, margin: float = 0.05,
                 min_k: int = 8, min_n: int = 20):
    """Match topics across periods for trend computation.

    Computes both trend_neg and trend_pos separately.
    """

    for curr in current_topics:
        stats = curr['stats']
        curr['trend_neg'] = None
        curr['trend_pos'] = None
        curr['trend_match_sim'] = None

        if not prior_topics:
            continue

        # Find best and second-best match by centroid similarity
        sims = [(p, float(curr['centroid'] @ p['centroid'])) for p in prior_topics]
        sims.sort(key=lambda x: x[1], reverse=True)

        best, best_sim = sims[0]
        second_sim = sims[1][1] if len(sims) > 1 else 0

        # Gate: match must be confident AND clearly better than alternatives
        if best_sim < threshold or (best_sim - second_sim) < margin:
            continue

        curr['trend_match_sim'] = best_sim

        # Compute trend for negatives (if both periods have enough data)
        if (stats['k_neg'] >= min_k and stats['n'] >= min_n and
            best['stats']['k_neg'] >= min_k and best['stats']['n'] >= min_n):
            curr['trend_neg'] = stats['rate_neg'] - best['stats']['rate_neg']

        # Compute trend for positives (if both periods have enough data)
        if (stats['k_pos'] >= min_k and stats['n'] >= min_n and
            best['stats']['k_pos'] >= min_k and best['stats']['n'] >= min_n):
            curr['trend_pos'] = stats['rate_pos'] - best['stats']['rate_pos']

Step 6: Quote Selection

Pick representative + sharp quotes. Include high-confidence noise spans.

  • Representative: closest span to centroid (within topic, matching sentiment)
  • Sharp: highest |sentiment_score| among topic spans + high-confidence noise
def pick_quotes(topic: dict, noise_spans: list, sentiment_filter: str,
                k: int = 2) -> list[dict]:
    """Select diverse, high-quality quotes: 1 representative + 1 sharp."""

    topic_spans = [s for s in topic['spans'] if s['sentiment'] == sentiment_filter]
    centroid = topic['centroid']

    quotes = []
    seen_reviews = set()

    # 1. Representative: closest to centroid
    if topic_spans:
        embeddings = np.array([s['embedding'] for s in topic_spans])
        sims = embeddings @ centroid
        ranked_idx = np.argsort(sims)[::-1]

        for idx in ranked_idx:
            span = topic_spans[idx]
            if span['review_id'] in seen_reviews:
                continue
            if len(span['text']) > 200:
                continue

            quotes.append({
                'text': span['text'],
                'sentiment': span['sentiment'],
                'date': span['date'],
                'type': 'representative',
            })
            seen_reviews.add(span['review_id'])
            break

    # 2. Sharp: highest confidence from topic + noise
    sharp_candidates = topic_spans + [s for s in noise_spans
                                       if s['sentiment'] == sentiment_filter
                                       and abs(s['sentiment_score']) > 0.5]
    sharp_candidates.sort(key=lambda s: abs(s['sentiment_score']), reverse=True)

    for span in sharp_candidates:
        if span['review_id'] in seen_reviews:
            continue
        if len(span['text']) > 200:
            continue

        quotes.append({
            'text': span['text'],
            'sentiment': span['sentiment'],
            'date': span['date'],
            'type': 'sharp',
        })
        seen_reviews.add(span['review_id'])

        if len(quotes) >= k:
            break

    return quotes

Step 7: Staff Aggregation

def aggregate_staff(spans: list[dict], all_review_ids: set) -> dict:
    """Aggregate staff mentions with review-level presence."""

    staff_data = {}

    for span in spans:
        if not span['staff_mentions']:
            continue

        for name in span['staff_mentions']:
            if name not in staff_data:
                staff_data[name] = {'pos_reviews': set(), 'neg_reviews': set(), 'quotes': []}

            if span['sentiment'] == 'positive':
                staff_data[name]['pos_reviews'].add(span['review_id'])
                staff_data[name]['quotes'].append(span['text'])
            elif span['sentiment'] == 'negative':
                staff_data[name]['neg_reviews'].add(span['review_id'])
                staff_data[name]['quotes'].append(span['text'])

    # Build heroes and concerns
    heroes, concerns = [], []

    for name, data in staff_data.items():
        pos = len(data['pos_reviews'])
        neg = len(data['neg_reviews'])
        total = pos + neg

        if total < 3:  # Minimum mentions
            continue

        entry = {
            'name': name,
            'positive': pos,
            'negative': neg,
            'total': total,
            'quote': data['quotes'][0] if data['quotes'] else None,
        }

        if pos > neg and pos >= 3:
            heroes.append(entry)
        elif neg > pos and neg >= 3:
            concerns.append(entry)

    heroes.sort(key=lambda x: x['positive'], reverse=True)
    concerns.sort(key=lambda x: x['negative'], reverse=True)

    return {'heroes': heroes[:3], 'concerns': concerns[:3]}

Step 8: Build LLM Payload

def build_payload(business_id: str, current_period: tuple,
                  topics: list, noise_spans: list, staff: dict,
                  review_count: int) -> dict:
    """Build structured payload for LLM narration.

    Args:
        noise_spans: High-confidence spans not assigned to any cluster.
                     Used for quote selection.
    """

    issues = []
    strengths = []

    for topic in topics:
        stats = topic['stats']

        # Issue: significant negative presence
        if stats['k_neg'] >= 8 and stats['n'] >= 20:
            ci = stats['ci_neg']
            if ci[1] - ci[0] <= 0.30:  # CI not too wide
                issues.append({
                    'label': topic['label'],
                    'rate': round(stats['rate_neg'], 3),
                    'ci': [round(ci[0], 3), round(ci[1], 3)],
                    'n': stats['k_neg'],
                    'trend': round(topic['trend_neg'], 3) if topic.get('trend_neg') else None,
                    'quotes': pick_quotes(topic, noise_spans, 'negative', k=2),
                })

        # Strength: significant positive presence
        if stats['k_pos'] >= 8 and stats['n'] >= 20:
            ci = stats['ci_pos']
            if ci[1] - ci[0] <= 0.30:
                strengths.append({
                    'label': topic['label'],
                    'rate': round(stats['rate_pos'], 3),
                    'ci': [round(ci[0], 3), round(ci[1], 3)],
                    'n': stats['k_pos'],
                    'trend': round(topic['trend_pos'], 3) if topic.get('trend_pos') else None,
                    'quotes': pick_quotes(topic, noise_spans, 'positive', k=2),
                })

    # Sort by rate
    issues.sort(key=lambda x: x['rate'], reverse=True)
    strengths.sort(key=lambda x: x['rate'], reverse=True)

    return {
        'business_id': business_id,
        'period': f"{current_period[0]} to {current_period[1]}",
        'total_reviews': review_count,
        'issues': issues[:5],
        'strengths': strengths[:5],
        'staff': staff,
    }

Step 9: LLM Narration (Single Call)

SYSTEM_PROMPT = """You are a business consultant analyzing customer review data.
Write a clear, actionable report for a small business owner.

RULES:
1. Use ONLY the statistics provided. Never invent numbers.
2. Include confidence intervals when stating percentages.
3. Be direct and actionable. The owner is busy.
4. Prioritize issues by frequency and trend direction.
5. Each recommendation must reference a specific issue from the data."""

def generate_report(payload: dict) -> str:
    user_prompt = f"""Based on this review analysis, write a consultant report.

DATA:
{json.dumps(payload, indent=2)}

SECTIONS:
1. Executive Summary (3 sentences max)
2. Top Strengths (what's working, with stats)
3. Critical Issues (what needs attention, with stats and trends)
4. Staff Performance (heroes and concerns if present)
5. Recommended Actions (3-5 specific steps, prioritized)

Keep total length under 600 words."""

    response = llm_client.chat(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_prompt}
        ],
        max_tokens=1500
    )
    return response.content

Full Report Generation Function

def generate_full_report(business_id: str,
                         current_start: date, current_end: date,
                         prior_start: date, prior_end: date) -> str:
    """Generate complete report for a business."""

    # Fetch spans
    current_spans = fetch_spans(business_id, current_start, current_end)
    prior_spans = fetch_spans(business_id, prior_start, prior_end)

    if not current_spans:
        return "Insufficient data for report."

    # Get unique review IDs
    current_reviews = set(s['review_id'] for s in current_spans)
    prior_reviews = set(s['review_id'] for s in prior_spans)

    # Cluster current period
    current_topics, noise_spans = cluster_spans(current_spans)

    # Compute stats for current topics
    for topic in current_topics:
        topic['stats'] = compute_topic_stats(topic, current_reviews)

    # Label topics (with deduplication)
    used_labels = set()
    for topic in current_topics:
        topic['label'] = select_label(topic, used_labels)

    # Cluster and compute stats for prior period
    prior_topics = []
    if prior_spans:
        prior_topics, _ = cluster_spans(prior_spans)
        for topic in prior_topics:
            topic['stats'] = compute_topic_stats(topic, prior_reviews)

    # Match trends
    match_trends(current_topics, prior_topics)

    # Aggregate staff
    staff = aggregate_staff(current_spans, current_reviews)

    # Build payload (include noise_spans for quote selection)
    payload = build_payload(
        business_id,
        (current_start, current_end),
        current_topics,
        noise_spans,  # Pass noise spans for quote selection
        staff,
        len(current_reviews)
    )

    # Generate report
    return generate_report(payload)

E. Summary of Design Decisions

What We Do

Decision Rationale
Ephemeral topics (no persistent catalog) Eliminates drift, merge logic, thresholds
Cluster all spans together One topic can have pos/neg breakdown; avoids duplicates
Fallback clustering for large datasets PCA + KMeans when >4000 spans (O(n) vs O(n²))
Review-level presence for stats Defensible claims ("X% of customers")
Wilson intervals + publish gates Statistical rigor
Centroid-based trend matching Stable identity regardless of label changes
Separate trend_neg/trend_pos Correct trends for both issues and strengths
Representative + sharp quotes Best of both: centroid-closest + highest confidence
Representative span labels Human-readable, no stopwords/NLP needed
Unicode-safe label dedup Works for Spanish, German, etc.
Multi-anchor sentiment Better multilingual alignment than bag sentence
Guarded staff extraction Reduces false positives
Single LLM call Cost control

What We Don't Do

Avoided Why
Persistent topic catalog Adds state, drift, merge complexity
Topic assignment at ingest Unnecessary; cluster at report time
Span-count stats Inflates rates; review-level is correct
TF-IDF with stopwords Brittle; representative spans are better
Split on "and/y/und" Over-splits positive phrases
POS tagging for labels Heavy dependency; regex cleanup is sufficient
Translation Multilingual embeddings + multi-language anchors handle it
Sentiment classifier Multi-anchor approach works across languages

Statistical Gates

Gate Threshold Purpose
Minimum k 8 Topic must have enough mentions
Minimum n 20 Period must have enough reviews
CI width ≤ 0.30 Reject imprecise estimates
Trend match sim ≥ 0.70 Confident topic match
Trend margin ≥ 0.05 Clear winner vs alternatives
Both periods min k≥8, n≥20 Trend requires data on both sides

Trend Handling

  • Accurate when: Topic structure is stable (most real issues)
  • Omitted when: Match confidence is low
  • Separate trends: trend_neg and trend_pos computed independently
  • Never: Show confidently wrong trends

F. Implementation Plan

Day Deliverable
1-2 Span splitter + embedding service
3-4 Sentiment scoring + staff extraction
5-6 Database schema + ingest pipeline
7-8 Clustering + stats + labeling
9-10 Trend matching + quote selection
11-12 LLM integration + end-to-end testing

Total: ~12 days for a competent engineer


G. What's NOT in v1

Feature Rationale v2 Trigger
Token-window segmentation Punctuation split is good enough Run-on reviews cause quality issues
Many-to-many trend matching Best-match is good enough Trend accuracy complaints
Owner-driven topic editing Not needed yet Users want to rename/merge topics
Multi-location rollup Different product Chain restaurants sign up
Anomaly detection Different product Fraud complaints
Response templates Low value User requests

H. Known Limitations / Future Improvements

Limitation Impact v2 Consideration
Sentiment anchors cover EN/ES/DE only Other languages (FR, PT, IT, etc.) rely on multilingual-e5 alignment Add 5-10 anchors per new language as user base grows
KMeans fallback uses pseudo-noise heuristic Sharp quotes may be slightly less sharp for >4k span reports Consider HDBSCAN with approximate nearest neighbors (pynndescent)
No streaming for very large reports Memory pressure if report spans exceed 10k Paginate or sample spans for extreme cases

I. Final Checklist Before Ship

  • Span splitter handles mobile text (no punctuation edge case)
  • Embeddings are L2-normalized before clustering
  • HDBSCAN uses precomputed cosine distance matrix
  • Clustering has fallback for >4000 spans (PCA + KMeans)
  • KMeans fallback generates pseudo-noise (bottom 3% by centroid distance)
  • Stats are review-level presence (not span counts)
  • Labels are deduplicated across topics (Unicode-safe)
  • Trends computed separately for neg/pos (trend_neg, trend_pos)
  • Trends require min support in BOTH periods
  • Sentiment anchors are multi-word averaged (not bag sentence)
  • Sentiment anchors include EN/ES/DE words
  • Staff history lookup uses normalized names
  • noise_spans passed to quote selection
  • pgvector index uses HNSW (or ivfflat with ANALYZE documented)
  • LLM prompt enforces "only use provided numbers"
  • Cost per report < $0.30

Document version: v1-final-reviewed Status: Ready for implementation (with reviewer fixes applied)