From 3da243be7933bde97cb4ede827564e25bddbdbc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 11:21:33 +0000 Subject: [PATCH] Add ReviewIQ pipeline spec and metadata extraction test - reviewiq-pipeline-v1-final.md: Earlier pipeline specification - test_metadata_extraction.py: Test script for metadata extraction Co-Authored-By: Claude Opus 4.5 --- .artifacts/reviewiq-pipeline-v1-final.md | 992 +++++++++++++++++++++++ test_metadata_extraction.py | 398 +++++++++ 2 files changed, 1390 insertions(+) create mode 100644 .artifacts/reviewiq-pipeline-v1-final.md create mode 100644 test_metadata_extraction.py diff --git a/.artifacts/reviewiq-pipeline-v1-final.md b/.artifacts/reviewiq-pipeline-v1-final.md new file mode 100644 index 0000000..b2a8030 --- /dev/null +++ b/.artifacts/reviewiq-pipeline-v1-final.md @@ -0,0 +1,992 @@ +# ReviewIQ Pipeline v1 — Final Architecture + +**Design principle**: Minimum state, defensible stats, multilingual, robust to messy mobile text, 1 LLM call per report, <$0.30/report. + +**Core decision**: Do not persist topics. Persist only enriched spans. Build topics at report time via clustering and match across periods for trends. + +--- + +## A. Architecture Overview + +``` + INGEST (continuous, stateless, ~$0.00) +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Raw Review │────▶│ Span │────▶│ Embed + │────▶│ Store │ +│ (text,rating,│ │ Splitter │ │ Sentiment │ │ Enriched │ +│ date, lang) │ │ │ │ + NER │ │ Spans │ +└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ + +No topic assignment at ingest. Just store enriched spans. + + REPORT (per request, ~$0.20) +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Fetch │────▶│ Cluster │────▶│ Stats + │────▶│ LLM │ +│ Spans │ │ (HDBSCAN) │ │ Labels + │ │ Narrate │ +│ │ │ │ │ Quotes │ │ (1 call) │ +└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ + +Topics are ephemeral. They exist only for this report. +Trends are computed by matching clusters across periods via centroid similarity. +``` + +### Cost Model + +| Stage | When | Cost | Notes | +|-------|------|------|-------| +| Span splitting | Per review ingested | $0.00 | Regex only | +| Embedding | Per span ingested | $0.00 | Local model, batched | +| Sentiment | Per span ingested | $0.00 | Embedding math (EN/ES/DE multi-anchor) | +| NER (staff) | Per span ingested | $0.00 | spaCy, guarded | +| Clustering | Per report | $0.00 | HDBSCAN <4k spans, PCA+KMeans fallback | +| Stats + labels | Per report | $0.00 | Python/SQL | +| LLM narration | Per report | ~$0.15-0.25 | Single API call | + +**Total: ~$0.20/report** (dominated by LLM) + +--- + +## B. Data Model (Only What Persists) + +### 1. Raw Reviews + +```sql +CREATE TABLE reviews ( + review_id TEXT PRIMARY KEY, + business_id TEXT NOT NULL, + text TEXT NOT NULL, + rating INT NOT NULL, + date TIMESTAMP, + source TEXT DEFAULT 'google', + ingested_at TIMESTAMP DEFAULT NOW() +); +``` + +### 2. Enriched Spans (The Only ML Artifact) + +```sql +CREATE TABLE spans ( + span_id TEXT PRIMARY KEY, + review_id TEXT REFERENCES reviews(review_id), + business_id TEXT NOT NULL, + span_index INT NOT NULL, + text TEXT NOT NULL, + embedding VECTOR(384), + sentiment TEXT, -- 'positive', 'negative', 'neutral' + sentiment_score FLOAT, + staff_mentions TEXT[], -- guarded extraction + date TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX idx_spans_business_date ON spans(business_id, date); + +-- Embedding index: prefer HNSW if available (pgvector 0.5+), otherwise ivfflat +-- HNSW: no training required, better query performance +CREATE INDEX idx_spans_embedding ON spans USING hnsw (embedding vector_cosine_ops) + WITH (m = 16, ef_construction = 64); + +-- Alternative for older pgvector (requires ANALYZE after bulk inserts): +-- CREATE INDEX idx_spans_embedding ON spans USING ivfflat (embedding vector_cosine_ops) +-- WITH (lists = 100); +-- ANALYZE spans; -- Required after bulk insert for ivfflat to work correctly +``` + +### 3. Review-Topic Presence (Computed at Report Time, Not Stored) + +Topics are ephemeral. Presence is computed per report, not persisted. + +--- + +## C. Ingest Pipeline + +### Step 1: Span Splitting + +Split on punctuation. Fallback split on contrast markers. Merge tiny fragments. + +```python +import re + +CONTRAST_RE = re.compile( + r'\b(?:but|pero|aber|aunque|however|though|although|yet|still|sin embargo)\b', + re.IGNORECASE +) + +def split_spans(text: str) -> list[str]: + # Split on punctuation (good enough for most text, with contrast fallback) + parts = re.split(r'[.!?;:,]\s*|\s{2,}', text) + parts = [p.strip() for p in parts if len(p.strip()) >= 12] + + # Fallback split on contrast markers + refined = [] + for p in parts: + if CONTRAST_RE.search(p): + sub = [s.strip() for s in CONTRAST_RE.split(p)] + # Merge tiny fragments back + merged = [] + for s in sub: + if not s: + continue + if len(s) < 12 and merged: + merged[-1] = merged[-1] + ' ' + s + else: + merged.append(s) + refined.extend([m for m in merged if len(m) >= 12]) + else: + refined.append(p) + + return refined +``` + +**Note**: Do NOT split on "and/y/und" by default — these often connect positive qualities ("friendly and fast"). + +### Step 2: Embedding + +Use multilingual model. No translation needed. + +```python +from sentence_transformers import SentenceTransformer + +model = SentenceTransformer('intfloat/multilingual-e5-small') + +def embed_spans(spans: list[str]) -> np.ndarray: + return model.encode(spans, normalize_embeddings=True) +``` + +### Step 3: Sentiment (Anchor-Based) + +Score sentiment via embedding distance to polar anchors. Works across all languages. + +**Note**: Encode multiple short anchors separately, normalize, then average. This gives +better multilingual alignment than a single "bag sentence". + +```python +# Multiple short anchors for better multilingual alignment +# Include ES/DE anchors for improved cross-language recall +POSITIVE_WORDS = [ + # English + "excellent", "wonderful", "amazing", "great", "fantastic", + "delicious", "friendly", "helpful", "perfect", "outstanding", + # Spanish + "excelente", "increíble", "delicioso", "amable", "rápido", + # German + "toll", "lecker", "freundlich", "schnell", "perfekt", +] +NEGATIVE_WORDS = [ + # English + "terrible", "awful", "horrible", "bad", "disgusting", + "rude", "slow", "dirty", "broken", "disappointing", + # Spanish + "horrible", "sucio", "lento", "grosero", "caro", + # German + "schlecht", "langsam", "unhöflich", "dreckig", "teuer", +] + +def _compute_anchor(words: list[str]) -> np.ndarray: + """Encode multiple anchors, normalize each, then average. + + Deduplicates words to avoid implicit weighting. + """ + unique_words = list(dict.fromkeys(words)) # Preserve order, remove dupes + embeddings = model.encode(unique_words, normalize_embeddings=True) + avg = embeddings.mean(axis=0) + return avg / np.linalg.norm(avg) # Re-normalize the average + +POSITIVE_ANCHOR = _compute_anchor(POSITIVE_WORDS) +NEGATIVE_ANCHOR = _compute_anchor(NEGATIVE_WORDS) + +def score_sentiment(embedding: np.ndarray) -> tuple[str, float]: + pos_sim = embedding @ POSITIVE_ANCHOR + neg_sim = embedding @ NEGATIVE_ANCHOR + + score = (pos_sim - neg_sim) / (pos_sim + neg_sim + 1e-6) + + if score > 0.15: + return ('positive', float(score)) + elif score < -0.15: + return ('negative', float(abs(score))) + else: + return ('neutral', 0.0) +``` + +### Step 4: Staff Extraction (Guarded) + +Use spaCy NER, but only count as staff when guarded: + +```python +import spacy + +nlp = spacy.load('xx_ent_wiki_sm') # multilingual + +ROLE_WORDS = {'server', 'waiter', 'waitress', 'manager', 'chef', 'doctor', + 'nurse', 'receptionist', 'mesero', 'gerente', 'doctor', 'kellner'} + +def extract_staff(text: str, business_history: dict = None) -> list[str]: + doc = nlp(text) + staff = [] + + for ent in doc.ents: + if ent.label_ != 'PERSON': + continue + + name = ent.text.strip() + normalized = normalize_name(name) # Normalize early for consistent lookup + context = text[max(0, ent.start_char-30):ent.end_char+30].lower() + + # Guard 1: Near role word + if any(role in context for role in ROLE_WORDS): + staff.append(normalized) + continue + + # Guard 2: Appears in thanks pattern + if any(p in context for p in ['thank', 'gracias', 'danke', 'shout out', 'kudos']): + staff.append(normalized) + continue + + # Guard 3: Frequent across reviews (if history available) + # Use normalized name for lookup (history keys are also normalized) + if business_history and business_history.get(normalized, 0) >= 3: + staff.append(normalized) + + return list(set(staff)) + +def normalize_name(name: str) -> str: + return ' '.join(name.strip().title().split()) +``` + +### Full Ingest Function + +```python +def ingest_review(review: dict) -> list[dict]: + spans = split_spans(review['text']) + if not spans: + return [] + + embeddings = embed_spans(spans) + + enriched = [] + for i, (text, emb) in enumerate(zip(spans, embeddings)): + sentiment, confidence = score_sentiment(emb) + staff = extract_staff(text) + + enriched.append({ + 'span_id': f"{review['review_id']}_{i}", + 'review_id': review['review_id'], + 'business_id': review['business_id'], + 'span_index': i, + 'text': text, + 'embedding': emb, + 'sentiment': sentiment, + 'sentiment_score': confidence, + 'staff_mentions': staff if staff else None, + 'date': review['date'], + }) + + return enriched +``` + +--- + +## D. Report Generation + +### Step 1: Fetch Spans + +```python +def fetch_spans(business_id: str, start: date, end: date) -> list[dict]: + return db.query(""" + SELECT span_id, review_id, text, embedding, sentiment, + sentiment_score, staff_mentions, date + FROM spans + WHERE business_id = %s AND date >= %s AND date < %s + """, [business_id, start, end]) +``` + +### Step 2: Cluster Spans (Ephemeral Topics) + +Cluster ALL spans together (not pos/neg separately). Compute sentiment breakdown within each cluster. + +**Scalability note**: Full distance matrix is O(n²) memory/time. For large span counts, +we fall back to PCA + MiniBatchKMeans. + +```python +import hdbscan +import numpy as np +from sklearn.decomposition import PCA +from sklearn.cluster import MiniBatchKMeans + +MAX_SPANS_FOR_HDBSCAN = 4000 # Beyond this, O(n²) distance matrix is too expensive + +def cluster_spans(spans: list[dict]) -> tuple[list[dict], list[dict]]: + """Returns (topics, noise_spans) + + Uses HDBSCAN for small datasets, falls back to PCA+KMeans for large ones. + """ + + if len(spans) > MAX_SPANS_FOR_HDBSCAN: + return _cluster_spans_fallback(spans) + + embeddings = np.array([s['embedding'] for s in spans]) + + # L2-normalize and compute distance matrix + normed = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) + dist_matrix = 1 - (normed @ normed.T) + np.fill_diagonal(dist_matrix, 0) + + clusterer = hdbscan.HDBSCAN( + min_cluster_size=10, # Aligned with publish gate + min_samples=5, + metric='precomputed' + ) + labels = clusterer.fit_predict(dist_matrix) + + # Group spans by cluster + topics = {} + noise_spans = [] + + for span, label in zip(spans, labels): + if label == -1: + # Keep high-confidence noise for quotes + if abs(span['sentiment_score']) > 0.5: + noise_spans.append(span) + continue + + if label not in topics: + topics[label] = {'spans': [], 'embeddings': []} + topics[label]['spans'].append(span) + topics[label]['embeddings'].append(span['embedding']) + + # Compute centroids + result = [] + for label, data in topics.items(): + embs = np.array(data['embeddings']) + centroid = embs.mean(axis=0) + centroid = centroid / np.linalg.norm(centroid) + + result.append({ + 'cluster_id': label, + 'spans': data['spans'], + 'embeddings': embs, + 'centroid': centroid, + }) + + return result, noise_spans + + +def _cluster_spans_fallback(spans: list[dict]) -> tuple[list[dict], list[dict]]: + """Fallback clustering for large datasets using PCA + MiniBatchKMeans. + + Trades cluster quality for O(n) scalability. + Generates pseudo-noise from spans far from their cluster centroid. + + Requires: Each span must have 'embedding' and 'sentiment_score' populated. + """ + + embeddings = np.array([s['embedding'] for s in spans]) + + # Reduce dimensionality + pca = PCA(n_components=50) + reduced = pca.fit_transform(embeddings) + + # Estimate k (heuristic: sqrt(n/10), clamped) + k = max(5, min(50, int(np.sqrt(len(spans) / 10)))) + + kmeans = MiniBatchKMeans(n_clusters=k, batch_size=256, n_init=3) + labels = kmeans.fit_predict(reduced) + + # Group spans by cluster + topics = {} + for span, emb, label in zip(spans, embeddings, labels): + if label not in topics: + topics[label] = {'spans': [], 'embeddings': []} + topics[label]['spans'].append(span) + topics[label]['embeddings'].append(emb) + + # Compute centroids and identify pseudo-noise (bottom 3% by similarity) + result = [] + all_distances = [] # (distance, span) tuples for pseudo-noise selection + + for label, data in topics.items(): + embs = np.array(data['embeddings']) + centroid = embs.mean(axis=0) + centroid = centroid / np.linalg.norm(centroid) + + # Compute similarities to centroid + normed_embs = embs / np.linalg.norm(embs, axis=1, keepdims=True) + sims = normed_embs @ centroid + + # Track distances for pseudo-noise + for span, sim in zip(data['spans'], sims): + all_distances.append((1 - sim, span)) + + result.append({ + 'cluster_id': label, + 'spans': data['spans'], + 'embeddings': embs, + 'centroid': centroid, + }) + + # Pseudo-noise: bottom 3% by similarity (farthest from any centroid) + # Only include high-confidence sentiment spans (same as HDBSCAN noise handling) + all_distances.sort(key=lambda x: x[0], reverse=True) + noise_cutoff = int(len(all_distances) * 0.03) + pseudo_noise = [ + span for _, span in all_distances[:noise_cutoff] + if abs(span['sentiment_score']) > 0.5 + ] + + return result, pseudo_noise +``` + +### Step 3: Compute Review-Level Stats + +Stats are review-level presence (not span counts). This is critical for defensible claims. + +```python +def compute_topic_stats(topic: dict, all_review_ids: set) -> dict: + """Compute review-level presence stats.""" + + spans = topic['spans'] + n = len(all_review_ids) + + # Review-level presence + reviews_any = set(s['review_id'] for s in spans) + reviews_neg = set(s['review_id'] for s in spans if s['sentiment'] == 'negative') + reviews_pos = set(s['review_id'] for s in spans if s['sentiment'] == 'positive') + + k_neg = len(reviews_neg) + k_pos = len(reviews_pos) + + return { + 'k_any': len(reviews_any), + 'k_neg': k_neg, + 'k_pos': k_pos, + 'n': n, + 'rate_neg': k_neg / n if n > 0 else 0, + 'rate_pos': k_pos / n if n > 0 else 0, + 'ci_neg': wilson_interval(k_neg, n), + 'ci_pos': wilson_interval(k_pos, n), + } + +def wilson_interval(k: int, n: int, z: float = 1.96) -> tuple[float, float]: + if n == 0: + return (0.0, 1.0) + + p = k / n + denom = 1 + z**2 / n + center = (p + z**2 / (2*n)) / denom + margin = (z / denom) * np.sqrt(p*(1-p)/n + z**2/(4*n**2)) + + return (max(0, center - margin), min(1, center + margin)) +``` + +### Step 4: Label Topics (Representative Spans, No Stopwords) + +Topic identity = centroid (for matching). Display label = cleaned representative span (for UI). + +```python +import re + +EMAIL_RE = re.compile(r'\b\S+@\S+\.\S+\b') +URL_RE = re.compile(r'\b(?:https?://|www\.)\S+\b', re.I) +PHONE_RE = re.compile(r'\b(?:\+?\d[\d .()-]{7,}\d)\b') +LONGDIG_RE = re.compile(r'\b\d{8,}\b') + +def beautify_label(text: str) -> str: + """Clean PII and noise from label text.""" + text = ' '.join(text.split()) + text = EMAIL_RE.sub('', text) + text = URL_RE.sub('', text) + text = PHONE_RE.sub('', text) + text = LONGDIG_RE.sub('', text) + text = re.sub(r'([!?.]){2,}', r'\1', text) + return text.strip() + +def norm_for_dedup(text: str) -> str: + """Normalize for near-duplicate detection. Unicode-safe for multilingual.""" + import unicodedata + + # Casefold (stronger than lower() for Unicode) + t = text.casefold() + + # Normalize Unicode (NFC form) + t = unicodedata.normalize('NFC', t) + + # Replace digits with placeholder + t = re.sub(r'\d+', '#', t) + + # Remove punctuation but keep letters from any alphabet (\w includes Unicode letters) + t = re.sub(r'[^\w\s#]+', ' ', t, flags=re.UNICODE) + + # Collapse whitespace + t = ' '.join(t.split()) + + return t + +def select_label(topic: dict, used_labels: set) -> str: + """Select clean, unique display label from representative spans.""" + + spans = topic['spans'] + embeddings = np.array(topic['embeddings']) + centroid = topic['centroid'] + + # Rank by similarity to centroid + sims = embeddings @ centroid + ranked = np.argsort(sims)[::-1] + + for idx in ranked[:15]: + cleaned = beautify_label(spans[idx]['text']) + + if not (15 <= len(cleaned) <= 80): + continue + + key = norm_for_dedup(cleaned) + if key in used_labels: + continue + + used_labels.add(key) + return cleaned + + # Fallback: truncate best match + best = beautify_label(spans[ranked[0]]['text']) + return best[:60].rstrip() + ("..." if len(best) > 60 else "") +``` + +### Step 5: Trend Matching (Centroid-Based) + +Match current topics to prior topics by centroid similarity. Never use label text for matching. + +**v1 decision**: Compute separate trends for negative and positive rates. This ensures strengths +get correct trend values (not reusing negative-only logic). + +```python +def match_trends(current_topics: list, prior_topics: list, + threshold: float = 0.70, margin: float = 0.05, + min_k: int = 8, min_n: int = 20): + """Match topics across periods for trend computation. + + Computes both trend_neg and trend_pos separately. + """ + + for curr in current_topics: + stats = curr['stats'] + curr['trend_neg'] = None + curr['trend_pos'] = None + curr['trend_match_sim'] = None + + if not prior_topics: + continue + + # Find best and second-best match by centroid similarity + sims = [(p, float(curr['centroid'] @ p['centroid'])) for p in prior_topics] + sims.sort(key=lambda x: x[1], reverse=True) + + best, best_sim = sims[0] + second_sim = sims[1][1] if len(sims) > 1 else 0 + + # Gate: match must be confident AND clearly better than alternatives + if best_sim < threshold or (best_sim - second_sim) < margin: + continue + + curr['trend_match_sim'] = best_sim + + # Compute trend for negatives (if both periods have enough data) + if (stats['k_neg'] >= min_k and stats['n'] >= min_n and + best['stats']['k_neg'] >= min_k and best['stats']['n'] >= min_n): + curr['trend_neg'] = stats['rate_neg'] - best['stats']['rate_neg'] + + # Compute trend for positives (if both periods have enough data) + if (stats['k_pos'] >= min_k and stats['n'] >= min_n and + best['stats']['k_pos'] >= min_k and best['stats']['n'] >= min_n): + curr['trend_pos'] = stats['rate_pos'] - best['stats']['rate_pos'] +``` + +### Step 6: Quote Selection + +Pick representative + sharp quotes. Include high-confidence noise spans. + +- **Representative**: closest span to centroid (within topic, matching sentiment) +- **Sharp**: highest |sentiment_score| among topic spans + high-confidence noise + +```python +def pick_quotes(topic: dict, noise_spans: list, sentiment_filter: str, + k: int = 2) -> list[dict]: + """Select diverse, high-quality quotes: 1 representative + 1 sharp.""" + + topic_spans = [s for s in topic['spans'] if s['sentiment'] == sentiment_filter] + centroid = topic['centroid'] + + quotes = [] + seen_reviews = set() + + # 1. Representative: closest to centroid + if topic_spans: + embeddings = np.array([s['embedding'] for s in topic_spans]) + sims = embeddings @ centroid + ranked_idx = np.argsort(sims)[::-1] + + for idx in ranked_idx: + span = topic_spans[idx] + if span['review_id'] in seen_reviews: + continue + if len(span['text']) > 200: + continue + + quotes.append({ + 'text': span['text'], + 'sentiment': span['sentiment'], + 'date': span['date'], + 'type': 'representative', + }) + seen_reviews.add(span['review_id']) + break + + # 2. Sharp: highest confidence from topic + noise + sharp_candidates = topic_spans + [s for s in noise_spans + if s['sentiment'] == sentiment_filter + and abs(s['sentiment_score']) > 0.5] + sharp_candidates.sort(key=lambda s: abs(s['sentiment_score']), reverse=True) + + for span in sharp_candidates: + if span['review_id'] in seen_reviews: + continue + if len(span['text']) > 200: + continue + + quotes.append({ + 'text': span['text'], + 'sentiment': span['sentiment'], + 'date': span['date'], + 'type': 'sharp', + }) + seen_reviews.add(span['review_id']) + + if len(quotes) >= k: + break + + return quotes +``` + +### Step 7: Staff Aggregation + +```python +def aggregate_staff(spans: list[dict], all_review_ids: set) -> dict: + """Aggregate staff mentions with review-level presence.""" + + staff_data = {} + + for span in spans: + if not span['staff_mentions']: + continue + + for name in span['staff_mentions']: + if name not in staff_data: + staff_data[name] = {'pos_reviews': set(), 'neg_reviews': set(), 'quotes': []} + + if span['sentiment'] == 'positive': + staff_data[name]['pos_reviews'].add(span['review_id']) + staff_data[name]['quotes'].append(span['text']) + elif span['sentiment'] == 'negative': + staff_data[name]['neg_reviews'].add(span['review_id']) + staff_data[name]['quotes'].append(span['text']) + + # Build heroes and concerns + heroes, concerns = [], [] + + for name, data in staff_data.items(): + pos = len(data['pos_reviews']) + neg = len(data['neg_reviews']) + total = pos + neg + + if total < 3: # Minimum mentions + continue + + entry = { + 'name': name, + 'positive': pos, + 'negative': neg, + 'total': total, + 'quote': data['quotes'][0] if data['quotes'] else None, + } + + if pos > neg and pos >= 3: + heroes.append(entry) + elif neg > pos and neg >= 3: + concerns.append(entry) + + heroes.sort(key=lambda x: x['positive'], reverse=True) + concerns.sort(key=lambda x: x['negative'], reverse=True) + + return {'heroes': heroes[:3], 'concerns': concerns[:3]} +``` + +### Step 8: Build LLM Payload + +```python +def build_payload(business_id: str, current_period: tuple, + topics: list, noise_spans: list, staff: dict, + review_count: int) -> dict: + """Build structured payload for LLM narration. + + Args: + noise_spans: High-confidence spans not assigned to any cluster. + Used for quote selection. + """ + + issues = [] + strengths = [] + + for topic in topics: + stats = topic['stats'] + + # Issue: significant negative presence + if stats['k_neg'] >= 8 and stats['n'] >= 20: + ci = stats['ci_neg'] + if ci[1] - ci[0] <= 0.30: # CI not too wide + issues.append({ + 'label': topic['label'], + 'rate': round(stats['rate_neg'], 3), + 'ci': [round(ci[0], 3), round(ci[1], 3)], + 'n': stats['k_neg'], + 'trend': round(topic['trend_neg'], 3) if topic.get('trend_neg') else None, + 'quotes': pick_quotes(topic, noise_spans, 'negative', k=2), + }) + + # Strength: significant positive presence + if stats['k_pos'] >= 8 and stats['n'] >= 20: + ci = stats['ci_pos'] + if ci[1] - ci[0] <= 0.30: + strengths.append({ + 'label': topic['label'], + 'rate': round(stats['rate_pos'], 3), + 'ci': [round(ci[0], 3), round(ci[1], 3)], + 'n': stats['k_pos'], + 'trend': round(topic['trend_pos'], 3) if topic.get('trend_pos') else None, + 'quotes': pick_quotes(topic, noise_spans, 'positive', k=2), + }) + + # Sort by rate + issues.sort(key=lambda x: x['rate'], reverse=True) + strengths.sort(key=lambda x: x['rate'], reverse=True) + + return { + 'business_id': business_id, + 'period': f"{current_period[0]} to {current_period[1]}", + 'total_reviews': review_count, + 'issues': issues[:5], + 'strengths': strengths[:5], + 'staff': staff, + } +``` + +### Step 9: LLM Narration (Single Call) + +```python +SYSTEM_PROMPT = """You are a business consultant analyzing customer review data. +Write a clear, actionable report for a small business owner. + +RULES: +1. Use ONLY the statistics provided. Never invent numbers. +2. Include confidence intervals when stating percentages. +3. Be direct and actionable. The owner is busy. +4. Prioritize issues by frequency and trend direction. +5. Each recommendation must reference a specific issue from the data.""" + +def generate_report(payload: dict) -> str: + user_prompt = f"""Based on this review analysis, write a consultant report. + +DATA: +{json.dumps(payload, indent=2)} + +SECTIONS: +1. Executive Summary (3 sentences max) +2. Top Strengths (what's working, with stats) +3. Critical Issues (what needs attention, with stats and trends) +4. Staff Performance (heroes and concerns if present) +5. Recommended Actions (3-5 specific steps, prioritized) + +Keep total length under 600 words.""" + + response = llm_client.chat( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_prompt} + ], + max_tokens=1500 + ) + return response.content +``` + +### Full Report Generation Function + +```python +def generate_full_report(business_id: str, + current_start: date, current_end: date, + prior_start: date, prior_end: date) -> str: + """Generate complete report for a business.""" + + # Fetch spans + current_spans = fetch_spans(business_id, current_start, current_end) + prior_spans = fetch_spans(business_id, prior_start, prior_end) + + if not current_spans: + return "Insufficient data for report." + + # Get unique review IDs + current_reviews = set(s['review_id'] for s in current_spans) + prior_reviews = set(s['review_id'] for s in prior_spans) + + # Cluster current period + current_topics, noise_spans = cluster_spans(current_spans) + + # Compute stats for current topics + for topic in current_topics: + topic['stats'] = compute_topic_stats(topic, current_reviews) + + # Label topics (with deduplication) + used_labels = set() + for topic in current_topics: + topic['label'] = select_label(topic, used_labels) + + # Cluster and compute stats for prior period + prior_topics = [] + if prior_spans: + prior_topics, _ = cluster_spans(prior_spans) + for topic in prior_topics: + topic['stats'] = compute_topic_stats(topic, prior_reviews) + + # Match trends + match_trends(current_topics, prior_topics) + + # Aggregate staff + staff = aggregate_staff(current_spans, current_reviews) + + # Build payload (include noise_spans for quote selection) + payload = build_payload( + business_id, + (current_start, current_end), + current_topics, + noise_spans, # Pass noise spans for quote selection + staff, + len(current_reviews) + ) + + # Generate report + return generate_report(payload) +``` + +--- + +## E. Summary of Design Decisions + +### What We Do + +| Decision | Rationale | +|----------|-----------| +| Ephemeral topics (no persistent catalog) | Eliminates drift, merge logic, thresholds | +| Cluster all spans together | One topic can have pos/neg breakdown; avoids duplicates | +| Fallback clustering for large datasets | PCA + KMeans when >4000 spans (O(n) vs O(n²)) | +| Review-level presence for stats | Defensible claims ("X% of customers") | +| Wilson intervals + publish gates | Statistical rigor | +| Centroid-based trend matching | Stable identity regardless of label changes | +| Separate trend_neg/trend_pos | Correct trends for both issues and strengths | +| Representative + sharp quotes | Best of both: centroid-closest + highest confidence | +| Representative span labels | Human-readable, no stopwords/NLP needed | +| Unicode-safe label dedup | Works for Spanish, German, etc. | +| Multi-anchor sentiment | Better multilingual alignment than bag sentence | +| Guarded staff extraction | Reduces false positives | +| Single LLM call | Cost control | + +### What We Don't Do + +| Avoided | Why | +|---------|-----| +| Persistent topic catalog | Adds state, drift, merge complexity | +| Topic assignment at ingest | Unnecessary; cluster at report time | +| Span-count stats | Inflates rates; review-level is correct | +| TF-IDF with stopwords | Brittle; representative spans are better | +| Split on "and/y/und" | Over-splits positive phrases | +| POS tagging for labels | Heavy dependency; regex cleanup is sufficient | +| Translation | Multilingual embeddings + multi-language anchors handle it | +| Sentiment classifier | Multi-anchor approach works across languages | + +### Statistical Gates + +| Gate | Threshold | Purpose | +|------|-----------|---------| +| Minimum k | 8 | Topic must have enough mentions | +| Minimum n | 20 | Period must have enough reviews | +| CI width | ≤ 0.30 | Reject imprecise estimates | +| Trend match sim | ≥ 0.70 | Confident topic match | +| Trend margin | ≥ 0.05 | Clear winner vs alternatives | +| Both periods min | k≥8, n≥20 | Trend requires data on both sides | + +### Trend Handling + +- **Accurate when**: Topic structure is stable (most real issues) +- **Omitted when**: Match confidence is low +- **Separate trends**: `trend_neg` and `trend_pos` computed independently +- **Never**: Show confidently wrong trends + +--- + +## F. Implementation Plan + +| Day | Deliverable | +|-----|-------------| +| 1-2 | Span splitter + embedding service | +| 3-4 | Sentiment scoring + staff extraction | +| 5-6 | Database schema + ingest pipeline | +| 7-8 | Clustering + stats + labeling | +| 9-10 | Trend matching + quote selection | +| 11-12 | LLM integration + end-to-end testing | + +**Total: ~12 days for a competent engineer** + +--- + +## G. What's NOT in v1 + +| Feature | Rationale | v2 Trigger | +|---------|-----------|------------| +| Token-window segmentation | Punctuation split is good enough | Run-on reviews cause quality issues | +| Many-to-many trend matching | Best-match is good enough | Trend accuracy complaints | +| Owner-driven topic editing | Not needed yet | Users want to rename/merge topics | +| Multi-location rollup | Different product | Chain restaurants sign up | +| Anomaly detection | Different product | Fraud complaints | +| Response templates | Low value | User requests | + +--- + +## H. Known Limitations / Future Improvements + +| Limitation | Impact | v2 Consideration | +|------------|--------|------------------| +| Sentiment anchors cover EN/ES/DE only | Other languages (FR, PT, IT, etc.) rely on multilingual-e5 alignment | Add 5-10 anchors per new language as user base grows | +| KMeans fallback uses pseudo-noise heuristic | Sharp quotes may be slightly less sharp for >4k span reports | Consider HDBSCAN with approximate nearest neighbors (pynndescent) | +| No streaming for very large reports | Memory pressure if report spans exceed 10k | Paginate or sample spans for extreme cases | + +--- + +## I. Final Checklist Before Ship + +- [ ] Span splitter handles mobile text (no punctuation edge case) +- [ ] Embeddings are L2-normalized before clustering +- [ ] HDBSCAN uses precomputed cosine distance matrix +- [ ] Clustering has fallback for >4000 spans (PCA + KMeans) +- [ ] KMeans fallback generates pseudo-noise (bottom 3% by centroid distance) +- [ ] Stats are review-level presence (not span counts) +- [ ] Labels are deduplicated across topics (Unicode-safe) +- [ ] Trends computed separately for neg/pos (trend_neg, trend_pos) +- [ ] Trends require min support in BOTH periods +- [ ] Sentiment anchors are multi-word averaged (not bag sentence) +- [ ] Sentiment anchors include EN/ES/DE words +- [ ] Staff history lookup uses normalized names +- [ ] noise_spans passed to quote selection +- [ ] pgvector index uses HNSW (or ivfflat with ANALYZE documented) +- [ ] LLM prompt enforces "only use provided numbers" +- [ ] Cost per report < $0.30 + +--- + +**Document version**: v1-final-reviewed +**Status**: Ready for implementation (with reviewer fixes applied) diff --git a/test_metadata_extraction.py b/test_metadata_extraction.py new file mode 100644 index 0000000..8a92b28 --- /dev/null +++ b/test_metadata_extraction.py @@ -0,0 +1,398 @@ +#!/usr/bin/env python3 +""" +Test metadata extraction: category, review topics, about info. +Uses robust selectors (aria-labels, roles, jsaction) to avoid breakage. +""" +import time +import json +from seleniumbase import Driver +from selenium.webdriver.common.by import By + +# Expected values for validation +EXPECTED = { + "name": "R. Fleitas Peluqueros", + "category": "Barber shop", + "review_topics": ["hair salon", "cutting", "price", "siblings", "beard"], + "about_sections": ["Accessibility", "Amenities", "Planning", "Payments", "Children"] +} + +def extract_metadata(driver, url: str) -> dict: + """Extract all business metadata from Google Maps.""" + + # Force English + if 'hl=' not in url: + url = f"{url}{'&' if '?' in url else '?'}hl=en&gl=us" + + print(f" Loading URL: {url[:70]}...") + driver.get(url) + + # Handle consent popup - poll with 10ms sleep (same as production scraper) + start = time.time() + while time.time() - start < 5: + if "consent.google" in driver.current_url: + print(" 🍪 Consent page detected, clicking accept...") + try: + for btn in driver.find_elements(By.CSS_SELECTOR, "button"): + txt = btn.text.lower() + if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt: + btn.click() + print(f" ✅ Clicked: '{btn.text}', reloading...") + driver.get(url) + break + except: + pass + break + if "maps/place" in driver.current_url or ("maps" in driver.current_url and "consent" not in driver.current_url): + break + time.sleep(0.01) # 10ms polling + + # Wait for page to stabilize + time.sleep(1) + + result = { + "name": None, + "category": None, + "rating": None, + "total_reviews": None, + "review_topics": [], + "about": {} + } + + # ========== OVERVIEW TAB (default) ========== + print("\n📍 Extracting from OVERVIEW tab...") + + overview_data = driver.execute_script(""" + var data = {name: null, category: null, rating: null, total_reviews: null}; + + // Business name - h1 is stable + var h1 = document.querySelector('h1'); + if (h1) data.name = h1.textContent.trim(); + + // Category - use jsaction attribute (more stable than class) + var catBtn = document.querySelector('button[jsaction*="category"]'); + if (catBtn) data.category = catBtn.textContent.trim(); + + // Fallback: look for button after rating that's not a link + if (!data.category) { + var buttons = document.querySelectorAll('button'); + for (var btn of buttons) { + var text = btn.textContent.trim(); + // Categories are short words, no numbers, not navigation + if (text && text.length < 50 && !text.match(/^[0-9]/) && + !text.match(/review|star|direction|save|share|photo/i)) { + // Check if it's near the rating area + var parent = btn.closest('.LBgpqf, .skqShb, .fontBodyMedium'); + if (parent) { + data.category = text; + break; + } + } + } + } + + // Rating and reviews from aria-labels (stable) + var spans = document.querySelectorAll('span[role="img"]'); + for (var span of spans) { + var label = span.getAttribute('aria-label') || ''; + + // Rating: "4.8 stars" + var rMatch = label.match(/^([\\d,.]+)\\s*star/i); + if (rMatch && !data.rating) { + data.rating = parseFloat(rMatch[1].replace(',', '.')); + } + + // Reviews: "79 reviews" + var revMatch = label.match(/^([\\d,]+)\\s*review/i); + if (revMatch && !data.total_reviews) { + data.total_reviews = parseInt(revMatch[1].replace(/,/g, '')); + } + } + + return data; + """) + + result.update(overview_data) + print(f" Name: {result['name']}") + print(f" Category: {result['category']}") + print(f" Rating: {result['rating']}") + print(f" Reviews: {result['total_reviews']}") + + # ========== REVIEWS TAB ========== + print("\n📝 Clicking REVIEWS tab...") + + # Click reviews tab using aria-label or role (robust) + clicked = driver.execute_script(""" + // Try multiple selectors for reviews tab + var selectors = [ + 'button[aria-label*="Review"]', + 'button[data-tab-index="1"]', + 'div[role="tablist"] button:nth-child(2)', + 'button[jsaction*="review"]' + ]; + + for (var sel of selectors) { + var btn = document.querySelector(sel); + if (btn && btn.textContent.toLowerCase().includes('review')) { + btn.click(); + return true; + } + } + + // Fallback: find by text content + var buttons = document.querySelectorAll('button'); + for (var btn of buttons) { + if (btn.textContent.trim().toLowerCase() === 'reviews') { + btn.click(); + return true; + } + } + return false; + """) + + if clicked: + time.sleep(1.5) # Wait for tab to load + + # Extract review topics from radiogroup (very stable selector) + topics = driver.execute_script(""" + var topics = []; + + // Primary: use role="radiogroup" with aria-label="Refine reviews" + var container = document.querySelector('div[role="radiogroup"][aria-label*="Refine"], div[role="radiogroup"][aria-label*="refine"]'); + + if (!container) { + // Fallback: any radiogroup in the reviews area + container = document.querySelector('div[role="radiogroup"]'); + } + + if (container) { + var buttons = container.querySelectorAll('button[role="radio"]'); + for (var btn of buttons) { + var label = btn.getAttribute('aria-label') || ''; + // Parse "hair salon, mentioned in 4 reviews" or just get the topic name + var match = label.match(/^([^,]+),\\s*mentioned in (\\d+)/i); + if (match) { + topics.push({ + topic: match[1].trim(), + count: parseInt(match[2]) + }); + } else if (label && !label.toLowerCase().includes('all review')) { + // Might be in different format + var countSpan = btn.querySelector('.bC3Nkc, .fontBodySmall'); + var nameSpan = btn.querySelector('.uEubGf, span:first-child'); + if (nameSpan) { + var name = nameSpan.textContent.trim(); + var count = countSpan ? parseInt(countSpan.textContent) : 0; + if (name && name.toLowerCase() !== 'all') { + topics.push({topic: name, count: count}); + } + } + } + } + } + + return topics; + """) + + result['review_topics'] = topics + print(f" Found {len(topics)} review topics:") + for t in topics: + print(f" - {t['topic']}: {t['count']} mentions") + else: + print(" ⚠️ Could not click Reviews tab") + + # ========== ABOUT TAB ========== + print("\n📋 Clicking ABOUT tab...") + + clicked = driver.execute_script(""" + // Try multiple selectors for about tab + var selectors = [ + 'button[aria-label*="About"]', + 'button[data-tab-index="2"]', + 'div[role="tablist"] button:nth-child(3)', + 'button[jsaction*="about"]' + ]; + + for (var sel of selectors) { + var btn = document.querySelector(sel); + if (btn && btn.textContent.toLowerCase().includes('about')) { + btn.click(); + return true; + } + } + + // Fallback: find by text content + var buttons = document.querySelectorAll('button'); + for (var btn of buttons) { + if (btn.textContent.trim().toLowerCase() === 'about') { + btn.click(); + return true; + } + } + return false; + """) + + if clicked: + time.sleep(1.5) # Wait for tab to load + + # Extract about sections using aria-label and role (stable) + about = driver.execute_script(""" + var about = {}; + + // Find the about region by aria-label or role + var container = document.querySelector('div[role="region"][aria-label*="About"]'); + + if (!container) { + // Fallback: look for the scrollable area with sections + container = document.querySelector('.m6QErb[aria-label*="About"]'); + } + + if (!container) { + // Last resort: find sections by h2 headers + container = document; + } + + // Find all section headers (h2 elements) + var sections = container.querySelectorAll('h2'); + + for (var h2 of sections) { + var sectionName = h2.textContent.trim(); + var items = []; + + // Find the ul list following this h2 + var parent = h2.closest('.iP2t7d, div'); + if (parent) { + var listItems = parent.querySelectorAll('li span[aria-label]'); + for (var li of listItems) { + var label = li.getAttribute('aria-label'); + if (label) { + // Parse "Has toilet" or "No wheelchair-accessible car park" + var hasFeature = !label.toLowerCase().startsWith('no '); + var featureName = label.replace(/^(Has |No )/i, ''); + items.push({ + feature: featureName, + available: hasFeature + }); + } + } + } + + if (sectionName && items.length > 0) { + about[sectionName] = items; + } + } + + return about; + """) + + result['about'] = about + print(f" Found {len(about)} about sections:") + for section, items in about.items(): + print(f" {section}:") + for item in items: + status = "✓" if item['available'] else "✗" + print(f" {status} {item['feature']}") + else: + print(" ⚠️ Could not click About tab") + + return result + + +def validate_results(result: dict) -> bool: + """Validate extracted data against expected values.""" + print("\n" + "="*60) + print("🔍 VALIDATION:") + print("="*60) + + all_passed = True + + # Check name + if result['name'] == EXPECTED['name']: + print(f" ✅ Name: {result['name']}") + else: + print(f" ❌ Name: got '{result['name']}', expected '{EXPECTED['name']}'") + all_passed = False + + # Check category + if result['category'] == EXPECTED['category']: + print(f" ✅ Category: {result['category']}") + else: + print(f" ❌ Category: got '{result['category']}', expected '{EXPECTED['category']}'") + all_passed = False + + # Check review topics (at least some should match) + extracted_topics = [t['topic'].lower() for t in result.get('review_topics', [])] + expected_topics = [t.lower() for t in EXPECTED['review_topics']] + matching = [t for t in expected_topics if t in extracted_topics] + + if len(matching) >= 3: # At least 3 topics should match + print(f" ✅ Review topics: {len(matching)}/{len(expected_topics)} matched") + else: + print(f" ❌ Review topics: only {len(matching)}/{len(expected_topics)} matched") + print(f" Expected: {expected_topics}") + print(f" Got: {extracted_topics}") + all_passed = False + + # Check about sections (at least some should be present) + about_sections = list(result.get('about', {}).keys()) + expected_sections = EXPECTED['about_sections'] + matching_sections = [s for s in expected_sections if s in about_sections] + + if len(matching_sections) >= 3: + print(f" ✅ About sections: {len(matching_sections)}/{len(expected_sections)} matched") + else: + print(f" ❌ About sections: only {len(matching_sections)}/{len(expected_sections)} matched") + print(f" Expected: {expected_sections}") + print(f" Got: {about_sections}") + all_passed = False + + return all_passed + + +def main(): + url = "https://www.google.com/maps/search/?api=1&query=R.+Fleitas+Peluqueros+Gran+Canaria" + + print("🚀 Starting metadata extraction test...") + print(f" URL: {url[:60]}...") + + driver = Driver(uc=True, headless=False) + + try: + # Set geolocation + try: + driver.execute_cdp_cmd('Emulation.setGeolocationOverride', { + 'latitude': 42.3601, 'longitude': -71.0589, 'accuracy': 100 + }) + except: + pass + + result = extract_metadata(driver, url) + + print("\n" + "="*60) + print("📊 FULL RESULT:") + print("="*60) + print(json.dumps(result, indent=2, ensure_ascii=False)) + + passed = validate_results(result) + + print("\n" + "="*60) + if passed: + print("🎉 ALL VALIDATIONS PASSED!") + else: + print("⚠️ SOME VALIDATIONS FAILED") + print("="*60) + + print("\n👀 Browser stays open for 15 seconds...") + time.sleep(15) + + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + traceback.print_exc() + time.sleep(10) + finally: + driver.quit() + print("🔒 Browser closed") + + +if __name__ == "__main__": + main()