Add ReviewIQ pipeline spec and metadata extraction test

- reviewiq-pipeline-v1-final.md: Earlier pipeline specification
- test_metadata_extraction.py: Test script for metadata extraction

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 11:21:33 +00:00
parent 59368a5bd5
commit 3da243be79
2 changed files with 1390 additions and 0 deletions

View File

@@ -0,0 +1,992 @@
# ReviewIQ Pipeline v1 — Final Architecture
**Design principle**: Minimum state, defensible stats, multilingual, robust to messy mobile text, 1 LLM call per report, <$0.30/report.
**Core decision**: Do not persist topics. Persist only enriched spans. Build topics at report time via clustering and match across periods for trends.
---
## A. Architecture Overview
```
INGEST (continuous, stateless, ~$0.00)
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Raw Review │────▶│ Span │────▶│ Embed + │────▶│ Store │
│ (text,rating,│ │ Splitter │ │ Sentiment │ │ Enriched │
│ date, lang) │ │ │ │ + NER │ │ Spans │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
No topic assignment at ingest. Just store enriched spans.
REPORT (per request, ~$0.20)
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Fetch │────▶│ Cluster │────▶│ Stats + │────▶│ LLM │
│ Spans │ │ (HDBSCAN) │ │ Labels + │ │ Narrate │
│ │ │ │ │ Quotes │ │ (1 call) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
Topics are ephemeral. They exist only for this report.
Trends are computed by matching clusters across periods via centroid similarity.
```
### Cost Model
| Stage | When | Cost | Notes |
|-------|------|------|-------|
| Span splitting | Per review ingested | $0.00 | Regex only |
| Embedding | Per span ingested | $0.00 | Local model, batched |
| Sentiment | Per span ingested | $0.00 | Embedding math (EN/ES/DE multi-anchor) |
| NER (staff) | Per span ingested | $0.00 | spaCy, guarded |
| Clustering | Per report | $0.00 | HDBSCAN <4k spans, PCA+KMeans fallback |
| Stats + labels | Per report | $0.00 | Python/SQL |
| LLM narration | Per report | ~$0.15-0.25 | Single API call |
**Total: ~$0.20/report** (dominated by LLM)
---
## B. Data Model (Only What Persists)
### 1. Raw Reviews
```sql
CREATE TABLE reviews (
review_id TEXT PRIMARY KEY,
business_id TEXT NOT NULL,
text TEXT NOT NULL,
rating INT NOT NULL,
date TIMESTAMP,
source TEXT DEFAULT 'google',
ingested_at TIMESTAMP DEFAULT NOW()
);
```
### 2. Enriched Spans (The Only ML Artifact)
```sql
CREATE TABLE spans (
span_id TEXT PRIMARY KEY,
review_id TEXT REFERENCES reviews(review_id),
business_id TEXT NOT NULL,
span_index INT NOT NULL,
text TEXT NOT NULL,
embedding VECTOR(384),
sentiment TEXT, -- 'positive', 'negative', 'neutral'
sentiment_score FLOAT,
staff_mentions TEXT[], -- guarded extraction
date TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_spans_business_date ON spans(business_id, date);
-- Embedding index: prefer HNSW if available (pgvector 0.5+), otherwise ivfflat
-- HNSW: no training required, better query performance
CREATE INDEX idx_spans_embedding ON spans USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Alternative for older pgvector (requires ANALYZE after bulk inserts):
-- CREATE INDEX idx_spans_embedding ON spans USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);
-- ANALYZE spans; -- Required after bulk insert for ivfflat to work correctly
```
### 3. Review-Topic Presence (Computed at Report Time, Not Stored)
Topics are ephemeral. Presence is computed per report, not persisted.
---
## C. Ingest Pipeline
### Step 1: Span Splitting
Split on punctuation. Fallback split on contrast markers. Merge tiny fragments.
```python
import re
CONTRAST_RE = re.compile(
r'\b(?:but|pero|aber|aunque|however|though|although|yet|still|sin embargo)\b',
re.IGNORECASE
)
def split_spans(text: str) -> list[str]:
# Split on punctuation (good enough for most text, with contrast fallback)
parts = re.split(r'[.!?;:,]\s*|\s{2,}', text)
parts = [p.strip() for p in parts if len(p.strip()) >= 12]
# Fallback split on contrast markers
refined = []
for p in parts:
if CONTRAST_RE.search(p):
sub = [s.strip() for s in CONTRAST_RE.split(p)]
# Merge tiny fragments back
merged = []
for s in sub:
if not s:
continue
if len(s) < 12 and merged:
merged[-1] = merged[-1] + ' ' + s
else:
merged.append(s)
refined.extend([m for m in merged if len(m) >= 12])
else:
refined.append(p)
return refined
```
**Note**: Do NOT split on "and/y/und" by default — these often connect positive qualities ("friendly and fast").
### Step 2: Embedding
Use multilingual model. No translation needed.
```python
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('intfloat/multilingual-e5-small')
def embed_spans(spans: list[str]) -> np.ndarray:
return model.encode(spans, normalize_embeddings=True)
```
### Step 3: Sentiment (Anchor-Based)
Score sentiment via embedding distance to polar anchors. Works across all languages.
**Note**: Encode multiple short anchors separately, normalize, then average. This gives
better multilingual alignment than a single "bag sentence".
```python
# Multiple short anchors for better multilingual alignment
# Include ES/DE anchors for improved cross-language recall
POSITIVE_WORDS = [
# English
"excellent", "wonderful", "amazing", "great", "fantastic",
"delicious", "friendly", "helpful", "perfect", "outstanding",
# Spanish
"excelente", "increíble", "delicioso", "amable", "rápido",
# German
"toll", "lecker", "freundlich", "schnell", "perfekt",
]
NEGATIVE_WORDS = [
# English
"terrible", "awful", "horrible", "bad", "disgusting",
"rude", "slow", "dirty", "broken", "disappointing",
# Spanish
"horrible", "sucio", "lento", "grosero", "caro",
# German
"schlecht", "langsam", "unhöflich", "dreckig", "teuer",
]
def _compute_anchor(words: list[str]) -> np.ndarray:
"""Encode multiple anchors, normalize each, then average.
Deduplicates words to avoid implicit weighting.
"""
unique_words = list(dict.fromkeys(words)) # Preserve order, remove dupes
embeddings = model.encode(unique_words, normalize_embeddings=True)
avg = embeddings.mean(axis=0)
return avg / np.linalg.norm(avg) # Re-normalize the average
POSITIVE_ANCHOR = _compute_anchor(POSITIVE_WORDS)
NEGATIVE_ANCHOR = _compute_anchor(NEGATIVE_WORDS)
def score_sentiment(embedding: np.ndarray) -> tuple[str, float]:
pos_sim = embedding @ POSITIVE_ANCHOR
neg_sim = embedding @ NEGATIVE_ANCHOR
score = (pos_sim - neg_sim) / (pos_sim + neg_sim + 1e-6)
if score > 0.15:
return ('positive', float(score))
elif score < -0.15:
return ('negative', float(abs(score)))
else:
return ('neutral', 0.0)
```
### Step 4: Staff Extraction (Guarded)
Use spaCy NER, but only count as staff when guarded:
```python
import spacy
nlp = spacy.load('xx_ent_wiki_sm') # multilingual
ROLE_WORDS = {'server', 'waiter', 'waitress', 'manager', 'chef', 'doctor',
'nurse', 'receptionist', 'mesero', 'gerente', 'doctor', 'kellner'}
def extract_staff(text: str, business_history: dict = None) -> list[str]:
doc = nlp(text)
staff = []
for ent in doc.ents:
if ent.label_ != 'PERSON':
continue
name = ent.text.strip()
normalized = normalize_name(name) # Normalize early for consistent lookup
context = text[max(0, ent.start_char-30):ent.end_char+30].lower()
# Guard 1: Near role word
if any(role in context for role in ROLE_WORDS):
staff.append(normalized)
continue
# Guard 2: Appears in thanks pattern
if any(p in context for p in ['thank', 'gracias', 'danke', 'shout out', 'kudos']):
staff.append(normalized)
continue
# Guard 3: Frequent across reviews (if history available)
# Use normalized name for lookup (history keys are also normalized)
if business_history and business_history.get(normalized, 0) >= 3:
staff.append(normalized)
return list(set(staff))
def normalize_name(name: str) -> str:
return ' '.join(name.strip().title().split())
```
### Full Ingest Function
```python
def ingest_review(review: dict) -> list[dict]:
spans = split_spans(review['text'])
if not spans:
return []
embeddings = embed_spans(spans)
enriched = []
for i, (text, emb) in enumerate(zip(spans, embeddings)):
sentiment, confidence = score_sentiment(emb)
staff = extract_staff(text)
enriched.append({
'span_id': f"{review['review_id']}_{i}",
'review_id': review['review_id'],
'business_id': review['business_id'],
'span_index': i,
'text': text,
'embedding': emb,
'sentiment': sentiment,
'sentiment_score': confidence,
'staff_mentions': staff if staff else None,
'date': review['date'],
})
return enriched
```
---
## D. Report Generation
### Step 1: Fetch Spans
```python
def fetch_spans(business_id: str, start: date, end: date) -> list[dict]:
return db.query("""
SELECT span_id, review_id, text, embedding, sentiment,
sentiment_score, staff_mentions, date
FROM spans
WHERE business_id = %s AND date >= %s AND date < %s
""", [business_id, start, end])
```
### Step 2: Cluster Spans (Ephemeral Topics)
Cluster ALL spans together (not pos/neg separately). Compute sentiment breakdown within each cluster.
**Scalability note**: Full distance matrix is O(n²) memory/time. For large span counts,
we fall back to PCA + MiniBatchKMeans.
```python
import hdbscan
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import MiniBatchKMeans
MAX_SPANS_FOR_HDBSCAN = 4000 # Beyond this, O(n²) distance matrix is too expensive
def cluster_spans(spans: list[dict]) -> tuple[list[dict], list[dict]]:
"""Returns (topics, noise_spans)
Uses HDBSCAN for small datasets, falls back to PCA+KMeans for large ones.
"""
if len(spans) > MAX_SPANS_FOR_HDBSCAN:
return _cluster_spans_fallback(spans)
embeddings = np.array([s['embedding'] for s in spans])
# L2-normalize and compute distance matrix
normed = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
dist_matrix = 1 - (normed @ normed.T)
np.fill_diagonal(dist_matrix, 0)
clusterer = hdbscan.HDBSCAN(
min_cluster_size=10, # Aligned with publish gate
min_samples=5,
metric='precomputed'
)
labels = clusterer.fit_predict(dist_matrix)
# Group spans by cluster
topics = {}
noise_spans = []
for span, label in zip(spans, labels):
if label == -1:
# Keep high-confidence noise for quotes
if abs(span['sentiment_score']) > 0.5:
noise_spans.append(span)
continue
if label not in topics:
topics[label] = {'spans': [], 'embeddings': []}
topics[label]['spans'].append(span)
topics[label]['embeddings'].append(span['embedding'])
# Compute centroids
result = []
for label, data in topics.items():
embs = np.array(data['embeddings'])
centroid = embs.mean(axis=0)
centroid = centroid / np.linalg.norm(centroid)
result.append({
'cluster_id': label,
'spans': data['spans'],
'embeddings': embs,
'centroid': centroid,
})
return result, noise_spans
def _cluster_spans_fallback(spans: list[dict]) -> tuple[list[dict], list[dict]]:
"""Fallback clustering for large datasets using PCA + MiniBatchKMeans.
Trades cluster quality for O(n) scalability.
Generates pseudo-noise from spans far from their cluster centroid.
Requires: Each span must have 'embedding' and 'sentiment_score' populated.
"""
embeddings = np.array([s['embedding'] for s in spans])
# Reduce dimensionality
pca = PCA(n_components=50)
reduced = pca.fit_transform(embeddings)
# Estimate k (heuristic: sqrt(n/10), clamped)
k = max(5, min(50, int(np.sqrt(len(spans) / 10))))
kmeans = MiniBatchKMeans(n_clusters=k, batch_size=256, n_init=3)
labels = kmeans.fit_predict(reduced)
# Group spans by cluster
topics = {}
for span, emb, label in zip(spans, embeddings, labels):
if label not in topics:
topics[label] = {'spans': [], 'embeddings': []}
topics[label]['spans'].append(span)
topics[label]['embeddings'].append(emb)
# Compute centroids and identify pseudo-noise (bottom 3% by similarity)
result = []
all_distances = [] # (distance, span) tuples for pseudo-noise selection
for label, data in topics.items():
embs = np.array(data['embeddings'])
centroid = embs.mean(axis=0)
centroid = centroid / np.linalg.norm(centroid)
# Compute similarities to centroid
normed_embs = embs / np.linalg.norm(embs, axis=1, keepdims=True)
sims = normed_embs @ centroid
# Track distances for pseudo-noise
for span, sim in zip(data['spans'], sims):
all_distances.append((1 - sim, span))
result.append({
'cluster_id': label,
'spans': data['spans'],
'embeddings': embs,
'centroid': centroid,
})
# Pseudo-noise: bottom 3% by similarity (farthest from any centroid)
# Only include high-confidence sentiment spans (same as HDBSCAN noise handling)
all_distances.sort(key=lambda x: x[0], reverse=True)
noise_cutoff = int(len(all_distances) * 0.03)
pseudo_noise = [
span for _, span in all_distances[:noise_cutoff]
if abs(span['sentiment_score']) > 0.5
]
return result, pseudo_noise
```
### Step 3: Compute Review-Level Stats
Stats are review-level presence (not span counts). This is critical for defensible claims.
```python
def compute_topic_stats(topic: dict, all_review_ids: set) -> dict:
"""Compute review-level presence stats."""
spans = topic['spans']
n = len(all_review_ids)
# Review-level presence
reviews_any = set(s['review_id'] for s in spans)
reviews_neg = set(s['review_id'] for s in spans if s['sentiment'] == 'negative')
reviews_pos = set(s['review_id'] for s in spans if s['sentiment'] == 'positive')
k_neg = len(reviews_neg)
k_pos = len(reviews_pos)
return {
'k_any': len(reviews_any),
'k_neg': k_neg,
'k_pos': k_pos,
'n': n,
'rate_neg': k_neg / n if n > 0 else 0,
'rate_pos': k_pos / n if n > 0 else 0,
'ci_neg': wilson_interval(k_neg, n),
'ci_pos': wilson_interval(k_pos, n),
}
def wilson_interval(k: int, n: int, z: float = 1.96) -> tuple[float, float]:
if n == 0:
return (0.0, 1.0)
p = k / n
denom = 1 + z**2 / n
center = (p + z**2 / (2*n)) / denom
margin = (z / denom) * np.sqrt(p*(1-p)/n + z**2/(4*n**2))
return (max(0, center - margin), min(1, center + margin))
```
### Step 4: Label Topics (Representative Spans, No Stopwords)
Topic identity = centroid (for matching). Display label = cleaned representative span (for UI).
```python
import re
EMAIL_RE = re.compile(r'\b\S+@\S+\.\S+\b')
URL_RE = re.compile(r'\b(?:https?://|www\.)\S+\b', re.I)
PHONE_RE = re.compile(r'\b(?:\+?\d[\d .()-]{7,}\d)\b')
LONGDIG_RE = re.compile(r'\b\d{8,}\b')
def beautify_label(text: str) -> str:
"""Clean PII and noise from label text."""
text = ' '.join(text.split())
text = EMAIL_RE.sub('', text)
text = URL_RE.sub('', text)
text = PHONE_RE.sub('', text)
text = LONGDIG_RE.sub('', text)
text = re.sub(r'([!?.]){2,}', r'\1', text)
return text.strip()
def norm_for_dedup(text: str) -> str:
"""Normalize for near-duplicate detection. Unicode-safe for multilingual."""
import unicodedata
# Casefold (stronger than lower() for Unicode)
t = text.casefold()
# Normalize Unicode (NFC form)
t = unicodedata.normalize('NFC', t)
# Replace digits with placeholder
t = re.sub(r'\d+', '#', t)
# Remove punctuation but keep letters from any alphabet (\w includes Unicode letters)
t = re.sub(r'[^\w\s#]+', ' ', t, flags=re.UNICODE)
# Collapse whitespace
t = ' '.join(t.split())
return t
def select_label(topic: dict, used_labels: set) -> str:
"""Select clean, unique display label from representative spans."""
spans = topic['spans']
embeddings = np.array(topic['embeddings'])
centroid = topic['centroid']
# Rank by similarity to centroid
sims = embeddings @ centroid
ranked = np.argsort(sims)[::-1]
for idx in ranked[:15]:
cleaned = beautify_label(spans[idx]['text'])
if not (15 <= len(cleaned) <= 80):
continue
key = norm_for_dedup(cleaned)
if key in used_labels:
continue
used_labels.add(key)
return cleaned
# Fallback: truncate best match
best = beautify_label(spans[ranked[0]]['text'])
return best[:60].rstrip() + ("..." if len(best) > 60 else "")
```
### Step 5: Trend Matching (Centroid-Based)
Match current topics to prior topics by centroid similarity. Never use label text for matching.
**v1 decision**: Compute separate trends for negative and positive rates. This ensures strengths
get correct trend values (not reusing negative-only logic).
```python
def match_trends(current_topics: list, prior_topics: list,
threshold: float = 0.70, margin: float = 0.05,
min_k: int = 8, min_n: int = 20):
"""Match topics across periods for trend computation.
Computes both trend_neg and trend_pos separately.
"""
for curr in current_topics:
stats = curr['stats']
curr['trend_neg'] = None
curr['trend_pos'] = None
curr['trend_match_sim'] = None
if not prior_topics:
continue
# Find best and second-best match by centroid similarity
sims = [(p, float(curr['centroid'] @ p['centroid'])) for p in prior_topics]
sims.sort(key=lambda x: x[1], reverse=True)
best, best_sim = sims[0]
second_sim = sims[1][1] if len(sims) > 1 else 0
# Gate: match must be confident AND clearly better than alternatives
if best_sim < threshold or (best_sim - second_sim) < margin:
continue
curr['trend_match_sim'] = best_sim
# Compute trend for negatives (if both periods have enough data)
if (stats['k_neg'] >= min_k and stats['n'] >= min_n and
best['stats']['k_neg'] >= min_k and best['stats']['n'] >= min_n):
curr['trend_neg'] = stats['rate_neg'] - best['stats']['rate_neg']
# Compute trend for positives (if both periods have enough data)
if (stats['k_pos'] >= min_k and stats['n'] >= min_n and
best['stats']['k_pos'] >= min_k and best['stats']['n'] >= min_n):
curr['trend_pos'] = stats['rate_pos'] - best['stats']['rate_pos']
```
### Step 6: Quote Selection
Pick representative + sharp quotes. Include high-confidence noise spans.
- **Representative**: closest span to centroid (within topic, matching sentiment)
- **Sharp**: highest |sentiment_score| among topic spans + high-confidence noise
```python
def pick_quotes(topic: dict, noise_spans: list, sentiment_filter: str,
k: int = 2) -> list[dict]:
"""Select diverse, high-quality quotes: 1 representative + 1 sharp."""
topic_spans = [s for s in topic['spans'] if s['sentiment'] == sentiment_filter]
centroid = topic['centroid']
quotes = []
seen_reviews = set()
# 1. Representative: closest to centroid
if topic_spans:
embeddings = np.array([s['embedding'] for s in topic_spans])
sims = embeddings @ centroid
ranked_idx = np.argsort(sims)[::-1]
for idx in ranked_idx:
span = topic_spans[idx]
if span['review_id'] in seen_reviews:
continue
if len(span['text']) > 200:
continue
quotes.append({
'text': span['text'],
'sentiment': span['sentiment'],
'date': span['date'],
'type': 'representative',
})
seen_reviews.add(span['review_id'])
break
# 2. Sharp: highest confidence from topic + noise
sharp_candidates = topic_spans + [s for s in noise_spans
if s['sentiment'] == sentiment_filter
and abs(s['sentiment_score']) > 0.5]
sharp_candidates.sort(key=lambda s: abs(s['sentiment_score']), reverse=True)
for span in sharp_candidates:
if span['review_id'] in seen_reviews:
continue
if len(span['text']) > 200:
continue
quotes.append({
'text': span['text'],
'sentiment': span['sentiment'],
'date': span['date'],
'type': 'sharp',
})
seen_reviews.add(span['review_id'])
if len(quotes) >= k:
break
return quotes
```
### Step 7: Staff Aggregation
```python
def aggregate_staff(spans: list[dict], all_review_ids: set) -> dict:
"""Aggregate staff mentions with review-level presence."""
staff_data = {}
for span in spans:
if not span['staff_mentions']:
continue
for name in span['staff_mentions']:
if name not in staff_data:
staff_data[name] = {'pos_reviews': set(), 'neg_reviews': set(), 'quotes': []}
if span['sentiment'] == 'positive':
staff_data[name]['pos_reviews'].add(span['review_id'])
staff_data[name]['quotes'].append(span['text'])
elif span['sentiment'] == 'negative':
staff_data[name]['neg_reviews'].add(span['review_id'])
staff_data[name]['quotes'].append(span['text'])
# Build heroes and concerns
heroes, concerns = [], []
for name, data in staff_data.items():
pos = len(data['pos_reviews'])
neg = len(data['neg_reviews'])
total = pos + neg
if total < 3: # Minimum mentions
continue
entry = {
'name': name,
'positive': pos,
'negative': neg,
'total': total,
'quote': data['quotes'][0] if data['quotes'] else None,
}
if pos > neg and pos >= 3:
heroes.append(entry)
elif neg > pos and neg >= 3:
concerns.append(entry)
heroes.sort(key=lambda x: x['positive'], reverse=True)
concerns.sort(key=lambda x: x['negative'], reverse=True)
return {'heroes': heroes[:3], 'concerns': concerns[:3]}
```
### Step 8: Build LLM Payload
```python
def build_payload(business_id: str, current_period: tuple,
topics: list, noise_spans: list, staff: dict,
review_count: int) -> dict:
"""Build structured payload for LLM narration.
Args:
noise_spans: High-confidence spans not assigned to any cluster.
Used for quote selection.
"""
issues = []
strengths = []
for topic in topics:
stats = topic['stats']
# Issue: significant negative presence
if stats['k_neg'] >= 8 and stats['n'] >= 20:
ci = stats['ci_neg']
if ci[1] - ci[0] <= 0.30: # CI not too wide
issues.append({
'label': topic['label'],
'rate': round(stats['rate_neg'], 3),
'ci': [round(ci[0], 3), round(ci[1], 3)],
'n': stats['k_neg'],
'trend': round(topic['trend_neg'], 3) if topic.get('trend_neg') else None,
'quotes': pick_quotes(topic, noise_spans, 'negative', k=2),
})
# Strength: significant positive presence
if stats['k_pos'] >= 8 and stats['n'] >= 20:
ci = stats['ci_pos']
if ci[1] - ci[0] <= 0.30:
strengths.append({
'label': topic['label'],
'rate': round(stats['rate_pos'], 3),
'ci': [round(ci[0], 3), round(ci[1], 3)],
'n': stats['k_pos'],
'trend': round(topic['trend_pos'], 3) if topic.get('trend_pos') else None,
'quotes': pick_quotes(topic, noise_spans, 'positive', k=2),
})
# Sort by rate
issues.sort(key=lambda x: x['rate'], reverse=True)
strengths.sort(key=lambda x: x['rate'], reverse=True)
return {
'business_id': business_id,
'period': f"{current_period[0]} to {current_period[1]}",
'total_reviews': review_count,
'issues': issues[:5],
'strengths': strengths[:5],
'staff': staff,
}
```
### Step 9: LLM Narration (Single Call)
```python
SYSTEM_PROMPT = """You are a business consultant analyzing customer review data.
Write a clear, actionable report for a small business owner.
RULES:
1. Use ONLY the statistics provided. Never invent numbers.
2. Include confidence intervals when stating percentages.
3. Be direct and actionable. The owner is busy.
4. Prioritize issues by frequency and trend direction.
5. Each recommendation must reference a specific issue from the data."""
def generate_report(payload: dict) -> str:
user_prompt = f"""Based on this review analysis, write a consultant report.
DATA:
{json.dumps(payload, indent=2)}
SECTIONS:
1. Executive Summary (3 sentences max)
2. Top Strengths (what's working, with stats)
3. Critical Issues (what needs attention, with stats and trends)
4. Staff Performance (heroes and concerns if present)
5. Recommended Actions (3-5 specific steps, prioritized)
Keep total length under 600 words."""
response = llm_client.chat(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
max_tokens=1500
)
return response.content
```
### Full Report Generation Function
```python
def generate_full_report(business_id: str,
current_start: date, current_end: date,
prior_start: date, prior_end: date) -> str:
"""Generate complete report for a business."""
# Fetch spans
current_spans = fetch_spans(business_id, current_start, current_end)
prior_spans = fetch_spans(business_id, prior_start, prior_end)
if not current_spans:
return "Insufficient data for report."
# Get unique review IDs
current_reviews = set(s['review_id'] for s in current_spans)
prior_reviews = set(s['review_id'] for s in prior_spans)
# Cluster current period
current_topics, noise_spans = cluster_spans(current_spans)
# Compute stats for current topics
for topic in current_topics:
topic['stats'] = compute_topic_stats(topic, current_reviews)
# Label topics (with deduplication)
used_labels = set()
for topic in current_topics:
topic['label'] = select_label(topic, used_labels)
# Cluster and compute stats for prior period
prior_topics = []
if prior_spans:
prior_topics, _ = cluster_spans(prior_spans)
for topic in prior_topics:
topic['stats'] = compute_topic_stats(topic, prior_reviews)
# Match trends
match_trends(current_topics, prior_topics)
# Aggregate staff
staff = aggregate_staff(current_spans, current_reviews)
# Build payload (include noise_spans for quote selection)
payload = build_payload(
business_id,
(current_start, current_end),
current_topics,
noise_spans, # Pass noise spans for quote selection
staff,
len(current_reviews)
)
# Generate report
return generate_report(payload)
```
---
## E. Summary of Design Decisions
### What We Do
| Decision | Rationale |
|----------|-----------|
| Ephemeral topics (no persistent catalog) | Eliminates drift, merge logic, thresholds |
| Cluster all spans together | One topic can have pos/neg breakdown; avoids duplicates |
| Fallback clustering for large datasets | PCA + KMeans when >4000 spans (O(n) vs O(n²)) |
| Review-level presence for stats | Defensible claims ("X% of customers") |
| Wilson intervals + publish gates | Statistical rigor |
| Centroid-based trend matching | Stable identity regardless of label changes |
| Separate trend_neg/trend_pos | Correct trends for both issues and strengths |
| Representative + sharp quotes | Best of both: centroid-closest + highest confidence |
| Representative span labels | Human-readable, no stopwords/NLP needed |
| Unicode-safe label dedup | Works for Spanish, German, etc. |
| Multi-anchor sentiment | Better multilingual alignment than bag sentence |
| Guarded staff extraction | Reduces false positives |
| Single LLM call | Cost control |
### What We Don't Do
| Avoided | Why |
|---------|-----|
| Persistent topic catalog | Adds state, drift, merge complexity |
| Topic assignment at ingest | Unnecessary; cluster at report time |
| Span-count stats | Inflates rates; review-level is correct |
| TF-IDF with stopwords | Brittle; representative spans are better |
| Split on "and/y/und" | Over-splits positive phrases |
| POS tagging for labels | Heavy dependency; regex cleanup is sufficient |
| Translation | Multilingual embeddings + multi-language anchors handle it |
| Sentiment classifier | Multi-anchor approach works across languages |
### Statistical Gates
| Gate | Threshold | Purpose |
|------|-----------|---------|
| Minimum k | 8 | Topic must have enough mentions |
| Minimum n | 20 | Period must have enough reviews |
| CI width | ≤ 0.30 | Reject imprecise estimates |
| Trend match sim | ≥ 0.70 | Confident topic match |
| Trend margin | ≥ 0.05 | Clear winner vs alternatives |
| Both periods min | k≥8, n≥20 | Trend requires data on both sides |
### Trend Handling
- **Accurate when**: Topic structure is stable (most real issues)
- **Omitted when**: Match confidence is low
- **Separate trends**: `trend_neg` and `trend_pos` computed independently
- **Never**: Show confidently wrong trends
---
## F. Implementation Plan
| Day | Deliverable |
|-----|-------------|
| 1-2 | Span splitter + embedding service |
| 3-4 | Sentiment scoring + staff extraction |
| 5-6 | Database schema + ingest pipeline |
| 7-8 | Clustering + stats + labeling |
| 9-10 | Trend matching + quote selection |
| 11-12 | LLM integration + end-to-end testing |
**Total: ~12 days for a competent engineer**
---
## G. What's NOT in v1
| Feature | Rationale | v2 Trigger |
|---------|-----------|------------|
| Token-window segmentation | Punctuation split is good enough | Run-on reviews cause quality issues |
| Many-to-many trend matching | Best-match is good enough | Trend accuracy complaints |
| Owner-driven topic editing | Not needed yet | Users want to rename/merge topics |
| Multi-location rollup | Different product | Chain restaurants sign up |
| Anomaly detection | Different product | Fraud complaints |
| Response templates | Low value | User requests |
---
## H. Known Limitations / Future Improvements
| Limitation | Impact | v2 Consideration |
|------------|--------|------------------|
| Sentiment anchors cover EN/ES/DE only | Other languages (FR, PT, IT, etc.) rely on multilingual-e5 alignment | Add 5-10 anchors per new language as user base grows |
| KMeans fallback uses pseudo-noise heuristic | Sharp quotes may be slightly less sharp for >4k span reports | Consider HDBSCAN with approximate nearest neighbors (pynndescent) |
| No streaming for very large reports | Memory pressure if report spans exceed 10k | Paginate or sample spans for extreme cases |
---
## I. Final Checklist Before Ship
- [ ] Span splitter handles mobile text (no punctuation edge case)
- [ ] Embeddings are L2-normalized before clustering
- [ ] HDBSCAN uses precomputed cosine distance matrix
- [ ] Clustering has fallback for >4000 spans (PCA + KMeans)
- [ ] KMeans fallback generates pseudo-noise (bottom 3% by centroid distance)
- [ ] Stats are review-level presence (not span counts)
- [ ] Labels are deduplicated across topics (Unicode-safe)
- [ ] Trends computed separately for neg/pos (trend_neg, trend_pos)
- [ ] Trends require min support in BOTH periods
- [ ] Sentiment anchors are multi-word averaged (not bag sentence)
- [ ] Sentiment anchors include EN/ES/DE words
- [ ] Staff history lookup uses normalized names
- [ ] noise_spans passed to quote selection
- [ ] pgvector index uses HNSW (or ivfflat with ANALYZE documented)
- [ ] LLM prompt enforces "only use provided numbers"
- [ ] Cost per report < $0.30
---
**Document version**: v1-final-reviewed
**Status**: Ready for implementation (with reviewer fixes applied)

