diff --git a/.artifacts/ReviewIQ-Architecture-v3.1.md b/.artifacts/ReviewIQ-Architecture-v3.1.md new file mode 100644 index 0000000..9782940 --- /dev/null +++ b/.artifacts/ReviewIQ-Architecture-v3.1.md @@ -0,0 +1,1366 @@ +# ReviewIQ: Review Intelligence Pipeline + +**Version**: 3.1.1 +**Status**: Architecture Specification (Reviewed) +**Date**: 2026-01-24 + +--- + +## Executive Summary + +ReviewIQ v3.1 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version refactors the data model for multi-location support, replaces array-based aggregation with proper relational design, and introduces a unified analytics spine for fast reporting. + +**What's New in v3.1**: +- **Relational issue tracking**: `issue_spans` table replaces `review_ids[]` arrays +- **Multi-location ready**: `place_id` as first-class key throughout +- **Split storage**: `reviews_raw` + `reviews_enriched` for audit and reprocessing +- **Unified analytics spine**: `fact_timeseries` with universal join keys +- **Quality controls**: `trust_score` and `dedup_group_id` for spam filtering +- **Competitor analysis**: Same pipeline, separate tracking table +- **KPI-ready hooks**: Nullable columns for future business metric integration + +**Design Principles**: +- **Google Reviews only** (for now) — but schema is source-agnostic +- **Relational over arrays** — scales, queries, joins +- **Facts-first reporting** — pre-aggregated spine for fast dashboards +- **KPI-joinable** — `(business_id, place_id, period_date, bucket_type)` as universal key +- **Tenant-scoped locations** — same place_id can exist for multiple businesses + +--- + +## Part 1: System Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────────────────┐ +│ REVIEWIQ v3.1 PIPELINE │ +├─────────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ │ +│ │ Google │ │ +│ │ Reviews │ │ +│ │ (API) │ │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────────────┐ │ +│ │ A) SOURCE & STORAGE │ │ +│ │ │ │ +│ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │ +│ │ │ │ +│ └──────────────────────────────────┬──────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────────────┐ │ +│ │ B) ENRICHMENT │ │ +│ │ │ │ +│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ +│ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │ +│ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │ +│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ +│ │ │ │ │ │ +│ │ └────────────────────┬───────────────────────┘ │ │ +│ │ ▼ │ │ +│ │ reviews_enriched (versioned) │ │ +│ │ │ │ +│ └──────────────────────────────┬──────────────────────────────────────────────┘ │ +│ │ │ +│ ┌────────────┴────────────┐ │ +│ ▼ ▼ │ +│ ┌────────────────────────────┐ ┌────────────────────────────────┐ │ +│ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │ +│ │ │ │ │ │ +│ │ reviews_enriched │ │ Daily/Weekly Jobs: │ │ +│ │ │ │ │ │ │ +│ │ ▼ │ │ reviews_enriched │ │ +│ │ Match/Create Issue │ │ │ │ │ +│ │ │ │ │ ▼ │ │ +│ │ ▼ │ │ fact_timeseries │ │ +│ │ issue_spans (link) │ │ (pre-aggregated metrics) │ │ +│ │ │ │ │ │ │ +│ │ ▼ │ │ Keys: │ │ +│ │ issues (update counters) │ │ • business_id │ │ +│ │ │ │ │ • place_id (or 'ALL') │ │ +│ │ ▼ │ │ • subject_type/id │ │ +│ │ issue_events (log) │ │ • period_date │ │ +│ │ │ │ • bucket_type │ │ +│ └────────────────────────────┘ └────────────────────────────────┘ │ +│ │ │ │ +│ └────────────┬────────────┘ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────────────┐ │ +│ │ E) REPORTING │ │ +│ │ │ │ +│ │ fact_timeseries ──┬──▶ Statistics & Trends │ │ +│ │ │ │ │ +│ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │ +│ │ │ │ │ +│ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │ +│ │ │ │ │ +│ │ competitors ──────┴──▶ Benchmark Comparisons │ │ +│ │ │ │ │ +│ │ ▼ │ │ +│ │ LLM Narrative Generation │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Part 2: Data Model (SQL DDL) + +### 2.1 Dimension Tables + +```sql +-- Business locations (multi-tenant: same place_id can exist for multiple businesses) +CREATE TABLE locations ( + business_id TEXT NOT NULL, -- Internal business identifier + place_id TEXT NOT NULL, -- Google Place ID + display_name TEXT NOT NULL, + address TEXT, + city TEXT, + state TEXT, + country TEXT, + timezone TEXT, + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + + PRIMARY KEY (business_id, place_id) +); + +CREATE INDEX idx_locations_place ON locations(place_id); + +-- URT code reference +CREATE TABLE urt_codes ( + code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc. + domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R + category TEXT NOT NULL, + subcategory TEXT NOT NULL, + display_name TEXT NOT NULL, + description TEXT, + keywords TEXT[] -- For search/matching +); + +-- Competitor mapping (separate from locations - no fake business_ids) +CREATE TABLE competitors ( + id SERIAL PRIMARY KEY, + business_id TEXT NOT NULL, -- Your business + competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID + competitor_name TEXT NOT NULL, + relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational' + is_active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(business_id, competitor_place_id) +); + +CREATE INDEX idx_competitors_business ON competitors(business_id); +``` + +### 2.2 Reviews Tables (Raw + Enriched) + +```sql +-- Immutable raw review storage (audit + reprocessing) +CREATE TABLE reviews_raw ( + id SERIAL PRIMARY KEY, + source TEXT NOT NULL DEFAULT 'google', + review_id TEXT NOT NULL, -- Google review ID + place_id TEXT NOT NULL, -- Google Place ID + + -- Raw payload + raw_payload JSONB NOT NULL, -- Complete API response + review_text TEXT, -- Extracted for indexing + rating SMALLINT, + review_time TIMESTAMP, + reviewer_name TEXT, + reviewer_id TEXT, + + -- Versioning (Google reviews can be edited) + review_version INT DEFAULT 1, + + -- Ingestion metadata + pulled_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(source, review_id, review_version) +); + +CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC); +CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id); + +-- Enriched review with LLM classification + embeddings (versioned) +-- Supports edited reviews: each version is a separate row +CREATE TABLE reviews_enriched ( + -- Versioned primary key (handles edited reviews) + source TEXT NOT NULL DEFAULT 'google', + review_id TEXT NOT NULL, -- Matches reviews_raw.review_id + review_version INT NOT NULL DEFAULT 1, + is_latest BOOLEAN NOT NULL DEFAULT TRUE, + + -- Link to raw (specific version) + raw_id INT NOT NULL REFERENCES reviews_raw(id), + + -- Identity + business_id TEXT NOT NULL, + place_id TEXT NOT NULL, + + -- Core content + text TEXT NOT NULL, + text_normalized TEXT, -- Cleaned for processing + rating SMALLINT, + review_time TIMESTAMP NOT NULL, + language TEXT, + + -- URT Classification (from LLM) + urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc. + urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains + valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±' + intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3' + comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S' + + -- Extracted entities (reserved for v3.2, nullable) + staff_mentions TEXT[] DEFAULT '{}', + quotes JSONB, -- {"code": "phrase", ...} + + -- Embedding + embedding VECTOR(384), + + -- Quality control + trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0 + dedup_group_id TEXT, -- Groups duplicate/near-duplicate reviews + is_suspicious BOOLEAN DEFAULT FALSE, + + -- Processing metadata + classification_model TEXT, + classification_confidence JSONB, -- Per-field confidence scores + processed_at TIMESTAMP DEFAULT NOW(), + model_version TEXT, + + -- KPI-ready hooks (nullable, computed later) + kpi_impact_estimate FLOAT, + kpi_last_computed_at TIMESTAMP, + + PRIMARY KEY (source, review_id, review_version) +); + +-- Indexes for common query patterns +CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id) + WHERE is_latest = TRUE; +CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC) + WHERE is_latest = TRUE; +CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC) + WHERE is_latest = TRUE; +CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary) + WHERE is_latest = TRUE; +CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time) + WHERE is_latest = TRUE; +CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative) + WHERE comparative != 'CR-N' AND is_latest = TRUE; +CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score) + WHERE trust_score < 0.5 AND is_latest = TRUE; +CREATE INDEX idx_enriched_embedding ON reviews_enriched + USING hnsw (embedding vector_cosine_ops); + +-- FK to locations (tenant-scoped) +ALTER TABLE reviews_enriched + ADD CONSTRAINT fk_enriched_location + FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id); +``` + +### 2.3 Issue Tables (Relational, No Arrays) + +**Note**: In v3.1, the issue grouping key is `(business_id, place_id, primary_subcode)` only. The `entity` and `entity_normalized` fields are reserved for v3.2+ when entity extraction is implemented. + +```sql +-- Issues (aggregated problems) +CREATE TABLE issues ( + issue_id TEXT PRIMARY KEY, + + -- Grouping keys (v3.1: code + place only) + business_id TEXT NOT NULL, + place_id TEXT NOT NULL, + primary_subcode TEXT NOT NULL, -- URT code + domain CHAR(1) NOT NULL, + + -- State machine + state TEXT NOT NULL DEFAULT 'DETECTED', + priority_score FLOAT NOT NULL, + confidence_score FLOAT NOT NULL, + + -- Aggregated metrics (updated via triggers/jobs) + span_count INT NOT NULL DEFAULT 1, + max_intensity TEXT NOT NULL, + avg_trust_score FLOAT DEFAULT 1.0, + + -- CR counters (rolling 30-day window) + cr_better_count INT DEFAULT 0, + cr_worse_count INT DEFAULT 0, + cr_same_count INT DEFAULT 0, + + -- Star drag proxy (avg rating when this issue present vs absent) + star_drag_estimate FLOAT, + + -- Ownership + owner_team TEXT, + owner_individual TEXT, + + -- Timestamps + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + acknowledged_at TIMESTAMP, + resolved_at TIMESTAMP, + verified_at TIMESTAMP, + + -- Resolution + reopen_count INT DEFAULT 0, + resolution_code TEXT, + resolution_notes TEXT, + decline_reason TEXT, + + -- Context (RESERVED for v3.2 - entity extraction) + entity TEXT, -- Product, staff member, feature + entity_normalized TEXT, + + -- KPI-ready hooks (nullable) + kpi_impact_estimate FLOAT, + kpi_impact_confidence FLOAT, + kpi_last_computed_at TIMESTAMP +); + +CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC); +CREATE INDEX idx_issues_place ON issues(place_id, state); +CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode); +CREATE INDEX idx_issues_open ON issues(business_id) + WHERE state NOT IN ('VERIFIED', 'DECLINED'); + +-- FK to locations (tenant-scoped) +ALTER TABLE issues + ADD CONSTRAINT fk_issues_location + FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id); + +-- Issue spans (link table: issue ↔ review) +CREATE TABLE issue_spans ( + id SERIAL PRIMARY KEY, + issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE, + review_id TEXT NOT NULL, -- Matches reviews_enriched (source, review_id, version) + + -- Span metadata + is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match + weight FLOAT DEFAULT 1.0, -- For weighted aggregation + intensity TEXT NOT NULL, -- Copied from review for fast queries + + -- Denormalized for timeline queries (avoids join) + review_time TIMESTAMP NOT NULL, + + created_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(issue_id, review_id) +); + +CREATE INDEX idx_spans_issue ON issue_spans(issue_id); +CREATE INDEX idx_spans_review ON issue_spans(review_id); +CREATE INDEX idx_spans_issue_time ON issue_spans(issue_id, review_time DESC); + +-- Issue events (audit log) +CREATE TABLE issue_events ( + event_id SERIAL PRIMARY KEY, + issue_id TEXT NOT NULL REFERENCES issues(issue_id), + + event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update' + from_state TEXT, + to_state TEXT, + + actor TEXT, -- User or 'system' + notes TEXT, + review_id TEXT, -- Triggering review if applicable + + metadata JSONB, -- Additional context + created_at TIMESTAMP DEFAULT NOW() +); + +CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC); +``` + +### 2.4 Unified Analytics Spine + +**Design Decision**: `place_id = 'ALL'` is the sentinel for "all locations" rollups. This avoids NULL handling complexity while keeping the schema simple. + +```sql +-- Fact table: pre-aggregated time-series metrics +CREATE TABLE fact_timeseries ( + id SERIAL PRIMARY KEY, + + -- Universal join keys (KPI-ready) + business_id TEXT NOT NULL, + place_id TEXT NOT NULL, -- 'ALL' = all locations rollup + period_date DATE NOT NULL, + bucket_type TEXT NOT NULL, -- 'day', 'week', 'month' + + -- Subject (what we're measuring) + subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall' + subject_id TEXT NOT NULL, -- Code, domain letter, or 'all' + + -- Volume metrics + review_count INT NOT NULL DEFAULT 0, + negative_count INT NOT NULL DEFAULT 0, + positive_count INT NOT NULL DEFAULT 0, + neutral_count INT NOT NULL DEFAULT 0, + mixed_count INT NOT NULL DEFAULT 0, + + -- Strength metrics (intensity-weighted) + strength_score FLOAT NOT NULL DEFAULT 0, + negative_strength FLOAT NOT NULL DEFAULT 0, + positive_strength FLOAT NOT NULL DEFAULT 0, + + -- Rating metrics + avg_rating FLOAT, + rating_count INT DEFAULT 0, + + -- Intensity distribution + i1_count INT DEFAULT 0, + i2_count INT DEFAULT 0, + i3_count INT DEFAULT 0, + + -- CR signals + cr_better INT DEFAULT 0, + cr_worse INT DEFAULT 0, + cr_same INT DEFAULT 0, + + -- Trust-weighted variants + trust_weighted_strength FLOAT, + trust_weighted_negative FLOAT, + + -- Metadata + computed_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id) +); + +-- Validate 'ALL' sentinel +ALTER TABLE fact_timeseries + ADD CONSTRAINT chk_place_id_format + CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$'); + +-- Optimized indexes for reporting queries +CREATE INDEX idx_facts_lookup ON fact_timeseries( + business_id, place_id, subject_type, subject_id, period_date DESC +); +CREATE INDEX idx_facts_period ON fact_timeseries( + business_id, period_date, bucket_type +); +CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id) + WHERE subject_type = 'urt_code'; +CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date) + WHERE place_id = 'ALL'; +``` + +**v3.1 Fact Population Scope**: +| subject_type | Populated | Notes | +|--------------|-----------|-------| +| `overall` | ✅ Mandatory | Business-wide + per-location | +| `urt_code` | ✅ Mandatory | Per URT code | +| `domain` | ⚡ Derived | Rollup from urt_code at query time | +| `issue` | 🔜 Optional | Recommended for issue timelines (v3.2) | + +### 2.5 Sub-Patterns (Persistent Clustering Results) + +```sql +-- Stored sub-pattern clustering results +CREATE TABLE subpatterns ( + id SERIAL PRIMARY KEY, + + -- Parent + subject_type TEXT NOT NULL, -- 'urt_code', 'issue' + subject_id TEXT NOT NULL, + business_id TEXT NOT NULL, + place_id TEXT, -- NULL = all locations + + -- Period + period_start DATE NOT NULL, + period_end DATE NOT NULL, + + -- Cluster info + cluster_id INT NOT NULL, + label TEXT NOT NULL, + + -- Metrics + review_count INT NOT NULL, + percentage FLOAT NOT NULL, + avg_intensity FLOAT, + + -- Representative content + representative_review_id TEXT, + representative_quote TEXT, + sharpest_review_id TEXT, + sharpest_quote TEXT, + + -- Embedding (for trend matching) + centroid VECTOR(384), + + -- Metadata + computed_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id) +); + +CREATE INDEX idx_subpatterns_lookup ON subpatterns( + subject_type, subject_id, business_id, period_end DESC +); +``` + +--- + +## Part 3: Ingest Layer + +### 3.1 Google Connector + +```python +async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]: + """Fetch new/updated reviews from Google Places API.""" + + reviews = await google_places_client.get_reviews(place_id, since=since) + + for review in reviews: + await store_raw_review(place_id, review) + + return reviews + + +async def store_raw_review(place_id: str, review: dict) -> int: + """Store immutable raw review payload.""" + + existing = await db.query_one(""" + SELECT id, review_version FROM reviews_raw + WHERE source = 'google' AND review_id = %s + ORDER BY review_version DESC LIMIT 1 + """, [review['review_id']]) + + version = 1 + if existing: + if content_changed(existing, review): + version = existing['review_version'] + 1 + else: + return existing['id'] + + return await db.insert(""" + INSERT INTO reviews_raw ( + source, review_id, place_id, raw_payload, + review_text, rating, review_time, reviewer_name, reviewer_id, + review_version, pulled_at + ) VALUES ( + 'google', %s, %s, %s, + %s, %s, %s, %s, %s, + %s, NOW() + ) + RETURNING id + """, [ + review['review_id'], place_id, json.dumps(review), + review.get('text'), review.get('rating'), + review.get('time'), review.get('author_name'), review.get('author_id'), + version + ]) +``` + +### 3.2 Enrichment Pipeline + +```python +async def enrich_review(raw_id: int) -> dict: + """Full enrichment: normalize → classify → embed → trust score.""" + + raw = await db.query_one( + "SELECT * FROM reviews_raw WHERE id = %s", [raw_id] + ) + + # 1. Normalize + text = normalize_text(raw['review_text']) + + # 2. Map to business + location = await db.query_one( + "SELECT business_id FROM locations WHERE place_id = %s", + [raw['place_id']] + ) + + # 3. Parallel: LLM classify + embed + classify_task = asyncio.create_task(classify_review_llm(text)) + embed_task = asyncio.create_task(embed_review(text)) + + classification = await classify_task + embedding = await embed_task + + # 4. Trust score + trust_score = compute_trust_score(raw, text, classification) + + # 5. Dedup check + dedup_group_id = await find_dedup_group(embedding, raw['place_id']) + + # 6. Mark previous versions as not-latest + await db.execute(""" + UPDATE reviews_enriched + SET is_latest = FALSE + WHERE source = 'google' AND review_id = %s AND is_latest = TRUE + """, [raw['review_id']]) + + # 7. Store enriched (versioned) + enriched = { + 'source': 'google', + 'review_id': raw['review_id'], + 'review_version': raw['review_version'], + 'is_latest': True, + 'raw_id': raw_id, + 'business_id': location['business_id'], + 'place_id': raw['place_id'], + 'text': raw['review_text'], + 'text_normalized': text, + 'rating': raw['rating'], + 'review_time': raw['review_time'], + 'language': detect_language(text), + 'embedding': embedding, + 'trust_score': trust_score, + 'dedup_group_id': dedup_group_id, + **classification, + } + + await upsert_enriched_review(enriched) + + return enriched +``` + +### 3.3 LLM Classification + +```python +CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT). + +Analyze the review and return JSON: + +{ + "urt_primary": "X1.23", + "urt_secondary": ["Y2.34"], + "valence": "V-", + "intensity": "I2", + "comparative": "CR-N", + "staff_mentions": ["Mike"], + "quotes": { + "X1.23": "exact phrase", + "Y2.34": "another phrase" + }, + "confidence": { + "urt_primary": 0.92, + "valence": 0.95, + "intensity": 0.88 + } +} + +URT DOMAINS: +- O (Offering): Product/service quality, function, completeness +- P (People): Staff attitude, competence, responsiveness +- J (Journey): Timing, ease, reliability, resolution +- E (Environment): Physical space, digital interface, ambiance +- A (Access): Availability, accessibility, convenience +- V (Value): Price, transparency, worth +- R (Relationship): Trust, dependability, loyalty + +RULES: +1. Primary = most impactful topic +2. Secondary must be DIFFERENT domains +3. V± only for genuinely mixed sentiment +4. CR-B/W/S only for explicit self-comparison +5. quotes = EXACT phrases from review + +Return valid JSON only.""" + + +async def classify_review_llm(text: str) -> dict: + """LLM-powered URT classification.""" + + response = await llm.chat( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": CLASSIFICATION_PROMPT}, + {"role": "user", "content": text} + ], + response_format={"type": "json_object"}, + temperature=0.1 + ) + + result = json.loads(response.content) + result['classification_model'] = 'gpt-4o-mini' + + return result +``` + +### 3.4 Trust Score Computation + +```python +def compute_trust_score(raw: dict, text: str, classification: dict) -> float: + """ + Compute trust score (0.0 to 1.0) based on review quality signals. + Low trust = likely spam, fake, or low-quality. + """ + score = 1.0 + + # Length penalty + word_count = len(text.split()) + if word_count < 5: + score *= 0.5 + elif word_count > 500: + score *= 0.8 + + # Rating/sentiment mismatch + rating = raw.get('rating') + valence = classification.get('valence') + if rating and valence: + if rating >= 4 and valence == 'V-': + score *= 0.7 + elif rating <= 2 and valence == 'V+': + score *= 0.7 + + # Generic text patterns + if is_generic_review(text): + score *= 0.6 + + # LLM confidence + confidence = classification.get('confidence', {}) + avg_confidence = np.mean(list(confidence.values())) if confidence else 1.0 + if avg_confidence < 0.7: + score *= 0.9 + + return max(0.1, min(1.0, score)) +``` + +--- + +## Part 4: Issue Lifecycle Management + +### 4.1 Issue Aggregation (Relational) + +**v3.1 Issue Key**: `(business_id, place_id, primary_subcode)` — entity matching is deferred to v3.2. + +```python +async def aggregate_to_issue(review: dict) -> Optional[str]: + """Match review to existing issue or create new one.""" + + if review['valence'] not in ('V-', 'V±'): + return None + + # Find matching open issues using relational query + # v3.1: match on (business_id, place_id, primary_subcode) only + matching = await db.query(""" + SELECT i.issue_id, i.primary_subcode, i.span_count + FROM issues i + WHERE i.business_id = %s + AND i.place_id = %s + AND i.primary_subcode = %s + AND i.state NOT IN ('VERIFIED', 'DECLINED') + AND i.created_at > NOW() - INTERVAL '30 days' + ORDER BY i.priority_score DESC + LIMIT 1 + """, [review['business_id'], review['place_id'], review['urt_primary']]) + + if matching: + issue = matching[0] + await add_span_to_issue(issue['issue_id'], review) + return issue['issue_id'] + + if should_create_issue(review): + return await create_issue(review) + + return None + + +async def add_span_to_issue(issue_id: str, review: dict): + """Add review span to issue and update counters.""" + + # Insert span (with denormalized review_time for timeline queries) + await db.execute(""" + INSERT INTO issue_spans (issue_id, review_id, is_primary_match, intensity, review_time) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (issue_id, review_id) DO NOTHING + """, [issue_id, review['review_id'], True, review['intensity'], review['review_time']]) + + # Update issue counters + await db.execute(""" + UPDATE issues SET + span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s), + max_intensity = ( + SELECT CASE MAX(CASE intensity + WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END) + WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END + FROM issue_spans WHERE issue_id = %s + ), + updated_at = NOW() + WHERE issue_id = %s + """, [issue_id, issue_id, issue_id]) + + await recalculate_priority(issue_id) + await log_issue_event(issue_id, 'span_added', review_id=review['review_id']) +``` + +### 4.2 Priority Scoring (Trust-Weighted) + +```python +INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0} + +async def recalculate_priority(issue_id: str): + """ + Priority = intensity × volume × decay × recurrence × trend × trust + """ + + issue = await db.query_one(""" + SELECT + i.*, + (SELECT AVG(re.trust_score) + FROM issue_spans s + JOIN reviews_enriched re ON s.review_id = re.review_id + WHERE s.issue_id = i.issue_id AND re.is_latest = TRUE) as avg_trust + FROM issues i + WHERE i.issue_id = %s + """, [issue_id]) + + intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1) + i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0) + + volume_factor = 1 + math.log(max(1, issue['span_count'])) + + days_old = (datetime.now() - issue['created_at']).days + decay = math.exp(-0.023 * days_old) + + recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1) + + if issue['cr_worse_count'] >= 2: + trend_modifier = 1.3 + elif issue['cr_better_count'] >= 2: + trend_modifier = 0.7 + else: + trend_modifier = 1.0 + + trust_factor = issue['avg_trust'] or 1.0 + + priority = ( + i_weight * volume_factor * decay * + recurrence_boost * trend_modifier * trust_factor + ) + + await db.execute(""" + UPDATE issues SET + priority_score = %s, + avg_trust_score = %s, + updated_at = NOW() + WHERE issue_id = %s + """, [priority, issue['avg_trust'], issue_id]) +``` + +### 4.3 Issue Review Drill-Down + +```python +async def get_issue_reviews(issue_id: str, + sort_by: str = 'date', + limit: int = 50, + offset: int = 0) -> list[dict]: + """Fetch all reviews for an issue with full details.""" + + order_clause = { + 'date': 's.review_time DESC', + 'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END", + 'trust': 're.trust_score DESC', + 'rating': 're.rating ASC' + }.get(sort_by, 's.review_time DESC') + + return await db.query(f""" + SELECT + re.review_id, + re.text, + s.review_time, + re.rating, + re.valence, + s.intensity, + re.comparative, + re.trust_score, + re.quotes, + re.staff_mentions, + s.is_primary_match, + s.weight, + l.display_name as location_name + FROM issue_spans s + JOIN reviews_enriched re ON s.review_id = re.review_id AND re.is_latest = TRUE + JOIN locations l ON (re.business_id, re.place_id) = (l.business_id, l.place_id) + WHERE s.issue_id = %s + ORDER BY {order_clause} + LIMIT %s OFFSET %s + """, [issue_id, limit, offset]) +``` + +### 4.4 Strength Score + +``` +Strength Score = Σ (intensity_weight) + +Where: + I1 (mild) → weight = 1 + I2 (moderate) → weight = 2 + I3 (strong) → weight = 4 + +One I3 review = 4 I1 reviews = 2 I2 reviews +``` + +--- + +## Part 5: Analytics Spine (Fact Population) + +### 5.1 Daily Fact Aggregation Job + +```python +async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'): + """ + Aggregate reviews into fact_timeseries. Run daily. + + v3.1 populates: + - subject_type='overall', subject_id='all' (per location + 'ALL') + - subject_type='urt_code', subject_id= (per location + 'ALL') + + Domain rollups are derived at query time from urt_code facts. + Issue facts are optional (recommended for v3.2). + """ + + if bucket_type == 'day': + period_start = date + period_end = date + timedelta(days=1) + elif bucket_type == 'week': + period_start = date - timedelta(days=date.weekday()) + period_end = period_start + timedelta(days=7) + elif bucket_type == 'month': + period_start = date.replace(day=1) + next_month = period_start.replace(day=28) + timedelta(days=4) + period_end = next_month.replace(day=1) + + locations = await db.query( + "SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE", + [business_id] + ) + + all_place_ids = [loc['place_id'] for loc in locations] + + # Per-location facts + for loc in locations: + place_id = loc['place_id'] + await populate_location_facts( + business_id, place_id, period_start, period_end, bucket_type + ) + + # All-locations rollup (place_id='ALL') + await populate_all_locations_facts( + business_id, all_place_ids, period_start, period_end, bucket_type + ) + + +async def populate_location_facts(business_id: str, place_id: str, + period_start: date, period_end: date, + bucket_type: str): + """Populate facts for a single location.""" + + # Aggregate by URT code + code_stats = await aggregate_by_code( + business_id, place_id, period_start, period_end + ) + + for stat in code_stats: + await upsert_fact( + business_id=business_id, + place_id=place_id, + period_date=period_start, + bucket_type=bucket_type, + subject_type='urt_code', + subject_id=stat['code'], + metrics=stat + ) + + # Aggregate overall + overall = await aggregate_overall( + business_id, place_id, period_start, period_end + ) + await upsert_fact( + business_id=business_id, + place_id=place_id, + period_date=period_start, + bucket_type=bucket_type, + subject_type='overall', + subject_id='all', + metrics=overall + ) + + +async def populate_all_locations_facts(business_id: str, place_ids: list[str], + period_start: date, period_end: date, + bucket_type: str): + """Populate 'ALL' rollup facts across all locations.""" + + # Aggregate by URT code across all locations + code_stats = await db.query(""" + SELECT + urt_primary as code, + COUNT(*) as review_count, + SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count, + SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count, + SUM(CASE intensity + WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 + END) as strength_score, + AVG(rating) as avg_rating + FROM reviews_enriched + WHERE business_id = %s + AND place_id = ANY(%s) + AND review_time >= %s AND review_time < %s + AND is_latest = TRUE + GROUP BY urt_primary + """, [business_id, place_ids, period_start, period_end]) + + for stat in code_stats: + await upsert_fact( + business_id=business_id, + place_id='ALL', + period_date=period_start, + bucket_type=bucket_type, + subject_type='urt_code', + subject_id=stat['code'], + metrics=stat + ) + + # Overall rollup + overall = await db.query_one(""" + SELECT + COUNT(*) as review_count, + SUM(CASE WHEN valence = 'V-' THEN 1 ELSE 0 END) as negative_count, + SUM(CASE WHEN valence = 'V+' THEN 1 ELSE 0 END) as positive_count, + AVG(rating) as avg_rating + FROM reviews_enriched + WHERE business_id = %s + AND place_id = ANY(%s) + AND review_time >= %s AND review_time < %s + AND is_latest = TRUE + """, [business_id, place_ids, period_start, period_end]) + + await upsert_fact( + business_id=business_id, + place_id='ALL', + period_date=period_start, + bucket_type=bucket_type, + subject_type='overall', + subject_id='all', + metrics=overall + ) +``` + +### 5.2 Timeline Query (For Charts) + +```python +async def get_timeline(business_id: str, + place_id: Optional[str], + subject_type: str, + subject_id: str, + start: date, + end: date, + bucket_type: str = 'week') -> list[dict]: + """ + Query pre-aggregated facts for line charts. + + Args: + place_id: Specific place_id, or None for 'ALL' locations rollup + """ + + # Use 'ALL' sentinel for all-locations queries + effective_place_id = place_id if place_id else 'ALL' + + return await db.query(""" + SELECT + period_date, + review_count, + negative_count, + positive_count, + strength_score, + negative_strength, + avg_rating, + cr_better, + cr_worse, + cr_same + FROM fact_timeseries + WHERE business_id = %s + AND place_id = %s + AND subject_type = %s + AND subject_id = %s + AND bucket_type = %s + AND period_date BETWEEN %s AND %s + ORDER BY period_date + """, [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end]) +``` + +--- + +## Part 6: Competitor Analysis + +### 6.1 Competitor Setup (Clean Model) + +Competitors are tracked in the `competitors` table only. They are **not** injected into `locations` with fake business_ids. Competitor reviews are stored in `reviews_raw` and `reviews_enriched` with `business_id = NULL` or a dedicated `__competitors__` partition. + +```python +async def setup_competitor(business_id: str, competitor_place_id: str, + competitor_name: str, relationship: str = 'direct'): + """Register a competitor for tracking.""" + + await db.execute(""" + INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship) + VALUES (%s, %s, %s, %s) + ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET + competitor_name = EXCLUDED.competitor_name, + relationship = EXCLUDED.relationship + """, [business_id, competitor_place_id, competitor_name, relationship]) + + +async def pull_competitor_reviews(business_id: str): + """Pull reviews for all tracked competitors.""" + + competitors = await db.query(""" + SELECT competitor_place_id, competitor_name + FROM competitors + WHERE business_id = %s AND is_active = TRUE + """, [business_id]) + + for comp in competitors: + # Store competitor reviews with special business_id marker + await pull_reviews_for_competitor( + business_id=business_id, + place_id=comp['competitor_place_id'] + ) +``` + +### 6.2 Competitor Comparison + +```python +async def get_competitor_comparison(business_id: str, code: str, + start: date, end: date) -> dict: + """Compare your URT metrics against competitors.""" + + # Your metrics (from 'ALL' rollup) + your_metrics = await db.query_one(""" + SELECT + SUM(negative_strength) as negative_strength, + SUM(review_count) as review_count, + AVG(avg_rating) as avg_rating + FROM fact_timeseries + WHERE business_id = %s + AND place_id = 'ALL' + AND subject_type = 'urt_code' + AND subject_id = %s + AND period_date BETWEEN %s AND %s + """, [business_id, code, start, end]) + + # Competitor metrics + competitors = await db.query(""" + SELECT competitor_place_id, competitor_name + FROM competitors WHERE business_id = %s AND is_active = TRUE + """, [business_id]) + + comparison = { + 'your_business': your_metrics or {}, + 'competitors': [] + } + + for comp in competitors: + # Query competitor's facts (stored with their place_id) + comp_metrics = await db.query_one(""" + SELECT + SUM(negative_strength) as negative_strength, + SUM(review_count) as review_count, + AVG(avg_rating) as avg_rating + FROM fact_timeseries + WHERE place_id = %s + AND subject_type = 'urt_code' + AND subject_id = %s + AND period_date BETWEEN %s AND %s + """, [comp['competitor_place_id'], code, start, end]) + + comparison['competitors'].append({ + 'name': comp['competitor_name'], + 'place_id': comp['competitor_place_id'], + **(comp_metrics or {}) + }) + + return comparison +``` + +--- + +## Part 7: Report Generation (Facts-First) + +```python +async def generate_report(business_id: str, place_id: Optional[str], + start: date, end: date) -> dict: + """Generate report from pre-aggregated facts.""" + + # Use 'ALL' for cross-location reports + effective_place_id = place_id if place_id else 'ALL' + + # 1. Top issues from facts (fast) + top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end) + + # 2. Strengths from facts + strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end) + + # 3. Sub-patterns (store results) + for issue in top_issues[:5]: + patterns = await discover_and_store_subpatterns( + business_id, effective_place_id, issue['code'], start, end + ) + issue['sub_patterns'] = patterns + + # 4. Trends from facts + trends = await compute_trends_from_facts(business_id, effective_place_id, start, end) + + # 5. Staff analysis + staff = await analyze_staff(business_id, effective_place_id, start, end) + + # 6. Competitor benchmarks + competitors = await get_competitor_benchmarks(business_id, start, end) + + payload = { + 'business_id': business_id, + 'place_id': place_id, # Original (None = all) + 'period': f"{start} to {end}", + 'issues': top_issues, + 'strengths': strengths, + 'trends': trends, + 'staff': staff, + 'competitors': competitors, + } + + narrative = await generate_narrative(payload) + + return { + 'payload': payload, + 'narrative': narrative, + 'generated_at': datetime.now().isoformat() + } +``` + +--- + +## Part 8: KPI-Ready Hooks + +### 8.1 Future KPI Integration (Interface Only) + +```sql +-- Future: KPI fact table with same grain +CREATE TABLE fact_kpi_timeseries ( + id SERIAL PRIMARY KEY, + + -- Same join keys as fact_timeseries + business_id TEXT NOT NULL, + place_id TEXT NOT NULL, -- 'ALL' for rollups + period_date DATE NOT NULL, + bucket_type TEXT NOT NULL, + + -- KPI metrics + revenue DECIMAL(12,2), + transactions INT, + cancellations INT, + refunds DECIMAL(12,2), + support_tickets INT, + + computed_at TIMESTAMP DEFAULT NOW(), + + UNIQUE(business_id, place_id, period_date, bucket_type) +); + +-- Join reviews and KPIs: +SELECT + r.period_date, + r.negative_strength, + k.revenue, + k.cancellations +FROM fact_timeseries r +JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type) +WHERE r.subject_type = 'overall' AND r.subject_id = 'all'; +``` + +--- + +## Part 9: Cost Model + +| Stage | When | Cost | Notes | +|-------|------|------|-------| +| **Raw Storage** | Per review | $0.00 | ~1KB per review | +| **Embedding** | Per review | $0.00 | Local model, ~50ms | +| **LLM Classification** | Per review | ~$0.0002 | GPT-4o-mini | +| **Fact Aggregation** | Daily job | $0.00 | SQL, <1 minute | +| **Sub-Clustering** | Per report | $0.00 | HDBSCAN, <1s | +| **LLM Narrative** | Per report | ~$0.15 | GPT-4o | + +**Total Costs:** + +| Volume | Monthly Ingest | Reports (10/mo) | Total | +|--------|---------------|-----------------|-------| +| 1K reviews | $0.20 | $1.50 | **$1.70** | +| 10K reviews | $2.00 | $1.50 | **$3.50** | +| 100K reviews | $20.00 | $1.50 | **$21.50** | + +--- + +## Part 10: Key Innovations + +| Innovation | Benefit | +|------------|---------| +| **Relational issue_spans** | Scales, queryable, no array limits | +| **Versioned reviews_enriched** | Handles edited reviews correctly | +| **Tenant-scoped locations** | Same place_id works for multiple businesses | +| **place_id='ALL' sentinel** | Clean all-locations rollups without NULL handling | +| **Unified fact_timeseries spine** | Fast dashboards, KPI-joinable later | +| **Trust score weighting** | Spam resistance without deletion | +| **Competitor separate table** | Clean model, no fake business_ids | +| **Universal join keys** | `(business_id, place_id, period, bucket)` future-proofs KPI integration | + +--- + +## Document Control + +| Field | Value | +|-------|-------| +| **Document** | ReviewIQ Architecture v3.1.1 | +| **Status** | Specification Complete (Reviewed) | +| **Date** | 2026-01-24 | +| **Dependencies** | URT Specification v5.1, Issue Lifecycle Framework C1 | +| **Source** | Google Reviews only | +| **Cost Target** | <$25/month at 100K reviews | + +### Changelog + +| Version | Changes | +|---------|---------| +| v3.0 | Issue lifecycle, strength scores, timeline charts | +| v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring | +| v3.1.1 | **Reviewed**: Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup, fixed get_timeline params, clarified issue key scope | + +### Fixes Applied (v3.1.1) + +| Issue | Fix | +|-------|-----| +| reviews_enriched PK wrong for edits | PK = `(source, review_id, review_version)` + `is_latest` flag | +| raw_id ambiguous under versioning | raw_id references specific raw version | +| locations.place_id prevents multi-tenant | PK = `(business_id, place_id)` (tenant-scoped) | +| Competitor fake business_id pattern | Removed; competitors table is separate, no injection into locations | +| fact_timeseries.place_id NOT NULL blocks rollups | `place_id='ALL'` sentinel for all-locations | +| get_timeline param ordering bug | Fixed: params built in correct order | +| Issue entity fields but no extraction | Clarified: v3.1 key is `(business_id, place_id, primary_subcode)` only; entity fields reserved for v3.2 | +| Missing indexes | Added `idx_spans_issue_time`, FK to locations | + +### Deferred to v3.2+ + +| Feature | Reason | +|---------|--------| +| Entity extraction for issues | Needs NER pipeline | +| journey_step inference | Needs better grounding data | +| intent_signals extraction | Needs action playbooks | +| stability_score tracking | Premature for v1 | +| issue facts in fact_timeseries | Optional performance optimization | +| KPI integration | Placeholder only in v3.1 | + +--- + +*End of ReviewIQ Architecture v3.1.1*