Files
whyrating-engine-legacy/.artifacts/ReviewIQ-Architecture-v3.2.md
Alejandro Gutiérrez 43fd1515d2 Align artifacts with canonical URT v5.1 specification
Fixes inconsistencies discovered during audit against urt-taxonomy/:

- urt_profile ENUM: Add 'lite' and 'core' profiles (was missing)
- USN format: Use canonical regex from spec (was non-compliant)
- USN valence encoding: Add V0 (0) and V± (±) support
- USN grammar: Add Lite (URT:L:) and Core (URT:C:) formats
- Dimension codes: Fix temporal (TC/TR/TH/TF), evidence (ES/EI/EC),
  comparative (CR-N/CR-B/CR-W/CR-S) in decisions doc
- LLM contract: Full USN regex validation pattern

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 16:21:21 +00:00

2312 lines
84 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ReviewIQ: Review Intelligence Pipeline
**Version**: 3.2.0
**Status**: Architecture Specification (Reviewed)
**Date**: 2026-01-24
---
## Executive Summary
ReviewIQ v3.2 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version introduces the **span layer** — a fine-grained extraction model that identifies and classifies individual semantic units within each review, enabling richer issue routing, causal analysis, and entity-aware aggregation.
**What's New in v3.2**:
- **Span layer**: `review_spans` table extracts individual semantic units from review text
- **URT ENUM types**: Strongly-typed classification fields with database-enforced constraints
- **Causal chain support**: `profile='full'` spans can capture cause/effect relationships
- **Entity extraction**: Named entities (staff, products, locations) linked to spans
- **Reprocessing pattern**: Soft-switch `is_active` flag for atomic span replacement
- **Deterministic issue routing**: SHA256-based issue IDs from grouping keys
- **1:1 span-to-issue mapping**: Each span belongs to exactly one issue
**Design Principles**:
- **Google Reviews only** (for now) — but schema is source-agnostic
- **Relational over arrays** — scales, queries, joins
- **Facts-first reporting** — pre-aggregated spine for fast dashboards
- **KPI-joinable** — `(business_id, place_id, period_date, bucket_type)` as universal key
- **Tenant-scoped locations** — same place_id can exist for multiple businesses
- **Span-first classification** — spans are the atomic unit of analysis; review-level is derived
---
## Part 1: System Architecture
```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ REVIEWIQ v3.2 PIPELINE │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Google │ │
│ │ Reviews │ │
│ │ (API) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ A) SOURCE & STORAGE │ │
│ │ │ │
│ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │
│ │ │ │
│ └──────────────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ B) ENRICHMENT │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │
│ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │
│ │ └──────────────┴─────────────┬───────────────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ reviews_enriched │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ review_spans (NEW) │◀── Span Extraction │ │
│ │ │ (per-span classification) │ │ │
│ │ └────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────┬──────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────────────────────────┐ ┌────────────────────────────────┐ │
│ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │
│ │ │ │ │ │
│ │ review_spans │ │ Daily/Weekly Jobs: │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ review_spans │ │
│ │ issue_spans (1:1 link) │ │ │ │ │
│ │ │ │ │ ▼ │ │
│ │ ▼ │ │ fact_timeseries │ │
│ │ issues (update counters) │ │ (pre-aggregated metrics) │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ Keys: │ │
│ │ issue_events (log) │ │ • business_id │ │
│ │ │ │ • place_id (or 'ALL') │ │
│ └────────────────────────────┘ │ • subject_type/id │ │
│ │ │ • period_date │ │
│ │ │ • bucket_type │ │
│ │ └────────────────────────────────┘ │
│ └────────────┬────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ E) REPORTING │ │
│ │ │ │
│ │ fact_timeseries ──┬──▶ Statistics & Trends │ │
│ │ │ │ │
│ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │
│ │ │ │ │
│ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │
│ │ │ │ │
│ │ competitors ──────┴──▶ Benchmark Comparisons │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Narrative Generation │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
```
---
## Part 2: Data Model (SQL DDL)
### 2.0 Required Extensions
```sql
-- btree_gist: Enables GiST index on btree-compatible types (for exclusion constraints)
CREATE EXTENSION IF NOT EXISTS btree_gist;
-- pgcrypto: Provides cryptographic functions (for SHA256-based ID generation)
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- pgvector: Vector similarity search (for embeddings)
CREATE EXTENSION IF NOT EXISTS vector;
```
### 2.1 ENUM Types
```sql
-- URT classification enums (strongly-typed, database-enforced)
CREATE TYPE urt_valence AS ENUM ('V+', 'V-', 'V0', '');
CREATE TYPE urt_intensity AS ENUM ('I1', 'I2', 'I3');
CREATE TYPE urt_specificity AS ENUM ('S1', 'S2', 'S3');
CREATE TYPE urt_actionability AS ENUM ('A1', 'A2', 'A3');
CREATE TYPE urt_temporal AS ENUM ('TC', 'TR', 'TH', 'TF');
CREATE TYPE urt_evidence AS ENUM ('ES', 'EI', 'EC');
CREATE TYPE urt_comparative AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S');
CREATE TYPE urt_profile AS ENUM ('lite', 'core', 'standard', 'full');
CREATE TYPE urt_confidence AS ENUM ('high', 'medium', 'low');
CREATE TYPE urt_relation AS ENUM ('cause_of', 'effect_of', 'contrast', 'resolution');
CREATE TYPE urt_entity_type AS ENUM ('location', 'staff', 'product', 'process', 'time', 'other');
```
### 2.2 Dimension Tables
```sql
-- Business locations (multi-tenant: same place_id can exist for multiple businesses)
-- Includes both owned locations and tracked competitor locations
CREATE TABLE locations (
business_id TEXT NOT NULL, -- Internal business identifier
place_id TEXT NOT NULL, -- Google Place ID
location_type TEXT NOT NULL DEFAULT 'owned'
CHECK (location_type IN ('owned', 'competitor')),
display_name TEXT NOT NULL,
address TEXT,
city TEXT,
state TEXT,
country TEXT,
timezone TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (business_id, place_id)
);
CREATE INDEX idx_locations_place ON locations(place_id);
CREATE INDEX idx_locations_owned ON locations(business_id)
WHERE location_type = 'owned';
-- URT code reference
CREATE TABLE urt_codes (
code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc.
domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R
category TEXT NOT NULL,
subcategory TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
keywords TEXT[] -- For search/matching
);
-- Competitor mapping (separate from locations - no fake business_ids)
CREATE TABLE competitors (
id SERIAL PRIMARY KEY,
business_id TEXT NOT NULL, -- Your business
competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID
competitor_name TEXT NOT NULL,
relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational'
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, competitor_place_id)
);
CREATE INDEX idx_competitors_business ON competitors(business_id);
```
### 2.3 Reviews Tables (Raw + Enriched)
```sql
-- Immutable raw review storage (audit + reprocessing)
CREATE TABLE reviews_raw (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Google review ID
place_id TEXT NOT NULL, -- Google Place ID
-- Raw payload
raw_payload JSONB NOT NULL, -- Complete API response
review_text TEXT, -- Extracted for indexing
rating SMALLINT,
review_time TIMESTAMP,
reviewer_name TEXT,
reviewer_id TEXT,
-- Versioning (Google reviews can be edited)
review_version INT DEFAULT 1,
-- Ingestion metadata
pulled_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source, review_id, review_version)
);
CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC);
CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id);
-- Enriched review with LLM classification + embeddings (versioned)
-- Supports edited reviews: each version is a separate row
CREATE TABLE reviews_enriched (
-- Versioned primary key (handles edited reviews)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Matches reviews_raw.review_id
review_version INT NOT NULL DEFAULT 1,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
-- Link to raw (specific version)
raw_id INT NOT NULL REFERENCES reviews_raw(id),
-- Identity
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
-- Core content
text TEXT NOT NULL,
text_normalized TEXT, -- Cleaned for processing
rating SMALLINT,
review_time TIMESTAMP NOT NULL,
language TEXT,
-- URT Classification (from LLM) — review-level summary, derived from spans in v3.2
urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc.
urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains
valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±'
intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3'
comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'
-- Extracted entities (summary from spans)
staff_mentions TEXT[] DEFAULT '{}',
quotes JSONB, -- {"code": "phrase", ...}
-- Embedding
embedding VECTOR(384),
-- Quality control
trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0
dedup_group_id TEXT, -- Tenant-scoped: format "{business_id}:{hash}"
is_suspicious BOOLEAN DEFAULT FALSE,
-- Processing metadata
classification_model TEXT,
classification_confidence JSONB, -- Per-field confidence scores
processed_at TIMESTAMP DEFAULT NOW(),
model_version TEXT,
-- KPI-ready hooks (nullable, computed later)
kpi_impact_estimate FLOAT,
kpi_last_computed_at TIMESTAMP,
PRIMARY KEY (source, review_id, review_version)
);
-- Indexes for common query patterns
CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative)
WHERE comparative != 'CR-N' AND is_latest = TRUE;
CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score)
WHERE trust_score < 0.5 AND is_latest = TRUE;
CREATE INDEX idx_enriched_embedding ON reviews_enriched
USING hnsw (embedding vector_cosine_ops);
-- FK to locations (tenant-scoped)
ALTER TABLE reviews_enriched
ADD CONSTRAINT fk_enriched_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Enforce tenant-scoped dedup format
ALTER TABLE reviews_enriched
ADD CONSTRAINT chk_dedup_scoped
CHECK (dedup_group_id IS NULL OR dedup_group_id LIKE business_id || ':%');
```
### 2.4 Span Layer (NEW in v3.2)
The span layer extracts individual semantic units from review text. Each span represents a single classifiable statement with its own URT code, valence, intensity, and optional entity reference.
```sql
-- Review spans: fine-grained semantic units within reviews
CREATE TABLE review_spans (
span_id TEXT PRIMARY KEY, -- Deterministic ID (see §9.5)
-- Parent review reference
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL,
review_version INT NOT NULL,
-- Span position (within review text)
span_index INT NOT NULL CHECK (span_index >= 0),
span_text TEXT NOT NULL,
span_start INT NOT NULL CHECK (span_start >= 0),
span_end INT NOT NULL,
-- Profile level (standard vs full classification)
profile urt_profile NOT NULL DEFAULT 'standard',
-- URT Classification (strongly-typed)
urt_primary TEXT NOT NULL, -- Tier-3 code: 'J1.01', 'P2.03', etc.
urt_secondary TEXT[] NOT NULL DEFAULT '{}',
valence urt_valence NOT NULL,
intensity urt_intensity NOT NULL,
comparative urt_comparative NOT NULL DEFAULT 'CR-N',
-- Extended classification (standard profile)
specificity urt_specificity NOT NULL DEFAULT 'S2',
actionability urt_actionability NOT NULL DEFAULT 'A2',
temporal urt_temporal NOT NULL DEFAULT 'TC',
evidence urt_evidence NOT NULL DEFAULT 'ES',
-- Causal relations (full profile only)
relation_type urt_relation,
related_span_id TEXT REFERENCES review_spans(span_id),
causal_chain JSONB, -- Full profile: structured cause/effect
-- Entity extraction
entity TEXT, -- Raw entity mention
entity_type urt_entity_type,
entity_normalized TEXT, -- Normalized form for grouping
-- Span state
is_primary BOOLEAN NOT NULL DEFAULT FALSE, -- Primary span for this review
is_active BOOLEAN NOT NULL DEFAULT TRUE, -- Soft-delete for reprocessing
review_time TIMESTAMP NOT NULL, -- Denormalized from parent review
-- Processing metadata
confidence urt_confidence DEFAULT 'medium',
usn TEXT, -- URT Semantic Notation string
model_version TEXT,
ingest_batch_id TEXT, -- For atomic reprocessing
created_at TIMESTAMP DEFAULT NOW(),
-- Uniqueness within review
UNIQUE (source, review_id, review_version, span_index)
);
-- Constraints for review_spans
ALTER TABLE review_spans
ADD CONSTRAINT chk_span_end
CHECK (span_end > span_start);
ALTER TABLE review_spans
ADD CONSTRAINT chk_primary_tier3
CHECK (urt_primary ~ '^[OPJEAVR][1-4]\.[0-9]{2}$');
ALTER TABLE review_spans
ADD CONSTRAINT chk_secondary_max2
CHECK (cardinality(urt_secondary) <= 2);
-- Validate each element in urt_secondary matches tier-3 pattern
ALTER TABLE review_spans
ADD CONSTRAINT chk_secondary_tier3
CHECK (
urt_secondary = '{}' OR
(SELECT bool_and(elem ~ '^[OPJEAVR][1-4]\.[0-9]{2}$') FROM unnest(urt_secondary) AS elem)
);
-- causal_chain only allowed for full profile
ALTER TABLE review_spans
ADD CONSTRAINT chk_full_only_fields
CHECK (
profile = 'full' OR causal_chain IS NULL
);
-- No self-referential relations
ALTER TABLE review_spans
ADD CONSTRAINT chk_no_self_relation
CHECK (related_span_id IS NULL OR related_span_id != span_id);
-- USN format validation based on profile (URT v5.1 canonical format)
-- Lite: URT:L:{domain}:{valence}{intensity}
-- Core: URT:C:{category}:{valence}{intensity}
-- Standard: URT:S:{subcode}[+{sec}]:{valence}{intensity}:{S}{A}{T}.{E}.{CR}
-- Full: URT:F:{subcode}[+{sec}]:{valence}{intensity}:{S}{A}{T}.{E}.{CR}[:{causal}]
-- Examples: URT:L:O:+2 | URT:C:J1:-3 | URT:S:J1.03:-2:22TC.ES.N | URT:F:J1.01:-3:23TR.ES.S:CD.O,MG.O
ALTER TABLE review_spans
ADD CONSTRAINT chk_usn_format
CHECK (
usn IS NULL OR
(profile = 'lite' AND usn ~ '^URT:L:[OPJEAVR]:[+\-0±][123]$') OR
(profile = 'core' AND usn ~ '^URT:C:[OPJEAVR][1-4]:[+\-0±][123]$') OR
(profile = 'standard' AND usn ~ '^URT:S:[OPJEAVR][1-4]\.[0-9]{2}(\+[OPJEAVR][1-4]\.[0-9]{2}){0,2}:[+\-0±][123]:[1-3][1-3]T[CRHF]\.E[SIC]\.[NBWS]$') OR
(profile = 'full' AND usn ~ '^URT:F:[OPJEAVR][1-4]\.[0-9]{2}(\+[OPJEAVR][1-4]\.[0-9]{2}){0,2}:[+\-0±][123]:[1-3][1-3]T[CRHF]\.E[SIC]\.[NBWS](:(CD|MG|SY)\.[STEOFRPCSHX](,(CD|MG|SY)\.[STEOFRPCSHX])*)?$')
);
-- Foreign keys for review_spans
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_review
FOREIGN KEY (source, review_id, review_version)
REFERENCES reviews_enriched(source, review_id, review_version)
ON DELETE CASCADE;
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_location
FOREIGN KEY (business_id, place_id)
REFERENCES locations(business_id, place_id);
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_urt_primary
FOREIGN KEY (urt_primary)
REFERENCES urt_codes(code);
-- Indexes for review_spans
CREATE UNIQUE INDEX uq_spans_active_order
ON review_spans(source, review_id, review_version, span_index)
WHERE is_active = TRUE;
CREATE UNIQUE INDEX uq_spans_one_primary_active
ON review_spans(source, review_id, review_version)
WHERE is_active = TRUE AND is_primary = TRUE;
CREATE INDEX idx_spans_review
ON review_spans(source, review_id, review_version)
WHERE is_active = TRUE;
CREATE INDEX idx_spans_business_time
ON review_spans(business_id, review_time DESC)
WHERE is_active = TRUE;
CREATE INDEX idx_spans_issue_routing
ON review_spans(business_id, place_id, urt_primary, entity_normalized)
WHERE is_active = TRUE AND valence IN ('V-', '');
CREATE INDEX idx_spans_entity
ON review_spans(business_id, entity_normalized)
WHERE entity_normalized IS NOT NULL AND is_active = TRUE;
CREATE INDEX idx_spans_batch
ON review_spans(ingest_batch_id)
WHERE ingest_batch_id IS NOT NULL;
-- Exclusion constraint: no overlapping spans within same active review version
CREATE INDEX ex_spans_no_overlap
ON review_spans
USING gist (
source,
review_id,
review_version,
int4range(span_start, span_end) WITH &&
)
WHERE is_active = TRUE;
-- Note: The above index enables checking for overlaps but does not enforce exclusion.
-- For strict enforcement, use:
ALTER TABLE review_spans
ADD CONSTRAINT ex_spans_no_overlap_constraint
EXCLUDE USING gist (
source WITH =,
review_id WITH =,
review_version WITH =,
int4range(span_start, span_end) WITH &&
)
WHERE (is_active = TRUE);
```
### 2.5 Issue Tables (Relational, Span-Based)
**v3.2 Issue Key**: `(business_id, place_id, urt_primary, entity_normalized)` — entity matching is now active. `entity_normalized` defaults to NULL in v3.2; distinct entities create distinct issues in v3.3+.
```sql
-- Issues (aggregated problems)
CREATE TABLE issues (
issue_id TEXT PRIMARY KEY, -- Deterministic SHA256-based ID
-- Grouping keys (v3.2: code + place + entity)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
primary_subcode TEXT NOT NULL, -- URT code
domain CHAR(1) NOT NULL,
-- State machine
state TEXT NOT NULL DEFAULT 'DETECTED',
priority_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
-- Aggregated metrics (updated via triggers/jobs)
span_count INT NOT NULL DEFAULT 1,
max_intensity TEXT NOT NULL,
avg_trust_score FLOAT DEFAULT 1.0,
-- CR counters (rolling 30-day window)
cr_better_count INT DEFAULT 0,
cr_worse_count INT DEFAULT 0,
cr_same_count INT DEFAULT 0,
-- Star drag proxy (avg rating when this issue present vs absent)
star_drag_estimate FLOAT,
-- Ownership
owner_team TEXT,
owner_individual TEXT,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
acknowledged_at TIMESTAMP,
resolved_at TIMESTAMP,
verified_at TIMESTAMP,
-- Resolution
reopen_count INT DEFAULT 0,
resolution_code TEXT,
resolution_notes TEXT,
decline_reason TEXT,
-- Context (v3.2: entity extraction active)
entity TEXT, -- Product, staff member, feature
entity_normalized TEXT, -- Normalized for grouping (defaults NULL in v3.2)
-- KPI-ready hooks (nullable)
kpi_impact_estimate FLOAT,
kpi_impact_confidence FLOAT,
kpi_last_computed_at TIMESTAMP
);
CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC);
CREATE INDEX idx_issues_place ON issues(place_id, state);
CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode);
CREATE INDEX idx_issues_open ON issues(business_id)
WHERE state NOT IN ('VERIFIED', 'DECLINED');
CREATE INDEX idx_issues_entity ON issues(business_id, entity_normalized)
WHERE entity_normalized IS NOT NULL;
-- FK to locations (tenant-scoped)
ALTER TABLE issues
ADD CONSTRAINT fk_issues_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Issue spans: 1:1 link from span to issue (each span belongs to exactly one issue)
CREATE TABLE issue_spans (
id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE,
-- Span reference (unique constraint enforces 1:1)
span_id TEXT NOT NULL REFERENCES review_spans(span_id) ON DELETE CASCADE,
-- Denormalized for queries (copied from span)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL,
review_version INT NOT NULL,
-- Classification snapshot
is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match
intensity TEXT NOT NULL, -- Copied from span for fast queries
review_time TIMESTAMP NOT NULL, -- Denormalized for timeline queries
weight FLOAT DEFAULT 1.0, -- For weighted aggregation
created_at TIMESTAMP DEFAULT NOW(),
-- One span → exactly one issue (1:1 mapping)
CONSTRAINT uq_issue_spans_span UNIQUE (span_id)
);
CREATE INDEX idx_issue_spans_issue ON issue_spans(issue_id);
CREATE INDEX idx_issue_spans_review ON issue_spans(source, review_id, review_version);
CREATE INDEX idx_issue_spans_time ON issue_spans(issue_id, review_time DESC);
-- Issue events (audit log)
CREATE TABLE issue_events (
event_id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id),
event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update'
from_state TEXT,
to_state TEXT,
actor TEXT, -- User or 'system'
notes TEXT,
-- Triggering span/review reference
span_id TEXT,
source TEXT DEFAULT 'google',
review_id TEXT,
review_version INT,
metadata JSONB, -- Additional context
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC);
CREATE INDEX idx_events_span ON issue_events(span_id)
WHERE span_id IS NOT NULL;
CREATE INDEX idx_events_review ON issue_events(source, review_id, review_version)
WHERE review_id IS NOT NULL;
```
### 2.6 Unified Analytics Spine
**Design Decision**: Sentinel value conventions (do not normalize):
- `place_id = 'ALL'` — spatial rollup (all locations)
- `subject_id = 'all'` — semantic rollup (all subjects within type)
Case matters: `'ALL'``'all'`. This avoids NULL handling while keeping the schema simple.
```sql
-- Fact table: pre-aggregated time-series metrics
CREATE TABLE fact_timeseries (
id SERIAL PRIMARY KEY,
-- Universal join keys (KPI-ready)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' = all locations rollup
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL, -- 'day', 'week', 'month'
-- Subject (what we're measuring)
subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall', 'issue'
subject_id TEXT NOT NULL, -- Code, domain letter, issue_id, or 'all'
-- Volume metrics
review_count INT NOT NULL DEFAULT 0,
span_count INT NOT NULL DEFAULT 0, -- v3.2: span-level counting
negative_count INT NOT NULL DEFAULT 0,
positive_count INT NOT NULL DEFAULT 0,
neutral_count INT NOT NULL DEFAULT 0,
mixed_count INT NOT NULL DEFAULT 0,
-- Strength metrics (intensity-weighted)
strength_score FLOAT NOT NULL DEFAULT 0,
negative_strength FLOAT NOT NULL DEFAULT 0,
positive_strength FLOAT NOT NULL DEFAULT 0,
-- Rating metrics
avg_rating FLOAT,
rating_count INT DEFAULT 0,
-- Intensity distribution
i1_count INT DEFAULT 0,
i2_count INT DEFAULT 0,
i3_count INT DEFAULT 0,
-- CR signals
cr_better INT DEFAULT 0,
cr_worse INT DEFAULT 0,
cr_same INT DEFAULT 0,
-- Trust-weighted variants (v3.2: now populated)
trust_weighted_strength FLOAT,
trust_weighted_negative FLOAT,
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id)
);
-- Validate 'ALL' sentinel
ALTER TABLE fact_timeseries
ADD CONSTRAINT chk_place_id_format
CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$');
-- Optimized indexes for reporting queries
CREATE INDEX idx_facts_lookup ON fact_timeseries(
business_id, place_id, subject_type, subject_id, period_date DESC
);
CREATE INDEX idx_facts_period ON fact_timeseries(
business_id, period_date, bucket_type
);
CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id)
WHERE subject_type = 'urt_code';
CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date)
WHERE place_id = 'ALL';
CREATE INDEX idx_facts_issue ON fact_timeseries(subject_id)
WHERE subject_type = 'issue';
```
**v3.2 Fact Population Scope**:
| subject_type | Populated | Notes |
|--------------|-----------|-------|
| `overall` | Mandatory | Business-wide + per-location |
| `urt_code` | Mandatory | Per URT code (from spans) |
| `domain` | Derived | Rollup from urt_code at query time |
| `issue` | Recommended | Per-issue timelines |
**v3.2 Rollup Rules**:
- `place_id='ALL'` includes **owned locations only** (not competitors)
- Competitor facts live at their `competitor_place_id`, never in `'ALL'` rollup
- Competitor comparison queries explicitly join on `competitor_place_id`
- Span-level metrics (`span_count`, intensity distribution) are now primary
**v3.2 Trust Score Usage**:
- `trust_score` applied to **issue priority scoring** and **filtering**
- `trust_weighted_strength` / `trust_weighted_negative` now **populated** in v3.2
- Formula: `SUM(trust_score * intensity_weight)` per fact row
### 2.7 Sub-Patterns (Persistent Clustering Results)
```sql
-- Stored sub-pattern clustering results
CREATE TABLE subpatterns (
id SERIAL PRIMARY KEY,
-- Parent
subject_type TEXT NOT NULL, -- 'urt_code', 'issue'
subject_id TEXT NOT NULL,
business_id TEXT NOT NULL,
place_id TEXT, -- NULL = all locations
-- Period
period_start DATE NOT NULL,
period_end DATE NOT NULL,
-- Cluster info
cluster_id INT NOT NULL,
label TEXT NOT NULL,
-- Metrics
review_count INT NOT NULL,
span_count INT NOT NULL, -- v3.2: span-level
percentage FLOAT NOT NULL,
avg_intensity FLOAT,
-- Representative content
representative_span_id TEXT, -- v3.2: span reference
representative_quote TEXT,
sharpest_span_id TEXT,
sharpest_quote TEXT,
-- Embedding (for trend matching)
centroid VECTOR(384),
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id)
);
CREATE INDEX idx_subpatterns_lookup ON subpatterns(
subject_type, subject_id, business_id, period_end DESC
);
```
---
## Part 3: Triggers and Functions
### 3.1 Span Validation Triggers
```sql
-- Trigger 1: Validate span_end <= length(review text)
CREATE OR REPLACE FUNCTION trg_review_spans_validate_bounds()
RETURNS TRIGGER AS $$
DECLARE
review_text_length INT;
BEGIN
SELECT length(text) INTO review_text_length
FROM reviews_enriched
WHERE source = NEW.source
AND review_id = NEW.review_id
AND review_version = NEW.review_version;
IF review_text_length IS NULL THEN
RAISE EXCEPTION 'Parent review not found: (%, %, %)',
NEW.source, NEW.review_id, NEW.review_version;
END IF;
IF NEW.span_end > review_text_length THEN
RAISE EXCEPTION 'span_end (%) exceeds review text length (%)',
NEW.span_end, review_text_length;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_bounds
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
EXECUTE FUNCTION trg_review_spans_validate_bounds();
-- Trigger 2: Validate span_text matches parent substring (conditional)
-- Enabled via session setting: SET reviewiq.validate_span_text = 'on';
CREATE OR REPLACE FUNCTION trg_review_spans_validate_text()
RETURNS TRIGGER AS $$
DECLARE
review_text TEXT;
expected_text TEXT;
validate_enabled TEXT;
BEGIN
-- Check if validation is enabled via session setting
BEGIN
validate_enabled := current_setting('reviewiq.validate_span_text', true);
EXCEPTION WHEN OTHERS THEN
validate_enabled := 'off';
END;
IF validate_enabled != 'on' THEN
RETURN NEW;
END IF;
SELECT text INTO review_text
FROM reviews_enriched
WHERE source = NEW.source
AND review_id = NEW.review_id
AND review_version = NEW.review_version;
expected_text := substring(review_text FROM NEW.span_start + 1 FOR NEW.span_end - NEW.span_start);
IF NEW.span_text != expected_text THEN
RAISE EXCEPTION 'span_text mismatch: expected "%" but got "%"',
left(expected_text, 50), left(NEW.span_text, 50);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_text
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
EXECUTE FUNCTION trg_review_spans_validate_text();
-- Trigger 3: Validate causal_chain JSONB structure
CREATE OR REPLACE FUNCTION trg_review_spans_validate_causal_chain()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.causal_chain IS NOT NULL THEN
-- Validate structure using helper function
IF NOT urt_validate_causal_chain(NEW.causal_chain) THEN
RAISE EXCEPTION 'Invalid causal_chain structure';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_causal_chain
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
WHEN (NEW.causal_chain IS NOT NULL)
EXECUTE FUNCTION trg_review_spans_validate_causal_chain();
```
### 3.2 Causal Chain Validation Function
```sql
-- Validate causal chain structure, codes, and ordering
CREATE OR REPLACE FUNCTION urt_validate_causal_chain(chain JSONB)
RETURNS BOOLEAN AS $$
DECLARE
link JSONB;
link_code TEXT;
link_role TEXT;
link_order INT;
prev_order INT := -1;
valid_roles TEXT[] := ARRAY['cause', 'effect', 'context', 'outcome'];
BEGIN
-- Must be an array
IF jsonb_typeof(chain) != 'array' THEN
RETURN FALSE;
END IF;
-- Empty array is valid
IF jsonb_array_length(chain) = 0 THEN
RETURN TRUE;
END IF;
-- Validate each link
FOR link IN SELECT * FROM jsonb_array_elements(chain)
LOOP
-- Required fields
IF NOT (link ? 'code' AND link ? 'role' AND link ? 'order') THEN
RETURN FALSE;
END IF;
link_code := link->>'code';
link_role := link->>'role';
link_order := (link->>'order')::INT;
-- Validate code format (tier-3)
IF link_code !~ '^[OPJEAVR][1-4]\.[0-9]{2}$' THEN
RETURN FALSE;
END IF;
-- Validate role
IF NOT (link_role = ANY(valid_roles)) THEN
RETURN FALSE;
END IF;
-- Validate order is increasing
IF link_order <= prev_order THEN
RETURN FALSE;
END IF;
prev_order := link_order;
END LOOP;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
```
### 3.3 Span Relation Validation
```sql
-- Validate related_span_id references span from same review
CREATE OR REPLACE FUNCTION validate_review_relations(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS BOOLEAN AS $$
DECLARE
invalid_count INT;
BEGIN
SELECT COUNT(*) INTO invalid_count
FROM review_spans s
WHERE s.source = p_source
AND s.review_id = p_review_id
AND s.review_version = p_review_version
AND s.related_span_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM review_spans r
WHERE r.span_id = s.related_span_id
AND r.source = s.source
AND r.review_id = s.review_id
AND r.review_version = s.review_version
);
RETURN invalid_count = 0;
END;
$$ LANGUAGE plpgsql;
```
### 3.4 Active Span Set Validation
```sql
-- Validate exactly one active span set per review version
CREATE OR REPLACE FUNCTION validate_active_spans(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS BOOLEAN AS $$
DECLARE
active_count INT;
primary_count INT;
BEGIN
-- Count active spans
SELECT COUNT(*), COUNT(*) FILTER (WHERE is_primary)
INTO active_count, primary_count
FROM review_spans
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE;
-- Must have at least one active span
IF active_count = 0 THEN
RETURN FALSE;
END IF;
-- Must have exactly one primary span
IF primary_count != 1 THEN
RETURN FALSE;
END IF;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
```
### 3.5 Primary Span Selection
```sql
-- Deterministic primary span selection: I3 > I2 > I1, V- > V± > V0 > V+, then span_index
CREATE OR REPLACE FUNCTION set_primary_span(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS TEXT AS $$
DECLARE
selected_span_id TEXT;
BEGIN
-- Clear existing primary
UPDATE review_spans
SET is_primary = FALSE
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE
AND is_primary = TRUE;
-- Select new primary using deterministic ordering
SELECT span_id INTO selected_span_id
FROM review_spans
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE
ORDER BY
-- Intensity: I3 > I2 > I1
CASE intensity
WHEN 'I3' THEN 1
WHEN 'I2' THEN 2
WHEN 'I1' THEN 3
END,
-- Valence: V- > V± > V0 > V+
CASE valence
WHEN 'V-' THEN 1
WHEN '' THEN 2
WHEN 'V0' THEN 3
WHEN 'V+' THEN 4
END,
-- Tiebreaker: first span
span_index
LIMIT 1;
-- Set as primary
IF selected_span_id IS NOT NULL THEN
UPDATE review_spans
SET is_primary = TRUE
WHERE span_id = selected_span_id;
END IF;
RETURN selected_span_id;
END;
$$ LANGUAGE plpgsql;
```
### 3.6 Deterministic Issue ID Generation
```sql
-- Generate deterministic issue_id from grouping key using SHA256
CREATE OR REPLACE FUNCTION generate_issue_id(
p_business_id TEXT,
p_place_id TEXT,
p_urt_primary TEXT,
p_entity_normalized TEXT DEFAULT NULL
)
RETURNS TEXT AS $$
DECLARE
grouping_key TEXT;
hash_bytes BYTEA;
BEGIN
-- Build grouping key (entity_normalized defaults to empty string if NULL)
grouping_key := p_business_id || '|' || p_place_id || '|' || p_urt_primary || '|' || COALESCE(p_entity_normalized, '');
-- Generate SHA256 hash
hash_bytes := digest(grouping_key, 'sha256');
-- Return first 16 chars of hex encoding (64 bits of entropy)
RETURN 'ISS-' || left(encode(hash_bytes, 'hex'), 16);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
```
---
## Part 4: Ingest Layer
### 4.1 Google Connector
```python
async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]:
"""Fetch new/updated reviews from Google Places API."""
reviews = await google_places_client.get_reviews(place_id, since=since)
for review in reviews:
await store_raw_review(place_id, review)
return reviews
async def store_raw_review(place_id: str, review: dict) -> int:
"""Store immutable raw review payload."""
existing = await db.query_one("""
SELECT id, review_version FROM reviews_raw
WHERE source = 'google' AND review_id = %s
ORDER BY review_version DESC LIMIT 1
""", [review['review_id']])
version = 1
if existing:
if content_changed(existing, review):
version = existing['review_version'] + 1
else:
return existing['id']
return await db.insert("""
INSERT INTO reviews_raw (
source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id,
review_version, pulled_at
) VALUES (
'google', %s, %s, %s,
%s, %s, %s, %s, %s,
%s, NOW()
)
RETURNING id
""", [
review['review_id'], place_id, json.dumps(review),
review.get('text'), review.get('rating'),
review.get('time'), review.get('author_name'), review.get('author_id'),
version
])
```
### 4.2 Enrichment Pipeline
```python
async def enrich_review(raw_id: int, business_id: str) -> dict:
"""
Full enrichment: normalize → classify → embed → trust score → extract spans.
Args:
raw_id: ID from reviews_raw
business_id: Tenant context (passed from ingest job, not looked up)
"""
raw = await db.query_one(
"SELECT * FROM reviews_raw WHERE id = %s", [raw_id]
)
# 1. Normalize
text = normalize_text(raw['review_text'])
# 2. Validate place_id exists under this tenant (owned or competitor)
location = await db.query_one(
"SELECT display_name, location_type FROM locations WHERE business_id = %s AND place_id = %s",
[business_id, raw['place_id']]
)
if not location:
raise ValueError(f"place_id {raw['place_id']} not registered for business {business_id}")
# 3. Parallel: LLM classify (with span extraction) + embed
classify_task = asyncio.create_task(classify_review_with_spans(text))
embed_task = asyncio.create_task(embed_review(text))
classification = await classify_task
embedding = await embed_task
# 4. Trust score
trust_score = compute_trust_score(raw, text, classification)
# 5. Dedup check
dedup_group_id = await find_dedup_group(embedding, raw['place_id'])
# 6. Mark previous versions as not-latest
await db.execute("""
UPDATE reviews_enriched
SET is_latest = FALSE
WHERE source = 'google' AND review_id = %s AND is_latest = TRUE
""", [raw['review_id']])
# 7. Store enriched (versioned)
enriched = {
'source': 'google',
'review_id': raw['review_id'],
'review_version': raw['review_version'],
'is_latest': True,
'raw_id': raw_id,
'business_id': business_id,
'place_id': raw['place_id'],
'text': raw['review_text'],
'text_normalized': text,
'rating': raw['rating'],
'review_time': raw['review_time'],
'language': detect_language(text),
'embedding': embedding,
'trust_score': trust_score,
'dedup_group_id': dedup_group_id,
# Review-level classification derived from primary span
'urt_primary': classification['spans'][0]['urt_primary'] if classification['spans'] else 'O1.01',
'valence': classification['review_valence'],
'intensity': classification['review_intensity'],
**classification.get('review_meta', {}),
}
await upsert_enriched_review(enriched)
# 8. Extract and store spans (v3.2)
batch_id = f"batch-{raw['review_id']}-{raw['review_version']}-{int(time.time())}"
await store_review_spans(
enriched,
classification['spans'],
batch_id
)
return enriched
```
### 4.3 LLM Classification with Span Extraction
```python
CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).
Analyze the review and extract SPANS (individual semantic units). Each span is a phrase or sentence expressing one classifiable idea.
Return JSON:
{
"spans": [
{
"text": "exact phrase from review",
"start": 0,
"end": 25,
"urt_primary": "X1.23",
"urt_secondary": [],
"valence": "V-",
"intensity": "I2",
"comparative": "CR-N",
"specificity": "S2",
"actionability": "A2",
"temporal": "TC",
"evidence": "ES",
"entity": "Mike",
"entity_type": "staff",
"confidence": "high"
}
],
"review_valence": "V-",
"review_intensity": "I2",
"review_meta": {
"staff_mentions": ["Mike"],
"comparative": "CR-N"
}
}
URT DOMAINS:
- O (Offering): Product/service quality, function, completeness
- P (People): Staff attitude, competence, responsiveness
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance
- A (Access): Availability, accessibility, convenience
- V (Value): Price, transparency, worth
- R (Relationship): Trust, dependability, loyalty
SPAN RULES:
1. Each span = one classifiable semantic unit
2. Spans must not overlap
3. text must be EXACT substring from review
4. start/end are character offsets (0-indexed)
5. First span with highest intensity + negative valence becomes primary
INTENSITY:
- I1: Mild observation, passing mention
- I2: Moderate emphasis, clear statement
- I3: Strong emotion, repeated emphasis, dealbreaker
SPECIFICITY:
- S1: Vague ("it was bad")
- S2: Specific ("the wait was 30 minutes")
- S3: Precise ("waited 32 minutes on Tuesday at 6pm")
ACTIONABILITY:
- A1: No clear action ("didn't like it")
- A2: Implied action ("too slow")
- A3: Explicit action ("need more cashiers during rush hour")
TEMPORAL:
- TC: Current/recent experience
- TR: Recurring pattern
- TH: Historical comparison
- TF: Future expectation
EVIDENCE:
- ES: Subjective opinion
- EI: Indirect evidence
- EC: Concrete/verifiable
Return valid JSON only."""
async def classify_review_with_spans(text: str) -> dict:
"""LLM-powered URT classification with span extraction."""
response = await llm.chat(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": CLASSIFICATION_PROMPT},
{"role": "user", "content": text}
],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.content)
result['classification_model'] = 'gpt-4o-mini'
return result
```
### 4.4 Span Storage
```python
async def store_review_spans(
enriched: dict,
spans: list[dict],
batch_id: str
) -> list[str]:
"""
Store extracted spans with soft-switch pattern.
Returns list of span_ids.
"""
span_ids = []
for idx, span in enumerate(spans):
# Generate deterministic span_id
span_id = generate_span_id(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
idx
)
# Build USN string
usn = build_usn(span, profile='standard')
await db.execute("""
INSERT INTO review_spans (
span_id, business_id, place_id, source, review_id, review_version,
span_index, span_text, span_start, span_end,
profile, urt_primary, urt_secondary, valence, intensity, comparative,
specificity, actionability, temporal, evidence,
entity, entity_type, entity_normalized,
is_primary, is_active, review_time,
confidence, usn, model_version, ingest_batch_id
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s
)
""", [
span_id, enriched['business_id'], enriched['place_id'],
enriched['source'], enriched['review_id'], enriched['review_version'],
idx, span['text'], span['start'], span['end'],
'standard', span['urt_primary'], span.get('urt_secondary', []),
span['valence'], span['intensity'], span.get('comparative', 'CR-N'),
span.get('specificity', 'S2'), span.get('actionability', 'A2'),
span.get('temporal', 'TC'), span.get('evidence', 'ES'),
span.get('entity'), span.get('entity_type'),
normalize_entity(span.get('entity')),
False, # is_primary set later
False, # is_active=FALSE until validated
enriched['review_time'],
span.get('confidence', 'medium'), usn, 'gpt-4o-mini', batch_id
])
span_ids.append(span_id)
# Set primary span
await set_primary_span_for_batch(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
batch_id
)
# Atomic activation (soft-switch)
await activate_span_batch(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
batch_id
)
return span_ids
def generate_span_id(source: str, review_id: str, version: int, index: int) -> str:
"""Generate deterministic span ID."""
key = f"{source}|{review_id}|{version}|{index}"
hash_bytes = hashlib.sha256(key.encode()).digest()
return f"SPN-{hash_bytes[:8].hex()}"
def build_usn(span: dict, profile: str = 'standard') -> str:
"""Build URT Semantic Notation string."""
base = f"V{span['valence'][-1]}:I{span['intensity'][-1]}:{span['urt_primary']}"
if profile == 'full':
base += f":S{span.get('specificity', 'S2')[-1]}"
base += f":A{span.get('actionability', 'A2')[-1]}"
base += f":T{span.get('temporal', 'TC')[-1]}"
base += f":E{span.get('evidence', 'ES')[-1]}"
return base
```
### 4.5 Reprocessing Pattern
The soft-switch pattern enables atomic span replacement without downtime:
```python
async def reprocess_review_spans(
source: str,
review_id: str,
review_version: int
) -> str:
"""
Reprocess spans for a review using soft-switch pattern.
Returns new batch_id.
"""
# 1. Fetch review
review = await db.query_one("""
SELECT * FROM reviews_enriched
WHERE source = %s AND review_id = %s AND review_version = %s
""", [source, review_id, review_version])
# 2. Re-classify with LLM
classification = await classify_review_with_spans(review['text'])
# 3. Generate new batch ID
batch_id = f"reprocess-{review_id}-{review_version}-{int(time.time())}"
# 4. INSERT new spans with is_active=FALSE
for idx, span in enumerate(classification['spans']):
span_id = generate_span_id(source, review_id, review_version, idx)
# ... insert with is_active=FALSE, ingest_batch_id=batch_id
# 5. Validate new spans
if not await validate_span_set(source, review_id, review_version, batch_id):
# Rollback: delete invalid batch
await db.execute("""
DELETE FROM review_spans
WHERE ingest_batch_id = %s
""", [batch_id])
raise ValueError("New span set failed validation")
# 6. Set primary span for new batch
await set_primary_span_for_batch(source, review_id, review_version, batch_id)
# 7. Atomic switch
async with db.transaction():
# Deactivate old spans
await db.execute("""
UPDATE review_spans
SET is_active = FALSE
WHERE source = %s AND review_id = %s AND review_version = %s
AND is_active = TRUE
AND ingest_batch_id != %s
""", [source, review_id, review_version, batch_id])
# Activate new spans
await db.execute("""
UPDATE review_spans
SET is_active = TRUE
WHERE ingest_batch_id = %s
""", [batch_id])
return batch_id
async def activate_span_batch(
source: str,
review_id: str,
review_version: int,
batch_id: str
):
"""Atomically switch from old spans to new batch."""
async with db.transaction():
# Deactivate existing active spans
await db.execute("""
UPDATE review_spans
SET is_active = FALSE
WHERE source = %s AND review_id = %s AND review_version = %s
AND is_active = TRUE
AND ingest_batch_id != %s
""", [source, review_id, review_version, batch_id])
# Activate new batch
await db.execute("""
UPDATE review_spans
SET is_active = TRUE
WHERE ingest_batch_id = %s
""", [batch_id])
```
### 4.6 Trust Score Computation
```python
def compute_trust_score(raw: dict, text: str, classification: dict) -> float:
"""
Compute trust score (0.0 to 1.0) based on review quality signals.
Low trust = likely spam, fake, or low-quality.
"""
score = 1.0
# Length penalty
word_count = len(text.split())
if word_count < 5:
score *= 0.5
elif word_count > 500:
score *= 0.8
# Rating/sentiment mismatch
rating = raw.get('rating')
valence = classification.get('review_valence')
if rating and valence:
if rating >= 4 and valence == 'V-':
score *= 0.7
elif rating <= 2 and valence == 'V+':
score *= 0.7
# Generic text patterns
if is_generic_review(text):
score *= 0.6
# Span confidence
spans = classification.get('spans', [])
if spans:
low_conf_count = sum(1 for s in spans if s.get('confidence') == 'low')
if low_conf_count > len(spans) / 2:
score *= 0.9
return max(0.1, min(1.0, score))
```
---
## Part 5: Issue Lifecycle Management
### 5.1 Issue Routing (Span-Based)
**v3.2 Issue Key**: `(business_id, place_id, urt_primary, entity_normalized)`
```python
async def route_span_to_issue(span: dict) -> Optional[str]:
"""
Route a span to an existing or new issue.
Returns issue_id or None if span doesn't warrant an issue.
"""
# Only negative/mixed spans create issues
if span['valence'] not in ('V-', ''):
return None
# Generate deterministic issue_id from grouping key
issue_id = await db.query_one("""
SELECT generate_issue_id(%s, %s, %s, %s) as issue_id
""", [
span['business_id'],
span['place_id'],
span['urt_primary'],
span.get('entity_normalized') # NULL in v3.2
])
issue_id = issue_id['issue_id']
# Check if issue exists
existing = await db.query_one("""
SELECT issue_id, state, span_count
FROM issues
WHERE issue_id = %s
""", [issue_id])
if existing:
# Add span to existing issue
await add_span_to_issue(existing['issue_id'], span)
return existing['issue_id']
# Create new issue
if should_create_issue(span):
return await create_issue_from_span(span, issue_id)
return None
async def create_issue_from_span(span: dict, issue_id: str) -> str:
"""Create a new issue from a span."""
domain = span['urt_primary'][0]
await db.execute("""
INSERT INTO issues (
issue_id, business_id, place_id, primary_subcode, domain,
state, priority_score, confidence_score,
span_count, max_intensity, entity, entity_normalized
) VALUES (
%s, %s, %s, %s, %s,
'DETECTED', %s, %s,
1, %s, %s, %s
)
""", [
issue_id, span['business_id'], span['place_id'],
span['urt_primary'], domain,
compute_initial_priority(span),
confidence_to_score(span.get('confidence', 'medium')),
span['intensity'],
span.get('entity'), span.get('entity_normalized')
])
# Link span to issue
await db.execute("""
INSERT INTO issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
) VALUES (
%s, %s, %s, %s, %s,
TRUE, %s, %s
)
""", [
issue_id, span['span_id'], span['source'],
span['review_id'], span['review_version'],
span['intensity'], span['review_time']
])
await log_issue_event(issue_id, 'created', span_id=span['span_id'])
return issue_id
async def add_span_to_issue(issue_id: str, span: dict):
"""Add span to existing issue and update counters."""
# Insert span link (1:1 mapping enforced by UNIQUE constraint)
await db.execute("""
INSERT INTO issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (span_id) DO NOTHING
""", [
issue_id, span['span_id'], span['source'],
span['review_id'], span['review_version'],
True, span['intensity'], span['review_time']
])
# Update issue counters
await db.execute("""
UPDATE issues SET
span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s),
max_intensity = (
SELECT CASE MAX(CASE intensity
WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END)
WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END
FROM issue_spans WHERE issue_id = %s
),
updated_at = NOW()
WHERE issue_id = %s
""", [issue_id, issue_id, issue_id])
await recalculate_priority(issue_id)
await log_issue_event(
issue_id, 'span_added',
span_id=span['span_id'],
source=span['source'],
review_id=span['review_id'],
review_version=span['review_version']
)
```
### 5.2 Priority Scoring (Trust-Weighted)
```python
INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}
async def recalculate_priority(issue_id: str):
"""
Priority = intensity × volume × decay × recurrence × trend × trust
"""
issue = await db.query_one("""
SELECT
i.*,
(SELECT AVG(re.trust_score)
FROM issue_spans s
JOIN review_spans rs ON s.span_id = rs.span_id
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE s.issue_id = i.issue_id) as avg_trust
FROM issues i
WHERE i.issue_id = %s
""", [issue_id])
intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1)
i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0)
volume_factor = 1 + math.log(max(1, issue['span_count']))
days_old = (datetime.now() - issue['created_at']).days
decay = math.exp(-0.023 * days_old)
recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)
if issue['cr_worse_count'] >= 2:
trend_modifier = 1.3
elif issue['cr_better_count'] >= 2:
trend_modifier = 0.7
else:
trend_modifier = 1.0
trust_factor = issue['avg_trust'] or 1.0
priority = (
i_weight * volume_factor * decay *
recurrence_boost * trend_modifier * trust_factor
)
await db.execute("""
UPDATE issues SET
priority_score = %s,
avg_trust_score = %s,
updated_at = NOW()
WHERE issue_id = %s
""", [priority, issue['avg_trust'], issue_id])
```
### 5.3 Issue Span Drill-Down
```python
async def get_issue_spans(issue_id: str,
sort_by: str = 'date',
limit: int = 50,
offset: int = 0) -> list[dict]:
"""Fetch all spans for an issue with full details."""
order_clause = {
'date': 's.review_time DESC',
'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END",
'trust': 're.trust_score DESC',
}.get(sort_by, 's.review_time DESC')
return await db.query(f"""
SELECT
rs.span_id,
rs.span_text,
rs.span_start,
rs.span_end,
rs.urt_primary,
rs.valence,
rs.intensity,
rs.specificity,
rs.actionability,
rs.entity,
rs.entity_type,
rs.usn,
s.review_time,
s.is_primary_match,
re.review_id,
re.review_version,
re.text as review_text,
re.rating,
re.trust_score,
l.display_name as location_name
FROM issue_spans s
JOIN review_spans rs ON s.span_id = rs.span_id
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
JOIN locations l ON (rs.business_id, rs.place_id) = (l.business_id, l.place_id)
WHERE s.issue_id = %s
AND rs.is_active = TRUE
ORDER BY {order_clause}
LIMIT %s OFFSET %s
""", [issue_id, limit, offset])
```
### 5.4 Strength Score
```
Strength Score = Σ (intensity_weight)
Where:
I1 (mild) → weight = 1
I2 (moderate) → weight = 2
I3 (strong) → weight = 4
One I3 span = 4 I1 spans = 2 I2 spans
```
---
## Part 6: Analytics Spine (Fact Population)
### 6.1 Daily Fact Aggregation Job
```python
async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'):
"""
Aggregate spans into fact_timeseries. Run daily.
v3.2 populates:
- subject_type='overall', subject_id='all' (per location + 'ALL')
- subject_type='urt_code', subject_id=<code> (per location + 'ALL')
- subject_type='issue', subject_id=<issue_id> (per issue)
"""
if bucket_type == 'day':
period_start = date
period_end = date + timedelta(days=1)
elif bucket_type == 'week':
period_start = date - timedelta(days=date.weekday())
period_end = period_start + timedelta(days=7)
elif bucket_type == 'month':
period_start = date.replace(day=1)
next_month = period_start.replace(day=28) + timedelta(days=4)
period_end = next_month.replace(day=1)
# Get owned locations (competitors excluded from 'ALL' rollup)
owned_locations = await db.query(
"SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE AND location_type = 'owned'",
[business_id]
)
owned_place_ids = [loc['place_id'] for loc in owned_locations]
# Per-location facts (owned)
for loc in owned_locations:
await populate_location_facts_from_spans(
business_id, loc['place_id'], period_start, period_end, bucket_type
)
# All-locations rollup (owned only — place_id='ALL')
await populate_all_locations_facts_from_spans(
business_id, owned_place_ids, period_start, period_end, bucket_type
)
# Issue facts
await populate_issue_facts(business_id, period_start, period_end, bucket_type)
async def populate_location_facts_from_spans(
business_id: str,
place_id: str,
period_start: date,
period_end: date,
bucket_type: str
):
"""Populate facts for a single location from spans."""
# Aggregate by URT code from spans
code_stats = await db.query("""
SELECT
rs.urt_primary as code,
COUNT(DISTINCT re.review_id) as review_count,
COUNT(*) as span_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
SUM(CASE WHEN rs.valence = 'V0' THEN 1 ELSE 0 END) as neutral_count,
SUM(CASE WHEN rs.valence = '' THEN 1 ELSE 0 END) as mixed_count,
SUM(CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as strength_score,
SUM(CASE WHEN rs.valence = 'V-' THEN
CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as negative_strength,
SUM(CASE WHEN rs.valence = 'V+' THEN
CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as positive_strength,
SUM(CASE WHEN rs.intensity::text = 'I1' THEN 1 ELSE 0 END) as i1_count,
SUM(CASE WHEN rs.intensity::text = 'I2' THEN 1 ELSE 0 END) as i2_count,
SUM(CASE WHEN rs.intensity::text = 'I3' THEN 1 ELSE 0 END) as i3_count,
SUM(CASE WHEN rs.comparative::text = 'CR-B' THEN 1 ELSE 0 END) as cr_better,
SUM(CASE WHEN rs.comparative::text = 'CR-W' THEN 1 ELSE 0 END) as cr_worse,
SUM(CASE WHEN rs.comparative::text = 'CR-S' THEN 1 ELSE 0 END) as cr_same,
-- Trust-weighted metrics (v3.2)
SUM(re.trust_score * CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as trust_weighted_strength,
SUM(CASE WHEN rs.valence = 'V-' THEN
re.trust_score * CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as trust_weighted_negative,
AVG(re.rating) as avg_rating,
COUNT(re.rating) as rating_count
FROM review_spans rs
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE rs.business_id = %s
AND rs.place_id = %s
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.is_active = TRUE
AND re.is_latest = TRUE
GROUP BY rs.urt_primary
""", [business_id, place_id, period_start, period_end])
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Aggregate overall
overall = await db.query_one("""
SELECT
COUNT(DISTINCT re.review_id) as review_count,
COUNT(*) as span_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(re.rating) as avg_rating
FROM review_spans rs
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE rs.business_id = %s
AND rs.place_id = %s
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.is_active = TRUE
AND re.is_latest = TRUE
""", [business_id, place_id, period_start, period_end])
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
```
### 6.2 Timeline Query (For Charts)
```python
async def get_timeline(business_id: str,
place_id: Optional[str],
subject_type: str,
subject_id: str,
start: date,
end: date,
bucket_type: str = 'week') -> list[dict]:
"""
Query pre-aggregated facts for line charts.
Args:
place_id: Specific place_id, or None for 'ALL' locations rollup
"""
# Use 'ALL' sentinel for all-locations queries
effective_place_id = place_id if place_id else 'ALL'
return await db.query("""
SELECT
period_date,
review_count,
span_count,
negative_count,
positive_count,
strength_score,
negative_strength,
avg_rating,
cr_better,
cr_worse,
cr_same,
trust_weighted_strength,
trust_weighted_negative
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = %s
AND subject_id = %s
AND bucket_type = %s
AND period_date BETWEEN %s AND %s
ORDER BY period_date
""", [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end])
```
---
## Part 7: Competitor Analysis
### 7.1 Competitor Setup (Clean Model)
Competitors are tracked in both `competitors` (relationship metadata) and `locations` (with `location_type='competitor'`). This preserves FK integrity and enables consistent joins for display names/timezones.
**Competitor Review Storage Rule**: Competitor reviews are stored with the **customer's business_id** and the **competitor's place_id**:
```
reviews_enriched.business_id = <customer_business_id>
reviews_enriched.place_id = <competitor_place_id>
```
The `locations.location_type` column distinguishes ownership:
- `'owned'` — customer's own locations
- `'competitor'` — tracked competitor locations
```python
async def setup_competitor(business_id: str, competitor_place_id: str,
competitor_name: str, relationship: str = 'direct'):
"""Register a competitor for tracking."""
# 1. Add to locations with location_type='competitor'
await db.execute("""
INSERT INTO locations (business_id, place_id, location_type, display_name)
VALUES (%s, %s, 'competitor', %s)
ON CONFLICT (business_id, place_id) DO UPDATE SET
display_name = EXCLUDED.display_name
""", [business_id, competitor_place_id, competitor_name])
# 2. Track relationship metadata in competitors table
await db.execute("""
INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship)
VALUES (%s, %s, %s, %s)
ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET
competitor_name = EXCLUDED.competitor_name,
relationship = EXCLUDED.relationship
""", [business_id, competitor_place_id, competitor_name, relationship])
```
### 7.2 Competitor Comparison
```python
async def get_competitor_comparison(business_id: str, code: str,
start: date, end: date) -> dict:
"""Compare your URT metrics against competitors."""
# Your metrics (from 'ALL' rollup)
your_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(span_count) as span_count,
AVG(avg_rating) as avg_rating,
SUM(trust_weighted_negative) as trust_weighted_negative
FROM fact_timeseries
WHERE business_id = %s
AND place_id = 'ALL'
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, code, start, end])
# Competitor metrics
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors WHERE business_id = %s AND is_active = TRUE
""", [business_id])
comparison = {
'your_business': your_metrics or {},
'competitors': []
}
for comp in competitors:
comp_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(span_count) as span_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, comp['competitor_place_id'], code, start, end])
comparison['competitors'].append({
'name': comp['competitor_name'],
'place_id': comp['competitor_place_id'],
**(comp_metrics or {})
})
return comparison
```
---
## Part 8: Report Generation (Facts-First)
```python
async def generate_report(business_id: str, place_id: Optional[str],
start: date, end: date) -> dict:
"""Generate report from pre-aggregated facts."""
effective_place_id = place_id if place_id else 'ALL'
# 1. Top issues from facts (fast)
top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end)
# 2. Strengths from facts
strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end)
# 3. Sub-patterns with span references
for issue in top_issues[:5]:
patterns = await discover_and_store_subpatterns(
business_id, effective_place_id, issue['code'], start, end
)
issue['sub_patterns'] = patterns
# 4. Trends from facts
trends = await compute_trends_from_facts(business_id, effective_place_id, start, end)
# 5. Entity analysis (v3.2)
entities = await analyze_entities(business_id, effective_place_id, start, end)
# 6. Competitor benchmarks
competitors = await get_competitor_benchmarks(business_id, start, end)
payload = {
'business_id': business_id,
'place_id': place_id,
'period': f"{start} to {end}",
'issues': top_issues,
'strengths': strengths,
'trends': trends,
'entities': entities,
'competitors': competitors,
}
narrative = await generate_narrative(payload)
return {
'payload': payload,
'narrative': narrative,
'generated_at': datetime.now().isoformat()
}
async def analyze_entities(business_id: str, place_id: str,
start: date, end: date) -> list[dict]:
"""Analyze entity mentions from spans."""
return await db.query("""
SELECT
rs.entity_normalized,
rs.entity_type::text,
COUNT(*) as mention_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 3
END) as avg_intensity,
array_agg(DISTINCT rs.urt_primary) as codes
FROM review_spans rs
WHERE rs.business_id = %s
AND (rs.place_id = %s OR %s = 'ALL')
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.entity_normalized IS NOT NULL
AND rs.is_active = TRUE
GROUP BY rs.entity_normalized, rs.entity_type
ORDER BY mention_count DESC
LIMIT 20
""", [business_id, place_id, place_id, start, end])
```
---
## Part 9: KPI-Ready Hooks
### 9.1 Future KPI Integration (Interface Only)
```sql
-- Future: KPI fact table with same grain
CREATE TABLE fact_kpi_timeseries (
id SERIAL PRIMARY KEY,
-- Same join keys as fact_timeseries
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' for rollups
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL,
-- KPI metrics
revenue DECIMAL(12,2),
transactions INT,
cancellations INT,
refunds DECIMAL(12,2),
support_tickets INT,
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type)
);
-- Join reviews and KPIs:
SELECT
r.period_date,
r.negative_strength,
r.trust_weighted_negative,
k.revenue,
k.cancellations
FROM fact_timeseries r
JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type)
WHERE r.subject_type = 'overall' AND r.subject_id = 'all';
```
---
## Part 10: Cost Model
| Stage | When | Cost | Notes |
|-------|------|------|-------|
| **Raw Storage** | Per review | $0.00 | ~1KB per review |
| **Embedding** | Per review | $0.00 | Local model, ~50ms |
| **LLM Classification + Spans** | Per review | ~$0.0003 | GPT-4o-mini (larger prompt) |
| **Fact Aggregation** | Daily job | $0.00 | SQL, <1 minute |
| **Sub-Clustering** | Per report | $0.00 | HDBSCAN, <1s |
| **LLM Narrative** | Per report | ~$0.15 | GPT-4o |
**Total Costs:**
| Volume | Monthly Ingest | Reports (10/mo) | Total |
|--------|---------------|-----------------|-------|
| 1K reviews | $0.30 | $1.50 | **$1.80** |
| 10K reviews | $3.00 | $1.50 | **$4.50** |
| 100K reviews | $30.00 | $1.50 | **$31.50** |
---
## Part 11: Key Innovations
| Innovation | Benefit |
|------------|---------|
| **Span layer** | Fine-grained classification at semantic unit level |
| **1:1 span-to-issue** | Clean data model, no ambiguity in routing |
| **Deterministic issue IDs** | SHA256-based, reproducible from grouping key |
| **Soft-switch reprocessing** | Atomic span replacement without downtime |
| **URT ENUM types** | Database-enforced classification constraints |
| **Causal chain support** | Full profile enables cause/effect analysis |
| **Entity extraction** | Named entity routing for targeted issues |
| **Trust-weighted facts** | Spam resistance with weighted aggregation |
| **USN notation** | Compact semantic notation for spans |
---
## Document Control
| Field | Value |
|-------|-------|
| **Document** | ReviewIQ Architecture v3.2.0 |
| **Status** | Specification Complete |
| **Date** | 2026-01-24 |
| **Dependencies** | URT Specification v5.1, Issue Lifecycle Framework C1 |
| **Source** | Google Reviews only |
| **Cost Target** | <$35/month at 100K reviews |
### Changelog
| Version | Changes |
|---------|---------|
| v3.0 | Issue lifecycle, strength scores, timeline charts |
| v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring |
| v3.1.1 | Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup |
| v3.1.2 | Versioned issue_spans FK, competitor business_id rule, trust-weighted facts deferred, location_type flag |
| v3.2.0 | **Span layer**: review_spans table, URT ENUM types, causal chain support, entity extraction, reprocessing pattern, deterministic issue IDs, 1:1 span mapping |
### New in v3.2.0
| Feature | Description |
|---------|-------------|
| **review_spans table** | Fine-grained semantic unit extraction from reviews |
| **URT ENUM types** | 12 strongly-typed enums for classification fields |
| **Required extensions** | btree_gist for exclusion constraints, pgcrypto for SHA256 |
| **Span constraints** | chk_span_end, chk_primary_tier3, chk_secondary_max2, chk_secondary_tier3, chk_full_only_fields, chk_no_self_relation, chk_usn_format |
| **Span indexes** | Active ordering, one-primary enforcement, non-overlap exclusion, issue routing |
| **Validation triggers** | Bounds validation, text matching, causal chain structure |
| **Helper functions** | urt_validate_causal_chain, validate_review_relations, validate_active_spans, set_primary_span, generate_issue_id |
| **Reprocessing pattern** | Soft-switch with is_active flag and ingest_batch_id |
| **issue_spans rewrite** | 1:1 span-to-issue mapping with UNIQUE(span_id) |
| **Trust-weighted facts** | trust_weighted_strength, trust_weighted_negative now populated |
| **span_count in facts** | Span-level counting alongside review_count |
### Deferred to v3.3+
| Feature | Reason |
|---------|--------|
| Distinct entity issues | `entity_normalized` defaults to NULL in v3.2; v3.3 creates separate issues per entity |
| Journey step inference | Needs better grounding data |
| Intent signals extraction | Needs action playbooks |
| Stability score tracking | Premature for current version |
| Span embeddings | Per-span vectors for sub-clustering |
---
*End of ReviewIQ Architecture v3.2.0*