New structure: - scrapers/google_reviews/v1_0_0.py (was modules/scraper_clean.py) - scrapers/base.py (BaseScraper interface) - scrapers/registry.py (ScraperRegistry for version routing) - core/database.py, models.py, config.py, enums.py - utils/logger.py, crash_analyzer.py, health_checks.py, helpers.py, date_converter.py - workers/chrome_pool.py - services/webhook_service.py - api/ routes structure (empty, ready for Phase 2) - tests/ structure mirroring source All imports updated in: - api_server_production.py (7 import paths updated) - utils/health_checks.py (scraper import path) Legacy modules moved to modules/_legacy/: - data_storage.py, image_handler.py, s3_handler.py (unused) Syntax verified, frontend build passing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
41 KiB
ReviewIQ: Review Intelligence Pipeline
Version: 2.0 Status: Architecture Specification Date: 2026-01-24
Executive Summary
ReviewIQ transforms customer reviews into actionable business intelligence through a three-stage pipeline:
- Ingest — LLM-powered URT classification with semantic embeddings
- Analyze — Issue lifecycle management with sub-pattern discovery
- Report — Statistically rigorous insights with trend detection
Design Principles:
- Accuracy over heuristics: LLM classification at ingest (~$0.0002/review)
- Taxonomy as structure: URT provides stable, interpretable categories
- Local ML for depth: Sub-clustering reveals actionable patterns within categories
- Feedback loop: CR (Comparative Reference) signals verify resolution effectiveness
Part 1: System Architecture
┌─────────────────────────────────────────────────────────────────────────────────┐
│ REVIEWIQ PIPELINE │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────────────────────────────────────────────┐ │
│ │ │ │ INGEST LAYER │ │
│ │ Reviews │────▶│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │ │
│ │ (Input) │ │ │ Embed │ │ LLM │ │ Store │ │ │
│ │ │ │ │ Review │───▶│Classify │───▶│ (PostgreSQL) │ │ │
│ └─────────────┘ │ └─────────┘ └─────────┘ └─────────────────┘ │ │
│ │ │ │ │ │
│ │ │ ~$0.00 │ ~$0.0002 │ │
│ │ │ (local) │ per review │ │
│ └──────┼──────────────────────────────┼─────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ ISSUE AGGREGATION │ │
│ │ │ │
│ │ V- classified reviews ───▶ Match or Create Issue ───▶ Track State │ │
│ │ │ │
│ │ Rules: Same URT code + entity + location + time window = same issue │ │
│ │ States: DETECTED → ACKNOWLEDGED → IN_PROGRESS → RESOLVED → VERIFIED │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ REPORT GENERATION │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │ │
│ │ │ Aggregate │ │ Sub-Cluster │ │ Trend │ │ LLM │ │ │
│ │ │ by URT Code │──▶│ Within Codes │──▶│ Analysis │──▶│ Narrate │ │ │
│ │ │ (SQL) │ │ (HDBSCAN) │ │ (CR + Rate) │ │ │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ └──────────┘ │ │
│ │ $0.00 $0.00 $0.00 ~$0.15 │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ OUTPUT │ │
│ │ │ │
│ │ • Executive Summary with statistically defensible claims │ │
│ │ • Issues ranked by priority with sub-pattern breakdown │ │
│ │ • Strengths with trend signals │ │
│ │ • Staff performance insights │ │
│ │ • Actionable recommendations │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
Part 2: Ingest Layer
2.1 Design Philosophy
The review is the atomic unit. We do not split reviews into fragments — this preserves context and enables accurate classification. A single review may contain multiple topics; URT's multi-coding (primary + up to 2 secondary codes) handles this naturally.
2.2 Dual Processing
Each review undergoes two parallel operations:
async def ingest_review(review: dict) -> dict:
"""Ingest a single review: embed + classify."""
text = review['text'].strip()
# Parallel execution
embedding_task = asyncio.create_task(embed_review(text))
classification_task = asyncio.create_task(classify_review_llm(text))
embedding = await embedding_task
classification = await classification_task
return {
'review_id': review['review_id'],
'business_id': review['business_id'],
'text': text,
'embedding': embedding,
'date': review['date'],
'rating': review.get('rating'),
**classification, # URT codes, valence, intensity, etc.
}
2.3 Embedding
Local multilingual embeddings for semantic capabilities:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('intfloat/multilingual-e5-small')
def embed_review(text: str) -> np.ndarray:
"""Generate normalized embedding for semantic search and clustering."""
# e5 models perform better with instruction prefix
embedding = model.encode(
f"passage: {text}",
normalize_embeddings=True
)
return embedding # 384 dimensions
Why embed if we have URT codes?
- Sub-clustering within URT codes (pattern discovery)
- Semantic quote selection (centroid-closest)
- Similarity search for emerging patterns
- Backup for low-confidence classifications
2.4 LLM Classification
Single LLM call extracts complete URT classification:
CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).
Analyze the review and return JSON with:
{
"urt_primary": "X1.23", // Main URT subcode
"urt_secondary": ["Y2.34"], // 0-2 additional codes (different domains only)
"valence": "V-", // V+, V-, V0, V±
"intensity": "I2", // I1 (mild), I2 (moderate), I3 (strong)
"comparative": "CR-N", // CR-N (none), CR-B (better), CR-W (worse), CR-S (same)
"staff_mentions": ["Mike"], // Employee names mentioned
"quotes": { // Key phrase for each code
"X1.23": "exact phrase from review",
"Y2.34": "another phrase"
}
}
URT DOMAINS (choose primary from most impactful):
- O (Offering): Product/service quality, function, completeness, fit
- P (People): Staff attitude, competence, responsiveness, communication
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance, safety
- A (Access): Availability, accessibility, inclusivity, convenience
- V (Value): Price, transparency, effort, worth
- R (Relationship): Trust, dependability, recovery, loyalty
RULES:
1. Primary = what customer is MOST affected by
2. Secondary must be DIFFERENT domains (P1.02 + P3.01 is invalid)
3. V± only when genuinely mixed (positive AND negative on different topics)
4. CR-B/W/S only for explicit self-comparison ("better than last time", "still broken")
5. quotes must be EXACT phrases from the review text
Return valid JSON only."""
async def classify_review_llm(text: str) -> dict:
"""Complete URT classification via LLM."""
response = await llm.chat(
model="gpt-4o-mini", # ~$0.0002 per review
messages=[
{"role": "system", "content": CLASSIFICATION_PROMPT},
{"role": "user", "content": text}
],
response_format={"type": "json_object"},
temperature=0.1 # Low temperature for consistency
)
return json.loads(response.content)
2.5 Batch Processing for Efficiency
For bulk ingestion, batch multiple reviews per LLM call:
async def classify_batch(reviews: list[dict], batch_size: int = 10) -> list[dict]:
"""Process reviews in batches for ~40% cost reduction."""
results = []
for i in range(0, len(reviews), batch_size):
batch = reviews[i:i+batch_size]
prompt = BATCH_CLASSIFICATION_PROMPT + "\n\nREVIEWS:\n"
for j, review in enumerate(batch):
prompt += f"\n[{j}] {review['text']}\n---\n"
response = await llm.chat(
model="gpt-4o-mini",
messages=[{"role": "system", "content": prompt}],
response_format={"type": "json_object"}
)
batch_results = json.loads(response.content)["classifications"]
results.extend(batch_results)
return results
2.6 Data Model
-- Core review storage with URT classification
CREATE TABLE reviews (
review_id TEXT PRIMARY KEY,
business_id TEXT NOT NULL,
text TEXT NOT NULL,
embedding VECTOR(384),
date TIMESTAMP NOT NULL,
rating SMALLINT,
-- URT Classification (from LLM)
urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc.
urt_secondary TEXT[] DEFAULT '{}', -- Max 2
valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±'
intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3'
comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'
-- Extracted entities
staff_mentions TEXT[] DEFAULT '{}',
quotes JSONB, -- {"code": "phrase", ...}
-- Metadata
created_at TIMESTAMP DEFAULT NOW(),
classification_model TEXT DEFAULT 'gpt-4o-mini'
);
-- Indexes for query patterns
CREATE INDEX idx_reviews_business_date ON reviews(business_id, date DESC);
CREATE INDEX idx_reviews_urt_primary ON reviews(business_id, urt_primary);
CREATE INDEX idx_reviews_valence ON reviews(business_id, valence, date);
CREATE INDEX idx_reviews_comparative ON reviews(comparative) WHERE comparative != 'CR-N';
CREATE INDEX idx_reviews_embedding ON reviews USING hnsw (embedding vector_cosine_ops);
Part 3: Issue Lifecycle Management
Following the URT Issue Lifecycle Framework (C1), negative feedback (V-) generates trackable issues.
3.1 Issue Aggregation
Multiple reviews about the same problem aggregate into a single issue:
def aggregate_to_issue(review: dict) -> str:
"""Match review to existing issue or create new one."""
if review['valence'] not in ('V-', 'V±'):
return None # Only negative feedback creates issues
# Find matching open issues
matching = db.query("""
SELECT issue_id, primary_subcode, entity, location
FROM issues
WHERE business_id = %s
AND primary_subcode = %s
AND state NOT IN ('VERIFIED', 'DECLINED')
AND created_at > NOW() - INTERVAL '30 days'
""", [review['business_id'], review['urt_primary']])
for issue in matching:
if is_same_issue(review, issue):
# Aggregate to existing issue
add_span_to_issue(issue['issue_id'], review)
recalculate_priority(issue['issue_id'])
return issue['issue_id']
# Check intensity threshold for new issue creation
if should_create_issue(review):
return create_issue(review)
return None # Stored in buffer for future aggregation
def should_create_issue(review: dict) -> bool:
"""Intensity-based issue creation thresholds."""
if review['intensity'] == 'I3':
return True # Critical = immediate issue
# Check aggregation buffer for patterns
similar_count = count_similar_in_buffer(review, window_days=30)
if review['intensity'] == 'I2' and similar_count >= 2:
return True # Moderate + 2 others = issue
if review['intensity'] == 'I1' and similar_count >= 4:
return True # Mild + 4 others = issue
return False
3.2 Issue State Machine
DETECTED
│
┌────────────┼────────────┐
▼ ▼ ▼
ACKNOWLEDGED DECLINED (escalate)
│ │
▼ │
IN_PROGRESS │
│ │
▼ │
RESOLVED ◀────────────────────┘
│
┌─────┼─────┐
▼ ▼
VERIFIED REOPENED
│
└──▶ (back to IN_PROGRESS)
3.3 Priority Scoring
def calculate_priority(issue: dict) -> float:
"""
Priority combines intensity, volume, recency, and recurrence.
P = I_weight × (1 + log(span_count)) × decay(days) × recurrence_boost × trend_modifier
"""
INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}
i_weight = INTENSITY_WEIGHTS[issue['max_intensity']]
volume_factor = 1 + math.log(issue['span_count'])
days_old = (datetime.now() - issue['created_at']).days
decay = math.exp(-0.023 * days_old) # Half-life ~30 days
recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)
# Trend modifier from CR signals
if issue['recent_cr_w_count'] >= 2:
trend_modifier = 1.3 # Worsening
elif issue['recent_cr_b_count'] >= 2:
trend_modifier = 0.7 # Improving
else:
trend_modifier = 1.0 # Stable
return i_weight * volume_factor * decay * recurrence_boost * trend_modifier
3.4 Resolution Verification via CR Signals
The Comparative Reference (CR) dimension enables automatic verification:
def process_cr_signal(review: dict):
"""Handle CR-B/W/S signals for issue lifecycle."""
if review['comparative'] == 'CR-N':
return
# Find resolved issues with matching code
resolved_issues = db.query("""
SELECT issue_id, state, resolved_at
FROM issues
WHERE business_id = %s
AND primary_subcode = %s
AND state IN ('RESOLVED', 'VERIFIED')
AND resolved_at > NOW() - INTERVAL '60 days'
""", [review['business_id'], review['urt_primary']])
for issue in resolved_issues:
if review['comparative'] == 'CR-B':
# Improvement signal → verify resolution
if issue['state'] == 'RESOLVED':
verify_issue(issue['issue_id'], review['review_id'])
elif review['comparative'] in ('CR-S', 'CR-W'):
# Unchanged or worsening → reopen
reopen_issue(issue['issue_id'], review['review_id'])
if review['comparative'] == 'CR-W':
escalate_issue(issue['issue_id'], reason='REGRESSION')
3.5 Issue Data Model
CREATE TABLE issues (
issue_id TEXT PRIMARY KEY,
business_id TEXT NOT NULL,
primary_subcode TEXT NOT NULL,
domain TEXT NOT NULL,
-- State
state TEXT NOT NULL DEFAULT 'DETECTED',
priority_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
-- Aggregation
review_ids TEXT[] NOT NULL,
span_count INT NOT NULL DEFAULT 1,
max_intensity TEXT NOT NULL,
-- Ownership
owner_team TEXT,
owner_individual TEXT,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
acknowledged_at TIMESTAMP,
resolved_at TIMESTAMP,
verified_at TIMESTAMP,
-- Resolution
reopen_count INT DEFAULT 0,
resolution_code TEXT,
resolution_notes TEXT,
decline_reason TEXT,
-- Context
entity TEXT, -- Product, staff member, feature
location TEXT, -- Physical or logical location
causal_codes TEXT[], -- CD-O, MG-T, etc.
-- Verification
verification_window_days INT DEFAULT 60
);
CREATE TABLE issue_events (
event_id SERIAL PRIMARY KEY,
issue_id TEXT REFERENCES issues(issue_id),
event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update'
from_state TEXT,
to_state TEXT,
actor TEXT,
notes TEXT,
review_id TEXT, -- Triggering review if applicable
created_at TIMESTAMP DEFAULT NOW()
);
Part 4: Report Generation
4.1 Report Structure
def generate_report(business_id: str, start: date, end: date) -> dict:
"""Generate comprehensive business intelligence report."""
# 1. Aggregate statistics by URT code
code_stats = compute_code_statistics(business_id, start, end)
# 2. Deep analysis of top issues (sub-clustering)
top_issues = analyze_top_issues(business_id, code_stats, start, end)
# 3. Strength analysis
strengths = analyze_strengths(business_id, code_stats, start, end)
# 4. Trend analysis (vs prior period)
trends = compute_trends(business_id, start, end)
# 5. Staff insights
staff = analyze_staff_mentions(business_id, start, end)
# 6. Open issues summary
open_issues = get_open_issues(business_id)
# 7. Build payload
payload = build_report_payload(
business_id, start, end,
top_issues, strengths, trends, staff, open_issues
)
# 8. LLM narration
narrative = await generate_narrative(payload)
return {
'payload': payload,
'narrative': narrative,
'generated_at': datetime.now().isoformat()
}
4.2 Statistics Computation
Review-level presence with Wilson confidence intervals:
def compute_code_statistics(business_id: str, start: date, end: date) -> list[dict]:
"""Aggregate statistics by URT code with confidence intervals."""
stats = db.query("""
WITH review_codes AS (
SELECT
review_id,
urt_primary as code,
valence,
intensity
FROM reviews
WHERE business_id = %s AND date BETWEEN %s AND %s
UNION ALL
SELECT
review_id,
unnest(urt_secondary) as code,
valence,
intensity
FROM reviews
WHERE business_id = %s AND date BETWEEN %s AND %s
AND array_length(urt_secondary, 1) > 0
),
code_stats AS (
SELECT
code,
COUNT(DISTINCT review_id) as k,
COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V-') as k_neg,
COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V+') as k_pos,
MAX(CASE intensity
WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END) as max_intensity
FROM review_codes
GROUP BY code
)
SELECT
cs.*,
(SELECT COUNT(DISTINCT review_id) FROM reviews
WHERE business_id = %s AND date BETWEEN %s AND %s) as n
FROM code_stats cs
WHERE k >= 3
ORDER BY k_neg DESC
""", [business_id, start, end] * 4)
results = []
for row in stats:
n = row['n']
# Wilson confidence intervals
ci_neg = wilson_ci(row['k_neg'], n) if row['k_neg'] > 0 else (0, 0)
ci_pos = wilson_ci(row['k_pos'], n) if row['k_pos'] > 0 else (0, 0)
results.append({
'code': row['code'],
'domain': row['code'][0],
'name': URT_CODE_NAMES[row['code']],
'k': row['k'],
'k_neg': row['k_neg'],
'k_pos': row['k_pos'],
'n': n,
'rate_neg': row['k_neg'] / n if n > 0 else 0,
'rate_pos': row['k_pos'] / n if n > 0 else 0,
'ci_neg': ci_neg,
'ci_pos': ci_pos,
'max_intensity': f"I{row['max_intensity']}",
})
return results
def wilson_ci(k: int, n: int, z: float = 1.96) -> tuple[float, float]:
"""Wilson score interval for binomial proportion."""
if n == 0:
return (0.0, 0.0)
p = k / n
denom = 1 + z**2 / n
center = (p + z**2 / (2*n)) / denom
margin = z * math.sqrt((p*(1-p) + z**2/(4*n)) / n) / denom
return (max(0, center - margin), min(1, center + margin))
4.3 Sub-Pattern Discovery (Local ML)
The key insight: LLM gives categories, local ML reveals patterns within categories.
def analyze_top_issues(business_id: str, code_stats: list,
start: date, end: date, top_k: int = 5) -> list[dict]:
"""Deep analysis of top negative codes with sub-clustering."""
# Filter to significant negative codes
issues_to_analyze = [
cs for cs in code_stats
if cs['k_neg'] >= 8 and cs['ci_neg'][1] - cs['ci_neg'][0] <= 0.30
][:top_k]
results = []
for code_stat in issues_to_analyze:
code = code_stat['code']
# Fetch all negative reviews for this code
reviews = db.query("""
SELECT review_id, text, embedding, intensity, quotes, date
FROM reviews
WHERE business_id = %s
AND (urt_primary = %s OR %s = ANY(urt_secondary))
AND valence IN ('V-', 'V±')
AND date BETWEEN %s AND %s
""", [business_id, code, code, start, end])
# Sub-cluster to find patterns
sub_patterns = discover_sub_patterns(reviews, code)
results.append({
'code': code,
'name': code_stat['name'],
'total_reviews': code_stat['k_neg'],
'rate': code_stat['rate_neg'],
'ci': code_stat['ci_neg'],
'max_intensity': code_stat['max_intensity'],
'sub_patterns': sub_patterns,
})
return results
def discover_sub_patterns(reviews: list[dict], code: str,
min_cluster_size: int = 3) -> list[dict]:
"""Cluster reviews within a URT code to find actionable sub-patterns."""
if len(reviews) < min_cluster_size * 2:
# Too few for meaningful clustering
return [{
'label': 'General',
'count': len(reviews),
'percentage': 1.0,
'representative_quote': select_representative(reviews, code),
'sharpest_quote': select_sharpest(reviews, code),
}]
embeddings = np.array([r['embedding'] for r in reviews])
# HDBSCAN for small datasets, KMeans for larger
if len(reviews) < 500:
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=2,
metric='euclidean'
)
labels = clusterer.fit_predict(embeddings)
else:
k = min(8, max(3, int(np.sqrt(len(reviews) / 5))))
kmeans = KMeans(n_clusters=k, n_init=3)
labels = kmeans.fit_predict(embeddings)
# Group by cluster
clusters = {}
for review, label in zip(reviews, labels):
if label == -1: # Noise
continue
if label not in clusters:
clusters[label] = []
clusters[label].append(review)
# Build sub-pattern descriptions
patterns = []
for label, cluster_reviews in clusters.items():
if len(cluster_reviews) < min_cluster_size:
continue
cluster_embeddings = np.array([r['embedding'] for r in cluster_reviews])
centroid = cluster_embeddings.mean(axis=0)
centroid /= np.linalg.norm(centroid)
patterns.append({
'label': extract_cluster_label(cluster_reviews, code),
'count': len(cluster_reviews),
'percentage': len(cluster_reviews) / len(reviews),
'representative_quote': select_representative(cluster_reviews, code, centroid),
'sharpest_quote': select_sharpest(cluster_reviews, code),
'avg_intensity': np.mean([
{'I1': 1, 'I2': 2, 'I3': 3}[r['intensity']]
for r in cluster_reviews
]),
'centroid': centroid, # For trend matching
})
patterns.sort(key=lambda x: x['count'], reverse=True)
return patterns[:4] # Top 4 sub-patterns
def select_representative(reviews: list, code: str,
centroid: np.ndarray = None) -> str:
"""Select quote closest to centroid (most representative)."""
if centroid is None:
embeddings = np.array([r['embedding'] for r in reviews])
centroid = embeddings.mean(axis=0)
centroid /= np.linalg.norm(centroid)
best_review = max(reviews, key=lambda r: r['embedding'] @ centroid)
# Return the extracted quote for this code, or truncated text
if best_review.get('quotes') and code in best_review['quotes']:
return best_review['quotes'][code]
return best_review['text'][:150]
def select_sharpest(reviews: list, code: str) -> str:
"""Select highest intensity quote (sharpest criticism)."""
intensity_order = {'I3': 3, 'I2': 2, 'I1': 1}
best_review = max(reviews, key=lambda r: intensity_order.get(r['intensity'], 0))
if best_review.get('quotes') and code in best_review['quotes']:
return best_review['quotes'][code]
return best_review['text'][:150]
def extract_cluster_label(reviews: list, code: str) -> str:
"""Generate a concise label for the cluster."""
# Extract common phrases from quotes
texts = []
for r in reviews:
if r.get('quotes') and code in r['quotes']:
texts.append(r['quotes'][code].lower())
else:
texts.append(r['text'][:100].lower())
# Find distinctive 2-3 word phrases
from collections import Counter
all_text = ' '.join(texts)
words = re.findall(r'\b[a-z]{3,}\b', all_text)
# Bigrams, skip stopwords
stopwords = {'the', 'and', 'was', 'were', 'for', 'that', 'this', 'with', 'but', 'have', 'had'}
bigrams = [
f"{words[i]} {words[i+1]}"
for i in range(len(words)-1)
if words[i] not in stopwords and words[i+1] not in stopwords
]
counts = Counter(bigrams)
if counts:
label = counts.most_common(1)[0][0]
return label.title()
return URT_CODE_NAMES.get(code, "General")
4.4 Trend Analysis
Combine rate comparison with CR signals:
def compute_trends(business_id: str, current_start: date,
current_end: date) -> dict:
"""Compute trends vs prior period using rates and CR signals."""
period_length = (current_end - current_start).days
prior_start = current_start - timedelta(days=period_length)
prior_end = current_start
# Current period stats
current_stats = compute_code_statistics(business_id, current_start, current_end)
current_map = {cs['code']: cs for cs in current_stats}
# Prior period stats
prior_stats = compute_code_statistics(business_id, prior_start, prior_end)
prior_map = {cs['code']: cs for cs in prior_stats}
# CR signals in current period
cr_signals = db.query("""
SELECT urt_primary as code, comparative, COUNT(*) as count
FROM reviews
WHERE business_id = %s
AND date BETWEEN %s AND %s
AND comparative != 'CR-N'
GROUP BY urt_primary, comparative
""", [business_id, current_start, current_end])
cr_map = {}
for row in cr_signals:
if row['code'] not in cr_map:
cr_map[row['code']] = {'CR-B': 0, 'CR-W': 0, 'CR-S': 0}
cr_map[row['code']][row['comparative']] = row['count']
# Compute trends
trends = {}
for code, current in current_map.items():
prior = prior_map.get(code, {'rate_neg': 0, 'rate_pos': 0, 'k_neg': 0, 'k_pos': 0})
cr = cr_map.get(code, {'CR-B': 0, 'CR-W': 0, 'CR-S': 0})
# Rate-based trend (issues)
rate_trend_neg = current['rate_neg'] - prior['rate_neg']
# CR-enhanced trend signal
if cr['CR-W'] >= 2:
trend_signal = 'worsening'
elif cr['CR-B'] >= 2:
trend_signal = 'improving'
elif cr['CR-S'] >= 2:
trend_signal = 'persistent'
elif rate_trend_neg > 0.05:
trend_signal = 'worsening'
elif rate_trend_neg < -0.05:
trend_signal = 'improving'
else:
trend_signal = 'stable'
trends[code] = {
'rate_change_neg': rate_trend_neg,
'rate_change_pos': current['rate_pos'] - prior['rate_pos'],
'signal': trend_signal,
'cr_better': cr['CR-B'],
'cr_worse': cr['CR-W'],
'cr_same': cr['CR-S'],
}
return trends
4.5 Staff Analysis
def analyze_staff_mentions(business_id: str, start: date, end: date) -> dict:
"""Aggregate staff performance from mentions."""
staff_data = db.query("""
SELECT
unnest(staff_mentions) as staff_name,
valence,
intensity,
urt_primary,
quotes
FROM reviews
WHERE business_id = %s
AND date BETWEEN %s AND %s
AND array_length(staff_mentions, 1) > 0
""", [business_id, start, end])
staff_map = {}
for row in staff_data:
name = row['staff_name']
if name not in staff_map:
staff_map[name] = {
'positive': [],
'negative': [],
'codes': Counter()
}
if row['valence'] in ('V+',):
staff_map[name]['positive'].append(row)
elif row['valence'] in ('V-',):
staff_map[name]['negative'].append(row)
staff_map[name]['codes'][row['urt_primary']] += 1
# Build summary
staff_summary = []
for name, data in staff_map.items():
total = len(data['positive']) + len(data['negative'])
if total < 2:
continue # Need multiple mentions
staff_summary.append({
'name': name,
'total_mentions': total,
'positive': len(data['positive']),
'negative': len(data['negative']),
'sentiment_ratio': len(data['positive']) / total,
'top_codes': data['codes'].most_common(3),
'sample_praise': data['positive'][0]['quotes'] if data['positive'] else None,
'sample_criticism': data['negative'][0]['quotes'] if data['negative'] else None,
})
staff_summary.sort(key=lambda x: x['total_mentions'], reverse=True)
return {
'staff': staff_summary,
'top_performer': max(staff_summary, key=lambda x: x['sentiment_ratio']) if staff_summary else None,
'needs_attention': [s for s in staff_summary if s['sentiment_ratio'] < 0.5],
}
4.6 LLM Narrative Generation
NARRATIVE_PROMPT = """You are a business intelligence analyst writing an executive summary of customer feedback.
You MUST follow these rules:
1. Only state claims supported by the provided data
2. Include specific numbers (percentages, counts) for every claim
3. Do not invent or hallucinate any statistics
4. Be direct and actionable, not vague
5. Highlight the most impactful findings first
The report payload follows. Write a concise executive summary (~300 words) covering:
- Top issues with their sub-patterns and severity
- Notable strengths
- Trend signals (improving/worsening/persistent)
- Staff highlights
- 2-3 prioritized recommendations
REPORT DATA:
{payload}"""
async def generate_narrative(payload: dict) -> str:
"""Generate executive narrative from structured payload."""
response = await llm.chat(
model="gpt-4o", # Use stronger model for narrative
messages=[
{"role": "system", "content": NARRATIVE_PROMPT.format(
payload=json.dumps(payload, indent=2)
)}
],
temperature=0.3
)
return response.content
Part 5: Report Output Example
5.1 Structured Payload
{
"business_id": "rest_12345",
"period": "2026-01-01 to 2026-01-31",
"total_reviews": 234,
"issues": [
{
"code": "J1.01",
"name": "Wait Time",
"total_reviews": 47,
"rate": 0.201,
"ci": [0.153, 0.258],
"max_intensity": "I3",
"trend": {
"signal": "worsening",
"cr_worse": 3,
"rate_change": 0.042
},
"sub_patterns": [
{
"label": "Table Seating",
"count": 20,
"percentage": 0.426,
"representative_quote": "Waited 45 minutes for a table even with reservation",
"sharpest_quote": "HOUR wait on a Tuesday. Unacceptable."
},
{
"label": "Food After Ordering",
"count": 15,
"percentage": 0.319,
"representative_quote": "Food took 40 minutes after we ordered",
"sharpest_quote": "Over an hour for cold pasta"
},
{
"label": "Check Payment",
"count": 12,
"percentage": 0.255,
"representative_quote": "Had to flag someone down just to pay",
"sharpest_quote": "20 minutes for the check, ridiculous"
}
]
}
],
"strengths": [
{
"code": "O2.02",
"name": "Craftsmanship",
"total_reviews": 89,
"rate": 0.380,
"ci": [0.318, 0.446],
"trend": {"signal": "stable"},
"representative_quote": "The pasta is clearly made fresh, incredible quality"
}
],
"staff": {
"top_performer": {
"name": "Maria",
"mentions": 12,
"sentiment_ratio": 0.917
},
"needs_attention": [
{
"name": "Tom",
"mentions": 8,
"sentiment_ratio": 0.375,
"top_issues": ["P1.02", "P3.01"]
}
]
},
"open_issues": [
{
"issue_id": "ISSUE-2026-0142",
"code": "J1.01",
"state": "IN_PROGRESS",
"priority": 7.45,
"days_open": 12
}
]
}
5.2 Generated Narrative
Executive Summary: January 2026
Analysis of 234 reviews reveals wait times as the critical issue, affecting 20.1% of customers (95% CI: 15.3%-25.8%) — a worsening trend with 3 explicit "worse than before" signals this month.
Wait Time Breakdown:
- Seating delays (43%): Customers report 30-60 minute waits despite reservations. "Waited 45 minutes for a table even with reservation."
- Kitchen delays (32%): Food taking 40+ minutes after ordering. "Over an hour for cold pasta."
- Checkout friction (26%): Difficulty getting the check. "20 minutes for the check, ridiculous."
Strengths remain strong: Food craftsmanship praised in 38% of reviews, stable month-over-month. "The pasta is clearly made fresh, incredible quality."
Staff Notes: Maria received 12 mentions with 92% positive sentiment. Tom (8 mentions, 38% positive) shows patterns in P1.02 (Respect) and P3.01 (Attentiveness) — recommend coaching session.
Prioritized Recommendations:
- Immediate: Audit reservation system — seating bottleneck is primary wait issue
- This Week: Review kitchen workflow for food delivery timing
- This Month: Implement checkout process training (e.g., table check-in rotation)
One high-priority issue (ISSUE-2026-0142) is in progress with 12 days elapsed.
Part 6: Cost Model
| Stage | When | Cost | Notes |
|---|---|---|---|
| Embedding | Per review ingested | $0.00 | Local model, ~50ms/review |
| LLM Classification | Per review ingested | ~$0.0002 | GPT-4o-mini, batched |
| Issue Aggregation | Per V- review | $0.00 | SQL queries |
| Sub-Clustering | Per report | $0.00 | HDBSCAN/KMeans, <1s |
| Trend Analysis | Per report | $0.00 | SQL + computation |
| LLM Narrative | Per report | ~$0.15 | GPT-4o, single call |
Total Costs:
| Volume | Monthly Ingest | Reports (10/month) | Total |
|---|---|---|---|
| 1K reviews | $0.20 | $1.50 | $1.70 |
| 10K reviews | $2.00 | $1.50 | $3.50 |
| 100K reviews | $20.00 | $1.50 | $21.50 |
Part 7: Implementation Checklist
Phase 1: Core Pipeline
- Set up PostgreSQL with pgvector extension
- Implement embedding generation (multilingual-e5-small)
- Build LLM classification module with batching
- Create review ingestion pipeline
- Implement URT code reference data
Phase 2: Issue Lifecycle
- Implement issue aggregation logic
- Build state machine with transitions
- Create priority scoring function
- Add CR signal processing for verification
- Set up issue event logging
Phase 3: Report Generation
- Build statistics aggregation queries
- Implement sub-pattern clustering
- Add trend analysis with CR integration
- Create staff analysis module
- Build narrative generation prompt
Phase 4: Integration
- API endpoints for ingestion
- Report generation endpoint
- Issue management endpoints
- Dashboard queries
- Alert/notification hooks
Part 8: Key Innovations
| Innovation | Benefit |
|---|---|
| LLM at ingest, not report | Accurate classification amortized across all reports |
| URT as structure | Stable, interpretable categories; no clustering drift |
| Multi-coding | Handle complex reviews without fragmentation |
| Sub-clustering within codes | Actionable patterns beyond category level |
| CR for verification | Automatic resolution validation from customer feedback |
| Review as unit | Preserve context; avoid embedding quality loss |
| Issue lifecycle | Operational tracking with statistical rigor |
Document Control
| Field | Value |
|---|---|
| Document | ReviewIQ Architecture v2.0 |
| Status | Specification Complete |
| Date | 2026-01-24 |
| Dependencies | URT Specification v5.1, Issue Lifecycle Framework C1 |
| Cost Target | <$25/month at 100K reviews |
| Accuracy Target | >90% URT classification, >85% sub-pattern relevance |
End of ReviewIQ Architecture v2.0