# ReviewIQ: Review Intelligence Pipeline **Version**: 3.2.0 **Status**: Architecture Specification (Reviewed) **Date**: 2026-01-24 --- ## Executive Summary ReviewIQ v3.2 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version introduces the **span layer** — a fine-grained extraction model that identifies and classifies individual semantic units within each review, enabling richer issue routing, causal analysis, and entity-aware aggregation. **What's New in v3.2**: - **Span layer**: `review_spans` table extracts individual semantic units from review text - **URT ENUM types**: Strongly-typed classification fields with database-enforced constraints - **Causal chain support**: `profile='full'` spans can capture cause/effect relationships - **Entity extraction**: Named entities (staff, products, locations) linked to spans - **Reprocessing pattern**: Soft-switch `is_active` flag for atomic span replacement - **Deterministic issue routing**: SHA256-based issue IDs from grouping keys - **1:1 span-to-issue mapping**: Each span belongs to exactly one issue **Design Principles**: - **Google Reviews only** (for now) — but schema is source-agnostic - **Relational over arrays** — scales, queries, joins - **Facts-first reporting** — pre-aggregated spine for fast dashboards - **KPI-joinable** — `(business_id, place_id, period_date, bucket_type)` as universal key - **Tenant-scoped locations** — same place_id can exist for multiple businesses - **Span-first classification** — spans are the atomic unit of analysis; review-level is derived --- ## Part 1: System Architecture ``` ┌─────────────────────────────────────────────────────────────────────────────────────┐ │ REVIEWIQ v3.2 PIPELINE │ ├─────────────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ │ │ │ Google │ │ │ │ Reviews │ │ │ │ (API) │ │ │ └──────┬───────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────────────┐ │ │ │ A) SOURCE & STORAGE │ │ │ │ │ │ │ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │ │ │ │ │ │ └──────────────────────────────────┬──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────────────┐ │ │ │ B) ENRICHMENT │ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │ │ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ │ │ │ │ └──────────────┴─────────────┬───────────────┘ │ │ │ │ ▼ │ │ │ │ ┌────────────────────────────────┐ │ │ │ │ │ reviews_enriched │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ review_spans (NEW) │◀── Span Extraction │ │ │ │ │ (per-span classification) │ │ │ │ │ └────────────────────────────────┘ │ │ │ │ │ │ │ └──────────────────────────────┬──────────────────────────────────────────────┘ │ │ │ │ │ ┌────────────┴────────────┐ │ │ ▼ ▼ │ │ ┌────────────────────────────┐ ┌────────────────────────────────┐ │ │ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │ │ │ │ │ │ │ │ │ review_spans │ │ Daily/Weekly Jobs: │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ review_spans │ │ │ │ issue_spans (1:1 link) │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ▼ │ │ fact_timeseries │ │ │ │ issues (update counters) │ │ (pre-aggregated metrics) │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ Keys: │ │ │ │ issue_events (log) │ │ • business_id │ │ │ │ │ │ • place_id (or 'ALL') │ │ │ └────────────────────────────┘ │ • subject_type/id │ │ │ │ │ • period_date │ │ │ │ │ • bucket_type │ │ │ │ └────────────────────────────────┘ │ │ └────────────┬────────────┘ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────────────┐ │ │ │ E) REPORTING │ │ │ │ │ │ │ │ fact_timeseries ──┬──▶ Statistics & Trends │ │ │ │ │ │ │ │ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │ │ │ │ │ │ │ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │ │ │ │ │ │ │ │ competitors ──────┴──▶ Benchmark Comparisons │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ LLM Narrative Generation │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────────────┘ ``` --- ## Part 2: Data Model (SQL DDL) ### 2.0 Required Extensions ```sql -- btree_gist: Enables GiST index on btree-compatible types (for exclusion constraints) CREATE EXTENSION IF NOT EXISTS btree_gist; -- pgcrypto: Provides cryptographic functions (for SHA256-based ID generation) CREATE EXTENSION IF NOT EXISTS pgcrypto; -- pgvector: Vector similarity search (for embeddings) CREATE EXTENSION IF NOT EXISTS vector; ``` ### 2.1 ENUM Types ```sql -- URT classification enums (strongly-typed, database-enforced) CREATE TYPE urt_valence AS ENUM ('V+', 'V-', 'V0', 'V±'); CREATE TYPE urt_intensity AS ENUM ('I1', 'I2', 'I3'); CREATE TYPE urt_specificity AS ENUM ('S1', 'S2', 'S3'); CREATE TYPE urt_actionability AS ENUM ('A1', 'A2', 'A3'); CREATE TYPE urt_temporal AS ENUM ('TC', 'TR', 'TH', 'TF'); CREATE TYPE urt_evidence AS ENUM ('ES', 'EI', 'EC'); CREATE TYPE urt_comparative AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S'); CREATE TYPE urt_profile AS ENUM ('lite', 'core', 'standard', 'full'); CREATE TYPE urt_confidence AS ENUM ('high', 'medium', 'low'); CREATE TYPE urt_relation AS ENUM ('cause_of', 'effect_of', 'contrast', 'resolution'); CREATE TYPE urt_entity_type AS ENUM ('location', 'staff', 'product', 'process', 'time', 'other'); ``` ### 2.2 Dimension Tables ```sql -- Business locations (multi-tenant: same place_id can exist for multiple businesses) -- Includes both owned locations and tracked competitor locations CREATE TABLE locations ( business_id TEXT NOT NULL, -- Internal business identifier place_id TEXT NOT NULL, -- Google Place ID location_type TEXT NOT NULL DEFAULT 'owned' CHECK (location_type IN ('owned', 'competitor')), display_name TEXT NOT NULL, address TEXT, city TEXT, state TEXT, country TEXT, timezone TEXT, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), PRIMARY KEY (business_id, place_id) ); CREATE INDEX idx_locations_place ON locations(place_id); CREATE INDEX idx_locations_owned ON locations(business_id) WHERE location_type = 'owned'; -- URT code reference CREATE TABLE urt_codes ( code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc. domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R category TEXT NOT NULL, subcategory TEXT NOT NULL, display_name TEXT NOT NULL, description TEXT, keywords TEXT[] -- For search/matching ); -- Competitor mapping (separate from locations - no fake business_ids) CREATE TABLE competitors ( id SERIAL PRIMARY KEY, business_id TEXT NOT NULL, -- Your business competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID competitor_name TEXT NOT NULL, relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational' is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT NOW(), UNIQUE(business_id, competitor_place_id) ); CREATE INDEX idx_competitors_business ON competitors(business_id); ``` ### 2.3 Reviews Tables (Raw + Enriched) ```sql -- Immutable raw review storage (audit + reprocessing) CREATE TABLE reviews_raw ( id SERIAL PRIMARY KEY, source TEXT NOT NULL DEFAULT 'google', review_id TEXT NOT NULL, -- Google review ID place_id TEXT NOT NULL, -- Google Place ID -- Raw payload raw_payload JSONB NOT NULL, -- Complete API response review_text TEXT, -- Extracted for indexing rating SMALLINT, review_time TIMESTAMP, reviewer_name TEXT, reviewer_id TEXT, -- Versioning (Google reviews can be edited) review_version INT DEFAULT 1, -- Ingestion metadata pulled_at TIMESTAMP DEFAULT NOW(), UNIQUE(source, review_id, review_version) ); CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC); CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id); -- Enriched review with LLM classification + embeddings (versioned) -- Supports edited reviews: each version is a separate row CREATE TABLE reviews_enriched ( -- Versioned primary key (handles edited reviews) source TEXT NOT NULL DEFAULT 'google', review_id TEXT NOT NULL, -- Matches reviews_raw.review_id review_version INT NOT NULL DEFAULT 1, is_latest BOOLEAN NOT NULL DEFAULT TRUE, -- Link to raw (specific version) raw_id INT NOT NULL REFERENCES reviews_raw(id), -- Identity business_id TEXT NOT NULL, place_id TEXT NOT NULL, -- Core content text TEXT NOT NULL, text_normalized TEXT, -- Cleaned for processing rating SMALLINT, review_time TIMESTAMP NOT NULL, language TEXT, -- URT Classification (from LLM) — review-level summary, derived from spans in v3.2 urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc. urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±' intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3' comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S' -- Extracted entities (summary from spans) staff_mentions TEXT[] DEFAULT '{}', quotes JSONB, -- {"code": "phrase", ...} -- Embedding embedding VECTOR(384), -- Quality control trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0 dedup_group_id TEXT, -- Tenant-scoped: format "{business_id}:{hash}" is_suspicious BOOLEAN DEFAULT FALSE, -- Processing metadata classification_model TEXT, classification_confidence JSONB, -- Per-field confidence scores processed_at TIMESTAMP DEFAULT NOW(), model_version TEXT, -- KPI-ready hooks (nullable, computed later) kpi_impact_estimate FLOAT, kpi_last_computed_at TIMESTAMP, PRIMARY KEY (source, review_id, review_version) ); -- Indexes for common query patterns CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id) WHERE is_latest = TRUE; CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC) WHERE is_latest = TRUE; CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC) WHERE is_latest = TRUE; CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary) WHERE is_latest = TRUE; CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time) WHERE is_latest = TRUE; CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative) WHERE comparative != 'CR-N' AND is_latest = TRUE; CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score) WHERE trust_score < 0.5 AND is_latest = TRUE; CREATE INDEX idx_enriched_embedding ON reviews_enriched USING hnsw (embedding vector_cosine_ops); -- FK to locations (tenant-scoped) ALTER TABLE reviews_enriched ADD CONSTRAINT fk_enriched_location FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id); -- Enforce tenant-scoped dedup format ALTER TABLE reviews_enriched ADD CONSTRAINT chk_dedup_scoped CHECK (dedup_group_id IS NULL OR dedup_group_id LIKE business_id || ':%'); ``` ### 2.4 Span Layer (NEW in v3.2) The span layer extracts individual semantic units from review text. Each span represents a single classifiable statement with its own URT code, valence, intensity, and optional entity reference. ```sql -- Review spans: fine-grained semantic units within reviews CREATE TABLE review_spans ( span_id TEXT PRIMARY KEY, -- Deterministic ID (see §9.5) -- Parent review reference business_id TEXT NOT NULL, place_id TEXT NOT NULL, source TEXT NOT NULL DEFAULT 'google', review_id TEXT NOT NULL, review_version INT NOT NULL, -- Span position (within review text) span_index INT NOT NULL CHECK (span_index >= 0), span_text TEXT NOT NULL, span_start INT NOT NULL CHECK (span_start >= 0), span_end INT NOT NULL, -- Profile level (standard vs full classification) profile urt_profile NOT NULL DEFAULT 'standard', -- URT Classification (strongly-typed) urt_primary TEXT NOT NULL, -- Tier-3 code: 'J1.01', 'P2.03', etc. urt_secondary TEXT[] NOT NULL DEFAULT '{}', valence urt_valence NOT NULL, intensity urt_intensity NOT NULL, comparative urt_comparative NOT NULL DEFAULT 'CR-N', -- Extended classification (standard profile) specificity urt_specificity NOT NULL DEFAULT 'S2', actionability urt_actionability NOT NULL DEFAULT 'A2', temporal urt_temporal NOT NULL DEFAULT 'TC', evidence urt_evidence NOT NULL DEFAULT 'ES', -- Causal relations (full profile only) relation_type urt_relation, related_span_id TEXT REFERENCES review_spans(span_id), causal_chain JSONB, -- Full profile: structured cause/effect -- Entity extraction entity TEXT, -- Raw entity mention entity_type urt_entity_type, entity_normalized TEXT, -- Normalized form for grouping -- Span state is_primary BOOLEAN NOT NULL DEFAULT FALSE, -- Primary span for this review is_active BOOLEAN NOT NULL DEFAULT TRUE, -- Soft-delete for reprocessing review_time TIMESTAMP NOT NULL, -- Denormalized from parent review -- Processing metadata confidence urt_confidence DEFAULT 'medium', usn TEXT, -- URT Semantic Notation string model_version TEXT, ingest_batch_id TEXT, -- For atomic reprocessing created_at TIMESTAMP DEFAULT NOW(), -- Uniqueness within review UNIQUE (source, review_id, review_version, span_index) ); -- Constraints for review_spans ALTER TABLE review_spans ADD CONSTRAINT chk_span_end CHECK (span_end > span_start); ALTER TABLE review_spans ADD CONSTRAINT chk_primary_tier3 CHECK (urt_primary ~ '^[OPJEAVR][1-4]\.[0-9]{2}$'); ALTER TABLE review_spans ADD CONSTRAINT chk_secondary_max2 CHECK (cardinality(urt_secondary) <= 2); -- Validate each element in urt_secondary matches tier-3 pattern ALTER TABLE review_spans ADD CONSTRAINT chk_secondary_tier3 CHECK ( urt_secondary = '{}' OR (SELECT bool_and(elem ~ '^[OPJEAVR][1-4]\.[0-9]{2}$') FROM unnest(urt_secondary) AS elem) ); -- causal_chain only allowed for full profile ALTER TABLE review_spans ADD CONSTRAINT chk_full_only_fields CHECK ( profile = 'full' OR causal_chain IS NULL ); -- No self-referential relations ALTER TABLE review_spans ADD CONSTRAINT chk_no_self_relation CHECK (related_span_id IS NULL OR related_span_id != span_id); -- USN format validation based on profile (URT v5.1 canonical format) -- Lite: URT:L:{domain}:{valence}{intensity} -- Core: URT:C:{category}:{valence}{intensity} -- Standard: URT:S:{subcode}[+{sec}]:{valence}{intensity}:{S}{A}{T}.{E}.{CR} -- Full: URT:F:{subcode}[+{sec}]:{valence}{intensity}:{S}{A}{T}.{E}.{CR}[:{causal}] -- Examples: URT:L:O:+2 | URT:C:J1:-3 | URT:S:J1.03:-2:22TC.ES.N | URT:F:J1.01:-3:23TR.ES.S:CD.O,MG.O ALTER TABLE review_spans ADD CONSTRAINT chk_usn_format CHECK ( usn IS NULL OR (profile = 'lite' AND usn ~ '^URT:L:[OPJEAVR]:[+\-0±][123]$') OR (profile = 'core' AND usn ~ '^URT:C:[OPJEAVR][1-4]:[+\-0±][123]$') OR (profile = 'standard' AND usn ~ '^URT:S:[OPJEAVR][1-4]\.[0-9]{2}(\+[OPJEAVR][1-4]\.[0-9]{2}){0,2}:[+\-0±][123]:[1-3][1-3]T[CRHF]\.E[SIC]\.[NBWS]$') OR (profile = 'full' AND usn ~ '^URT:F:[OPJEAVR][1-4]\.[0-9]{2}(\+[OPJEAVR][1-4]\.[0-9]{2}){0,2}:[+\-0±][123]:[1-3][1-3]T[CRHF]\.E[SIC]\.[NBWS](:(CD|MG|SY)\.[STEOFRPCSHX](,(CD|MG|SY)\.[STEOFRPCSHX])*)?$') ); -- Foreign keys for review_spans ALTER TABLE review_spans ADD CONSTRAINT fk_spans_review FOREIGN KEY (source, review_id, review_version) REFERENCES reviews_enriched(source, review_id, review_version) ON DELETE CASCADE; ALTER TABLE review_spans ADD CONSTRAINT fk_spans_location FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id); ALTER TABLE review_spans ADD CONSTRAINT fk_spans_urt_primary FOREIGN KEY (urt_primary) REFERENCES urt_codes(code); -- Indexes for review_spans CREATE UNIQUE INDEX uq_spans_active_order ON review_spans(source, review_id, review_version, span_index) WHERE is_active = TRUE; CREATE UNIQUE INDEX uq_spans_one_primary_active ON review_spans(source, review_id, review_version) WHERE is_active = TRUE AND is_primary = TRUE; CREATE INDEX idx_spans_review ON review_spans(source, review_id, review_version) WHERE is_active = TRUE; CREATE INDEX idx_spans_business_time ON review_spans(business_id, review_time DESC) WHERE is_active = TRUE; CREATE INDEX idx_spans_issue_routing ON review_spans(business_id, place_id, urt_primary, entity_normalized) WHERE is_active = TRUE AND valence IN ('V-', 'V±'); CREATE INDEX idx_spans_entity ON review_spans(business_id, entity_normalized) WHERE entity_normalized IS NOT NULL AND is_active = TRUE; CREATE INDEX idx_spans_batch ON review_spans(ingest_batch_id) WHERE ingest_batch_id IS NOT NULL; -- Exclusion constraint: no overlapping spans within same active review version CREATE INDEX ex_spans_no_overlap ON review_spans USING gist ( source, review_id, review_version, int4range(span_start, span_end) WITH && ) WHERE is_active = TRUE; -- Note: The above index enables checking for overlaps but does not enforce exclusion. -- For strict enforcement, use: ALTER TABLE review_spans ADD CONSTRAINT ex_spans_no_overlap_constraint EXCLUDE USING gist ( source WITH =, review_id WITH =, review_version WITH =, int4range(span_start, span_end) WITH && ) WHERE (is_active = TRUE); ``` ### 2.5 Issue Tables (Relational, Span-Based) **v3.2 Issue Key**: `(business_id, place_id, urt_primary, entity_normalized)` — entity matching is now active. `entity_normalized` defaults to NULL in v3.2; distinct entities create distinct issues in v3.3+. ```sql -- Issues (aggregated problems) CREATE TABLE issues ( issue_id TEXT PRIMARY KEY, -- Deterministic SHA256-based ID -- Grouping keys (v3.2: code + place + entity) business_id TEXT NOT NULL, place_id TEXT NOT NULL, primary_subcode TEXT NOT NULL, -- URT code domain CHAR(1) NOT NULL, -- State machine state TEXT NOT NULL DEFAULT 'DETECTED', priority_score FLOAT NOT NULL, confidence_score FLOAT NOT NULL, -- Aggregated metrics (updated via triggers/jobs) span_count INT NOT NULL DEFAULT 1, max_intensity TEXT NOT NULL, avg_trust_score FLOAT DEFAULT 1.0, -- CR counters (rolling 30-day window) cr_better_count INT DEFAULT 0, cr_worse_count INT DEFAULT 0, cr_same_count INT DEFAULT 0, -- Star drag proxy (avg rating when this issue present vs absent) star_drag_estimate FLOAT, -- Ownership owner_team TEXT, owner_individual TEXT, -- Timestamps created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), acknowledged_at TIMESTAMP, resolved_at TIMESTAMP, verified_at TIMESTAMP, -- Resolution reopen_count INT DEFAULT 0, resolution_code TEXT, resolution_notes TEXT, decline_reason TEXT, -- Context (v3.2: entity extraction active) entity TEXT, -- Product, staff member, feature entity_normalized TEXT, -- Normalized for grouping (defaults NULL in v3.2) -- KPI-ready hooks (nullable) kpi_impact_estimate FLOAT, kpi_impact_confidence FLOAT, kpi_last_computed_at TIMESTAMP ); CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC); CREATE INDEX idx_issues_place ON issues(place_id, state); CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode); CREATE INDEX idx_issues_open ON issues(business_id) WHERE state NOT IN ('VERIFIED', 'DECLINED'); CREATE INDEX idx_issues_entity ON issues(business_id, entity_normalized) WHERE entity_normalized IS NOT NULL; -- FK to locations (tenant-scoped) ALTER TABLE issues ADD CONSTRAINT fk_issues_location FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id); -- Issue spans: 1:1 link from span to issue (each span belongs to exactly one issue) CREATE TABLE issue_spans ( id SERIAL PRIMARY KEY, issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE, -- Span reference (unique constraint enforces 1:1) span_id TEXT NOT NULL REFERENCES review_spans(span_id) ON DELETE CASCADE, -- Denormalized for queries (copied from span) source TEXT NOT NULL DEFAULT 'google', review_id TEXT NOT NULL, review_version INT NOT NULL, -- Classification snapshot is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match intensity TEXT NOT NULL, -- Copied from span for fast queries review_time TIMESTAMP NOT NULL, -- Denormalized for timeline queries weight FLOAT DEFAULT 1.0, -- For weighted aggregation created_at TIMESTAMP DEFAULT NOW(), -- One span → exactly one issue (1:1 mapping) CONSTRAINT uq_issue_spans_span UNIQUE (span_id) ); CREATE INDEX idx_issue_spans_issue ON issue_spans(issue_id); CREATE INDEX idx_issue_spans_review ON issue_spans(source, review_id, review_version); CREATE INDEX idx_issue_spans_time ON issue_spans(issue_id, review_time DESC); -- Issue events (audit log) CREATE TABLE issue_events ( event_id SERIAL PRIMARY KEY, issue_id TEXT NOT NULL REFERENCES issues(issue_id), event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update' from_state TEXT, to_state TEXT, actor TEXT, -- User or 'system' notes TEXT, -- Triggering span/review reference span_id TEXT, source TEXT DEFAULT 'google', review_id TEXT, review_version INT, metadata JSONB, -- Additional context created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC); CREATE INDEX idx_events_span ON issue_events(span_id) WHERE span_id IS NOT NULL; CREATE INDEX idx_events_review ON issue_events(source, review_id, review_version) WHERE review_id IS NOT NULL; ``` ### 2.6 Unified Analytics Spine **Design Decision**: Sentinel value conventions (do not normalize): - `place_id = 'ALL'` — spatial rollup (all locations) - `subject_id = 'all'` — semantic rollup (all subjects within type) Case matters: `'ALL'` ≠ `'all'`. This avoids NULL handling while keeping the schema simple. ```sql -- Fact table: pre-aggregated time-series metrics CREATE TABLE fact_timeseries ( id SERIAL PRIMARY KEY, -- Universal join keys (KPI-ready) business_id TEXT NOT NULL, place_id TEXT NOT NULL, -- 'ALL' = all locations rollup period_date DATE NOT NULL, bucket_type TEXT NOT NULL, -- 'day', 'week', 'month' -- Subject (what we're measuring) subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall', 'issue' subject_id TEXT NOT NULL, -- Code, domain letter, issue_id, or 'all' -- Volume metrics review_count INT NOT NULL DEFAULT 0, span_count INT NOT NULL DEFAULT 0, -- v3.2: span-level counting negative_count INT NOT NULL DEFAULT 0, positive_count INT NOT NULL DEFAULT 0, neutral_count INT NOT NULL DEFAULT 0, mixed_count INT NOT NULL DEFAULT 0, -- Strength metrics (intensity-weighted) strength_score FLOAT NOT NULL DEFAULT 0, negative_strength FLOAT NOT NULL DEFAULT 0, positive_strength FLOAT NOT NULL DEFAULT 0, -- Rating metrics avg_rating FLOAT, rating_count INT DEFAULT 0, -- Intensity distribution i1_count INT DEFAULT 0, i2_count INT DEFAULT 0, i3_count INT DEFAULT 0, -- CR signals cr_better INT DEFAULT 0, cr_worse INT DEFAULT 0, cr_same INT DEFAULT 0, -- Trust-weighted variants (v3.2: now populated) trust_weighted_strength FLOAT, trust_weighted_negative FLOAT, -- Metadata computed_at TIMESTAMP DEFAULT NOW(), UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id) ); -- Validate 'ALL' sentinel ALTER TABLE fact_timeseries ADD CONSTRAINT chk_place_id_format CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$'); -- Optimized indexes for reporting queries CREATE INDEX idx_facts_lookup ON fact_timeseries( business_id, place_id, subject_type, subject_id, period_date DESC ); CREATE INDEX idx_facts_period ON fact_timeseries( business_id, period_date, bucket_type ); CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id) WHERE subject_type = 'urt_code'; CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date) WHERE place_id = 'ALL'; CREATE INDEX idx_facts_issue ON fact_timeseries(subject_id) WHERE subject_type = 'issue'; ``` **v3.2 Fact Population Scope**: | subject_type | Populated | Notes | |--------------|-----------|-------| | `overall` | Mandatory | Business-wide + per-location | | `urt_code` | Mandatory | Per URT code (from spans) | | `domain` | Derived | Rollup from urt_code at query time | | `issue` | Recommended | Per-issue timelines | **v3.2 Rollup Rules**: - `place_id='ALL'` includes **owned locations only** (not competitors) - Competitor facts live at their `competitor_place_id`, never in `'ALL'` rollup - Competitor comparison queries explicitly join on `competitor_place_id` - Span-level metrics (`span_count`, intensity distribution) are now primary **v3.2 Trust Score Usage**: - `trust_score` applied to **issue priority scoring** and **filtering** - `trust_weighted_strength` / `trust_weighted_negative` now **populated** in v3.2 - Formula: `SUM(trust_score * intensity_weight)` per fact row ### 2.7 Sub-Patterns (Persistent Clustering Results) ```sql -- Stored sub-pattern clustering results CREATE TABLE subpatterns ( id SERIAL PRIMARY KEY, -- Parent subject_type TEXT NOT NULL, -- 'urt_code', 'issue' subject_id TEXT NOT NULL, business_id TEXT NOT NULL, place_id TEXT, -- NULL = all locations -- Period period_start DATE NOT NULL, period_end DATE NOT NULL, -- Cluster info cluster_id INT NOT NULL, label TEXT NOT NULL, -- Metrics review_count INT NOT NULL, span_count INT NOT NULL, -- v3.2: span-level percentage FLOAT NOT NULL, avg_intensity FLOAT, -- Representative content representative_span_id TEXT, -- v3.2: span reference representative_quote TEXT, sharpest_span_id TEXT, sharpest_quote TEXT, -- Embedding (for trend matching) centroid VECTOR(384), -- Metadata computed_at TIMESTAMP DEFAULT NOW(), UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id) ); CREATE INDEX idx_subpatterns_lookup ON subpatterns( subject_type, subject_id, business_id, period_end DESC ); ``` --- ## Part 3: Triggers and Functions ### 3.1 Span Validation Triggers ```sql -- Trigger 1: Validate span_end <= length(review text) CREATE OR REPLACE FUNCTION trg_review_spans_validate_bounds() RETURNS TRIGGER AS $$ DECLARE review_text_length INT; BEGIN SELECT length(text) INTO review_text_length FROM reviews_enriched WHERE source = NEW.source AND review_id = NEW.review_id AND review_version = NEW.review_version; IF review_text_length IS NULL THEN RAISE EXCEPTION 'Parent review not found: (%, %, %)', NEW.source, NEW.review_id, NEW.review_version; END IF; IF NEW.span_end > review_text_length THEN RAISE EXCEPTION 'span_end (%) exceeds review text length (%)', NEW.span_end, review_text_length; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_review_spans_validate_bounds BEFORE INSERT OR UPDATE ON review_spans FOR EACH ROW EXECUTE FUNCTION trg_review_spans_validate_bounds(); -- Trigger 2: Validate span_text matches parent substring (conditional) -- Enabled via session setting: SET reviewiq.validate_span_text = 'on'; CREATE OR REPLACE FUNCTION trg_review_spans_validate_text() RETURNS TRIGGER AS $$ DECLARE review_text TEXT; expected_text TEXT; validate_enabled TEXT; BEGIN -- Check if validation is enabled via session setting BEGIN validate_enabled := current_setting('reviewiq.validate_span_text', true); EXCEPTION WHEN OTHERS THEN validate_enabled := 'off'; END; IF validate_enabled != 'on' THEN RETURN NEW; END IF; SELECT text INTO review_text FROM reviews_enriched WHERE source = NEW.source AND review_id = NEW.review_id AND review_version = NEW.review_version; expected_text := substring(review_text FROM NEW.span_start + 1 FOR NEW.span_end - NEW.span_start); IF NEW.span_text != expected_text THEN RAISE EXCEPTION 'span_text mismatch: expected "%" but got "%"', left(expected_text, 50), left(NEW.span_text, 50); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_review_spans_validate_text BEFORE INSERT OR UPDATE ON review_spans FOR EACH ROW EXECUTE FUNCTION trg_review_spans_validate_text(); -- Trigger 3: Validate causal_chain JSONB structure CREATE OR REPLACE FUNCTION trg_review_spans_validate_causal_chain() RETURNS TRIGGER AS $$ BEGIN IF NEW.causal_chain IS NOT NULL THEN -- Validate structure using helper function IF NOT urt_validate_causal_chain(NEW.causal_chain) THEN RAISE EXCEPTION 'Invalid causal_chain structure'; END IF; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER trg_review_spans_validate_causal_chain BEFORE INSERT OR UPDATE ON review_spans FOR EACH ROW WHEN (NEW.causal_chain IS NOT NULL) EXECUTE FUNCTION trg_review_spans_validate_causal_chain(); ``` ### 3.2 Causal Chain Validation Function ```sql -- Validate causal chain structure, codes, and ordering CREATE OR REPLACE FUNCTION urt_validate_causal_chain(chain JSONB) RETURNS BOOLEAN AS $$ DECLARE link JSONB; link_code TEXT; link_role TEXT; link_order INT; prev_order INT := -1; valid_roles TEXT[] := ARRAY['cause', 'effect', 'context', 'outcome']; BEGIN -- Must be an array IF jsonb_typeof(chain) != 'array' THEN RETURN FALSE; END IF; -- Empty array is valid IF jsonb_array_length(chain) = 0 THEN RETURN TRUE; END IF; -- Validate each link FOR link IN SELECT * FROM jsonb_array_elements(chain) LOOP -- Required fields IF NOT (link ? 'code' AND link ? 'role' AND link ? 'order') THEN RETURN FALSE; END IF; link_code := link->>'code'; link_role := link->>'role'; link_order := (link->>'order')::INT; -- Validate code format (tier-3) IF link_code !~ '^[OPJEAVR][1-4]\.[0-9]{2}$' THEN RETURN FALSE; END IF; -- Validate role IF NOT (link_role = ANY(valid_roles)) THEN RETURN FALSE; END IF; -- Validate order is increasing IF link_order <= prev_order THEN RETURN FALSE; END IF; prev_order := link_order; END LOOP; RETURN TRUE; END; $$ LANGUAGE plpgsql IMMUTABLE; ``` ### 3.3 Span Relation Validation ```sql -- Validate related_span_id references span from same review CREATE OR REPLACE FUNCTION validate_review_relations( p_source TEXT, p_review_id TEXT, p_review_version INT ) RETURNS BOOLEAN AS $$ DECLARE invalid_count INT; BEGIN SELECT COUNT(*) INTO invalid_count FROM review_spans s WHERE s.source = p_source AND s.review_id = p_review_id AND s.review_version = p_review_version AND s.related_span_id IS NOT NULL AND NOT EXISTS ( SELECT 1 FROM review_spans r WHERE r.span_id = s.related_span_id AND r.source = s.source AND r.review_id = s.review_id AND r.review_version = s.review_version ); RETURN invalid_count = 0; END; $$ LANGUAGE plpgsql; ``` ### 3.4 Active Span Set Validation ```sql -- Validate exactly one active span set per review version CREATE OR REPLACE FUNCTION validate_active_spans( p_source TEXT, p_review_id TEXT, p_review_version INT ) RETURNS BOOLEAN AS $$ DECLARE active_count INT; primary_count INT; BEGIN -- Count active spans SELECT COUNT(*), COUNT(*) FILTER (WHERE is_primary) INTO active_count, primary_count FROM review_spans WHERE source = p_source AND review_id = p_review_id AND review_version = p_review_version AND is_active = TRUE; -- Must have at least one active span IF active_count = 0 THEN RETURN FALSE; END IF; -- Must have exactly one primary span IF primary_count != 1 THEN RETURN FALSE; END IF; RETURN TRUE; END; $$ LANGUAGE plpgsql; ``` ### 3.5 Primary Span Selection ```sql -- Deterministic primary span selection: I3 > I2 > I1, V- > V± > V0 > V+, then span_index CREATE OR REPLACE FUNCTION set_primary_span( p_source TEXT, p_review_id TEXT, p_review_version INT ) RETURNS TEXT AS $$ DECLARE selected_span_id TEXT; BEGIN -- Clear existing primary UPDATE review_spans SET is_primary = FALSE WHERE source = p_source AND review_id = p_review_id AND review_version = p_review_version AND is_active = TRUE AND is_primary = TRUE; -- Select new primary using deterministic ordering SELECT span_id INTO selected_span_id FROM review_spans WHERE source = p_source AND review_id = p_review_id AND review_version = p_review_version AND is_active = TRUE ORDER BY -- Intensity: I3 > I2 > I1 CASE intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 WHEN 'I1' THEN 3 END, -- Valence: V- > V± > V0 > V+ CASE valence WHEN 'V-' THEN 1 WHEN 'V±' THEN 2 WHEN 'V0' THEN 3 WHEN 'V+' THEN 4 END, -- Tiebreaker: first span span_index LIMIT 1; -- Set as primary IF selected_span_id IS NOT NULL THEN UPDATE review_spans SET is_primary = TRUE WHERE span_id = selected_span_id; END IF; RETURN selected_span_id; END; $$ LANGUAGE plpgsql; ``` ### 3.6 Deterministic Issue ID Generation ```sql -- Generate deterministic issue_id from grouping key using SHA256 CREATE OR REPLACE FUNCTION generate_issue_id( p_business_id TEXT, p_place_id TEXT, p_urt_primary TEXT, p_entity_normalized TEXT DEFAULT NULL ) RETURNS TEXT AS $$ DECLARE grouping_key TEXT; hash_bytes BYTEA; BEGIN -- Build grouping key (entity_normalized defaults to empty string if NULL) grouping_key := p_business_id || '|' || p_place_id || '|' || p_urt_primary || '|' || COALESCE(p_entity_normalized, ''); -- Generate SHA256 hash hash_bytes := digest(grouping_key, 'sha256'); -- Return first 16 chars of hex encoding (64 bits of entropy) RETURN 'ISS-' || left(encode(hash_bytes, 'hex'), 16); END; $$ LANGUAGE plpgsql IMMUTABLE; ``` --- ## Part 4: Ingest Layer ### 4.1 Google Connector ```python async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]: """Fetch new/updated reviews from Google Places API.""" reviews = await google_places_client.get_reviews(place_id, since=since) for review in reviews: await store_raw_review(place_id, review) return reviews async def store_raw_review(place_id: str, review: dict) -> int: """Store immutable raw review payload.""" existing = await db.query_one(""" SELECT id, review_version FROM reviews_raw WHERE source = 'google' AND review_id = %s ORDER BY review_version DESC LIMIT 1 """, [review['review_id']]) version = 1 if existing: if content_changed(existing, review): version = existing['review_version'] + 1 else: return existing['id'] return await db.insert(""" INSERT INTO reviews_raw ( source, review_id, place_id, raw_payload, review_text, rating, review_time, reviewer_name, reviewer_id, review_version, pulled_at ) VALUES ( 'google', %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() ) RETURNING id """, [ review['review_id'], place_id, json.dumps(review), review.get('text'), review.get('rating'), review.get('time'), review.get('author_name'), review.get('author_id'), version ]) ``` ### 4.2 Enrichment Pipeline ```python async def enrich_review(raw_id: int, business_id: str) -> dict: """ Full enrichment: normalize → classify → embed → trust score → extract spans. Args: raw_id: ID from reviews_raw business_id: Tenant context (passed from ingest job, not looked up) """ raw = await db.query_one( "SELECT * FROM reviews_raw WHERE id = %s", [raw_id] ) # 1. Normalize text = normalize_text(raw['review_text']) # 2. Validate place_id exists under this tenant (owned or competitor) location = await db.query_one( "SELECT display_name, location_type FROM locations WHERE business_id = %s AND place_id = %s", [business_id, raw['place_id']] ) if not location: raise ValueError(f"place_id {raw['place_id']} not registered for business {business_id}") # 3. Parallel: LLM classify (with span extraction) + embed classify_task = asyncio.create_task(classify_review_with_spans(text)) embed_task = asyncio.create_task(embed_review(text)) classification = await classify_task embedding = await embed_task # 4. Trust score trust_score = compute_trust_score(raw, text, classification) # 5. Dedup check dedup_group_id = await find_dedup_group(embedding, raw['place_id']) # 6. Mark previous versions as not-latest await db.execute(""" UPDATE reviews_enriched SET is_latest = FALSE WHERE source = 'google' AND review_id = %s AND is_latest = TRUE """, [raw['review_id']]) # 7. Store enriched (versioned) enriched = { 'source': 'google', 'review_id': raw['review_id'], 'review_version': raw['review_version'], 'is_latest': True, 'raw_id': raw_id, 'business_id': business_id, 'place_id': raw['place_id'], 'text': raw['review_text'], 'text_normalized': text, 'rating': raw['rating'], 'review_time': raw['review_time'], 'language': detect_language(text), 'embedding': embedding, 'trust_score': trust_score, 'dedup_group_id': dedup_group_id, # Review-level classification derived from primary span 'urt_primary': classification['spans'][0]['urt_primary'] if classification['spans'] else 'O1.01', 'valence': classification['review_valence'], 'intensity': classification['review_intensity'], **classification.get('review_meta', {}), } await upsert_enriched_review(enriched) # 8. Extract and store spans (v3.2) batch_id = f"batch-{raw['review_id']}-{raw['review_version']}-{int(time.time())}" await store_review_spans( enriched, classification['spans'], batch_id ) return enriched ``` ### 4.3 LLM Classification with Span Extraction ```python CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT). Analyze the review and extract SPANS (individual semantic units). Each span is a phrase or sentence expressing one classifiable idea. Return JSON: { "spans": [ { "text": "exact phrase from review", "start": 0, "end": 25, "urt_primary": "X1.23", "urt_secondary": [], "valence": "V-", "intensity": "I2", "comparative": "CR-N", "specificity": "S2", "actionability": "A2", "temporal": "TC", "evidence": "ES", "entity": "Mike", "entity_type": "staff", "confidence": "high" } ], "review_valence": "V-", "review_intensity": "I2", "review_meta": { "staff_mentions": ["Mike"], "comparative": "CR-N" } } URT DOMAINS: - O (Offering): Product/service quality, function, completeness - P (People): Staff attitude, competence, responsiveness - J (Journey): Timing, ease, reliability, resolution - E (Environment): Physical space, digital interface, ambiance - A (Access): Availability, accessibility, convenience - V (Value): Price, transparency, worth - R (Relationship): Trust, dependability, loyalty SPAN RULES: 1. Each span = one classifiable semantic unit 2. Spans must not overlap 3. text must be EXACT substring from review 4. start/end are character offsets (0-indexed) 5. First span with highest intensity + negative valence becomes primary INTENSITY: - I1: Mild observation, passing mention - I2: Moderate emphasis, clear statement - I3: Strong emotion, repeated emphasis, dealbreaker SPECIFICITY: - S1: Vague ("it was bad") - S2: Specific ("the wait was 30 minutes") - S3: Precise ("waited 32 minutes on Tuesday at 6pm") ACTIONABILITY: - A1: No clear action ("didn't like it") - A2: Implied action ("too slow") - A3: Explicit action ("need more cashiers during rush hour") TEMPORAL: - TC: Current/recent experience - TR: Recurring pattern - TH: Historical comparison - TF: Future expectation EVIDENCE: - ES: Subjective opinion - EI: Indirect evidence - EC: Concrete/verifiable Return valid JSON only.""" async def classify_review_with_spans(text: str) -> dict: """LLM-powered URT classification with span extraction.""" response = await llm.chat( model="gpt-4o-mini", messages=[ {"role": "system", "content": CLASSIFICATION_PROMPT}, {"role": "user", "content": text} ], response_format={"type": "json_object"}, temperature=0.1 ) result = json.loads(response.content) result['classification_model'] = 'gpt-4o-mini' return result ``` ### 4.4 Span Storage ```python async def store_review_spans( enriched: dict, spans: list[dict], batch_id: str ) -> list[str]: """ Store extracted spans with soft-switch pattern. Returns list of span_ids. """ span_ids = [] for idx, span in enumerate(spans): # Generate deterministic span_id span_id = generate_span_id( enriched['source'], enriched['review_id'], enriched['review_version'], idx ) # Build USN string usn = build_usn(span, profile='standard') await db.execute(""" INSERT INTO review_spans ( span_id, business_id, place_id, source, review_id, review_version, span_index, span_text, span_start, span_end, profile, urt_primary, urt_secondary, valence, intensity, comparative, specificity, actionability, temporal, evidence, entity, entity_type, entity_normalized, is_primary, is_active, review_time, confidence, usn, model_version, ingest_batch_id ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """, [ span_id, enriched['business_id'], enriched['place_id'], enriched['source'], enriched['review_id'], enriched['review_version'], idx, span['text'], span['start'], span['end'], 'standard', span['urt_primary'], span.get('urt_secondary', []), span['valence'], span['intensity'], span.get('comparative', 'CR-N'), span.get('specificity', 'S2'), span.get('actionability', 'A2'), span.get('temporal', 'TC'), span.get('evidence', 'ES'), span.get('entity'), span.get('entity_type'), normalize_entity(span.get('entity')), False, # is_primary set later False, # is_active=FALSE until validated enriched['review_time'], span.get('confidence', 'medium'), usn, 'gpt-4o-mini', batch_id ]) span_ids.append(span_id) # Set primary span await set_primary_span_for_batch( enriched['source'], enriched['review_id'], enriched['review_version'], batch_id ) # Atomic activation (soft-switch) await activate_span_batch( enriched['source'], enriched['review_id'], enriched['review_version'], batch_id ) return span_ids def generate_span_id(source: str, review_id: str, version: int, index: int) -> str: """Generate deterministic span ID.""" key = f"{source}|{review_id}|{version}|{index}" hash_bytes = hashlib.sha256(key.encode()).digest() return f"SPN-{hash_bytes[:8].hex()}" def build_usn(span: dict, profile: str = 'standard') -> str: """Build URT Semantic Notation string.""" base = f"V{span['valence'][-1]}:I{span['intensity'][-1]}:{span['urt_primary']}" if profile == 'full': base += f":S{span.get('specificity', 'S2')[-1]}" base += f":A{span.get('actionability', 'A2')[-1]}" base += f":T{span.get('temporal', 'TC')[-1]}" base += f":E{span.get('evidence', 'ES')[-1]}" return base ``` ### 4.5 Reprocessing Pattern The soft-switch pattern enables atomic span replacement without downtime: ```python async def reprocess_review_spans( source: str, review_id: str, review_version: int ) -> str: """ Reprocess spans for a review using soft-switch pattern. Returns new batch_id. """ # 1. Fetch review review = await db.query_one(""" SELECT * FROM reviews_enriched WHERE source = %s AND review_id = %s AND review_version = %s """, [source, review_id, review_version]) # 2. Re-classify with LLM classification = await classify_review_with_spans(review['text']) # 3. Generate new batch ID batch_id = f"reprocess-{review_id}-{review_version}-{int(time.time())}" # 4. INSERT new spans with is_active=FALSE for idx, span in enumerate(classification['spans']): span_id = generate_span_id(source, review_id, review_version, idx) # ... insert with is_active=FALSE, ingest_batch_id=batch_id # 5. Validate new spans if not await validate_span_set(source, review_id, review_version, batch_id): # Rollback: delete invalid batch await db.execute(""" DELETE FROM review_spans WHERE ingest_batch_id = %s """, [batch_id]) raise ValueError("New span set failed validation") # 6. Set primary span for new batch await set_primary_span_for_batch(source, review_id, review_version, batch_id) # 7. Atomic switch async with db.transaction(): # Deactivate old spans await db.execute(""" UPDATE review_spans SET is_active = FALSE WHERE source = %s AND review_id = %s AND review_version = %s AND is_active = TRUE AND ingest_batch_id != %s """, [source, review_id, review_version, batch_id]) # Activate new spans await db.execute(""" UPDATE review_spans SET is_active = TRUE WHERE ingest_batch_id = %s """, [batch_id]) return batch_id async def activate_span_batch( source: str, review_id: str, review_version: int, batch_id: str ): """Atomically switch from old spans to new batch.""" async with db.transaction(): # Deactivate existing active spans await db.execute(""" UPDATE review_spans SET is_active = FALSE WHERE source = %s AND review_id = %s AND review_version = %s AND is_active = TRUE AND ingest_batch_id != %s """, [source, review_id, review_version, batch_id]) # Activate new batch await db.execute(""" UPDATE review_spans SET is_active = TRUE WHERE ingest_batch_id = %s """, [batch_id]) ``` ### 4.6 Trust Score Computation ```python def compute_trust_score(raw: dict, text: str, classification: dict) -> float: """ Compute trust score (0.0 to 1.0) based on review quality signals. Low trust = likely spam, fake, or low-quality. """ score = 1.0 # Length penalty word_count = len(text.split()) if word_count < 5: score *= 0.5 elif word_count > 500: score *= 0.8 # Rating/sentiment mismatch rating = raw.get('rating') valence = classification.get('review_valence') if rating and valence: if rating >= 4 and valence == 'V-': score *= 0.7 elif rating <= 2 and valence == 'V+': score *= 0.7 # Generic text patterns if is_generic_review(text): score *= 0.6 # Span confidence spans = classification.get('spans', []) if spans: low_conf_count = sum(1 for s in spans if s.get('confidence') == 'low') if low_conf_count > len(spans) / 2: score *= 0.9 return max(0.1, min(1.0, score)) ``` --- ## Part 5: Issue Lifecycle Management ### 5.1 Issue Routing (Span-Based) **v3.2 Issue Key**: `(business_id, place_id, urt_primary, entity_normalized)` ```python async def route_span_to_issue(span: dict) -> Optional[str]: """ Route a span to an existing or new issue. Returns issue_id or None if span doesn't warrant an issue. """ # Only negative/mixed spans create issues if span['valence'] not in ('V-', 'V±'): return None # Generate deterministic issue_id from grouping key issue_id = await db.query_one(""" SELECT generate_issue_id(%s, %s, %s, %s) as issue_id """, [ span['business_id'], span['place_id'], span['urt_primary'], span.get('entity_normalized') # NULL in v3.2 ]) issue_id = issue_id['issue_id'] # Check if issue exists existing = await db.query_one(""" SELECT issue_id, state, span_count FROM issues WHERE issue_id = %s """, [issue_id]) if existing: # Add span to existing issue await add_span_to_issue(existing['issue_id'], span) return existing['issue_id'] # Create new issue if should_create_issue(span): return await create_issue_from_span(span, issue_id) return None async def create_issue_from_span(span: dict, issue_id: str) -> str: """Create a new issue from a span.""" domain = span['urt_primary'][0] await db.execute(""" INSERT INTO issues ( issue_id, business_id, place_id, primary_subcode, domain, state, priority_score, confidence_score, span_count, max_intensity, entity, entity_normalized ) VALUES ( %s, %s, %s, %s, %s, 'DETECTED', %s, %s, 1, %s, %s, %s ) """, [ issue_id, span['business_id'], span['place_id'], span['urt_primary'], domain, compute_initial_priority(span), confidence_to_score(span.get('confidence', 'medium')), span['intensity'], span.get('entity'), span.get('entity_normalized') ]) # Link span to issue await db.execute(""" INSERT INTO issue_spans ( issue_id, span_id, source, review_id, review_version, is_primary_match, intensity, review_time ) VALUES ( %s, %s, %s, %s, %s, TRUE, %s, %s ) """, [ issue_id, span['span_id'], span['source'], span['review_id'], span['review_version'], span['intensity'], span['review_time'] ]) await log_issue_event(issue_id, 'created', span_id=span['span_id']) return issue_id async def add_span_to_issue(issue_id: str, span: dict): """Add span to existing issue and update counters.""" # Insert span link (1:1 mapping enforced by UNIQUE constraint) await db.execute(""" INSERT INTO issue_spans ( issue_id, span_id, source, review_id, review_version, is_primary_match, intensity, review_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (span_id) DO NOTHING """, [ issue_id, span['span_id'], span['source'], span['review_id'], span['review_version'], True, span['intensity'], span['review_time'] ]) # Update issue counters await db.execute(""" UPDATE issues SET span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s), max_intensity = ( SELECT CASE MAX(CASE intensity WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END) WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END FROM issue_spans WHERE issue_id = %s ), updated_at = NOW() WHERE issue_id = %s """, [issue_id, issue_id, issue_id]) await recalculate_priority(issue_id) await log_issue_event( issue_id, 'span_added', span_id=span['span_id'], source=span['source'], review_id=span['review_id'], review_version=span['review_version'] ) ``` ### 5.2 Priority Scoring (Trust-Weighted) ```python INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0} async def recalculate_priority(issue_id: str): """ Priority = intensity × volume × decay × recurrence × trend × trust """ issue = await db.query_one(""" SELECT i.*, (SELECT AVG(re.trust_score) FROM issue_spans s JOIN review_spans rs ON s.span_id = rs.span_id JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version) = (re.source, re.review_id, re.review_version) WHERE s.issue_id = i.issue_id) as avg_trust FROM issues i WHERE i.issue_id = %s """, [issue_id]) intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1) i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0) volume_factor = 1 + math.log(max(1, issue['span_count'])) days_old = (datetime.now() - issue['created_at']).days decay = math.exp(-0.023 * days_old) recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1) if issue['cr_worse_count'] >= 2: trend_modifier = 1.3 elif issue['cr_better_count'] >= 2: trend_modifier = 0.7 else: trend_modifier = 1.0 trust_factor = issue['avg_trust'] or 1.0 priority = ( i_weight * volume_factor * decay * recurrence_boost * trend_modifier * trust_factor ) await db.execute(""" UPDATE issues SET priority_score = %s, avg_trust_score = %s, updated_at = NOW() WHERE issue_id = %s """, [priority, issue['avg_trust'], issue_id]) ``` ### 5.3 Issue Span Drill-Down ```python async def get_issue_spans(issue_id: str, sort_by: str = 'date', limit: int = 50, offset: int = 0) -> list[dict]: """Fetch all spans for an issue with full details.""" order_clause = { 'date': 's.review_time DESC', 'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END", 'trust': 're.trust_score DESC', }.get(sort_by, 's.review_time DESC') return await db.query(f""" SELECT rs.span_id, rs.span_text, rs.span_start, rs.span_end, rs.urt_primary, rs.valence, rs.intensity, rs.specificity, rs.actionability, rs.entity, rs.entity_type, rs.usn, s.review_time, s.is_primary_match, re.review_id, re.review_version, re.text as review_text, re.rating, re.trust_score, l.display_name as location_name FROM issue_spans s JOIN review_spans rs ON s.span_id = rs.span_id JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version) = (re.source, re.review_id, re.review_version) JOIN locations l ON (rs.business_id, rs.place_id) = (l.business_id, l.place_id) WHERE s.issue_id = %s AND rs.is_active = TRUE ORDER BY {order_clause} LIMIT %s OFFSET %s """, [issue_id, limit, offset]) ``` ### 5.4 Strength Score ``` Strength Score = Σ (intensity_weight) Where: I1 (mild) → weight = 1 I2 (moderate) → weight = 2 I3 (strong) → weight = 4 One I3 span = 4 I1 spans = 2 I2 spans ``` --- ## Part 6: Analytics Spine (Fact Population) ### 6.1 Daily Fact Aggregation Job ```python async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'): """ Aggregate spans into fact_timeseries. Run daily. v3.2 populates: - subject_type='overall', subject_id='all' (per location + 'ALL') - subject_type='urt_code', subject_id= (per location + 'ALL') - subject_type='issue', subject_id= (per issue) """ if bucket_type == 'day': period_start = date period_end = date + timedelta(days=1) elif bucket_type == 'week': period_start = date - timedelta(days=date.weekday()) period_end = period_start + timedelta(days=7) elif bucket_type == 'month': period_start = date.replace(day=1) next_month = period_start.replace(day=28) + timedelta(days=4) period_end = next_month.replace(day=1) # Get owned locations (competitors excluded from 'ALL' rollup) owned_locations = await db.query( "SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE AND location_type = 'owned'", [business_id] ) owned_place_ids = [loc['place_id'] for loc in owned_locations] # Per-location facts (owned) for loc in owned_locations: await populate_location_facts_from_spans( business_id, loc['place_id'], period_start, period_end, bucket_type ) # All-locations rollup (owned only — place_id='ALL') await populate_all_locations_facts_from_spans( business_id, owned_place_ids, period_start, period_end, bucket_type ) # Issue facts await populate_issue_facts(business_id, period_start, period_end, bucket_type) async def populate_location_facts_from_spans( business_id: str, place_id: str, period_start: date, period_end: date, bucket_type: str ): """Populate facts for a single location from spans.""" # Aggregate by URT code from spans code_stats = await db.query(""" SELECT rs.urt_primary as code, COUNT(DISTINCT re.review_id) as review_count, COUNT(*) as span_count, SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count, SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count, SUM(CASE WHEN rs.valence = 'V0' THEN 1 ELSE 0 END) as neutral_count, SUM(CASE WHEN rs.valence = 'V±' THEN 1 ELSE 0 END) as mixed_count, SUM(CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END) as strength_score, SUM(CASE WHEN rs.valence = 'V-' THEN CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END ELSE 0 END) as negative_strength, SUM(CASE WHEN rs.valence = 'V+' THEN CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END ELSE 0 END) as positive_strength, SUM(CASE WHEN rs.intensity::text = 'I1' THEN 1 ELSE 0 END) as i1_count, SUM(CASE WHEN rs.intensity::text = 'I2' THEN 1 ELSE 0 END) as i2_count, SUM(CASE WHEN rs.intensity::text = 'I3' THEN 1 ELSE 0 END) as i3_count, SUM(CASE WHEN rs.comparative::text = 'CR-B' THEN 1 ELSE 0 END) as cr_better, SUM(CASE WHEN rs.comparative::text = 'CR-W' THEN 1 ELSE 0 END) as cr_worse, SUM(CASE WHEN rs.comparative::text = 'CR-S' THEN 1 ELSE 0 END) as cr_same, -- Trust-weighted metrics (v3.2) SUM(re.trust_score * CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END) as trust_weighted_strength, SUM(CASE WHEN rs.valence = 'V-' THEN re.trust_score * CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END ELSE 0 END) as trust_weighted_negative, AVG(re.rating) as avg_rating, COUNT(re.rating) as rating_count FROM review_spans rs JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version) = (re.source, re.review_id, re.review_version) WHERE rs.business_id = %s AND rs.place_id = %s AND rs.review_time >= %s AND rs.review_time < %s AND rs.is_active = TRUE AND re.is_latest = TRUE GROUP BY rs.urt_primary """, [business_id, place_id, period_start, period_end]) for stat in code_stats: await upsert_fact( business_id=business_id, place_id=place_id, period_date=period_start, bucket_type=bucket_type, subject_type='urt_code', subject_id=stat['code'], metrics=stat ) # Aggregate overall overall = await db.query_one(""" SELECT COUNT(DISTINCT re.review_id) as review_count, COUNT(*) as span_count, SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count, SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count, AVG(re.rating) as avg_rating FROM review_spans rs JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version) = (re.source, re.review_id, re.review_version) WHERE rs.business_id = %s AND rs.place_id = %s AND rs.review_time >= %s AND rs.review_time < %s AND rs.is_active = TRUE AND re.is_latest = TRUE """, [business_id, place_id, period_start, period_end]) await upsert_fact( business_id=business_id, place_id=place_id, period_date=period_start, bucket_type=bucket_type, subject_type='overall', subject_id='all', metrics=overall ) ``` ### 6.2 Timeline Query (For Charts) ```python async def get_timeline(business_id: str, place_id: Optional[str], subject_type: str, subject_id: str, start: date, end: date, bucket_type: str = 'week') -> list[dict]: """ Query pre-aggregated facts for line charts. Args: place_id: Specific place_id, or None for 'ALL' locations rollup """ # Use 'ALL' sentinel for all-locations queries effective_place_id = place_id if place_id else 'ALL' return await db.query(""" SELECT period_date, review_count, span_count, negative_count, positive_count, strength_score, negative_strength, avg_rating, cr_better, cr_worse, cr_same, trust_weighted_strength, trust_weighted_negative FROM fact_timeseries WHERE business_id = %s AND place_id = %s AND subject_type = %s AND subject_id = %s AND bucket_type = %s AND period_date BETWEEN %s AND %s ORDER BY period_date """, [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end]) ``` --- ## Part 7: Competitor Analysis ### 7.1 Competitor Setup (Clean Model) Competitors are tracked in both `competitors` (relationship metadata) and `locations` (with `location_type='competitor'`). This preserves FK integrity and enables consistent joins for display names/timezones. **Competitor Review Storage Rule**: Competitor reviews are stored with the **customer's business_id** and the **competitor's place_id**: ``` reviews_enriched.business_id = reviews_enriched.place_id = ``` The `locations.location_type` column distinguishes ownership: - `'owned'` — customer's own locations - `'competitor'` — tracked competitor locations ```python async def setup_competitor(business_id: str, competitor_place_id: str, competitor_name: str, relationship: str = 'direct'): """Register a competitor for tracking.""" # 1. Add to locations with location_type='competitor' await db.execute(""" INSERT INTO locations (business_id, place_id, location_type, display_name) VALUES (%s, %s, 'competitor', %s) ON CONFLICT (business_id, place_id) DO UPDATE SET display_name = EXCLUDED.display_name """, [business_id, competitor_place_id, competitor_name]) # 2. Track relationship metadata in competitors table await db.execute(""" INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship) VALUES (%s, %s, %s, %s) ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET competitor_name = EXCLUDED.competitor_name, relationship = EXCLUDED.relationship """, [business_id, competitor_place_id, competitor_name, relationship]) ``` ### 7.2 Competitor Comparison ```python async def get_competitor_comparison(business_id: str, code: str, start: date, end: date) -> dict: """Compare your URT metrics against competitors.""" # Your metrics (from 'ALL' rollup) your_metrics = await db.query_one(""" SELECT SUM(negative_strength) as negative_strength, SUM(span_count) as span_count, AVG(avg_rating) as avg_rating, SUM(trust_weighted_negative) as trust_weighted_negative FROM fact_timeseries WHERE business_id = %s AND place_id = 'ALL' AND subject_type = 'urt_code' AND subject_id = %s AND period_date BETWEEN %s AND %s """, [business_id, code, start, end]) # Competitor metrics competitors = await db.query(""" SELECT competitor_place_id, competitor_name FROM competitors WHERE business_id = %s AND is_active = TRUE """, [business_id]) comparison = { 'your_business': your_metrics or {}, 'competitors': [] } for comp in competitors: comp_metrics = await db.query_one(""" SELECT SUM(negative_strength) as negative_strength, SUM(span_count) as span_count, AVG(avg_rating) as avg_rating FROM fact_timeseries WHERE business_id = %s AND place_id = %s AND subject_type = 'urt_code' AND subject_id = %s AND period_date BETWEEN %s AND %s """, [business_id, comp['competitor_place_id'], code, start, end]) comparison['competitors'].append({ 'name': comp['competitor_name'], 'place_id': comp['competitor_place_id'], **(comp_metrics or {}) }) return comparison ``` --- ## Part 8: Report Generation (Facts-First) ```python async def generate_report(business_id: str, place_id: Optional[str], start: date, end: date) -> dict: """Generate report from pre-aggregated facts.""" effective_place_id = place_id if place_id else 'ALL' # 1. Top issues from facts (fast) top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end) # 2. Strengths from facts strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end) # 3. Sub-patterns with span references for issue in top_issues[:5]: patterns = await discover_and_store_subpatterns( business_id, effective_place_id, issue['code'], start, end ) issue['sub_patterns'] = patterns # 4. Trends from facts trends = await compute_trends_from_facts(business_id, effective_place_id, start, end) # 5. Entity analysis (v3.2) entities = await analyze_entities(business_id, effective_place_id, start, end) # 6. Competitor benchmarks competitors = await get_competitor_benchmarks(business_id, start, end) payload = { 'business_id': business_id, 'place_id': place_id, 'period': f"{start} to {end}", 'issues': top_issues, 'strengths': strengths, 'trends': trends, 'entities': entities, 'competitors': competitors, } narrative = await generate_narrative(payload) return { 'payload': payload, 'narrative': narrative, 'generated_at': datetime.now().isoformat() } async def analyze_entities(business_id: str, place_id: str, start: date, end: date) -> list[dict]: """Analyze entity mentions from spans.""" return await db.query(""" SELECT rs.entity_normalized, rs.entity_type::text, COUNT(*) as mention_count, SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count, SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count, AVG(CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 3 END) as avg_intensity, array_agg(DISTINCT rs.urt_primary) as codes FROM review_spans rs WHERE rs.business_id = %s AND (rs.place_id = %s OR %s = 'ALL') AND rs.review_time >= %s AND rs.review_time < %s AND rs.entity_normalized IS NOT NULL AND rs.is_active = TRUE GROUP BY rs.entity_normalized, rs.entity_type ORDER BY mention_count DESC LIMIT 20 """, [business_id, place_id, place_id, start, end]) ``` --- ## Part 9: KPI-Ready Hooks ### 9.1 Future KPI Integration (Interface Only) ```sql -- Future: KPI fact table with same grain CREATE TABLE fact_kpi_timeseries ( id SERIAL PRIMARY KEY, -- Same join keys as fact_timeseries business_id TEXT NOT NULL, place_id TEXT NOT NULL, -- 'ALL' for rollups period_date DATE NOT NULL, bucket_type TEXT NOT NULL, -- KPI metrics revenue DECIMAL(12,2), transactions INT, cancellations INT, refunds DECIMAL(12,2), support_tickets INT, computed_at TIMESTAMP DEFAULT NOW(), UNIQUE(business_id, place_id, period_date, bucket_type) ); -- Join reviews and KPIs: SELECT r.period_date, r.negative_strength, r.trust_weighted_negative, k.revenue, k.cancellations FROM fact_timeseries r JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type) WHERE r.subject_type = 'overall' AND r.subject_id = 'all'; ``` --- ## Part 10: Cost Model | Stage | When | Cost | Notes | |-------|------|------|-------| | **Raw Storage** | Per review | $0.00 | ~1KB per review | | **Embedding** | Per review | $0.00 | Local model, ~50ms | | **LLM Classification + Spans** | Per review | ~$0.0003 | GPT-4o-mini (larger prompt) | | **Fact Aggregation** | Daily job | $0.00 | SQL, <1 minute | | **Sub-Clustering** | Per report | $0.00 | HDBSCAN, <1s | | **LLM Narrative** | Per report | ~$0.15 | GPT-4o | **Total Costs:** | Volume | Monthly Ingest | Reports (10/mo) | Total | |--------|---------------|-----------------|-------| | 1K reviews | $0.30 | $1.50 | **$1.80** | | 10K reviews | $3.00 | $1.50 | **$4.50** | | 100K reviews | $30.00 | $1.50 | **$31.50** | --- ## Part 11: Key Innovations | Innovation | Benefit | |------------|---------| | **Span layer** | Fine-grained classification at semantic unit level | | **1:1 span-to-issue** | Clean data model, no ambiguity in routing | | **Deterministic issue IDs** | SHA256-based, reproducible from grouping key | | **Soft-switch reprocessing** | Atomic span replacement without downtime | | **URT ENUM types** | Database-enforced classification constraints | | **Causal chain support** | Full profile enables cause/effect analysis | | **Entity extraction** | Named entity routing for targeted issues | | **Trust-weighted facts** | Spam resistance with weighted aggregation | | **USN notation** | Compact semantic notation for spans | --- ## Document Control | Field | Value | |-------|-------| | **Document** | ReviewIQ Architecture v3.2.0 | | **Status** | Specification Complete | | **Date** | 2026-01-24 | | **Dependencies** | URT Specification v5.1, Issue Lifecycle Framework C1 | | **Source** | Google Reviews only | | **Cost Target** | <$35/month at 100K reviews | ### Changelog | Version | Changes | |---------|---------| | v3.0 | Issue lifecycle, strength scores, timeline charts | | v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring | | v3.1.1 | Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup | | v3.1.2 | Versioned issue_spans FK, competitor business_id rule, trust-weighted facts deferred, location_type flag | | v3.2.0 | **Span layer**: review_spans table, URT ENUM types, causal chain support, entity extraction, reprocessing pattern, deterministic issue IDs, 1:1 span mapping | ### New in v3.2.0 | Feature | Description | |---------|-------------| | **review_spans table** | Fine-grained semantic unit extraction from reviews | | **URT ENUM types** | 12 strongly-typed enums for classification fields | | **Required extensions** | btree_gist for exclusion constraints, pgcrypto for SHA256 | | **Span constraints** | chk_span_end, chk_primary_tier3, chk_secondary_max2, chk_secondary_tier3, chk_full_only_fields, chk_no_self_relation, chk_usn_format | | **Span indexes** | Active ordering, one-primary enforcement, non-overlap exclusion, issue routing | | **Validation triggers** | Bounds validation, text matching, causal chain structure | | **Helper functions** | urt_validate_causal_chain, validate_review_relations, validate_active_spans, set_primary_span, generate_issue_id | | **Reprocessing pattern** | Soft-switch with is_active flag and ingest_batch_id | | **issue_spans rewrite** | 1:1 span-to-issue mapping with UNIQUE(span_id) | | **Trust-weighted facts** | trust_weighted_strength, trust_weighted_negative now populated | | **span_count in facts** | Span-level counting alongside review_count | ### Deferred to v3.3+ | Feature | Reason | |---------|--------| | Distinct entity issues | `entity_normalized` defaults to NULL in v3.2; v3.3 creates separate issues per entity | | Journey step inference | Needs better grounding data | | Intent signals extraction | Needs action playbooks | | Stability score tracking | Premature for current version | | Span embeddings | Per-span vectors for sub-clustering | --- *End of ReviewIQ Architecture v3.2.0*