398
test_metadata_extraction.py Normal file
View File

@@ -0,0 +1,398 @@
#!/usr/bin/env python3
"""
Test metadata extraction: category, review topics, about info.
Uses robust selectors (aria-labels, roles, jsaction) to avoid breakage.
"""
import time
import json
from seleniumbase import Driver
from selenium.webdriver.common.by import By
# Expected values for validation
EXPECTED = {
"name": "R. Fleitas Peluqueros",
"category": "Barber shop",
"review_topics": ["hair salon", "cutting", "price", "siblings", "beard"],
"about_sections": ["Accessibility", "Amenities", "Planning", "Payments", "Children"]
}
def extract_metadata(driver, url: str) -> dict:
"""Extract all business metadata from Google Maps."""
# Force English
if 'hl=' not in url:
url = f"{url}{'&' if '?' in url else '?'}hl=en&gl=us"
print(f" Loading URL: {url[:70]}...")
driver.get(url)
# Handle consent popup - poll with 10ms sleep (same as production scraper)
start = time.time()
while time.time() - start < 5:
if "consent.google" in driver.current_url:
print(" 🍪 Consent page detected, clicking accept...")
try:
for btn in driver.find_elements(By.CSS_SELECTOR, "button"):
txt = btn.text.lower()
if "accept" in txt or "aceptar" in txt or "alle akzeptieren" in txt:
btn.click()
print(f" ✅ Clicked: '{btn.text}', reloading...")
driver.get(url)
break
except:
pass
break
if "maps/place" in driver.current_url or ("maps" in driver.current_url and "consent" not in driver.current_url):
break
time.sleep(0.01) # 10ms polling
# Wait for page to stabilize
time.sleep(1)
result = {
"name": None,
"category": None,
"rating": None,
"total_reviews": None,
"review_topics": [],
"about": {}
}
# ========== OVERVIEW TAB (default) ==========
print("\n📍 Extracting from OVERVIEW tab...")
overview_data = driver.execute_script("""
var data = {name: null, category: null, rating: null, total_reviews: null};
// Business name - h1 is stable
var h1 = document.querySelector('h1');
if (h1) data.name = h1.textContent.trim();
// Category - use jsaction attribute (more stable than class)
var catBtn = document.querySelector('button[jsaction*="category"]');
if (catBtn) data.category = catBtn.textContent.trim();
// Fallback: look for button after rating that's not a link
if (!data.category) {
var buttons = document.querySelectorAll('button');
for (var btn of buttons) {
var text = btn.textContent.trim();
// Categories are short words, no numbers, not navigation
if (text && text.length < 50 && !text.match(/^[0-9]/) &&
!text.match(/review|star|direction|save|share|photo/i)) {
// Check if it's near the rating area
var parent = btn.closest('.LBgpqf, .skqShb, .fontBodyMedium');
if (parent) {
data.category = text;
break;
}
}
}
}
// Rating and reviews from aria-labels (stable)
var spans = document.querySelectorAll('span[role="img"]');
for (var span of spans) {
var label = span.getAttribute('aria-label') || '';
// Rating: "4.8 stars"
var rMatch = label.match(/^([\\d,.]+)\\s*star/i);
if (rMatch && !data.rating) {
data.rating = parseFloat(rMatch[1].replace(',', '.'));
}
// Reviews: "79 reviews"
var revMatch = label.match(/^([\\d,]+)\\s*review/i);
if (revMatch && !data.total_reviews) {
data.total_reviews = parseInt(revMatch[1].replace(/,/g, ''));
}
}
return data;
""")
result.update(overview_data)
print(f" Name: {result['name']}")
print(f" Category: {result['category']}")
print(f" Rating: {result['rating']}")
print(f" Reviews: {result['total_reviews']}")
# ========== REVIEWS TAB ==========
print("\n📝 Clicking REVIEWS tab...")
# Click reviews tab using aria-label or role (robust)
clicked = driver.execute_script("""
// Try multiple selectors for reviews tab
var selectors = [
'button[aria-label*="Review"]',
'button[data-tab-index="1"]',
'div[role="tablist"] button:nth-child(2)',
'button[jsaction*="review"]'
];
for (var sel of selectors) {
var btn = document.querySelector(sel);
if (btn && btn.textContent.toLowerCase().includes('review')) {
btn.click();
return true;
}
}
// Fallback: find by text content
var buttons = document.querySelectorAll('button');
for (var btn of buttons) {
if (btn.textContent.trim().toLowerCase() === 'reviews') {
btn.click();
return true;
}
}
return false;
""")
if clicked:
time.sleep(1.5) # Wait for tab to load
# Extract review topics from radiogroup (very stable selector)
topics = driver.execute_script("""
var topics = [];
// Primary: use role="radiogroup" with aria-label="Refine reviews"
var container = document.querySelector('div[role="radiogroup"][aria-label*="Refine"], div[role="radiogroup"][aria-label*="refine"]');
if (!container) {
// Fallback: any radiogroup in the reviews area
container = document.querySelector('div[role="radiogroup"]');
}
if (container) {
var buttons = container.querySelectorAll('button[role="radio"]');
for (var btn of buttons) {
var label = btn.getAttribute('aria-label') || '';
// Parse "hair salon, mentioned in 4 reviews" or just get the topic name
var match = label.match(/^([^,]+),\\s*mentioned in (\\d+)/i);
if (match) {
topics.push({
topic: match[1].trim(),
count: parseInt(match[2])
});
} else if (label && !label.toLowerCase().includes('all review')) {
// Might be in different format
var countSpan = btn.querySelector('.bC3Nkc, .fontBodySmall');
var nameSpan = btn.querySelector('.uEubGf, span:first-child');
if (nameSpan) {
var name = nameSpan.textContent.trim();
var count = countSpan ? parseInt(countSpan.textContent) : 0;
if (name && name.toLowerCase() !== 'all') {
topics.push({topic: name, count: count});
}
}
}
}
}
return topics;
""")
result['review_topics'] = topics
print(f" Found {len(topics)} review topics:")
for t in topics:
print(f" - {t['topic']}: {t['count']} mentions")
else:
print(" ⚠️ Could not click Reviews tab")
# ========== ABOUT TAB ==========
print("\n📋 Clicking ABOUT tab...")
clicked = driver.execute_script("""
// Try multiple selectors for about tab
var selectors = [
'button[aria-label*="About"]',
'button[data-tab-index="2"]',
'div[role="tablist"] button:nth-child(3)',
'button[jsaction*="about"]'
];
for (var sel of selectors) {
var btn = document.querySelector(sel);
if (btn && btn.textContent.toLowerCase().includes('about')) {
btn.click();
return true;
}
}
// Fallback: find by text content
var buttons = document.querySelectorAll('button');
for (var btn of buttons) {
if (btn.textContent.trim().toLowerCase() === 'about') {
btn.click();
return true;
}
}
return false;
""")
if clicked:
time.sleep(1.5) # Wait for tab to load
# Extract about sections using aria-label and role (stable)
about = driver.execute_script("""
var about = {};
// Find the about region by aria-label or role
var container = document.querySelector('div[role="region"][aria-label*="About"]');
if (!container) {
// Fallback: look for the scrollable area with sections
container = document.querySelector('.m6QErb[aria-label*="About"]');
}
if (!container) {
// Last resort: find sections by h2 headers
container = document;
}
// Find all section headers (h2 elements)
var sections = container.querySelectorAll('h2');
for (var h2 of sections) {
var sectionName = h2.textContent.trim();
var items = [];
// Find the ul list following this h2
var parent = h2.closest('.iP2t7d, div');
if (parent) {
var listItems = parent.querySelectorAll('li span[aria-label]');
for (var li of listItems) {
var label = li.getAttribute('aria-label');
if (label) {
// Parse "Has toilet" or "No wheelchair-accessible car park"
var hasFeature = !label.toLowerCase().startsWith('no ');
var featureName = label.replace(/^(Has |No )/i, '');
items.push({
feature: featureName,
available: hasFeature
});
}
}
}
if (sectionName && items.length > 0) {
about[sectionName] = items;
}
}
return about;
""")
result['about'] = about
print(f" Found {len(about)} about sections:")
for section, items in about.items():
print(f" {section}:")
for item in items:
status = "" if item['available'] else ""
print(f" {status} {item['feature']}")
else:
print(" ⚠️ Could not click About tab")
return result
def validate_results(result: dict) -> bool:
"""Validate extracted data against expected values."""
print("\n" + "="*60)
print("🔍 VALIDATION:")
print("="*60)
all_passed = True
# Check name
if result['name'] == EXPECTED['name']:
print(f" ✅ Name: {result['name']}")
else:
print(f" ❌ Name: got '{result['name']}', expected '{EXPECTED['name']}'")
all_passed = False
# Check category
if result['category'] == EXPECTED['category']:
print(f" ✅ Category: {result['category']}")
else:
print(f" ❌ Category: got '{result['category']}', expected '{EXPECTED['category']}'")
all_passed = False
# Check review topics (at least some should match)
extracted_topics = [t['topic'].lower() for t in result.get('review_topics', [])]
expected_topics = [t.lower() for t in EXPECTED['review_topics']]
matching = [t for t in expected_topics if t in extracted_topics]
if len(matching) >= 3: # At least 3 topics should match
print(f" ✅ Review topics: {len(matching)}/{len(expected_topics)} matched")
else:
print(f" ❌ Review topics: only {len(matching)}/{len(expected_topics)} matched")
print(f" Expected: {expected_topics}")
print(f" Got: {extracted_topics}")
all_passed = False
# Check about sections (at least some should be present)
about_sections = list(result.get('about', {}).keys())
expected_sections = EXPECTED['about_sections']
matching_sections = [s for s in expected_sections if s in about_sections]
if len(matching_sections) >= 3:
print(f" ✅ About sections: {len(matching_sections)}/{len(expected_sections)} matched")
else:
print(f" ❌ About sections: only {len(matching_sections)}/{len(expected_sections)} matched")
print(f" Expected: {expected_sections}")
print(f" Got: {about_sections}")
all_passed = False
return all_passed
def main():
url = "https://www.google.com/maps/search/?api=1&query=R.+Fleitas+Peluqueros+Gran+Canaria"
print("🚀 Starting metadata extraction test...")
print(f" URL: {url[:60]}...")
driver = Driver(uc=True, headless=False)
try:
# Set geolocation
try:
driver.execute_cdp_cmd('Emulation.setGeolocationOverride', {
'latitude': 42.3601, 'longitude': -71.0589, 'accuracy': 100
})
except:
pass
result = extract_metadata(driver, url)
print("\n" + "="*60)
print("📊 FULL RESULT:")
print("="*60)
print(json.dumps(result, indent=2, ensure_ascii=False))
passed = validate_results(result)
print("\n" + "="*60)
if passed:
print("🎉 ALL VALIDATIONS PASSED!")
else:
print("⚠️ SOME VALIDATIONS FAILED")
print("="*60)
print("\n👀 Browser stays open for 15 seconds...")
time.sleep(15)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
time.sleep(10)
finally:
driver.quit()
print("🔒 Browser closed")
if __name__ == "__main__":
main()