Files
whyrating-engine-legacy/.artifacts/ReviewIQ-Architecture-v3.1.md
Alejandro Gutiérrez f99827717f Final polish: v3.1.2 operational safety constraints
- Add chk_dedup_scoped constraint enforcing tenant-scoped dedup format
- Filter location_type='owned' in populate_facts() for 'ALL' rollup
- Document competitor exclusion from 'ALL' sentinel rollups
- Add explicit comments in aggregation code for maintainability

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 12:55:31 +00:00

1473 lines
56 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ReviewIQ: Review Intelligence Pipeline
**Version**: 3.1.2
**Status**: Architecture Specification (Reviewed)
**Date**: 2026-01-24
---
## Executive Summary
ReviewIQ v3.1 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version refactors the data model for multi-location support, replaces array-based aggregation with proper relational design, and introduces a unified analytics spine for fast reporting.
**What's New in v3.1**:
- **Relational issue tracking**: `issue_spans` table replaces `review_ids[]` arrays
- **Multi-location ready**: `place_id` as first-class key throughout
- **Split storage**: `reviews_raw` + `reviews_enriched` for audit and reprocessing
- **Unified analytics spine**: `fact_timeseries` with universal join keys
- **Quality controls**: `trust_score` and `dedup_group_id` for spam filtering
- **Competitor analysis**: Same pipeline, separate tracking table
- **KPI-ready hooks**: Nullable columns for future business metric integration
**Design Principles**:
- **Google Reviews only** (for now) — but schema is source-agnostic
- **Relational over arrays** — scales, queries, joins
- **Facts-first reporting** — pre-aggregated spine for fast dashboards
- **KPI-joinable** — `(business_id, place_id, period_date, bucket_type)` as universal key
- **Tenant-scoped locations** — same place_id can exist for multiple businesses
---
## Part 1: System Architecture
```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ REVIEWIQ v3.1 PIPELINE │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Google │ │
│ │ Reviews │ │
│ │ (API) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ A) SOURCE & STORAGE │ │
│ │ │ │
│ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │
│ │ │ │
│ └──────────────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ B) ENRICHMENT │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │
│ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │
│ │ └────────────────────┬───────────────────────┘ │ │
│ │ ▼ │ │
│ │ reviews_enriched (versioned) │ │
│ │ │ │
│ └──────────────────────────────┬──────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────────────────────────┐ ┌────────────────────────────────┐ │
│ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │
│ │ │ │ │ │
│ │ reviews_enriched │ │ Daily/Weekly Jobs: │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ reviews_enriched │ │
│ │ Match/Create Issue │ │ │ │ │
│ │ │ │ │ ▼ │ │
│ │ ▼ │ │ fact_timeseries │ │
│ │ issue_spans (link) │ │ (pre-aggregated metrics) │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ Keys: │ │
│ │ issues (update counters) │ │ • business_id │ │
│ │ │ │ │ • place_id (or 'ALL') │ │
│ │ ▼ │ │ • subject_type/id │ │
│ │ issue_events (log) │ │ • period_date │ │
│ │ │ │ • bucket_type │ │
│ └────────────────────────────┘ └────────────────────────────────┘ │
│ │ │ │
│ └────────────┬────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ E) REPORTING │ │
│ │ │ │
│ │ fact_timeseries ──┬──▶ Statistics & Trends │ │
│ │ │ │ │
│ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │
│ │ │ │ │
│ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │
│ │ │ │ │
│ │ competitors ──────┴──▶ Benchmark Comparisons │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Narrative Generation │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
```
---
## Part 2: Data Model (SQL DDL)
### 2.1 Dimension Tables
```sql
-- Business locations (multi-tenant: same place_id can exist for multiple businesses)
-- Includes both owned locations and tracked competitor locations
CREATE TABLE locations (
business_id TEXT NOT NULL, -- Internal business identifier
place_id TEXT NOT NULL, -- Google Place ID
location_type TEXT NOT NULL DEFAULT 'owned'
CHECK (location_type IN ('owned', 'competitor')),
display_name TEXT NOT NULL,
address TEXT,
city TEXT,
state TEXT,
country TEXT,
timezone TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (business_id, place_id)
);
CREATE INDEX idx_locations_place ON locations(place_id);
CREATE INDEX idx_locations_owned ON locations(business_id)
WHERE location_type = 'owned';
-- URT code reference
CREATE TABLE urt_codes (
code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc.
domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R
category TEXT NOT NULL,
subcategory TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
keywords TEXT[] -- For search/matching
);
-- Competitor mapping (separate from locations - no fake business_ids)
CREATE TABLE competitors (
id SERIAL PRIMARY KEY,
business_id TEXT NOT NULL, -- Your business
competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID
competitor_name TEXT NOT NULL,
relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational'
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, competitor_place_id)
);
CREATE INDEX idx_competitors_business ON competitors(business_id);
```
### 2.2 Reviews Tables (Raw + Enriched)
```sql
-- Immutable raw review storage (audit + reprocessing)
CREATE TABLE reviews_raw (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Google review ID
place_id TEXT NOT NULL, -- Google Place ID
-- Raw payload
raw_payload JSONB NOT NULL, -- Complete API response
review_text TEXT, -- Extracted for indexing
rating SMALLINT,
review_time TIMESTAMP,
reviewer_name TEXT,
reviewer_id TEXT,
-- Versioning (Google reviews can be edited)
review_version INT DEFAULT 1,
-- Ingestion metadata
pulled_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source, review_id, review_version)
);
CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC);
CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id);
-- Enriched review with LLM classification + embeddings (versioned)
-- Supports edited reviews: each version is a separate row
CREATE TABLE reviews_enriched (
-- Versioned primary key (handles edited reviews)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Matches reviews_raw.review_id
review_version INT NOT NULL DEFAULT 1,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
-- Link to raw (specific version)
raw_id INT NOT NULL REFERENCES reviews_raw(id),
-- Identity
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
-- Core content
text TEXT NOT NULL,
text_normalized TEXT, -- Cleaned for processing
rating SMALLINT,
review_time TIMESTAMP NOT NULL,
language TEXT,
-- URT Classification (from LLM)
urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc.
urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains
valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±'
intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3'
comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'
-- Extracted entities (reserved for v3.2, nullable)
staff_mentions TEXT[] DEFAULT '{}',
quotes JSONB, -- {"code": "phrase", ...}
-- Embedding
embedding VECTOR(384),
-- Quality control
trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0
dedup_group_id TEXT, -- Tenant-scoped: format "{business_id}:{hash}"
is_suspicious BOOLEAN DEFAULT FALSE,
-- Processing metadata
classification_model TEXT,
classification_confidence JSONB, -- Per-field confidence scores
processed_at TIMESTAMP DEFAULT NOW(),
model_version TEXT,
-- KPI-ready hooks (nullable, computed later)
kpi_impact_estimate FLOAT,
kpi_last_computed_at TIMESTAMP,
PRIMARY KEY (source, review_id, review_version)
);
-- Indexes for common query patterns
CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative)
WHERE comparative != 'CR-N' AND is_latest = TRUE;
CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score)
WHERE trust_score < 0.5 AND is_latest = TRUE;
CREATE INDEX idx_enriched_embedding ON reviews_enriched
USING hnsw (embedding vector_cosine_ops);
-- FK to locations (tenant-scoped)
ALTER TABLE reviews_enriched
ADD CONSTRAINT fk_enriched_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Enforce tenant-scoped dedup format
ALTER TABLE reviews_enriched
ADD CONSTRAINT chk_dedup_scoped
CHECK (dedup_group_id IS NULL OR dedup_group_id LIKE business_id || ':%');
```
### 2.3 Issue Tables (Relational, No Arrays)
**Note**: In v3.1, the issue grouping key is `(business_id, place_id, primary_subcode)` only. The `entity` and `entity_normalized` fields are reserved for v3.2+ when entity extraction is implemented.
```sql
-- Issues (aggregated problems)
CREATE TABLE issues (
issue_id TEXT PRIMARY KEY,
-- Grouping keys (v3.1: code + place only)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
primary_subcode TEXT NOT NULL, -- URT code
domain CHAR(1) NOT NULL,
-- State machine
state TEXT NOT NULL DEFAULT 'DETECTED',
priority_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
-- Aggregated metrics (updated via triggers/jobs)
span_count INT NOT NULL DEFAULT 1,
max_intensity TEXT NOT NULL,
avg_trust_score FLOAT DEFAULT 1.0,
-- CR counters (rolling 30-day window)
cr_better_count INT DEFAULT 0,
cr_worse_count INT DEFAULT 0,
cr_same_count INT DEFAULT 0,
-- Star drag proxy (avg rating when this issue present vs absent)
star_drag_estimate FLOAT,
-- Ownership
owner_team TEXT,
owner_individual TEXT,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
acknowledged_at TIMESTAMP,
resolved_at TIMESTAMP,
verified_at TIMESTAMP,
-- Resolution
reopen_count INT DEFAULT 0,
resolution_code TEXT,
resolution_notes TEXT,
decline_reason TEXT,
-- Context (RESERVED for v3.2 - entity extraction)
entity TEXT, -- Product, staff member, feature
entity_normalized TEXT,
-- KPI-ready hooks (nullable)
kpi_impact_estimate FLOAT,
kpi_impact_confidence FLOAT,
kpi_last_computed_at TIMESTAMP
);
CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC);
CREATE INDEX idx_issues_place ON issues(place_id, state);
CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode);
CREATE INDEX idx_issues_open ON issues(business_id)
WHERE state NOT IN ('VERIFIED', 'DECLINED');
-- FK to locations (tenant-scoped)
ALTER TABLE issues
ADD CONSTRAINT fk_issues_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Issue spans (link table: issue ↔ review, versioned)
CREATE TABLE issue_spans (
id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE,
-- Full review reference (versioned)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL,
review_version INT NOT NULL,
-- Span metadata
is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match
weight FLOAT DEFAULT 1.0, -- For weighted aggregation
intensity TEXT NOT NULL, -- Copied from review for fast queries
-- Denormalized for timeline queries (avoids join)
review_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(issue_id, source, review_id, review_version)
);
-- FK to versioned review
ALTER TABLE issue_spans
ADD CONSTRAINT fk_span_review
FOREIGN KEY (source, review_id, review_version)
REFERENCES reviews_enriched(source, review_id, review_version);
CREATE INDEX idx_spans_issue ON issue_spans(issue_id);
CREATE INDEX idx_spans_review ON issue_spans(source, review_id, review_version);
CREATE INDEX idx_spans_issue_time ON issue_spans(issue_id, review_time DESC);
-- Issue events (audit log)
CREATE TABLE issue_events (
event_id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id),
event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update'
from_state TEXT,
to_state TEXT,
actor TEXT, -- User or 'system'
notes TEXT,
-- Triggering review reference (versioned)
source TEXT DEFAULT 'google',
review_id TEXT,
review_version INT,
metadata JSONB, -- Additional context
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC);
CREATE INDEX idx_events_review ON issue_events(source, review_id, review_version)
WHERE review_id IS NOT NULL;
```
### 2.4 Unified Analytics Spine
**Design Decision**: Sentinel value conventions (do not normalize):
- `place_id = 'ALL'` — spatial rollup (all locations)
- `subject_id = 'all'` — semantic rollup (all subjects within type)
Case matters: `'ALL'``'all'`. This avoids NULL handling while keeping the schema simple.
```sql
-- Fact table: pre-aggregated time-series metrics
CREATE TABLE fact_timeseries (
id SERIAL PRIMARY KEY,
-- Universal join keys (KPI-ready)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' = all locations rollup
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL, -- 'day', 'week', 'month'
-- Subject (what we're measuring)
subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall'
subject_id TEXT NOT NULL, -- Code, domain letter, or 'all'
-- Volume metrics
review_count INT NOT NULL DEFAULT 0,
negative_count INT NOT NULL DEFAULT 0,
positive_count INT NOT NULL DEFAULT 0,
neutral_count INT NOT NULL DEFAULT 0,
mixed_count INT NOT NULL DEFAULT 0,
-- Strength metrics (intensity-weighted)
strength_score FLOAT NOT NULL DEFAULT 0,
negative_strength FLOAT NOT NULL DEFAULT 0,
positive_strength FLOAT NOT NULL DEFAULT 0,
-- Rating metrics
avg_rating FLOAT,
rating_count INT DEFAULT 0,
-- Intensity distribution
i1_count INT DEFAULT 0,
i2_count INT DEFAULT 0,
i3_count INT DEFAULT 0,
-- CR signals
cr_better INT DEFAULT 0,
cr_worse INT DEFAULT 0,
cr_same INT DEFAULT 0,
-- Trust-weighted variants (DEFERRED to v3.2 - columns reserved, not populated)
trust_weighted_strength FLOAT,
trust_weighted_negative FLOAT,
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id)
);
-- Validate 'ALL' sentinel
ALTER TABLE fact_timeseries
ADD CONSTRAINT chk_place_id_format
CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$');
-- Optimized indexes for reporting queries
CREATE INDEX idx_facts_lookup ON fact_timeseries(
business_id, place_id, subject_type, subject_id, period_date DESC
);
CREATE INDEX idx_facts_period ON fact_timeseries(
business_id, period_date, bucket_type
);
CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id)
WHERE subject_type = 'urt_code';
CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date)
WHERE place_id = 'ALL';
```
**v3.1 Fact Population Scope**:
| subject_type | Populated | Notes |
|--------------|-----------|-------|
| `overall` | ✅ Mandatory | Business-wide + per-location |
| `urt_code` | ✅ Mandatory | Per URT code |
| `domain` | ⚡ Derived | Rollup from urt_code at query time |
| `issue` | 🔜 Optional | Recommended for issue timelines (v3.2) |
**v3.1 Rollup Rules**:
- `place_id='ALL'` includes **owned locations only** (not competitors)
- Competitor facts live at their `competitor_place_id`, never in `'ALL'` rollup
- Competitor comparison queries explicitly join on `competitor_place_id`
**v3.1 Trust Score Usage**:
- `trust_score` is applied to **issue priority scoring** and **filtering** (see §4.2)
- `trust_weighted_strength` / `trust_weighted_negative` columns are **reserved but not populated** in v3.1
- Trust-weighted fact aggregation (`SUM(trust_score * intensity_weight)`) deferred to v3.2
### 2.5 Sub-Patterns (Persistent Clustering Results)
```sql
-- Stored sub-pattern clustering results
CREATE TABLE subpatterns (
id SERIAL PRIMARY KEY,
-- Parent
subject_type TEXT NOT NULL, -- 'urt_code', 'issue'
subject_id TEXT NOT NULL,
business_id TEXT NOT NULL,
place_id TEXT, -- NULL = all locations
-- Period
period_start DATE NOT NULL,
period_end DATE NOT NULL,
-- Cluster info
cluster_id INT NOT NULL,
label TEXT NOT NULL,
-- Metrics
review_count INT NOT NULL,
percentage FLOAT NOT NULL,
avg_intensity FLOAT,
-- Representative content
representative_review_id TEXT,
representative_quote TEXT,
sharpest_review_id TEXT,
sharpest_quote TEXT,
-- Embedding (for trend matching)
centroid VECTOR(384),
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id)
);
CREATE INDEX idx_subpatterns_lookup ON subpatterns(
subject_type, subject_id, business_id, period_end DESC
);
```
---
## Part 3: Ingest Layer
### 3.1 Google Connector
```python
async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]:
"""Fetch new/updated reviews from Google Places API."""
reviews = await google_places_client.get_reviews(place_id, since=since)
for review in reviews:
await store_raw_review(place_id, review)
return reviews
async def store_raw_review(place_id: str, review: dict) -> int:
"""Store immutable raw review payload."""
existing = await db.query_one("""
SELECT id, review_version FROM reviews_raw
WHERE source = 'google' AND review_id = %s
ORDER BY review_version DESC LIMIT 1
""", [review['review_id']])
version = 1
if existing:
if content_changed(existing, review):
version = existing['review_version'] + 1
else:
return existing['id']
return await db.insert("""
INSERT INTO reviews_raw (
source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id,
review_version, pulled_at
) VALUES (
'google', %s, %s, %s,
%s, %s, %s, %s, %s,
%s, NOW()
)
RETURNING id
""", [
review['review_id'], place_id, json.dumps(review),
review.get('text'), review.get('rating'),
review.get('time'), review.get('author_name'), review.get('author_id'),
version
])
```
### 3.2 Enrichment Pipeline
```python
async def enrich_review(raw_id: int, business_id: str) -> dict:
"""
Full enrichment: normalize → classify → embed → trust score.
Args:
raw_id: ID from reviews_raw
business_id: Tenant context (passed from ingest job, not looked up)
"""
raw = await db.query_one(
"SELECT * FROM reviews_raw WHERE id = %s", [raw_id]
)
# 1. Normalize
text = normalize_text(raw['review_text'])
# 2. Validate place_id exists under this tenant (owned or competitor)
location = await db.query_one(
"SELECT display_name, location_type FROM locations WHERE business_id = %s AND place_id = %s",
[business_id, raw['place_id']]
)
if not location:
raise ValueError(f"place_id {raw['place_id']} not registered for business {business_id}")
# 3. Parallel: LLM classify + embed
classify_task = asyncio.create_task(classify_review_llm(text))
embed_task = asyncio.create_task(embed_review(text))
classification = await classify_task
embedding = await embed_task
# 4. Trust score
trust_score = compute_trust_score(raw, text, classification)
# 5. Dedup check
dedup_group_id = await find_dedup_group(embedding, raw['place_id'])
# 6. Mark previous versions as not-latest
await db.execute("""
UPDATE reviews_enriched
SET is_latest = FALSE
WHERE source = 'google' AND review_id = %s AND is_latest = TRUE
""", [raw['review_id']])
# 7. Store enriched (versioned)
enriched = {
'source': 'google',
'review_id': raw['review_id'],
'review_version': raw['review_version'],
'is_latest': True,
'raw_id': raw_id,
'business_id': business_id, # Passed from ingest job (tenant context)
'place_id': raw['place_id'],
'text': raw['review_text'],
'text_normalized': text,
'rating': raw['rating'],
'review_time': raw['review_time'],
'language': detect_language(text),
'embedding': embedding,
'trust_score': trust_score,
'dedup_group_id': dedup_group_id,
**classification,
}
await upsert_enriched_review(enriched)
return enriched
```
### 3.3 LLM Classification
```python
CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).
Analyze the review and return JSON:
{
"urt_primary": "X1.23",
"urt_secondary": ["Y2.34"],
"valence": "V-",
"intensity": "I2",
"comparative": "CR-N",
"staff_mentions": ["Mike"],
"quotes": {
"X1.23": "exact phrase",
"Y2.34": "another phrase"
},
"confidence": {
"urt_primary": 0.92,
"valence": 0.95,
"intensity": 0.88
}
}
URT DOMAINS:
- O (Offering): Product/service quality, function, completeness
- P (People): Staff attitude, competence, responsiveness
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance
- A (Access): Availability, accessibility, convenience
- V (Value): Price, transparency, worth
- R (Relationship): Trust, dependability, loyalty
RULES:
1. Primary = most impactful topic
2. Secondary must be DIFFERENT domains
3. V± only for genuinely mixed sentiment
4. CR-B/W/S only for explicit self-comparison
5. quotes = EXACT phrases from review
Return valid JSON only."""
async def classify_review_llm(text: str) -> dict:
"""LLM-powered URT classification."""
response = await llm.chat(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": CLASSIFICATION_PROMPT},
{"role": "user", "content": text}
],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.content)
result['classification_model'] = 'gpt-4o-mini'
return result
```
### 3.4 Trust Score Computation
```python
def compute_trust_score(raw: dict, text: str, classification: dict) -> float:
"""
Compute trust score (0.0 to 1.0) based on review quality signals.
Low trust = likely spam, fake, or low-quality.
"""
score = 1.0
# Length penalty
word_count = len(text.split())
if word_count < 5:
score *= 0.5
elif word_count > 500:
score *= 0.8
# Rating/sentiment mismatch
rating = raw.get('rating')
valence = classification.get('valence')
if rating and valence:
if rating >= 4 and valence == 'V-':
score *= 0.7
elif rating <= 2 and valence == 'V+':
score *= 0.7
# Generic text patterns
if is_generic_review(text):
score *= 0.6
# LLM confidence
confidence = classification.get('confidence', {})
avg_confidence = np.mean(list(confidence.values())) if confidence else 1.0
if avg_confidence < 0.7:
score *= 0.9
return max(0.1, min(1.0, score))
```
---
## Part 4: Issue Lifecycle Management
### 4.1 Issue Aggregation (Relational)
**v3.1 Issue Key**: `(business_id, place_id, primary_subcode)` — entity matching is deferred to v3.2.
```python
async def aggregate_to_issue(review: dict) -> Optional[str]:
"""Match review to existing issue or create new one."""
if review['valence'] not in ('V-', ''):
return None
# Find matching open issues using relational query
# v3.1: match on (business_id, place_id, primary_subcode) only
matching = await db.query("""
SELECT i.issue_id, i.primary_subcode, i.span_count
FROM issues i
WHERE i.business_id = %s
AND i.place_id = %s
AND i.primary_subcode = %s
AND i.state NOT IN ('VERIFIED', 'DECLINED')
AND i.created_at > NOW() - INTERVAL '30 days'
ORDER BY i.priority_score DESC
LIMIT 1
""", [review['business_id'], review['place_id'], review['urt_primary']])
if matching:
issue = matching[0]
await add_span_to_issue(issue['issue_id'], review)
return issue['issue_id']
if should_create_issue(review):
return await create_issue(review)
return None
async def add_span_to_issue(issue_id: str, review: dict):
"""Add review span to issue and update counters."""
# Insert span (versioned, with denormalized review_time for timeline queries)
await db.execute("""
INSERT INTO issue_spans (
issue_id, source, review_id, review_version,
is_primary_match, intensity, review_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (issue_id, source, review_id, review_version) DO NOTHING
""", [
issue_id, review['source'], review['review_id'], review['review_version'],
True, review['intensity'], review['review_time']
])
# Update issue counters
await db.execute("""
UPDATE issues SET
span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s),
max_intensity = (
SELECT CASE MAX(CASE intensity
WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END)
WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END
FROM issue_spans WHERE issue_id = %s
),
updated_at = NOW()
WHERE issue_id = %s
""", [issue_id, issue_id, issue_id])
await recalculate_priority(issue_id)
await log_issue_event(
issue_id, 'span_added',
source=review['source'],
review_id=review['review_id'],
review_version=review['review_version']
)
```
### 4.2 Priority Scoring (Trust-Weighted)
```python
INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}
async def recalculate_priority(issue_id: str):
"""
Priority = intensity × volume × decay × recurrence × trend × trust
"""
issue = await db.query_one("""
SELECT
i.*,
(SELECT AVG(re.trust_score)
FROM issue_spans s
JOIN reviews_enriched re ON (s.source, s.review_id, s.review_version)
= (re.source, re.review_id, re.review_version)
WHERE s.issue_id = i.issue_id) as avg_trust
FROM issues i
WHERE i.issue_id = %s
""", [issue_id])
intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1)
i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0)
volume_factor = 1 + math.log(max(1, issue['span_count']))
days_old = (datetime.now() - issue['created_at']).days
decay = math.exp(-0.023 * days_old)
recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)
if issue['cr_worse_count'] >= 2:
trend_modifier = 1.3
elif issue['cr_better_count'] >= 2:
trend_modifier = 0.7
else:
trend_modifier = 1.0
trust_factor = issue['avg_trust'] or 1.0
priority = (
i_weight * volume_factor * decay *
recurrence_boost * trend_modifier * trust_factor
)
await db.execute("""
UPDATE issues SET
priority_score = %s,
avg_trust_score = %s,
updated_at = NOW()
WHERE issue_id = %s
""", [priority, issue['avg_trust'], issue_id])
```
### 4.3 Issue Review Drill-Down
```python
async def get_issue_reviews(issue_id: str,
sort_by: str = 'date',
limit: int = 50,
offset: int = 0) -> list[dict]:
"""Fetch all reviews for an issue with full details."""
order_clause = {
'date': 's.review_time DESC',
'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END",
'trust': 're.trust_score DESC',
'rating': 're.rating ASC'
}.get(sort_by, 's.review_time DESC')
return await db.query(f"""
SELECT
re.review_id,
re.review_version,
re.text,
s.review_time,
re.rating,
re.valence,
s.intensity,
re.comparative,
re.trust_score,
re.quotes,
re.staff_mentions,
s.is_primary_match,
s.weight,
l.display_name as location_name
FROM issue_spans s
JOIN reviews_enriched re ON (s.source, s.review_id, s.review_version)
= (re.source, re.review_id, re.review_version)
JOIN locations l ON (re.business_id, re.place_id) = (l.business_id, l.place_id)
WHERE s.issue_id = %s
ORDER BY {order_clause}
LIMIT %s OFFSET %s
""", [issue_id, limit, offset])
```
### 4.4 Strength Score
```
Strength Score = Σ (intensity_weight)
Where:
I1 (mild) → weight = 1
I2 (moderate) → weight = 2
I3 (strong) → weight = 4
One I3 review = 4 I1 reviews = 2 I2 reviews
```
---
## Part 5: Analytics Spine (Fact Population)
### 5.1 Daily Fact Aggregation Job
```python
async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'):
"""
Aggregate reviews into fact_timeseries. Run daily.
v3.1 populates:
- subject_type='overall', subject_id='all' (per location + 'ALL')
- subject_type='urt_code', subject_id=<code> (per location + 'ALL')
Domain rollups are derived at query time from urt_code facts.
Issue facts are optional (recommended for v3.2).
"""
if bucket_type == 'day':
period_start = date
period_end = date + timedelta(days=1)
elif bucket_type == 'week':
period_start = date - timedelta(days=date.weekday())
period_end = period_start + timedelta(days=7)
elif bucket_type == 'month':
period_start = date.replace(day=1)
next_month = period_start.replace(day=28) + timedelta(days=4)
period_end = next_month.replace(day=1)
# Get owned locations (competitors excluded from 'ALL' rollup)
owned_locations = await db.query(
"SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE AND location_type = 'owned'",
[business_id]
)
owned_place_ids = [loc['place_id'] for loc in owned_locations]
# Get competitor locations (facts per place only, no 'ALL' rollup)
competitor_locations = await db.query(
"SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE AND location_type = 'competitor'",
[business_id]
)
# Per-location facts (owned)
for loc in owned_locations:
await populate_location_facts(
business_id, loc['place_id'], period_start, period_end, bucket_type
)
# Per-location facts (competitors — no 'ALL' rollup)
for loc in competitor_locations:
await populate_location_facts(
business_id, loc['place_id'], period_start, period_end, bucket_type
)
# All-locations rollup (owned only — place_id='ALL')
await populate_all_locations_facts(
business_id, owned_place_ids, period_start, period_end, bucket_type
)
async def populate_location_facts(business_id: str, place_id: str,
period_start: date, period_end: date,
bucket_type: str):
"""Populate facts for a single location."""
# Aggregate by URT code
code_stats = await aggregate_by_code(
business_id, place_id, period_start, period_end
)
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Aggregate overall
overall = await aggregate_overall(
business_id, place_id, period_start, period_end
)
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
async def populate_all_locations_facts(business_id: str, place_ids: list[str],
period_start: date, period_end: date,
bucket_type: str):
"""Populate 'ALL' rollup facts across all locations."""
# Aggregate by URT code across all locations
code_stats = await db.query("""
SELECT
urt_primary as code,
COUNT(*) as review_count,
SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
SUM(CASE intensity
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as strength_score,
AVG(rating) as avg_rating
FROM reviews_enriched
WHERE business_id = %s
AND place_id = ANY(%s)
AND review_time >= %s AND review_time < %s
AND is_latest = TRUE
GROUP BY urt_primary
""", [business_id, place_ids, period_start, period_end])
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id='ALL',
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Overall rollup
overall = await db.query_one("""
SELECT
COUNT(*) as review_count,
SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(rating) as avg_rating
FROM reviews_enriched
WHERE business_id = %s
AND place_id = ANY(%s)
AND review_time >= %s AND review_time < %s
AND is_latest = TRUE
""", [business_id, place_ids, period_start, period_end])
await upsert_fact(
business_id=business_id,
place_id='ALL',
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
```
### 5.2 Timeline Query (For Charts)
```python
async def get_timeline(business_id: str,
place_id: Optional[str],
subject_type: str,
subject_id: str,
start: date,
end: date,
bucket_type: str = 'week') -> list[dict]:
"""
Query pre-aggregated facts for line charts.
Args:
place_id: Specific place_id, or None for 'ALL' locations rollup
"""
# Use 'ALL' sentinel for all-locations queries
effective_place_id = place_id if place_id else 'ALL'
return await db.query("""
SELECT
period_date,
review_count,
negative_count,
positive_count,
strength_score,
negative_strength,
avg_rating,
cr_better,
cr_worse,
cr_same
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = %s
AND subject_id = %s
AND bucket_type = %s
AND period_date BETWEEN %s AND %s
ORDER BY period_date
""", [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end])
```
---
## Part 6: Competitor Analysis
### 6.1 Competitor Setup (Clean Model)
Competitors are tracked in both `competitors` (relationship metadata) and `locations` (with `location_type='competitor'`). This preserves FK integrity and enables consistent joins for display names/timezones.
**Competitor Review Storage Rule**: Competitor reviews are stored with the **customer's business_id** and the **competitor's place_id**:
```
reviews_enriched.business_id = <customer_business_id>
reviews_enriched.place_id = <competitor_place_id>
```
The `locations.location_type` column distinguishes ownership:
- `'owned'` — customer's own locations
- `'competitor'` — tracked competitor locations
This keeps all queries and FK constraints working without NULL semantics or special-case logic.
```python
async def setup_competitor(business_id: str, competitor_place_id: str,
competitor_name: str, relationship: str = 'direct'):
"""Register a competitor for tracking."""
# 1. Add to locations with location_type='competitor' (enables FK + joins)
await db.execute("""
INSERT INTO locations (business_id, place_id, location_type, display_name)
VALUES (%s, %s, 'competitor', %s)
ON CONFLICT (business_id, place_id) DO UPDATE SET
display_name = EXCLUDED.display_name
""", [business_id, competitor_place_id, competitor_name])
# 2. Track relationship metadata in competitors table
await db.execute("""
INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship)
VALUES (%s, %s, %s, %s)
ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET
competitor_name = EXCLUDED.competitor_name,
relationship = EXCLUDED.relationship
""", [business_id, competitor_place_id, competitor_name, relationship])
async def pull_competitor_reviews(business_id: str):
"""Pull reviews for all tracked competitors."""
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors
WHERE business_id = %s AND is_active = TRUE
""", [business_id])
for comp in competitors:
# Store competitor reviews with customer's business_id + competitor's place_id
await pull_reviews_for_competitor(
business_id=business_id, # Customer's business_id (NOT NULL)
place_id=comp['competitor_place_id']
)
```
### 6.2 Competitor Comparison
```python
async def get_competitor_comparison(business_id: str, code: str,
start: date, end: date) -> dict:
"""Compare your URT metrics against competitors."""
# Your metrics (from 'ALL' rollup)
your_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(review_count) as review_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE business_id = %s
AND place_id = 'ALL'
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, code, start, end])
# Competitor metrics
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors WHERE business_id = %s AND is_active = TRUE
""", [business_id])
comparison = {
'your_business': your_metrics or {},
'competitors': []
}
for comp in competitors:
# Query competitor's facts (tenant-scoped: business_id + place_id)
comp_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(review_count) as review_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, comp['competitor_place_id'], code, start, end])
comparison['competitors'].append({
'name': comp['competitor_name'],
'place_id': comp['competitor_place_id'],
**(comp_metrics or {})
})
return comparison
```
---
## Part 7: Report Generation (Facts-First)
```python
async def generate_report(business_id: str, place_id: Optional[str],
start: date, end: date) -> dict:
"""Generate report from pre-aggregated facts."""
# Use 'ALL' for cross-location reports
effective_place_id = place_id if place_id else 'ALL'
# 1. Top issues from facts (fast)
top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end)
# 2. Strengths from facts
strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end)
# 3. Sub-patterns (store results)
for issue in top_issues[:5]:
patterns = await discover_and_store_subpatterns(
business_id, effective_place_id, issue['code'], start, end
)
issue['sub_patterns'] = patterns
# 4. Trends from facts
trends = await compute_trends_from_facts(business_id, effective_place_id, start, end)
# 5. Staff analysis
staff = await analyze_staff(business_id, effective_place_id, start, end)
# 6. Competitor benchmarks
competitors = await get_competitor_benchmarks(business_id, start, end)
payload = {
'business_id': business_id,
'place_id': place_id, # Original (None = all)
'period': f"{start} to {end}",
'issues': top_issues,
'strengths': strengths,
'trends': trends,
'staff': staff,
'competitors': competitors,
}
narrative = await generate_narrative(payload)
return {
'payload': payload,
'narrative': narrative,
'generated_at': datetime.now().isoformat()
}
```
---
## Part 8: KPI-Ready Hooks
### 8.1 Future KPI Integration (Interface Only)
```sql
-- Future: KPI fact table with same grain
CREATE TABLE fact_kpi_timeseries (
id SERIAL PRIMARY KEY,
-- Same join keys as fact_timeseries
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' for rollups
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL,
-- KPI metrics
revenue DECIMAL(12,2),
transactions INT,
cancellations INT,
refunds DECIMAL(12,2),
support_tickets INT,
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type)
);
-- Join reviews and KPIs:
SELECT
r.period_date,
r.negative_strength,
k.revenue,
k.cancellations
FROM fact_timeseries r
JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type)
WHERE r.subject_type = 'overall' AND r.subject_id = 'all';
```
---
## Part 9: Cost Model
| Stage | When | Cost | Notes |
|-------|------|------|-------|
| **Raw Storage** | Per review | $0.00 | ~1KB per review |
| **Embedding** | Per review | $0.00 | Local model, ~50ms |
| **LLM Classification** | Per review | ~$0.0002 | GPT-4o-mini |
| **Fact Aggregation** | Daily job | $0.00 | SQL, <1 minute |
| **Sub-Clustering** | Per report | $0.00 | HDBSCAN, <1s |
| **LLM Narrative** | Per report | ~$0.15 | GPT-4o |
**Total Costs:**
| Volume | Monthly Ingest | Reports (10/mo) | Total |
|--------|---------------|-----------------|-------|
| 1K reviews | $0.20 | $1.50 | **$1.70** |
| 10K reviews | $2.00 | $1.50 | **$3.50** |
| 100K reviews | $20.00 | $1.50 | **$21.50** |
---
## Part 10: Key Innovations
| Innovation | Benefit |
|------------|---------|
| **Relational issue_spans** | Scales, queryable, no array limits |
| **Versioned reviews_enriched** | Handles edited reviews correctly |
| **Tenant-scoped locations** | Same place_id works for multiple businesses |
| **place_id='ALL' sentinel** | Clean all-locations rollups without NULL handling |
| **Unified fact_timeseries spine** | Fast dashboards, KPI-joinable later |
| **Trust score weighting** | Spam resistance without deletion |
| **Competitor separate table** | Clean model, no fake business_ids |
| **Universal join keys** | `(business_id, place_id, period, bucket)` future-proofs KPI integration |
---
## Document Control
| Field | Value |
|-------|-------|
| **Document** | ReviewIQ Architecture v3.1.2 |
| **Status** | Specification Complete (Production-Ready) |
| **Date** | 2026-01-24 |
| **Dependencies** | URT Specification v5.1, Issue Lifecycle Framework C1 |
| **Source** | Google Reviews only |
| **Cost Target** | <$25/month at 100K reviews |
### Changelog
| Version | Changes |
|---------|---------|
| v3.0 | Issue lifecycle, strength scores, timeline charts |
| v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring |
| v3.1.1 | **Reviewed**: Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup, fixed get_timeline params, clarified issue key scope |
| v3.1.2 | **Final**: Versioned issue_spans FK, competitor business_id rule, trust-weighted facts deferred, location_type flag, tenant-scoped enrichment |
### Fixes Applied (v3.1.1 → v3.1.2)
| Issue | Fix |
|-------|-----|
| reviews_enriched PK wrong for edits | PK = `(source, review_id, review_version)` + `is_latest` flag |
| raw_id ambiguous under versioning | raw_id references specific raw version |
| locations.place_id prevents multi-tenant | PK = `(business_id, place_id)` (tenant-scoped) |
| Competitor fake business_id pattern | Competitors inserted into `locations` with `location_type='competitor'` |
| fact_timeseries.place_id NOT NULL blocks rollups | `place_id='ALL'` sentinel for all-locations |
| get_timeline param ordering bug | Fixed: params built in correct order |
| Issue entity fields but no extraction | Clarified: v3.1 key is `(business_id, place_id, primary_subcode)` only; entity fields reserved for v3.2 |
| Missing indexes | Added `idx_spans_issue_time`, FK to locations |
| **issue_spans.review_id underspecified** | Added `source`, `review_version` columns + FK to versioned review |
| **Competitor business_id = NULL breaks joins** | Rule: competitor reviews use customer's `business_id` + competitor's `place_id` |
| **trust_weighted_* columns implied populated** | Clarified: columns reserved but not populated in v3.1; deferred to v3.2 |
| **Footer version string** | Fixed: v3.1.1 → v3.1.2 |
| **Competitor fact query missing tenant scope** | Added `business_id` filter to competitor comparison query |
| **reviews_enriched FK conflicts with competitor rule** | Added `location_type` column to `locations`; competitors inserted with `'competitor'` type |
| **issue_events.review_id not versioned** | Added `source`, `review_version` columns to issue_events |
| **Enrichment lookup breaks multi-tenant** | `business_id` now passed from ingest job; validated against locations |
### Deferred to v3.2+
| Feature | Reason |
|---------|--------|
| Entity extraction for issues | Needs NER pipeline |
| journey_step inference | Needs better grounding data |
| intent_signals extraction | Needs action playbooks |
| stability_score tracking | Premature for v1 |
| issue facts in fact_timeseries | Optional performance optimization |
| Trust-weighted fact aggregation | Columns reserved; `SUM(trust_score * intensity_weight)` deferred |
| KPI integration | Placeholder only in v3.1 |
---
*End of ReviewIQ Architecture v3.1.2*