Complete pipeline architecture for Google Reviews intelligence: - Versioned reviews_enriched with (source, review_id, version) PK - Tenant-scoped locations with (business_id, place_id) PK - Relational issue_spans replacing array aggregation - Unified fact_timeseries spine with 'ALL' sentinel for rollups - Clean competitor model (separate table, no fake business_ids) - Trust scoring and dedup support - KPI-ready join keys Reviewed and fixed: PK for edited reviews, multi-tenant overlap, param ordering bugs, fact population scope, entity field deferral. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
51 KiB
ReviewIQ: Review Intelligence Pipeline
Version: 3.1.1 Status: Architecture Specification (Reviewed) Date: 2026-01-24
Executive Summary
ReviewIQ v3.1 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version refactors the data model for multi-location support, replaces array-based aggregation with proper relational design, and introduces a unified analytics spine for fast reporting.
What's New in v3.1:
- Relational issue tracking:
issue_spanstable replacesreview_ids[]arrays - Multi-location ready:
place_idas first-class key throughout - Split storage:
reviews_raw+reviews_enrichedfor audit and reprocessing - Unified analytics spine:
fact_timeserieswith universal join keys - Quality controls:
trust_scoreanddedup_group_idfor spam filtering - Competitor analysis: Same pipeline, separate tracking table
- KPI-ready hooks: Nullable columns for future business metric integration
Design Principles:
- Google Reviews only (for now) — but schema is source-agnostic
- Relational over arrays — scales, queries, joins
- Facts-first reporting — pre-aggregated spine for fast dashboards
- KPI-joinable —
(business_id, place_id, period_date, bucket_type)as universal key - Tenant-scoped locations — same place_id can exist for multiple businesses
Part 1: System Architecture
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ REVIEWIQ v3.1 PIPELINE │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Google │ │
│ │ Reviews │ │
│ │ (API) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ A) SOURCE & STORAGE │ │
│ │ │ │
│ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │
│ │ │ │
│ └──────────────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ B) ENRICHMENT │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │
│ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │
│ │ └────────────────────┬───────────────────────┘ │ │
│ │ ▼ │ │
│ │ reviews_enriched (versioned) │ │
│ │ │ │
│ └──────────────────────────────┬──────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────────────────────────┐ ┌────────────────────────────────┐ │
│ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │
│ │ │ │ │ │
│ │ reviews_enriched │ │ Daily/Weekly Jobs: │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ reviews_enriched │ │
│ │ Match/Create Issue │ │ │ │ │
│ │ │ │ │ ▼ │ │
│ │ ▼ │ │ fact_timeseries │ │
│ │ issue_spans (link) │ │ (pre-aggregated metrics) │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ Keys: │ │
│ │ issues (update counters) │ │ • business_id │ │
│ │ │ │ │ • place_id (or 'ALL') │ │
│ │ ▼ │ │ • subject_type/id │ │
│ │ issue_events (log) │ │ • period_date │ │
│ │ │ │ • bucket_type │ │
│ └────────────────────────────┘ └────────────────────────────────┘ │
│ │ │ │
│ └────────────┬────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ E) REPORTING │ │
│ │ │ │
│ │ fact_timeseries ──┬──▶ Statistics & Trends │ │
│ │ │ │ │
│ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │
│ │ │ │ │
│ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │
│ │ │ │ │
│ │ competitors ──────┴──▶ Benchmark Comparisons │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Narrative Generation │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
Part 2: Data Model (SQL DDL)
2.1 Dimension Tables
-- Business locations (multi-tenant: same place_id can exist for multiple businesses)
CREATE TABLE locations (
business_id TEXT NOT NULL, -- Internal business identifier
place_id TEXT NOT NULL, -- Google Place ID
display_name TEXT NOT NULL,
address TEXT,
city TEXT,
state TEXT,
country TEXT,
timezone TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (business_id, place_id)
);
CREATE INDEX idx_locations_place ON locations(place_id);
-- URT code reference
CREATE TABLE urt_codes (
code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc.
domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R
category TEXT NOT NULL,
subcategory TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
keywords TEXT[] -- For search/matching
);
-- Competitor mapping (separate from locations - no fake business_ids)
CREATE TABLE competitors (
id SERIAL PRIMARY KEY,
business_id TEXT NOT NULL, -- Your business
competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID
competitor_name TEXT NOT NULL,
relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational'
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, competitor_place_id)
);
CREATE INDEX idx_competitors_business ON competitors(business_id);
2.2 Reviews Tables (Raw + Enriched)
-- Immutable raw review storage (audit + reprocessing)
CREATE TABLE reviews_raw (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Google review ID
place_id TEXT NOT NULL, -- Google Place ID
-- Raw payload
raw_payload JSONB NOT NULL, -- Complete API response
review_text TEXT, -- Extracted for indexing
rating SMALLINT,
review_time TIMESTAMP,
reviewer_name TEXT,
reviewer_id TEXT,
-- Versioning (Google reviews can be edited)
review_version INT DEFAULT 1,
-- Ingestion metadata
pulled_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source, review_id, review_version)
);
CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC);
CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id);
-- Enriched review with LLM classification + embeddings (versioned)
-- Supports edited reviews: each version is a separate row
CREATE TABLE reviews_enriched (
-- Versioned primary key (handles edited reviews)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Matches reviews_raw.review_id
review_version INT NOT NULL DEFAULT 1,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
-- Link to raw (specific version)
raw_id INT NOT NULL REFERENCES reviews_raw(id),
-- Identity
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
-- Core content
text TEXT NOT NULL,
text_normalized TEXT, -- Cleaned for processing
rating SMALLINT,
review_time TIMESTAMP NOT NULL,
language TEXT,
-- URT Classification (from LLM)
urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc.
urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains
valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±'
intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3'
comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'
-- Extracted entities (reserved for v3.2, nullable)
staff_mentions TEXT[] DEFAULT '{}',
quotes JSONB, -- {"code": "phrase", ...}
-- Embedding
embedding VECTOR(384),
-- Quality control
trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0
dedup_group_id TEXT, -- Groups duplicate/near-duplicate reviews
is_suspicious BOOLEAN DEFAULT FALSE,
-- Processing metadata
classification_model TEXT,
classification_confidence JSONB, -- Per-field confidence scores
processed_at TIMESTAMP DEFAULT NOW(),
model_version TEXT,
-- KPI-ready hooks (nullable, computed later)
kpi_impact_estimate FLOAT,
kpi_last_computed_at TIMESTAMP,
PRIMARY KEY (source, review_id, review_version)
);
-- Indexes for common query patterns
CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative)
WHERE comparative != 'CR-N' AND is_latest = TRUE;
CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score)
WHERE trust_score < 0.5 AND is_latest = TRUE;
CREATE INDEX idx_enriched_embedding ON reviews_enriched
USING hnsw (embedding vector_cosine_ops);
-- FK to locations (tenant-scoped)
ALTER TABLE reviews_enriched
ADD CONSTRAINT fk_enriched_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
2.3 Issue Tables (Relational, No Arrays)
Note: In v3.1, the issue grouping key is (business_id, place_id, primary_subcode) only. The entity and entity_normalized fields are reserved for v3.2+ when entity extraction is implemented.
-- Issues (aggregated problems)
CREATE TABLE issues (
issue_id TEXT PRIMARY KEY,
-- Grouping keys (v3.1: code + place only)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
primary_subcode TEXT NOT NULL, -- URT code
domain CHAR(1) NOT NULL,
-- State machine
state TEXT NOT NULL DEFAULT 'DETECTED',
priority_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
-- Aggregated metrics (updated via triggers/jobs)
span_count INT NOT NULL DEFAULT 1,
max_intensity TEXT NOT NULL,
avg_trust_score FLOAT DEFAULT 1.0,
-- CR counters (rolling 30-day window)
cr_better_count INT DEFAULT 0,
cr_worse_count INT DEFAULT 0,
cr_same_count INT DEFAULT 0,
-- Star drag proxy (avg rating when this issue present vs absent)
star_drag_estimate FLOAT,
-- Ownership
owner_team TEXT,
owner_individual TEXT,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
acknowledged_at TIMESTAMP,
resolved_at TIMESTAMP,
verified_at TIMESTAMP,
-- Resolution
reopen_count INT DEFAULT 0,
resolution_code TEXT,
resolution_notes TEXT,
decline_reason TEXT,
-- Context (RESERVED for v3.2 - entity extraction)
entity TEXT, -- Product, staff member, feature
entity_normalized TEXT,
-- KPI-ready hooks (nullable)
kpi_impact_estimate FLOAT,
kpi_impact_confidence FLOAT,
kpi_last_computed_at TIMESTAMP
);
CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC);
CREATE INDEX idx_issues_place ON issues(place_id, state);
CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode);
CREATE INDEX idx_issues_open ON issues(business_id)
WHERE state NOT IN ('VERIFIED', 'DECLINED');
-- FK to locations (tenant-scoped)
ALTER TABLE issues
ADD CONSTRAINT fk_issues_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Issue spans (link table: issue ↔ review)
CREATE TABLE issue_spans (
id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE,
review_id TEXT NOT NULL, -- Matches reviews_enriched (source, review_id, version)
-- Span metadata
is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match
weight FLOAT DEFAULT 1.0, -- For weighted aggregation
intensity TEXT NOT NULL, -- Copied from review for fast queries
-- Denormalized for timeline queries (avoids join)
review_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(issue_id, review_id)
);
CREATE INDEX idx_spans_issue ON issue_spans(issue_id);
CREATE INDEX idx_spans_review ON issue_spans(review_id);
CREATE INDEX idx_spans_issue_time ON issue_spans(issue_id, review_time DESC);
-- Issue events (audit log)
CREATE TABLE issue_events (
event_id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id),
event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update'
from_state TEXT,
to_state TEXT,
actor TEXT, -- User or 'system'
notes TEXT,
review_id TEXT, -- Triggering review if applicable
metadata JSONB, -- Additional context
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC);
2.4 Unified Analytics Spine
Design Decision: place_id = 'ALL' is the sentinel for "all locations" rollups. This avoids NULL handling complexity while keeping the schema simple.
-- Fact table: pre-aggregated time-series metrics
CREATE TABLE fact_timeseries (
id SERIAL PRIMARY KEY,
-- Universal join keys (KPI-ready)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' = all locations rollup
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL, -- 'day', 'week', 'month'
-- Subject (what we're measuring)
subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall'
subject_id TEXT NOT NULL, -- Code, domain letter, or 'all'
-- Volume metrics
review_count INT NOT NULL DEFAULT 0,
negative_count INT NOT NULL DEFAULT 0,
positive_count INT NOT NULL DEFAULT 0,
neutral_count INT NOT NULL DEFAULT 0,
mixed_count INT NOT NULL DEFAULT 0,
-- Strength metrics (intensity-weighted)
strength_score FLOAT NOT NULL DEFAULT 0,
negative_strength FLOAT NOT NULL DEFAULT 0,
positive_strength FLOAT NOT NULL DEFAULT 0,
-- Rating metrics
avg_rating FLOAT,
rating_count INT DEFAULT 0,
-- Intensity distribution
i1_count INT DEFAULT 0,
i2_count INT DEFAULT 0,
i3_count INT DEFAULT 0,
-- CR signals
cr_better INT DEFAULT 0,
cr_worse INT DEFAULT 0,
cr_same INT DEFAULT 0,
-- Trust-weighted variants
trust_weighted_strength FLOAT,
trust_weighted_negative FLOAT,
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id)
);
-- Validate 'ALL' sentinel
ALTER TABLE fact_timeseries
ADD CONSTRAINT chk_place_id_format
CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$');
-- Optimized indexes for reporting queries
CREATE INDEX idx_facts_lookup ON fact_timeseries(
business_id, place_id, subject_type, subject_id, period_date DESC
);
CREATE INDEX idx_facts_period ON fact_timeseries(
business_id, period_date, bucket_type
);
CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id)
WHERE subject_type = 'urt_code';
CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date)
WHERE place_id = 'ALL';
v3.1 Fact Population Scope:
| subject_type | Populated | Notes |
|---|---|---|
overall |
✅ Mandatory | Business-wide + per-location |
urt_code |
✅ Mandatory | Per URT code |
domain |
⚡ Derived | Rollup from urt_code at query time |
issue |
🔜 Optional | Recommended for issue timelines (v3.2) |
2.5 Sub-Patterns (Persistent Clustering Results)
-- Stored sub-pattern clustering results
CREATE TABLE subpatterns (
id SERIAL PRIMARY KEY,
-- Parent
subject_type TEXT NOT NULL, -- 'urt_code', 'issue'
subject_id TEXT NOT NULL,
business_id TEXT NOT NULL,
place_id TEXT, -- NULL = all locations
-- Period
period_start DATE NOT NULL,
period_end DATE NOT NULL,
-- Cluster info
cluster_id INT NOT NULL,
label TEXT NOT NULL,
-- Metrics
review_count INT NOT NULL,
percentage FLOAT NOT NULL,
avg_intensity FLOAT,
-- Representative content
representative_review_id TEXT,
representative_quote TEXT,
sharpest_review_id TEXT,
sharpest_quote TEXT,
-- Embedding (for trend matching)
centroid VECTOR(384),
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id)
);
CREATE INDEX idx_subpatterns_lookup ON subpatterns(
subject_type, subject_id, business_id, period_end DESC
);
Part 3: Ingest Layer
3.1 Google Connector
async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]:
"""Fetch new/updated reviews from Google Places API."""
reviews = await google_places_client.get_reviews(place_id, since=since)
for review in reviews:
await store_raw_review(place_id, review)
return reviews
async def store_raw_review(place_id: str, review: dict) -> int:
"""Store immutable raw review payload."""
existing = await db.query_one("""
SELECT id, review_version FROM reviews_raw
WHERE source = 'google' AND review_id = %s
ORDER BY review_version DESC LIMIT 1
""", [review['review_id']])
version = 1
if existing:
if content_changed(existing, review):
version = existing['review_version'] + 1
else:
return existing['id']
return await db.insert("""
INSERT INTO reviews_raw (
source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id,
review_version, pulled_at
) VALUES (
'google', %s, %s, %s,
%s, %s, %s, %s, %s,
%s, NOW()
)
RETURNING id
""", [
review['review_id'], place_id, json.dumps(review),
review.get('text'), review.get('rating'),
review.get('time'), review.get('author_name'), review.get('author_id'),
version
])
3.2 Enrichment Pipeline
async def enrich_review(raw_id: int) -> dict:
"""Full enrichment: normalize → classify → embed → trust score."""
raw = await db.query_one(
"SELECT * FROM reviews_raw WHERE id = %s", [raw_id]
)
# 1. Normalize
text = normalize_text(raw['review_text'])
# 2. Map to business
location = await db.query_one(
"SELECT business_id FROM locations WHERE place_id = %s",
[raw['place_id']]
)
# 3. Parallel: LLM classify + embed
classify_task = asyncio.create_task(classify_review_llm(text))
embed_task = asyncio.create_task(embed_review(text))
classification = await classify_task
embedding = await embed_task
# 4. Trust score
trust_score = compute_trust_score(raw, text, classification)
# 5. Dedup check
dedup_group_id = await find_dedup_group(embedding, raw['place_id'])
# 6. Mark previous versions as not-latest
await db.execute("""
UPDATE reviews_enriched
SET is_latest = FALSE
WHERE source = 'google' AND review_id = %s AND is_latest = TRUE
""", [raw['review_id']])
# 7. Store enriched (versioned)
enriched = {
'source': 'google',
'review_id': raw['review_id'],
'review_version': raw['review_version'],
'is_latest': True,
'raw_id': raw_id,
'business_id': location['business_id'],
'place_id': raw['place_id'],
'text': raw['review_text'],
'text_normalized': text,
'rating': raw['rating'],
'review_time': raw['review_time'],
'language': detect_language(text),
'embedding': embedding,
'trust_score': trust_score,
'dedup_group_id': dedup_group_id,
**classification,
}
await upsert_enriched_review(enriched)
return enriched
3.3 LLM Classification
CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).
Analyze the review and return JSON:
{
"urt_primary": "X1.23",
"urt_secondary": ["Y2.34"],
"valence": "V-",
"intensity": "I2",
"comparative": "CR-N",
"staff_mentions": ["Mike"],
"quotes": {
"X1.23": "exact phrase",
"Y2.34": "another phrase"
},
"confidence": {
"urt_primary": 0.92,
"valence": 0.95,
"intensity": 0.88
}
}
URT DOMAINS:
- O (Offering): Product/service quality, function, completeness
- P (People): Staff attitude, competence, responsiveness
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance
- A (Access): Availability, accessibility, convenience
- V (Value): Price, transparency, worth
- R (Relationship): Trust, dependability, loyalty
RULES:
1. Primary = most impactful topic
2. Secondary must be DIFFERENT domains
3. V± only for genuinely mixed sentiment
4. CR-B/W/S only for explicit self-comparison
5. quotes = EXACT phrases from review
Return valid JSON only."""
async def classify_review_llm(text: str) -> dict:
"""LLM-powered URT classification."""
response = await llm.chat(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": CLASSIFICATION_PROMPT},
{"role": "user", "content": text}
],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.content)
result['classification_model'] = 'gpt-4o-mini'
return result
3.4 Trust Score Computation
def compute_trust_score(raw: dict, text: str, classification: dict) -> float:
"""
Compute trust score (0.0 to 1.0) based on review quality signals.
Low trust = likely spam, fake, or low-quality.
"""
score = 1.0
# Length penalty
word_count = len(text.split())
if word_count < 5:
score *= 0.5
elif word_count > 500:
score *= 0.8
# Rating/sentiment mismatch
rating = raw.get('rating')
valence = classification.get('valence')
if rating and valence:
if rating >= 4 and valence == 'V-':
score *= 0.7
elif rating <= 2 and valence == 'V+':
score *= 0.7
# Generic text patterns
if is_generic_review(text):
score *= 0.6
# LLM confidence
confidence = classification.get('confidence', {})
avg_confidence = np.mean(list(confidence.values())) if confidence else 1.0
if avg_confidence < 0.7:
score *= 0.9
return max(0.1, min(1.0, score))
Part 4: Issue Lifecycle Management
4.1 Issue Aggregation (Relational)
v3.1 Issue Key: (business_id, place_id, primary_subcode) — entity matching is deferred to v3.2.
async def aggregate_to_issue(review: dict) -> Optional[str]:
"""Match review to existing issue or create new one."""
if review['valence'] not in ('V-', 'V±'):
return None
# Find matching open issues using relational query
# v3.1: match on (business_id, place_id, primary_subcode) only
matching = await db.query("""
SELECT i.issue_id, i.primary_subcode, i.span_count
FROM issues i
WHERE i.business_id = %s
AND i.place_id = %s
AND i.primary_subcode = %s
AND i.state NOT IN ('VERIFIED', 'DECLINED')
AND i.created_at > NOW() - INTERVAL '30 days'
ORDER BY i.priority_score DESC
LIMIT 1
""", [review['business_id'], review['place_id'], review['urt_primary']])
if matching:
issue = matching[0]
await add_span_to_issue(issue['issue_id'], review)
return issue['issue_id']
if should_create_issue(review):
return await create_issue(review)
return None
async def add_span_to_issue(issue_id: str, review: dict):
"""Add review span to issue and update counters."""
# Insert span (with denormalized review_time for timeline queries)
await db.execute("""
INSERT INTO issue_spans (issue_id, review_id, is_primary_match, intensity, review_time)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (issue_id, review_id) DO NOTHING
""", [issue_id, review['review_id'], True, review['intensity'], review['review_time']])
# Update issue counters
await db.execute("""
UPDATE issues SET
span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s),
max_intensity = (
SELECT CASE MAX(CASE intensity
WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END)
WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END
FROM issue_spans WHERE issue_id = %s
),
updated_at = NOW()
WHERE issue_id = %s
""", [issue_id, issue_id, issue_id])
await recalculate_priority(issue_id)
await log_issue_event(issue_id, 'span_added', review_id=review['review_id'])
4.2 Priority Scoring (Trust-Weighted)
INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}
async def recalculate_priority(issue_id: str):
"""
Priority = intensity × volume × decay × recurrence × trend × trust
"""
issue = await db.query_one("""
SELECT
i.*,
(SELECT AVG(re.trust_score)
FROM issue_spans s
JOIN reviews_enriched re ON s.review_id = re.review_id
WHERE s.issue_id = i.issue_id AND re.is_latest = TRUE) as avg_trust
FROM issues i
WHERE i.issue_id = %s
""", [issue_id])
intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1)
i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0)
volume_factor = 1 + math.log(max(1, issue['span_count']))
days_old = (datetime.now() - issue['created_at']).days
decay = math.exp(-0.023 * days_old)
recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)
if issue['cr_worse_count'] >= 2:
trend_modifier = 1.3
elif issue['cr_better_count'] >= 2:
trend_modifier = 0.7
else:
trend_modifier = 1.0
trust_factor = issue['avg_trust'] or 1.0
priority = (
i_weight * volume_factor * decay *
recurrence_boost * trend_modifier * trust_factor
)
await db.execute("""
UPDATE issues SET
priority_score = %s,
avg_trust_score = %s,
updated_at = NOW()
WHERE issue_id = %s
""", [priority, issue['avg_trust'], issue_id])
4.3 Issue Review Drill-Down
async def get_issue_reviews(issue_id: str,
sort_by: str = 'date',
limit: int = 50,
offset: int = 0) -> list[dict]:
"""Fetch all reviews for an issue with full details."""
order_clause = {
'date': 's.review_time DESC',
'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END",
'trust': 're.trust_score DESC',
'rating': 're.rating ASC'
}.get(sort_by, 's.review_time DESC')
return await db.query(f"""
SELECT
re.review_id,
re.text,
s.review_time,
re.rating,
re.valence,
s.intensity,
re.comparative,
re.trust_score,
re.quotes,
re.staff_mentions,
s.is_primary_match,
s.weight,
l.display_name as location_name
FROM issue_spans s
JOIN reviews_enriched re ON s.review_id = re.review_id AND re.is_latest = TRUE
JOIN locations l ON (re.business_id, re.place_id) = (l.business_id, l.place_id)
WHERE s.issue_id = %s
ORDER BY {order_clause}
LIMIT %s OFFSET %s
""", [issue_id, limit, offset])
4.4 Strength Score
Strength Score = Σ (intensity_weight)
Where:
I1 (mild) → weight = 1
I2 (moderate) → weight = 2
I3 (strong) → weight = 4
One I3 review = 4 I1 reviews = 2 I2 reviews
Part 5: Analytics Spine (Fact Population)
5.1 Daily Fact Aggregation Job
async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'):
"""
Aggregate reviews into fact_timeseries. Run daily.
v3.1 populates:
- subject_type='overall', subject_id='all' (per location + 'ALL')
- subject_type='urt_code', subject_id=<code> (per location + 'ALL')
Domain rollups are derived at query time from urt_code facts.
Issue facts are optional (recommended for v3.2).
"""
if bucket_type == 'day':
period_start = date
period_end = date + timedelta(days=1)
elif bucket_type == 'week':
period_start = date - timedelta(days=date.weekday())
period_end = period_start + timedelta(days=7)
elif bucket_type == 'month':
period_start = date.replace(day=1)
next_month = period_start.replace(day=28) + timedelta(days=4)
period_end = next_month.replace(day=1)
locations = await db.query(
"SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE",
[business_id]
)
all_place_ids = [loc['place_id'] for loc in locations]
# Per-location facts
for loc in locations:
place_id = loc['place_id']
await populate_location_facts(
business_id, place_id, period_start, period_end, bucket_type
)
# All-locations rollup (place_id='ALL')
await populate_all_locations_facts(
business_id, all_place_ids, period_start, period_end, bucket_type
)
async def populate_location_facts(business_id: str, place_id: str,
period_start: date, period_end: date,
bucket_type: str):
"""Populate facts for a single location."""
# Aggregate by URT code
code_stats = await aggregate_by_code(
business_id, place_id, period_start, period_end
)
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Aggregate overall
overall = await aggregate_overall(
business_id, place_id, period_start, period_end
)
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
async def populate_all_locations_facts(business_id: str, place_ids: list[str],
period_start: date, period_end: date,
bucket_type: str):
"""Populate 'ALL' rollup facts across all locations."""
# Aggregate by URT code across all locations
code_stats = await db.query("""
SELECT
urt_primary as code,
COUNT(*) as review_count,
SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
SUM(CASE intensity
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as strength_score,
AVG(rating) as avg_rating
FROM reviews_enriched
WHERE business_id = %s
AND place_id = ANY(%s)
AND review_time >= %s AND review_time < %s
AND is_latest = TRUE
GROUP BY urt_primary
""", [business_id, place_ids, period_start, period_end])
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id='ALL',
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Overall rollup
overall = await db.query_one("""
SELECT
COUNT(*) as review_count,
SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(rating) as avg_rating
FROM reviews_enriched
WHERE business_id = %s
AND place_id = ANY(%s)
AND review_time >= %s AND review_time < %s
AND is_latest = TRUE
""", [business_id, place_ids, period_start, period_end])
await upsert_fact(
business_id=business_id,
place_id='ALL',
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
5.2 Timeline Query (For Charts)
async def get_timeline(business_id: str,
place_id: Optional[str],
subject_type: str,
subject_id: str,
start: date,
end: date,
bucket_type: str = 'week') -> list[dict]:
"""
Query pre-aggregated facts for line charts.
Args:
place_id: Specific place_id, or None for 'ALL' locations rollup
"""
# Use 'ALL' sentinel for all-locations queries
effective_place_id = place_id if place_id else 'ALL'
return await db.query("""
SELECT
period_date,
review_count,
negative_count,
positive_count,
strength_score,
negative_strength,
avg_rating,
cr_better,
cr_worse,
cr_same
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = %s
AND subject_id = %s
AND bucket_type = %s
AND period_date BETWEEN %s AND %s
ORDER BY period_date
""", [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end])
Part 6: Competitor Analysis
6.1 Competitor Setup (Clean Model)
Competitors are tracked in the competitors table only. They are not injected into locations with fake business_ids. Competitor reviews are stored in reviews_raw and reviews_enriched with business_id = NULL or a dedicated __competitors__ partition.
async def setup_competitor(business_id: str, competitor_place_id: str,
competitor_name: str, relationship: str = 'direct'):
"""Register a competitor for tracking."""
await db.execute("""
INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship)
VALUES (%s, %s, %s, %s)
ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET
competitor_name = EXCLUDED.competitor_name,
relationship = EXCLUDED.relationship
""", [business_id, competitor_place_id, competitor_name, relationship])
async def pull_competitor_reviews(business_id: str):
"""Pull reviews for all tracked competitors."""
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors
WHERE business_id = %s AND is_active = TRUE
""", [business_id])
for comp in competitors:
# Store competitor reviews with special business_id marker
await pull_reviews_for_competitor(
business_id=business_id,
place_id=comp['competitor_place_id']
)
6.2 Competitor Comparison
async def get_competitor_comparison(business_id: str, code: str,
start: date, end: date) -> dict:
"""Compare your URT metrics against competitors."""
# Your metrics (from 'ALL' rollup)
your_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(review_count) as review_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE business_id = %s
AND place_id = 'ALL'
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, code, start, end])
# Competitor metrics
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors WHERE business_id = %s AND is_active = TRUE
""", [business_id])
comparison = {
'your_business': your_metrics or {},
'competitors': []
}
for comp in competitors:
# Query competitor's facts (stored with their place_id)
comp_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(review_count) as review_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE place_id = %s
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [comp['competitor_place_id'], code, start, end])
comparison['competitors'].append({
'name': comp['competitor_name'],
'place_id': comp['competitor_place_id'],
**(comp_metrics or {})
})
return comparison
Part 7: Report Generation (Facts-First)
async def generate_report(business_id: str, place_id: Optional[str],
start: date, end: date) -> dict:
"""Generate report from pre-aggregated facts."""
# Use 'ALL' for cross-location reports
effective_place_id = place_id if place_id else 'ALL'
# 1. Top issues from facts (fast)
top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end)
# 2. Strengths from facts
strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end)
# 3. Sub-patterns (store results)
for issue in top_issues[:5]:
patterns = await discover_and_store_subpatterns(
business_id, effective_place_id, issue['code'], start, end
)
issue['sub_patterns'] = patterns
# 4. Trends from facts
trends = await compute_trends_from_facts(business_id, effective_place_id, start, end)
# 5. Staff analysis
staff = await analyze_staff(business_id, effective_place_id, start, end)
# 6. Competitor benchmarks
competitors = await get_competitor_benchmarks(business_id, start, end)
payload = {
'business_id': business_id,
'place_id': place_id, # Original (None = all)
'period': f"{start} to {end}",
'issues': top_issues,
'strengths': strengths,
'trends': trends,
'staff': staff,
'competitors': competitors,
}
narrative = await generate_narrative(payload)
return {
'payload': payload,
'narrative': narrative,
'generated_at': datetime.now().isoformat()
}
Part 8: KPI-Ready Hooks
8.1 Future KPI Integration (Interface Only)
-- Future: KPI fact table with same grain
CREATE TABLE fact_kpi_timeseries (
id SERIAL PRIMARY KEY,
-- Same join keys as fact_timeseries
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' for rollups
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL,
-- KPI metrics
revenue DECIMAL(12,2),
transactions INT,
cancellations INT,
refunds DECIMAL(12,2),
support_tickets INT,
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type)
);
-- Join reviews and KPIs:
SELECT
r.period_date,
r.negative_strength,
k.revenue,
k.cancellations
FROM fact_timeseries r
JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type)
WHERE r.subject_type = 'overall' AND r.subject_id = 'all';
Part 9: Cost Model
| Stage | When | Cost | Notes |
|---|---|---|---|
| Raw Storage | Per review | $0.00 | ~1KB per review |
| Embedding | Per review | $0.00 | Local model, ~50ms |
| LLM Classification | Per review | ~$0.0002 | GPT-4o-mini |
| Fact Aggregation | Daily job | $0.00 | SQL, <1 minute |
| Sub-Clustering | Per report | $0.00 | HDBSCAN, <1s |
| LLM Narrative | Per report | ~$0.15 | GPT-4o |
Total Costs:
| Volume | Monthly Ingest | Reports (10/mo) | Total |
|---|---|---|---|
| 1K reviews | $0.20 | $1.50 | $1.70 |
| 10K reviews | $2.00 | $1.50 | $3.50 |
| 100K reviews | $20.00 | $1.50 | $21.50 |
Part 10: Key Innovations
| Innovation | Benefit |
|---|---|
| Relational issue_spans | Scales, queryable, no array limits |
| Versioned reviews_enriched | Handles edited reviews correctly |
| Tenant-scoped locations | Same place_id works for multiple businesses |
| place_id='ALL' sentinel | Clean all-locations rollups without NULL handling |
| Unified fact_timeseries spine | Fast dashboards, KPI-joinable later |
| Trust score weighting | Spam resistance without deletion |
| Competitor separate table | Clean model, no fake business_ids |
| Universal join keys | (business_id, place_id, period, bucket) future-proofs KPI integration |
Document Control
| Field | Value |
|---|---|
| Document | ReviewIQ Architecture v3.1.1 |
| Status | Specification Complete (Reviewed) |
| Date | 2026-01-24 |
| Dependencies | URT Specification v5.1, Issue Lifecycle Framework C1 |
| Source | Google Reviews only |
| Cost Target | <$25/month at 100K reviews |
Changelog
| Version | Changes |
|---|---|
| v3.0 | Issue lifecycle, strength scores, timeline charts |
| v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring |
| v3.1.1 | Reviewed: Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup, fixed get_timeline params, clarified issue key scope |
Fixes Applied (v3.1.1)
| Issue | Fix |
|---|---|
| reviews_enriched PK wrong for edits | PK = (source, review_id, review_version) + is_latest flag |
| raw_id ambiguous under versioning | raw_id references specific raw version |
| locations.place_id prevents multi-tenant | PK = (business_id, place_id) (tenant-scoped) |
| Competitor fake business_id pattern | Removed; competitors table is separate, no injection into locations |
| fact_timeseries.place_id NOT NULL blocks rollups | place_id='ALL' sentinel for all-locations |
| get_timeline param ordering bug | Fixed: params built in correct order |
| Issue entity fields but no extraction | Clarified: v3.1 key is (business_id, place_id, primary_subcode) only; entity fields reserved for v3.2 |
| Missing indexes | Added idx_spans_issue_time, FK to locations |
Deferred to v3.2+
| Feature | Reason |
|---|---|
| Entity extraction for issues | Needs NER pipeline |
| journey_step inference | Needs better grounding data |
| intent_signals extraction | Needs action playbooks |
| stability_score tracking | Premature for v1 |
| issue facts in fact_timeseries | Optional performance optimization |
| KPI integration | Placeholder only in v3.1 |
End of ReviewIQ Architecture v3.1.1