# ReviewIQ Pipeline v1 — Final Architecture **Design principle**: Minimum state, defensible stats, multilingual, robust to messy mobile text, 1 LLM call per report, <$0.30/report. **Core decision**: Do not persist topics. Persist only enriched spans. Build topics at report time via clustering and match across periods for trends. --- ## A. Architecture Overview ``` INGEST (continuous, stateless, ~$0.00) ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Raw Review │────▶│ Span │────▶│ Embed + │────▶│ Store │ │ (text,rating,│ │ Splitter │ │ Sentiment │ │ Enriched │ │ date, lang) │ │ │ │ + NER │ │ Spans │ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ No topic assignment at ingest. Just store enriched spans. REPORT (per request, ~$0.20) ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Fetch │────▶│ Cluster │────▶│ Stats + │────▶│ LLM │ │ Spans │ │ (HDBSCAN) │ │ Labels + │ │ Narrate │ │ │ │ │ │ Quotes │ │ (1 call) │ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ Topics are ephemeral. They exist only for this report. Trends are computed by matching clusters across periods via centroid similarity. ``` ### Cost Model | Stage | When | Cost | Notes | |-------|------|------|-------| | Span splitting | Per review ingested | $0.00 | Regex only | | Embedding | Per span ingested | $0.00 | Local model, batched | | Sentiment | Per span ingested | $0.00 | Embedding math (EN/ES/DE multi-anchor) | | NER (staff) | Per span ingested | $0.00 | spaCy, guarded | | Clustering | Per report | $0.00 | HDBSCAN <4k spans, PCA+KMeans fallback | | Stats + labels | Per report | $0.00 | Python/SQL | | LLM narration | Per report | ~$0.15-0.25 | Single API call | **Total: ~$0.20/report** (dominated by LLM) --- ## B. Data Model (Only What Persists) ### 1. Raw Reviews ```sql CREATE TABLE reviews ( review_id TEXT PRIMARY KEY, business_id TEXT NOT NULL, text TEXT NOT NULL, rating INT NOT NULL, date TIMESTAMP, source TEXT DEFAULT 'google', ingested_at TIMESTAMP DEFAULT NOW() ); ``` ### 2. Enriched Spans (The Only ML Artifact) ```sql CREATE TABLE spans ( span_id TEXT PRIMARY KEY, review_id TEXT REFERENCES reviews(review_id), business_id TEXT NOT NULL, span_index INT NOT NULL, text TEXT NOT NULL, embedding VECTOR(384), sentiment TEXT, -- 'positive', 'negative', 'neutral' sentiment_score FLOAT, staff_mentions TEXT[], -- guarded extraction date TIMESTAMP, created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX idx_spans_business_date ON spans(business_id, date); -- Embedding index: prefer HNSW if available (pgvector 0.5+), otherwise ivfflat -- HNSW: no training required, better query performance CREATE INDEX idx_spans_embedding ON spans USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64); -- Alternative for older pgvector (requires ANALYZE after bulk inserts): -- CREATE INDEX idx_spans_embedding ON spans USING ivfflat (embedding vector_cosine_ops) -- WITH (lists = 100); -- ANALYZE spans; -- Required after bulk insert for ivfflat to work correctly ``` ### 3. Review-Topic Presence (Computed at Report Time, Not Stored) Topics are ephemeral. Presence is computed per report, not persisted. --- ## C. Ingest Pipeline ### Step 1: Span Splitting Split on punctuation. Fallback split on contrast markers. Merge tiny fragments. ```python import re CONTRAST_RE = re.compile( r'\b(?:but|pero|aber|aunque|however|though|although|yet|still|sin embargo)\b', re.IGNORECASE ) def split_spans(text: str) -> list[str]: # Split on punctuation (good enough for most text, with contrast fallback) parts = re.split(r'[.!?;:,]\s*|\s{2,}', text) parts = [p.strip() for p in parts if len(p.strip()) >= 12] # Fallback split on contrast markers refined = [] for p in parts: if CONTRAST_RE.search(p): sub = [s.strip() for s in CONTRAST_RE.split(p)] # Merge tiny fragments back merged = [] for s in sub: if not s: continue if len(s) < 12 and merged: merged[-1] = merged[-1] + ' ' + s else: merged.append(s) refined.extend([m for m in merged if len(m) >= 12]) else: refined.append(p) return refined ``` **Note**: Do NOT split on "and/y/und" by default — these often connect positive qualities ("friendly and fast"). ### Step 2: Embedding Use multilingual model. No translation needed. ```python from sentence_transformers import SentenceTransformer model = SentenceTransformer('intfloat/multilingual-e5-small') def embed_spans(spans: list[str]) -> np.ndarray: return model.encode(spans, normalize_embeddings=True) ``` ### Step 3: Sentiment (Anchor-Based) Score sentiment via embedding distance to polar anchors. Works across all languages. **Note**: Encode multiple short anchors separately, normalize, then average. This gives better multilingual alignment than a single "bag sentence". ```python # Multiple short anchors for better multilingual alignment # Include ES/DE anchors for improved cross-language recall POSITIVE_WORDS = [ # English "excellent", "wonderful", "amazing", "great", "fantastic", "delicious", "friendly", "helpful", "perfect", "outstanding", # Spanish "excelente", "increíble", "delicioso", "amable", "rápido", # German "toll", "lecker", "freundlich", "schnell", "perfekt", ] NEGATIVE_WORDS = [ # English "terrible", "awful", "horrible", "bad", "disgusting", "rude", "slow", "dirty", "broken", "disappointing", # Spanish "horrible", "sucio", "lento", "grosero", "caro", # German "schlecht", "langsam", "unhöflich", "dreckig", "teuer", ] def _compute_anchor(words: list[str]) -> np.ndarray: """Encode multiple anchors, normalize each, then average. Deduplicates words to avoid implicit weighting. """ unique_words = list(dict.fromkeys(words)) # Preserve order, remove dupes embeddings = model.encode(unique_words, normalize_embeddings=True) avg = embeddings.mean(axis=0) return avg / np.linalg.norm(avg) # Re-normalize the average POSITIVE_ANCHOR = _compute_anchor(POSITIVE_WORDS) NEGATIVE_ANCHOR = _compute_anchor(NEGATIVE_WORDS) def score_sentiment(embedding: np.ndarray) -> tuple[str, float]: pos_sim = embedding @ POSITIVE_ANCHOR neg_sim = embedding @ NEGATIVE_ANCHOR score = (pos_sim - neg_sim) / (pos_sim + neg_sim + 1e-6) if score > 0.15: return ('positive', float(score)) elif score < -0.15: return ('negative', float(abs(score))) else: return ('neutral', 0.0) ``` ### Step 4: Staff Extraction (Guarded) Use spaCy NER, but only count as staff when guarded: ```python import spacy nlp = spacy.load('xx_ent_wiki_sm') # multilingual ROLE_WORDS = {'server', 'waiter', 'waitress', 'manager', 'chef', 'doctor', 'nurse', 'receptionist', 'mesero', 'gerente', 'doctor', 'kellner'} def extract_staff(text: str, business_history: dict = None) -> list[str]: doc = nlp(text) staff = [] for ent in doc.ents: if ent.label_ != 'PERSON': continue name = ent.text.strip() normalized = normalize_name(name) # Normalize early for consistent lookup context = text[max(0, ent.start_char-30):ent.end_char+30].lower() # Guard 1: Near role word if any(role in context for role in ROLE_WORDS): staff.append(normalized) continue # Guard 2: Appears in thanks pattern if any(p in context for p in ['thank', 'gracias', 'danke', 'shout out', 'kudos']): staff.append(normalized) continue # Guard 3: Frequent across reviews (if history available) # Use normalized name for lookup (history keys are also normalized) if business_history and business_history.get(normalized, 0) >= 3: staff.append(normalized) return list(set(staff)) def normalize_name(name: str) -> str: return ' '.join(name.strip().title().split()) ``` ### Full Ingest Function ```python def ingest_review(review: dict) -> list[dict]: spans = split_spans(review['text']) if not spans: return [] embeddings = embed_spans(spans) enriched = [] for i, (text, emb) in enumerate(zip(spans, embeddings)): sentiment, confidence = score_sentiment(emb) staff = extract_staff(text) enriched.append({ 'span_id': f"{review['review_id']}_{i}", 'review_id': review['review_id'], 'business_id': review['business_id'], 'span_index': i, 'text': text, 'embedding': emb, 'sentiment': sentiment, 'sentiment_score': confidence, 'staff_mentions': staff if staff else None, 'date': review['date'], }) return enriched ``` --- ## D. Report Generation ### Step 1: Fetch Spans ```python def fetch_spans(business_id: str, start: date, end: date) -> list[dict]: return db.query(""" SELECT span_id, review_id, text, embedding, sentiment, sentiment_score, staff_mentions, date FROM spans WHERE business_id = %s AND date >= %s AND date < %s """, [business_id, start, end]) ``` ### Step 2: Cluster Spans (Ephemeral Topics) Cluster ALL spans together (not pos/neg separately). Compute sentiment breakdown within each cluster. **Scalability note**: Full distance matrix is O(n²) memory/time. For large span counts, we fall back to PCA + MiniBatchKMeans. ```python import hdbscan import numpy as np from sklearn.decomposition import PCA from sklearn.cluster import MiniBatchKMeans MAX_SPANS_FOR_HDBSCAN = 4000 # Beyond this, O(n²) distance matrix is too expensive def cluster_spans(spans: list[dict]) -> tuple[list[dict], list[dict]]: """Returns (topics, noise_spans) Uses HDBSCAN for small datasets, falls back to PCA+KMeans for large ones. """ if len(spans) > MAX_SPANS_FOR_HDBSCAN: return _cluster_spans_fallback(spans) embeddings = np.array([s['embedding'] for s in spans]) # L2-normalize and compute distance matrix normed = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) dist_matrix = 1 - (normed @ normed.T) np.fill_diagonal(dist_matrix, 0) clusterer = hdbscan.HDBSCAN( min_cluster_size=10, # Aligned with publish gate min_samples=5, metric='precomputed' ) labels = clusterer.fit_predict(dist_matrix) # Group spans by cluster topics = {} noise_spans = [] for span, label in zip(spans, labels): if label == -1: # Keep high-confidence noise for quotes if abs(span['sentiment_score']) > 0.5: noise_spans.append(span) continue if label not in topics: topics[label] = {'spans': [], 'embeddings': []} topics[label]['spans'].append(span) topics[label]['embeddings'].append(span['embedding']) # Compute centroids result = [] for label, data in topics.items(): embs = np.array(data['embeddings']) centroid = embs.mean(axis=0) centroid = centroid / np.linalg.norm(centroid) result.append({ 'cluster_id': label, 'spans': data['spans'], 'embeddings': embs, 'centroid': centroid, }) return result, noise_spans def _cluster_spans_fallback(spans: list[dict]) -> tuple[list[dict], list[dict]]: """Fallback clustering for large datasets using PCA + MiniBatchKMeans. Trades cluster quality for O(n) scalability. Generates pseudo-noise from spans far from their cluster centroid. Requires: Each span must have 'embedding' and 'sentiment_score' populated. """ embeddings = np.array([s['embedding'] for s in spans]) # Reduce dimensionality pca = PCA(n_components=50) reduced = pca.fit_transform(embeddings) # Estimate k (heuristic: sqrt(n/10), clamped) k = max(5, min(50, int(np.sqrt(len(spans) / 10)))) kmeans = MiniBatchKMeans(n_clusters=k, batch_size=256, n_init=3) labels = kmeans.fit_predict(reduced) # Group spans by cluster topics = {} for span, emb, label in zip(spans, embeddings, labels): if label not in topics: topics[label] = {'spans': [], 'embeddings': []} topics[label]['spans'].append(span) topics[label]['embeddings'].append(emb) # Compute centroids and identify pseudo-noise (bottom 3% by similarity) result = [] all_distances = [] # (distance, span) tuples for pseudo-noise selection for label, data in topics.items(): embs = np.array(data['embeddings']) centroid = embs.mean(axis=0) centroid = centroid / np.linalg.norm(centroid) # Compute similarities to centroid normed_embs = embs / np.linalg.norm(embs, axis=1, keepdims=True) sims = normed_embs @ centroid # Track distances for pseudo-noise for span, sim in zip(data['spans'], sims): all_distances.append((1 - sim, span)) result.append({ 'cluster_id': label, 'spans': data['spans'], 'embeddings': embs, 'centroid': centroid, }) # Pseudo-noise: bottom 3% by similarity (farthest from any centroid) # Only include high-confidence sentiment spans (same as HDBSCAN noise handling) all_distances.sort(key=lambda x: x[0], reverse=True) noise_cutoff = int(len(all_distances) * 0.03) pseudo_noise = [ span for _, span in all_distances[:noise_cutoff] if abs(span['sentiment_score']) > 0.5 ] return result, pseudo_noise ``` ### Step 3: Compute Review-Level Stats Stats are review-level presence (not span counts). This is critical for defensible claims. ```python def compute_topic_stats(topic: dict, all_review_ids: set) -> dict: """Compute review-level presence stats.""" spans = topic['spans'] n = len(all_review_ids) # Review-level presence reviews_any = set(s['review_id'] for s in spans) reviews_neg = set(s['review_id'] for s in spans if s['sentiment'] == 'negative') reviews_pos = set(s['review_id'] for s in spans if s['sentiment'] == 'positive') k_neg = len(reviews_neg) k_pos = len(reviews_pos) return { 'k_any': len(reviews_any), 'k_neg': k_neg, 'k_pos': k_pos, 'n': n, 'rate_neg': k_neg / n if n > 0 else 0, 'rate_pos': k_pos / n if n > 0 else 0, 'ci_neg': wilson_interval(k_neg, n), 'ci_pos': wilson_interval(k_pos, n), } def wilson_interval(k: int, n: int, z: float = 1.96) -> tuple[float, float]: if n == 0: return (0.0, 1.0) p = k / n denom = 1 + z**2 / n center = (p + z**2 / (2*n)) / denom margin = (z / denom) * np.sqrt(p*(1-p)/n + z**2/(4*n**2)) return (max(0, center - margin), min(1, center + margin)) ``` ### Step 4: Label Topics (Representative Spans, No Stopwords) Topic identity = centroid (for matching). Display label = cleaned representative span (for UI). ```python import re EMAIL_RE = re.compile(r'\b\S+@\S+\.\S+\b') URL_RE = re.compile(r'\b(?:https?://|www\.)\S+\b', re.I) PHONE_RE = re.compile(r'\b(?:\+?\d[\d .()-]{7,}\d)\b') LONGDIG_RE = re.compile(r'\b\d{8,}\b') def beautify_label(text: str) -> str: """Clean PII and noise from label text.""" text = ' '.join(text.split()) text = EMAIL_RE.sub('', text) text = URL_RE.sub('', text) text = PHONE_RE.sub('', text) text = LONGDIG_RE.sub('', text) text = re.sub(r'([!?.]){2,}', r'\1', text) return text.strip() def norm_for_dedup(text: str) -> str: """Normalize for near-duplicate detection. Unicode-safe for multilingual.""" import unicodedata # Casefold (stronger than lower() for Unicode) t = text.casefold() # Normalize Unicode (NFC form) t = unicodedata.normalize('NFC', t) # Replace digits with placeholder t = re.sub(r'\d+', '#', t) # Remove punctuation but keep letters from any alphabet (\w includes Unicode letters) t = re.sub(r'[^\w\s#]+', ' ', t, flags=re.UNICODE) # Collapse whitespace t = ' '.join(t.split()) return t def select_label(topic: dict, used_labels: set) -> str: """Select clean, unique display label from representative spans.""" spans = topic['spans'] embeddings = np.array(topic['embeddings']) centroid = topic['centroid'] # Rank by similarity to centroid sims = embeddings @ centroid ranked = np.argsort(sims)[::-1] for idx in ranked[:15]: cleaned = beautify_label(spans[idx]['text']) if not (15 <= len(cleaned) <= 80): continue key = norm_for_dedup(cleaned) if key in used_labels: continue used_labels.add(key) return cleaned # Fallback: truncate best match best = beautify_label(spans[ranked[0]]['text']) return best[:60].rstrip() + ("..." if len(best) > 60 else "") ``` ### Step 5: Trend Matching (Centroid-Based) Match current topics to prior topics by centroid similarity. Never use label text for matching. **v1 decision**: Compute separate trends for negative and positive rates. This ensures strengths get correct trend values (not reusing negative-only logic). ```python def match_trends(current_topics: list, prior_topics: list, threshold: float = 0.70, margin: float = 0.05, min_k: int = 8, min_n: int = 20): """Match topics across periods for trend computation. Computes both trend_neg and trend_pos separately. """ for curr in current_topics: stats = curr['stats'] curr['trend_neg'] = None curr['trend_pos'] = None curr['trend_match_sim'] = None if not prior_topics: continue # Find best and second-best match by centroid similarity sims = [(p, float(curr['centroid'] @ p['centroid'])) for p in prior_topics] sims.sort(key=lambda x: x[1], reverse=True) best, best_sim = sims[0] second_sim = sims[1][1] if len(sims) > 1 else 0 # Gate: match must be confident AND clearly better than alternatives if best_sim < threshold or (best_sim - second_sim) < margin: continue curr['trend_match_sim'] = best_sim # Compute trend for negatives (if both periods have enough data) if (stats['k_neg'] >= min_k and stats['n'] >= min_n and best['stats']['k_neg'] >= min_k and best['stats']['n'] >= min_n): curr['trend_neg'] = stats['rate_neg'] - best['stats']['rate_neg'] # Compute trend for positives (if both periods have enough data) if (stats['k_pos'] >= min_k and stats['n'] >= min_n and best['stats']['k_pos'] >= min_k and best['stats']['n'] >= min_n): curr['trend_pos'] = stats['rate_pos'] - best['stats']['rate_pos'] ``` ### Step 6: Quote Selection Pick representative + sharp quotes. Include high-confidence noise spans. - **Representative**: closest span to centroid (within topic, matching sentiment) - **Sharp**: highest |sentiment_score| among topic spans + high-confidence noise ```python def pick_quotes(topic: dict, noise_spans: list, sentiment_filter: str, k: int = 2) -> list[dict]: """Select diverse, high-quality quotes: 1 representative + 1 sharp.""" topic_spans = [s for s in topic['spans'] if s['sentiment'] == sentiment_filter] centroid = topic['centroid'] quotes = [] seen_reviews = set() # 1. Representative: closest to centroid if topic_spans: embeddings = np.array([s['embedding'] for s in topic_spans]) sims = embeddings @ centroid ranked_idx = np.argsort(sims)[::-1] for idx in ranked_idx: span = topic_spans[idx] if span['review_id'] in seen_reviews: continue if len(span['text']) > 200: continue quotes.append({ 'text': span['text'], 'sentiment': span['sentiment'], 'date': span['date'], 'type': 'representative', }) seen_reviews.add(span['review_id']) break # 2. Sharp: highest confidence from topic + noise sharp_candidates = topic_spans + [s for s in noise_spans if s['sentiment'] == sentiment_filter and abs(s['sentiment_score']) > 0.5] sharp_candidates.sort(key=lambda s: abs(s['sentiment_score']), reverse=True) for span in sharp_candidates: if span['review_id'] in seen_reviews: continue if len(span['text']) > 200: continue quotes.append({ 'text': span['text'], 'sentiment': span['sentiment'], 'date': span['date'], 'type': 'sharp', }) seen_reviews.add(span['review_id']) if len(quotes) >= k: break return quotes ``` ### Step 7: Staff Aggregation ```python def aggregate_staff(spans: list[dict], all_review_ids: set) -> dict: """Aggregate staff mentions with review-level presence.""" staff_data = {} for span in spans: if not span['staff_mentions']: continue for name in span['staff_mentions']: if name not in staff_data: staff_data[name] = {'pos_reviews': set(), 'neg_reviews': set(), 'quotes': []} if span['sentiment'] == 'positive': staff_data[name]['pos_reviews'].add(span['review_id']) staff_data[name]['quotes'].append(span['text']) elif span['sentiment'] == 'negative': staff_data[name]['neg_reviews'].add(span['review_id']) staff_data[name]['quotes'].append(span['text']) # Build heroes and concerns heroes, concerns = [], [] for name, data in staff_data.items(): pos = len(data['pos_reviews']) neg = len(data['neg_reviews']) total = pos + neg if total < 3: # Minimum mentions continue entry = { 'name': name, 'positive': pos, 'negative': neg, 'total': total, 'quote': data['quotes'][0] if data['quotes'] else None, } if pos > neg and pos >= 3: heroes.append(entry) elif neg > pos and neg >= 3: concerns.append(entry) heroes.sort(key=lambda x: x['positive'], reverse=True) concerns.sort(key=lambda x: x['negative'], reverse=True) return {'heroes': heroes[:3], 'concerns': concerns[:3]} ``` ### Step 8: Build LLM Payload ```python def build_payload(business_id: str, current_period: tuple, topics: list, noise_spans: list, staff: dict, review_count: int) -> dict: """Build structured payload for LLM narration. Args: noise_spans: High-confidence spans not assigned to any cluster. Used for quote selection. """ issues = [] strengths = [] for topic in topics: stats = topic['stats'] # Issue: significant negative presence if stats['k_neg'] >= 8 and stats['n'] >= 20: ci = stats['ci_neg'] if ci[1] - ci[0] <= 0.30: # CI not too wide issues.append({ 'label': topic['label'], 'rate': round(stats['rate_neg'], 3), 'ci': [round(ci[0], 3), round(ci[1], 3)], 'n': stats['k_neg'], 'trend': round(topic['trend_neg'], 3) if topic.get('trend_neg') else None, 'quotes': pick_quotes(topic, noise_spans, 'negative', k=2), }) # Strength: significant positive presence if stats['k_pos'] >= 8 and stats['n'] >= 20: ci = stats['ci_pos'] if ci[1] - ci[0] <= 0.30: strengths.append({ 'label': topic['label'], 'rate': round(stats['rate_pos'], 3), 'ci': [round(ci[0], 3), round(ci[1], 3)], 'n': stats['k_pos'], 'trend': round(topic['trend_pos'], 3) if topic.get('trend_pos') else None, 'quotes': pick_quotes(topic, noise_spans, 'positive', k=2), }) # Sort by rate issues.sort(key=lambda x: x['rate'], reverse=True) strengths.sort(key=lambda x: x['rate'], reverse=True) return { 'business_id': business_id, 'period': f"{current_period[0]} to {current_period[1]}", 'total_reviews': review_count, 'issues': issues[:5], 'strengths': strengths[:5], 'staff': staff, } ``` ### Step 9: LLM Narration (Single Call) ```python SYSTEM_PROMPT = """You are a business consultant analyzing customer review data. Write a clear, actionable report for a small business owner. RULES: 1. Use ONLY the statistics provided. Never invent numbers. 2. Include confidence intervals when stating percentages. 3. Be direct and actionable. The owner is busy. 4. Prioritize issues by frequency and trend direction. 5. Each recommendation must reference a specific issue from the data.""" def generate_report(payload: dict) -> str: user_prompt = f"""Based on this review analysis, write a consultant report. DATA: {json.dumps(payload, indent=2)} SECTIONS: 1. Executive Summary (3 sentences max) 2. Top Strengths (what's working, with stats) 3. Critical Issues (what needs attention, with stats and trends) 4. Staff Performance (heroes and concerns if present) 5. Recommended Actions (3-5 specific steps, prioritized) Keep total length under 600 words.""" response = llm_client.chat( model="gpt-4o-mini", messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt} ], max_tokens=1500 ) return response.content ``` ### Full Report Generation Function ```python def generate_full_report(business_id: str, current_start: date, current_end: date, prior_start: date, prior_end: date) -> str: """Generate complete report for a business.""" # Fetch spans current_spans = fetch_spans(business_id, current_start, current_end) prior_spans = fetch_spans(business_id, prior_start, prior_end) if not current_spans: return "Insufficient data for report." # Get unique review IDs current_reviews = set(s['review_id'] for s in current_spans) prior_reviews = set(s['review_id'] for s in prior_spans) # Cluster current period current_topics, noise_spans = cluster_spans(current_spans) # Compute stats for current topics for topic in current_topics: topic['stats'] = compute_topic_stats(topic, current_reviews) # Label topics (with deduplication) used_labels = set() for topic in current_topics: topic['label'] = select_label(topic, used_labels) # Cluster and compute stats for prior period prior_topics = [] if prior_spans: prior_topics, _ = cluster_spans(prior_spans) for topic in prior_topics: topic['stats'] = compute_topic_stats(topic, prior_reviews) # Match trends match_trends(current_topics, prior_topics) # Aggregate staff staff = aggregate_staff(current_spans, current_reviews) # Build payload (include noise_spans for quote selection) payload = build_payload( business_id, (current_start, current_end), current_topics, noise_spans, # Pass noise spans for quote selection staff, len(current_reviews) ) # Generate report return generate_report(payload) ``` --- ## E. Summary of Design Decisions ### What We Do | Decision | Rationale | |----------|-----------| | Ephemeral topics (no persistent catalog) | Eliminates drift, merge logic, thresholds | | Cluster all spans together | One topic can have pos/neg breakdown; avoids duplicates | | Fallback clustering for large datasets | PCA + KMeans when >4000 spans (O(n) vs O(n²)) | | Review-level presence for stats | Defensible claims ("X% of customers") | | Wilson intervals + publish gates | Statistical rigor | | Centroid-based trend matching | Stable identity regardless of label changes | | Separate trend_neg/trend_pos | Correct trends for both issues and strengths | | Representative + sharp quotes | Best of both: centroid-closest + highest confidence | | Representative span labels | Human-readable, no stopwords/NLP needed | | Unicode-safe label dedup | Works for Spanish, German, etc. | | Multi-anchor sentiment | Better multilingual alignment than bag sentence | | Guarded staff extraction | Reduces false positives | | Single LLM call | Cost control | ### What We Don't Do | Avoided | Why | |---------|-----| | Persistent topic catalog | Adds state, drift, merge complexity | | Topic assignment at ingest | Unnecessary; cluster at report time | | Span-count stats | Inflates rates; review-level is correct | | TF-IDF with stopwords | Brittle; representative spans are better | | Split on "and/y/und" | Over-splits positive phrases | | POS tagging for labels | Heavy dependency; regex cleanup is sufficient | | Translation | Multilingual embeddings + multi-language anchors handle it | | Sentiment classifier | Multi-anchor approach works across languages | ### Statistical Gates | Gate | Threshold | Purpose | |------|-----------|---------| | Minimum k | 8 | Topic must have enough mentions | | Minimum n | 20 | Period must have enough reviews | | CI width | ≤ 0.30 | Reject imprecise estimates | | Trend match sim | ≥ 0.70 | Confident topic match | | Trend margin | ≥ 0.05 | Clear winner vs alternatives | | Both periods min | k≥8, n≥20 | Trend requires data on both sides | ### Trend Handling - **Accurate when**: Topic structure is stable (most real issues) - **Omitted when**: Match confidence is low - **Separate trends**: `trend_neg` and `trend_pos` computed independently - **Never**: Show confidently wrong trends --- ## F. Implementation Plan | Day | Deliverable | |-----|-------------| | 1-2 | Span splitter + embedding service | | 3-4 | Sentiment scoring + staff extraction | | 5-6 | Database schema + ingest pipeline | | 7-8 | Clustering + stats + labeling | | 9-10 | Trend matching + quote selection | | 11-12 | LLM integration + end-to-end testing | **Total: ~12 days for a competent engineer** --- ## G. What's NOT in v1 | Feature | Rationale | v2 Trigger | |---------|-----------|------------| | Token-window segmentation | Punctuation split is good enough | Run-on reviews cause quality issues | | Many-to-many trend matching | Best-match is good enough | Trend accuracy complaints | | Owner-driven topic editing | Not needed yet | Users want to rename/merge topics | | Multi-location rollup | Different product | Chain restaurants sign up | | Anomaly detection | Different product | Fraud complaints | | Response templates | Low value | User requests | --- ## H. Known Limitations / Future Improvements | Limitation | Impact | v2 Consideration | |------------|--------|------------------| | Sentiment anchors cover EN/ES/DE only | Other languages (FR, PT, IT, etc.) rely on multilingual-e5 alignment | Add 5-10 anchors per new language as user base grows | | KMeans fallback uses pseudo-noise heuristic | Sharp quotes may be slightly less sharp for >4k span reports | Consider HDBSCAN with approximate nearest neighbors (pynndescent) | | No streaming for very large reports | Memory pressure if report spans exceed 10k | Paginate or sample spans for extreme cases | --- ## I. Final Checklist Before Ship - [ ] Span splitter handles mobile text (no punctuation edge case) - [ ] Embeddings are L2-normalized before clustering - [ ] HDBSCAN uses precomputed cosine distance matrix - [ ] Clustering has fallback for >4000 spans (PCA + KMeans) - [ ] KMeans fallback generates pseudo-noise (bottom 3% by centroid distance) - [ ] Stats are review-level presence (not span counts) - [ ] Labels are deduplicated across topics (Unicode-safe) - [ ] Trends computed separately for neg/pos (trend_neg, trend_pos) - [ ] Trends require min support in BOTH periods - [ ] Sentiment anchors are multi-word averaged (not bag sentence) - [ ] Sentiment anchors include EN/ES/DE words - [ ] Staff history lookup uses normalized names - [ ] noise_spans passed to quote selection - [ ] pgvector index uses HNSW (or ivfflat with ANALYZE documented) - [ ] LLM prompt enforces "only use provided numbers" - [ ] Cost per report < $0.30 --- **Document version**: v1-final-reviewed **Status**: Ready for implementation (with reviewer fixes applied)