New structure: - scrapers/google_reviews/v1_0_0.py (was modules/scraper_clean.py) - scrapers/base.py (BaseScraper interface) - scrapers/registry.py (ScraperRegistry for version routing) - core/database.py, models.py, config.py, enums.py - utils/logger.py, crash_analyzer.py, health_checks.py, helpers.py, date_converter.py - workers/chrome_pool.py - services/webhook_service.py - api/ routes structure (empty, ready for Phase 2) - tests/ structure mirroring source All imports updated in: - api_server_production.py (7 import paths updated) - utils/health_checks.py (scraper import path) Legacy modules moved to modules/_legacy/: - data_storage.py, image_handler.py, s3_handler.py (unused) Syntax verified, frontend build passing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
83 KiB
ReviewIQ: Review Intelligence Pipeline
Version: 3.2.0 Status: Architecture Specification (Reviewed) Date: 2026-01-24
Executive Summary
ReviewIQ v3.2 transforms Google Reviews into actionable business intelligence through a scalable, KPI-ready pipeline. This version introduces the span layer — a fine-grained extraction model that identifies and classifies individual semantic units within each review, enabling richer issue routing, causal analysis, and entity-aware aggregation.
What's New in v3.2:
- Span layer:
review_spanstable extracts individual semantic units from review text - URT ENUM types: Strongly-typed classification fields with database-enforced constraints
- Causal chain support:
profile='full'spans can capture cause/effect relationships - Entity extraction: Named entities (staff, products, locations) linked to spans
- Reprocessing pattern: Soft-switch
is_activeflag for atomic span replacement - Deterministic issue routing: SHA256-based issue IDs from grouping keys
- 1:1 span-to-issue mapping: Each span belongs to exactly one issue
Design Principles:
- Google Reviews only (for now) — but schema is source-agnostic
- Relational over arrays — scales, queries, joins
- Facts-first reporting — pre-aggregated spine for fast dashboards
- KPI-joinable —
(business_id, place_id, period_date, bucket_type)as universal key - Tenant-scoped locations — same place_id can exist for multiple businesses
- Span-first classification — spans are the atomic unit of analysis; review-level is derived
Part 1: System Architecture
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ REVIEWIQ v3.2 PIPELINE │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Google │ │
│ │ Reviews │ │
│ │ (API) │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ A) SOURCE & STORAGE │ │
│ │ │ │
│ │ google_connector ───▶ reviews_raw (immutable JSON + metadata) │ │
│ │ │ │
│ └──────────────────────────────────┬──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ B) ENRICHMENT │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Normalize │──▶│ LLM │──▶│ Embed │──▶│ Trust │ │ │
│ │ │ + Map │ │ Classify │ │ (local) │ │ Score │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │
│ │ └──────────────┴─────────────┬───────────────┘ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ reviews_enriched │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ review_spans (NEW) │◀── Span Extraction │ │
│ │ │ (per-span classification) │ │ │
│ │ └────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────┬──────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┴────────────┐ │
│ ▼ ▼ │
│ ┌────────────────────────────┐ ┌────────────────────────────────┐ │
│ │ C) OPERATIONALIZATION │ │ D) ANALYTICS SPINE │ │
│ │ │ │ │ │
│ │ review_spans │ │ Daily/Weekly Jobs: │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ review_spans │ │
│ │ issue_spans (1:1 link) │ │ │ │ │
│ │ │ │ │ ▼ │ │
│ │ ▼ │ │ fact_timeseries │ │
│ │ issues (update counters) │ │ (pre-aggregated metrics) │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ Keys: │ │
│ │ issue_events (log) │ │ • business_id │ │
│ │ │ │ • place_id (or 'ALL') │ │
│ └────────────────────────────┘ │ • subject_type/id │ │
│ │ │ • period_date │ │
│ │ │ • bucket_type │ │
│ │ └────────────────────────────────┘ │
│ └────────────┬────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ E) REPORTING │ │
│ │ │ │
│ │ fact_timeseries ──┬──▶ Statistics & Trends │ │
│ │ │ │ │
│ │ issues + spans ───┼──▶ Issue Rankings & Drill-Down │ │
│ │ │ │ │
│ │ embeddings ───────┼──▶ Sub-Pattern Clustering │ │
│ │ │ │ │
│ │ competitors ──────┴──▶ Benchmark Comparisons │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Narrative Generation │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
Part 2: Data Model (SQL DDL)
2.0 Required Extensions
-- btree_gist: Enables GiST index on btree-compatible types (for exclusion constraints)
CREATE EXTENSION IF NOT EXISTS btree_gist;
-- pgcrypto: Provides cryptographic functions (for SHA256-based ID generation)
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- pgvector: Vector similarity search (for embeddings)
CREATE EXTENSION IF NOT EXISTS vector;
2.1 ENUM Types
-- URT classification enums (strongly-typed, database-enforced)
CREATE TYPE urt_valence AS ENUM ('V+', 'V-', 'V0', 'V±');
CREATE TYPE urt_intensity AS ENUM ('I1', 'I2', 'I3');
CREATE TYPE urt_specificity AS ENUM ('S1', 'S2', 'S3');
CREATE TYPE urt_actionability AS ENUM ('A1', 'A2', 'A3');
CREATE TYPE urt_temporal AS ENUM ('TC', 'TR', 'TH', 'TF');
CREATE TYPE urt_evidence AS ENUM ('ES', 'EI', 'EC');
CREATE TYPE urt_comparative AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S');
CREATE TYPE urt_profile AS ENUM ('standard', 'full');
CREATE TYPE urt_confidence AS ENUM ('high', 'medium', 'low');
CREATE TYPE urt_relation AS ENUM ('cause_of', 'effect_of', 'contrast', 'resolution');
CREATE TYPE urt_entity_type AS ENUM ('location', 'staff', 'product', 'process', 'time', 'other');
2.2 Dimension Tables
-- Business locations (multi-tenant: same place_id can exist for multiple businesses)
-- Includes both owned locations and tracked competitor locations
CREATE TABLE locations (
business_id TEXT NOT NULL, -- Internal business identifier
place_id TEXT NOT NULL, -- Google Place ID
location_type TEXT NOT NULL DEFAULT 'owned'
CHECK (location_type IN ('owned', 'competitor')),
display_name TEXT NOT NULL,
address TEXT,
city TEXT,
state TEXT,
country TEXT,
timezone TEXT,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (business_id, place_id)
);
CREATE INDEX idx_locations_place ON locations(place_id);
CREATE INDEX idx_locations_owned ON locations(business_id)
WHERE location_type = 'owned';
-- URT code reference
CREATE TABLE urt_codes (
code TEXT PRIMARY KEY, -- 'J1.01', 'P1.02', etc.
domain CHAR(1) NOT NULL, -- O, P, J, E, A, V, R
category TEXT NOT NULL,
subcategory TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
keywords TEXT[] -- For search/matching
);
-- Competitor mapping (separate from locations - no fake business_ids)
CREATE TABLE competitors (
id SERIAL PRIMARY KEY,
business_id TEXT NOT NULL, -- Your business
competitor_place_id TEXT NOT NULL, -- Competitor's Google Place ID
competitor_name TEXT NOT NULL,
relationship TEXT DEFAULT 'direct', -- 'direct', 'indirect', 'aspirational'
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, competitor_place_id)
);
CREATE INDEX idx_competitors_business ON competitors(business_id);
2.3 Reviews Tables (Raw + Enriched)
-- Immutable raw review storage (audit + reprocessing)
CREATE TABLE reviews_raw (
id SERIAL PRIMARY KEY,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Google review ID
place_id TEXT NOT NULL, -- Google Place ID
-- Raw payload
raw_payload JSONB NOT NULL, -- Complete API response
review_text TEXT, -- Extracted for indexing
rating SMALLINT,
review_time TIMESTAMP,
reviewer_name TEXT,
reviewer_id TEXT,
-- Versioning (Google reviews can be edited)
review_version INT DEFAULT 1,
-- Ingestion metadata
pulled_at TIMESTAMP DEFAULT NOW(),
UNIQUE(source, review_id, review_version)
);
CREATE INDEX idx_reviews_raw_place ON reviews_raw(place_id, review_time DESC);
CREATE INDEX idx_reviews_raw_lookup ON reviews_raw(source, review_id);
-- Enriched review with LLM classification + embeddings (versioned)
-- Supports edited reviews: each version is a separate row
CREATE TABLE reviews_enriched (
-- Versioned primary key (handles edited reviews)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL, -- Matches reviews_raw.review_id
review_version INT NOT NULL DEFAULT 1,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
-- Link to raw (specific version)
raw_id INT NOT NULL REFERENCES reviews_raw(id),
-- Identity
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
-- Core content
text TEXT NOT NULL,
text_normalized TEXT, -- Cleaned for processing
rating SMALLINT,
review_time TIMESTAMP NOT NULL,
language TEXT,
-- URT Classification (from LLM) — review-level summary, derived from spans in v3.2
urt_primary TEXT NOT NULL, -- 'J1.01', 'P1.02', etc.
urt_secondary TEXT[] DEFAULT '{}', -- Max 2, different domains
valence TEXT NOT NULL, -- 'V+', 'V-', 'V0', 'V±'
intensity TEXT NOT NULL, -- 'I1', 'I2', 'I3'
comparative TEXT DEFAULT 'CR-N', -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'
-- Extracted entities (summary from spans)
staff_mentions TEXT[] DEFAULT '{}',
quotes JSONB, -- {"code": "phrase", ...}
-- Embedding
embedding VECTOR(384),
-- Quality control
trust_score FLOAT DEFAULT 1.0, -- 0.0 to 1.0
dedup_group_id TEXT, -- Tenant-scoped: format "{business_id}:{hash}"
is_suspicious BOOLEAN DEFAULT FALSE,
-- Processing metadata
classification_model TEXT,
classification_confidence JSONB, -- Per-field confidence scores
processed_at TIMESTAMP DEFAULT NOW(),
model_version TEXT,
-- KPI-ready hooks (nullable, computed later)
kpi_impact_estimate FLOAT,
kpi_last_computed_at TIMESTAMP,
PRIMARY KEY (source, review_id, review_version)
);
-- Indexes for common query patterns
CREATE INDEX idx_enriched_latest ON reviews_enriched(source, review_id)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_business_date ON reviews_enriched(business_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_place_date ON reviews_enriched(place_id, review_time DESC)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_urt_primary ON reviews_enriched(business_id, urt_primary)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_valence ON reviews_enriched(business_id, valence, review_time)
WHERE is_latest = TRUE;
CREATE INDEX idx_enriched_comparative ON reviews_enriched(comparative)
WHERE comparative != 'CR-N' AND is_latest = TRUE;
CREATE INDEX idx_enriched_trust ON reviews_enriched(trust_score)
WHERE trust_score < 0.5 AND is_latest = TRUE;
CREATE INDEX idx_enriched_embedding ON reviews_enriched
USING hnsw (embedding vector_cosine_ops);
-- FK to locations (tenant-scoped)
ALTER TABLE reviews_enriched
ADD CONSTRAINT fk_enriched_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Enforce tenant-scoped dedup format
ALTER TABLE reviews_enriched
ADD CONSTRAINT chk_dedup_scoped
CHECK (dedup_group_id IS NULL OR dedup_group_id LIKE business_id || ':%');
2.4 Span Layer (NEW in v3.2)
The span layer extracts individual semantic units from review text. Each span represents a single classifiable statement with its own URT code, valence, intensity, and optional entity reference.
-- Review spans: fine-grained semantic units within reviews
CREATE TABLE review_spans (
span_id TEXT PRIMARY KEY, -- Deterministic ID (see §9.5)
-- Parent review reference
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL,
review_version INT NOT NULL,
-- Span position (within review text)
span_index INT NOT NULL CHECK (span_index >= 0),
span_text TEXT NOT NULL,
span_start INT NOT NULL CHECK (span_start >= 0),
span_end INT NOT NULL,
-- Profile level (standard vs full classification)
profile urt_profile NOT NULL DEFAULT 'standard',
-- URT Classification (strongly-typed)
urt_primary TEXT NOT NULL, -- Tier-3 code: 'J1.01', 'P2.03', etc.
urt_secondary TEXT[] NOT NULL DEFAULT '{}',
valence urt_valence NOT NULL,
intensity urt_intensity NOT NULL,
comparative urt_comparative NOT NULL DEFAULT 'CR-N',
-- Extended classification (standard profile)
specificity urt_specificity NOT NULL DEFAULT 'S2',
actionability urt_actionability NOT NULL DEFAULT 'A2',
temporal urt_temporal NOT NULL DEFAULT 'TC',
evidence urt_evidence NOT NULL DEFAULT 'ES',
-- Causal relations (full profile only)
relation_type urt_relation,
related_span_id TEXT REFERENCES review_spans(span_id),
causal_chain JSONB, -- Full profile: structured cause/effect
-- Entity extraction
entity TEXT, -- Raw entity mention
entity_type urt_entity_type,
entity_normalized TEXT, -- Normalized form for grouping
-- Span state
is_primary BOOLEAN NOT NULL DEFAULT FALSE, -- Primary span for this review
is_active BOOLEAN NOT NULL DEFAULT TRUE, -- Soft-delete for reprocessing
review_time TIMESTAMP NOT NULL, -- Denormalized from parent review
-- Processing metadata
confidence urt_confidence DEFAULT 'medium',
usn TEXT, -- URT Semantic Notation string
model_version TEXT,
ingest_batch_id TEXT, -- For atomic reprocessing
created_at TIMESTAMP DEFAULT NOW(),
-- Uniqueness within review
UNIQUE (source, review_id, review_version, span_index)
);
-- Constraints for review_spans
ALTER TABLE review_spans
ADD CONSTRAINT chk_span_end
CHECK (span_end > span_start);
ALTER TABLE review_spans
ADD CONSTRAINT chk_primary_tier3
CHECK (urt_primary ~ '^[OPJEAVR][1-4]\.[0-9]{2}$');
ALTER TABLE review_spans
ADD CONSTRAINT chk_secondary_max2
CHECK (cardinality(urt_secondary) <= 2);
-- Validate each element in urt_secondary matches tier-3 pattern
ALTER TABLE review_spans
ADD CONSTRAINT chk_secondary_tier3
CHECK (
urt_secondary = '{}' OR
(SELECT bool_and(elem ~ '^[OPJEAVR][1-4]\.[0-9]{2}$') FROM unnest(urt_secondary) AS elem)
);
-- causal_chain only allowed for full profile
ALTER TABLE review_spans
ADD CONSTRAINT chk_full_only_fields
CHECK (
profile = 'full' OR causal_chain IS NULL
);
-- No self-referential relations
ALTER TABLE review_spans
ADD CONSTRAINT chk_no_self_relation
CHECK (related_span_id IS NULL OR related_span_id != span_id);
-- USN format validation based on profile
-- Standard: V[+-0±]:I[123]:CODE (e.g., "V-:I2:J1.01")
-- Full: V[+-0±]:I[123]:CODE:S[123]:A[123]:T[CRHF]:E[SIC] (e.g., "V-:I3:J1.01:S2:A2:TC:ES")
ALTER TABLE review_spans
ADD CONSTRAINT chk_usn_format
CHECK (
usn IS NULL OR
(profile = 'standard' AND usn ~ '^V[+\-0±]:I[123]:[OPJEAVR][1-4]\.[0-9]{2}$') OR
(profile = 'full' AND usn ~ '^V[+\-0±]:I[123]:[OPJEAVR][1-4]\.[0-9]{2}:S[123]:A[123]:T[CRHF]:E[SIC]$')
);
-- Foreign keys for review_spans
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_review
FOREIGN KEY (source, review_id, review_version)
REFERENCES reviews_enriched(source, review_id, review_version)
ON DELETE CASCADE;
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_location
FOREIGN KEY (business_id, place_id)
REFERENCES locations(business_id, place_id);
ALTER TABLE review_spans
ADD CONSTRAINT fk_spans_urt_primary
FOREIGN KEY (urt_primary)
REFERENCES urt_codes(code);
-- Indexes for review_spans
CREATE UNIQUE INDEX uq_spans_active_order
ON review_spans(source, review_id, review_version, span_index)
WHERE is_active = TRUE;
CREATE UNIQUE INDEX uq_spans_one_primary_active
ON review_spans(source, review_id, review_version)
WHERE is_active = TRUE AND is_primary = TRUE;
CREATE INDEX idx_spans_review
ON review_spans(source, review_id, review_version)
WHERE is_active = TRUE;
CREATE INDEX idx_spans_business_time
ON review_spans(business_id, review_time DESC)
WHERE is_active = TRUE;
CREATE INDEX idx_spans_issue_routing
ON review_spans(business_id, place_id, urt_primary, entity_normalized)
WHERE is_active = TRUE AND valence IN ('V-', 'V±');
CREATE INDEX idx_spans_entity
ON review_spans(business_id, entity_normalized)
WHERE entity_normalized IS NOT NULL AND is_active = TRUE;
CREATE INDEX idx_spans_batch
ON review_spans(ingest_batch_id)
WHERE ingest_batch_id IS NOT NULL;
-- Exclusion constraint: no overlapping spans within same active review version
CREATE INDEX ex_spans_no_overlap
ON review_spans
USING gist (
source,
review_id,
review_version,
int4range(span_start, span_end) WITH &&
)
WHERE is_active = TRUE;
-- Note: The above index enables checking for overlaps but does not enforce exclusion.
-- For strict enforcement, use:
ALTER TABLE review_spans
ADD CONSTRAINT ex_spans_no_overlap_constraint
EXCLUDE USING gist (
source WITH =,
review_id WITH =,
review_version WITH =,
int4range(span_start, span_end) WITH &&
)
WHERE (is_active = TRUE);
2.5 Issue Tables (Relational, Span-Based)
v3.2 Issue Key: (business_id, place_id, urt_primary, entity_normalized) — entity matching is now active. entity_normalized defaults to NULL in v3.2; distinct entities create distinct issues in v3.3+.
-- Issues (aggregated problems)
CREATE TABLE issues (
issue_id TEXT PRIMARY KEY, -- Deterministic SHA256-based ID
-- Grouping keys (v3.2: code + place + entity)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL,
primary_subcode TEXT NOT NULL, -- URT code
domain CHAR(1) NOT NULL,
-- State machine
state TEXT NOT NULL DEFAULT 'DETECTED',
priority_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
-- Aggregated metrics (updated via triggers/jobs)
span_count INT NOT NULL DEFAULT 1,
max_intensity TEXT NOT NULL,
avg_trust_score FLOAT DEFAULT 1.0,
-- CR counters (rolling 30-day window)
cr_better_count INT DEFAULT 0,
cr_worse_count INT DEFAULT 0,
cr_same_count INT DEFAULT 0,
-- Star drag proxy (avg rating when this issue present vs absent)
star_drag_estimate FLOAT,
-- Ownership
owner_team TEXT,
owner_individual TEXT,
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
acknowledged_at TIMESTAMP,
resolved_at TIMESTAMP,
verified_at TIMESTAMP,
-- Resolution
reopen_count INT DEFAULT 0,
resolution_code TEXT,
resolution_notes TEXT,
decline_reason TEXT,
-- Context (v3.2: entity extraction active)
entity TEXT, -- Product, staff member, feature
entity_normalized TEXT, -- Normalized for grouping (defaults NULL in v3.2)
-- KPI-ready hooks (nullable)
kpi_impact_estimate FLOAT,
kpi_impact_confidence FLOAT,
kpi_last_computed_at TIMESTAMP
);
CREATE INDEX idx_issues_business ON issues(business_id, state, priority_score DESC);
CREATE INDEX idx_issues_place ON issues(place_id, state);
CREATE INDEX idx_issues_code ON issues(business_id, primary_subcode);
CREATE INDEX idx_issues_open ON issues(business_id)
WHERE state NOT IN ('VERIFIED', 'DECLINED');
CREATE INDEX idx_issues_entity ON issues(business_id, entity_normalized)
WHERE entity_normalized IS NOT NULL;
-- FK to locations (tenant-scoped)
ALTER TABLE issues
ADD CONSTRAINT fk_issues_location
FOREIGN KEY (business_id, place_id) REFERENCES locations(business_id, place_id);
-- Issue spans: 1:1 link from span to issue (each span belongs to exactly one issue)
CREATE TABLE issue_spans (
id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id) ON DELETE CASCADE,
-- Span reference (unique constraint enforces 1:1)
span_id TEXT NOT NULL REFERENCES review_spans(span_id) ON DELETE CASCADE,
-- Denormalized for queries (copied from span)
source TEXT NOT NULL DEFAULT 'google',
review_id TEXT NOT NULL,
review_version INT NOT NULL,
-- Classification snapshot
is_primary_match BOOLEAN DEFAULT TRUE, -- Primary vs secondary code match
intensity TEXT NOT NULL, -- Copied from span for fast queries
review_time TIMESTAMP NOT NULL, -- Denormalized for timeline queries
weight FLOAT DEFAULT 1.0, -- For weighted aggregation
created_at TIMESTAMP DEFAULT NOW(),
-- One span → exactly one issue (1:1 mapping)
CONSTRAINT uq_issue_spans_span UNIQUE (span_id)
);
CREATE INDEX idx_issue_spans_issue ON issue_spans(issue_id);
CREATE INDEX idx_issue_spans_review ON issue_spans(source, review_id, review_version);
CREATE INDEX idx_issue_spans_time ON issue_spans(issue_id, review_time DESC);
-- Issue events (audit log)
CREATE TABLE issue_events (
event_id SERIAL PRIMARY KEY,
issue_id TEXT NOT NULL REFERENCES issues(issue_id),
event_type TEXT NOT NULL, -- 'state_change', 'span_added', 'priority_update'
from_state TEXT,
to_state TEXT,
actor TEXT, -- User or 'system'
notes TEXT,
-- Triggering span/review reference
span_id TEXT,
source TEXT DEFAULT 'google',
review_id TEXT,
review_version INT,
metadata JSONB, -- Additional context
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_events_issue ON issue_events(issue_id, created_at DESC);
CREATE INDEX idx_events_span ON issue_events(span_id)
WHERE span_id IS NOT NULL;
CREATE INDEX idx_events_review ON issue_events(source, review_id, review_version)
WHERE review_id IS NOT NULL;
2.6 Unified Analytics Spine
Design Decision: Sentinel value conventions (do not normalize):
place_id = 'ALL'— spatial rollup (all locations)subject_id = 'all'— semantic rollup (all subjects within type)
Case matters: 'ALL' ≠ 'all'. This avoids NULL handling while keeping the schema simple.
-- Fact table: pre-aggregated time-series metrics
CREATE TABLE fact_timeseries (
id SERIAL PRIMARY KEY,
-- Universal join keys (KPI-ready)
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' = all locations rollup
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL, -- 'day', 'week', 'month'
-- Subject (what we're measuring)
subject_type TEXT NOT NULL, -- 'urt_code', 'domain', 'overall', 'issue'
subject_id TEXT NOT NULL, -- Code, domain letter, issue_id, or 'all'
-- Volume metrics
review_count INT NOT NULL DEFAULT 0,
span_count INT NOT NULL DEFAULT 0, -- v3.2: span-level counting
negative_count INT NOT NULL DEFAULT 0,
positive_count INT NOT NULL DEFAULT 0,
neutral_count INT NOT NULL DEFAULT 0,
mixed_count INT NOT NULL DEFAULT 0,
-- Strength metrics (intensity-weighted)
strength_score FLOAT NOT NULL DEFAULT 0,
negative_strength FLOAT NOT NULL DEFAULT 0,
positive_strength FLOAT NOT NULL DEFAULT 0,
-- Rating metrics
avg_rating FLOAT,
rating_count INT DEFAULT 0,
-- Intensity distribution
i1_count INT DEFAULT 0,
i2_count INT DEFAULT 0,
i3_count INT DEFAULT 0,
-- CR signals
cr_better INT DEFAULT 0,
cr_worse INT DEFAULT 0,
cr_same INT DEFAULT 0,
-- Trust-weighted variants (v3.2: now populated)
trust_weighted_strength FLOAT,
trust_weighted_negative FLOAT,
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type, subject_type, subject_id)
);
-- Validate 'ALL' sentinel
ALTER TABLE fact_timeseries
ADD CONSTRAINT chk_place_id_format
CHECK (place_id = 'ALL' OR place_id ~ '^[a-zA-Z0-9_-]+$');
-- Optimized indexes for reporting queries
CREATE INDEX idx_facts_lookup ON fact_timeseries(
business_id, place_id, subject_type, subject_id, period_date DESC
);
CREATE INDEX idx_facts_period ON fact_timeseries(
business_id, period_date, bucket_type
);
CREATE INDEX idx_facts_code ON fact_timeseries(subject_type, subject_id)
WHERE subject_type = 'urt_code';
CREATE INDEX idx_facts_all_locations ON fact_timeseries(business_id, period_date)
WHERE place_id = 'ALL';
CREATE INDEX idx_facts_issue ON fact_timeseries(subject_id)
WHERE subject_type = 'issue';
v3.2 Fact Population Scope:
| subject_type | Populated | Notes |
|---|---|---|
overall |
Mandatory | Business-wide + per-location |
urt_code |
Mandatory | Per URT code (from spans) |
domain |
Derived | Rollup from urt_code at query time |
issue |
Recommended | Per-issue timelines |
v3.2 Rollup Rules:
place_id='ALL'includes owned locations only (not competitors)- Competitor facts live at their
competitor_place_id, never in'ALL'rollup - Competitor comparison queries explicitly join on
competitor_place_id - Span-level metrics (
span_count, intensity distribution) are now primary
v3.2 Trust Score Usage:
trust_scoreapplied to issue priority scoring and filteringtrust_weighted_strength/trust_weighted_negativenow populated in v3.2- Formula:
SUM(trust_score * intensity_weight)per fact row
2.7 Sub-Patterns (Persistent Clustering Results)
-- Stored sub-pattern clustering results
CREATE TABLE subpatterns (
id SERIAL PRIMARY KEY,
-- Parent
subject_type TEXT NOT NULL, -- 'urt_code', 'issue'
subject_id TEXT NOT NULL,
business_id TEXT NOT NULL,
place_id TEXT, -- NULL = all locations
-- Period
period_start DATE NOT NULL,
period_end DATE NOT NULL,
-- Cluster info
cluster_id INT NOT NULL,
label TEXT NOT NULL,
-- Metrics
review_count INT NOT NULL,
span_count INT NOT NULL, -- v3.2: span-level
percentage FLOAT NOT NULL,
avg_intensity FLOAT,
-- Representative content
representative_span_id TEXT, -- v3.2: span reference
representative_quote TEXT,
sharpest_span_id TEXT,
sharpest_quote TEXT,
-- Embedding (for trend matching)
centroid VECTOR(384),
-- Metadata
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(subject_type, subject_id, business_id, place_id, period_start, period_end, cluster_id)
);
CREATE INDEX idx_subpatterns_lookup ON subpatterns(
subject_type, subject_id, business_id, period_end DESC
);
Part 3: Triggers and Functions
3.1 Span Validation Triggers
-- Trigger 1: Validate span_end <= length(review text)
CREATE OR REPLACE FUNCTION trg_review_spans_validate_bounds()
RETURNS TRIGGER AS $$
DECLARE
review_text_length INT;
BEGIN
SELECT length(text) INTO review_text_length
FROM reviews_enriched
WHERE source = NEW.source
AND review_id = NEW.review_id
AND review_version = NEW.review_version;
IF review_text_length IS NULL THEN
RAISE EXCEPTION 'Parent review not found: (%, %, %)',
NEW.source, NEW.review_id, NEW.review_version;
END IF;
IF NEW.span_end > review_text_length THEN
RAISE EXCEPTION 'span_end (%) exceeds review text length (%)',
NEW.span_end, review_text_length;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_bounds
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
EXECUTE FUNCTION trg_review_spans_validate_bounds();
-- Trigger 2: Validate span_text matches parent substring (conditional)
-- Enabled via session setting: SET reviewiq.validate_span_text = 'on';
CREATE OR REPLACE FUNCTION trg_review_spans_validate_text()
RETURNS TRIGGER AS $$
DECLARE
review_text TEXT;
expected_text TEXT;
validate_enabled TEXT;
BEGIN
-- Check if validation is enabled via session setting
BEGIN
validate_enabled := current_setting('reviewiq.validate_span_text', true);
EXCEPTION WHEN OTHERS THEN
validate_enabled := 'off';
END;
IF validate_enabled != 'on' THEN
RETURN NEW;
END IF;
SELECT text INTO review_text
FROM reviews_enriched
WHERE source = NEW.source
AND review_id = NEW.review_id
AND review_version = NEW.review_version;
expected_text := substring(review_text FROM NEW.span_start + 1 FOR NEW.span_end - NEW.span_start);
IF NEW.span_text != expected_text THEN
RAISE EXCEPTION 'span_text mismatch: expected "%" but got "%"',
left(expected_text, 50), left(NEW.span_text, 50);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_text
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
EXECUTE FUNCTION trg_review_spans_validate_text();
-- Trigger 3: Validate causal_chain JSONB structure
CREATE OR REPLACE FUNCTION trg_review_spans_validate_causal_chain()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.causal_chain IS NOT NULL THEN
-- Validate structure using helper function
IF NOT urt_validate_causal_chain(NEW.causal_chain) THEN
RAISE EXCEPTION 'Invalid causal_chain structure';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_review_spans_validate_causal_chain
BEFORE INSERT OR UPDATE ON review_spans
FOR EACH ROW
WHEN (NEW.causal_chain IS NOT NULL)
EXECUTE FUNCTION trg_review_spans_validate_causal_chain();
3.2 Causal Chain Validation Function
-- Validate causal chain structure, codes, and ordering
CREATE OR REPLACE FUNCTION urt_validate_causal_chain(chain JSONB)
RETURNS BOOLEAN AS $$
DECLARE
link JSONB;
link_code TEXT;
link_role TEXT;
link_order INT;
prev_order INT := -1;
valid_roles TEXT[] := ARRAY['cause', 'effect', 'context', 'outcome'];
BEGIN
-- Must be an array
IF jsonb_typeof(chain) != 'array' THEN
RETURN FALSE;
END IF;
-- Empty array is valid
IF jsonb_array_length(chain) = 0 THEN
RETURN TRUE;
END IF;
-- Validate each link
FOR link IN SELECT * FROM jsonb_array_elements(chain)
LOOP
-- Required fields
IF NOT (link ? 'code' AND link ? 'role' AND link ? 'order') THEN
RETURN FALSE;
END IF;
link_code := link->>'code';
link_role := link->>'role';
link_order := (link->>'order')::INT;
-- Validate code format (tier-3)
IF link_code !~ '^[OPJEAVR][1-4]\.[0-9]{2}$' THEN
RETURN FALSE;
END IF;
-- Validate role
IF NOT (link_role = ANY(valid_roles)) THEN
RETURN FALSE;
END IF;
-- Validate order is increasing
IF link_order <= prev_order THEN
RETURN FALSE;
END IF;
prev_order := link_order;
END LOOP;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
3.3 Span Relation Validation
-- Validate related_span_id references span from same review
CREATE OR REPLACE FUNCTION validate_review_relations(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS BOOLEAN AS $$
DECLARE
invalid_count INT;
BEGIN
SELECT COUNT(*) INTO invalid_count
FROM review_spans s
WHERE s.source = p_source
AND s.review_id = p_review_id
AND s.review_version = p_review_version
AND s.related_span_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM review_spans r
WHERE r.span_id = s.related_span_id
AND r.source = s.source
AND r.review_id = s.review_id
AND r.review_version = s.review_version
);
RETURN invalid_count = 0;
END;
$$ LANGUAGE plpgsql;
3.4 Active Span Set Validation
-- Validate exactly one active span set per review version
CREATE OR REPLACE FUNCTION validate_active_spans(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS BOOLEAN AS $$
DECLARE
active_count INT;
primary_count INT;
BEGIN
-- Count active spans
SELECT COUNT(*), COUNT(*) FILTER (WHERE is_primary)
INTO active_count, primary_count
FROM review_spans
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE;
-- Must have at least one active span
IF active_count = 0 THEN
RETURN FALSE;
END IF;
-- Must have exactly one primary span
IF primary_count != 1 THEN
RETURN FALSE;
END IF;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
3.5 Primary Span Selection
-- Deterministic primary span selection: I3 > I2 > I1, V- > V± > V0 > V+, then span_index
CREATE OR REPLACE FUNCTION set_primary_span(
p_source TEXT,
p_review_id TEXT,
p_review_version INT
)
RETURNS TEXT AS $$
DECLARE
selected_span_id TEXT;
BEGIN
-- Clear existing primary
UPDATE review_spans
SET is_primary = FALSE
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE
AND is_primary = TRUE;
-- Select new primary using deterministic ordering
SELECT span_id INTO selected_span_id
FROM review_spans
WHERE source = p_source
AND review_id = p_review_id
AND review_version = p_review_version
AND is_active = TRUE
ORDER BY
-- Intensity: I3 > I2 > I1
CASE intensity
WHEN 'I3' THEN 1
WHEN 'I2' THEN 2
WHEN 'I1' THEN 3
END,
-- Valence: V- > V± > V0 > V+
CASE valence
WHEN 'V-' THEN 1
WHEN 'V±' THEN 2
WHEN 'V0' THEN 3
WHEN 'V+' THEN 4
END,
-- Tiebreaker: first span
span_index
LIMIT 1;
-- Set as primary
IF selected_span_id IS NOT NULL THEN
UPDATE review_spans
SET is_primary = TRUE
WHERE span_id = selected_span_id;
END IF;
RETURN selected_span_id;
END;
$$ LANGUAGE plpgsql;
3.6 Deterministic Issue ID Generation
-- Generate deterministic issue_id from grouping key using SHA256
CREATE OR REPLACE FUNCTION generate_issue_id(
p_business_id TEXT,
p_place_id TEXT,
p_urt_primary TEXT,
p_entity_normalized TEXT DEFAULT NULL
)
RETURNS TEXT AS $$
DECLARE
grouping_key TEXT;
hash_bytes BYTEA;
BEGIN
-- Build grouping key (entity_normalized defaults to empty string if NULL)
grouping_key := p_business_id || '|' || p_place_id || '|' || p_urt_primary || '|' || COALESCE(p_entity_normalized, '');
-- Generate SHA256 hash
hash_bytes := digest(grouping_key, 'sha256');
-- Return first 16 chars of hex encoding (64 bits of entropy)
RETURN 'ISS-' || left(encode(hash_bytes, 'hex'), 16);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
Part 4: Ingest Layer
4.1 Google Connector
async def pull_reviews(place_id: str, since: datetime = None) -> list[dict]:
"""Fetch new/updated reviews from Google Places API."""
reviews = await google_places_client.get_reviews(place_id, since=since)
for review in reviews:
await store_raw_review(place_id, review)
return reviews
async def store_raw_review(place_id: str, review: dict) -> int:
"""Store immutable raw review payload."""
existing = await db.query_one("""
SELECT id, review_version FROM reviews_raw
WHERE source = 'google' AND review_id = %s
ORDER BY review_version DESC LIMIT 1
""", [review['review_id']])
version = 1
if existing:
if content_changed(existing, review):
version = existing['review_version'] + 1
else:
return existing['id']
return await db.insert("""
INSERT INTO reviews_raw (
source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id,
review_version, pulled_at
) VALUES (
'google', %s, %s, %s,
%s, %s, %s, %s, %s,
%s, NOW()
)
RETURNING id
""", [
review['review_id'], place_id, json.dumps(review),
review.get('text'), review.get('rating'),
review.get('time'), review.get('author_name'), review.get('author_id'),
version
])
4.2 Enrichment Pipeline
async def enrich_review(raw_id: int, business_id: str) -> dict:
"""
Full enrichment: normalize → classify → embed → trust score → extract spans.
Args:
raw_id: ID from reviews_raw
business_id: Tenant context (passed from ingest job, not looked up)
"""
raw = await db.query_one(
"SELECT * FROM reviews_raw WHERE id = %s", [raw_id]
)
# 1. Normalize
text = normalize_text(raw['review_text'])
# 2. Validate place_id exists under this tenant (owned or competitor)
location = await db.query_one(
"SELECT display_name, location_type FROM locations WHERE business_id = %s AND place_id = %s",
[business_id, raw['place_id']]
)
if not location:
raise ValueError(f"place_id {raw['place_id']} not registered for business {business_id}")
# 3. Parallel: LLM classify (with span extraction) + embed
classify_task = asyncio.create_task(classify_review_with_spans(text))
embed_task = asyncio.create_task(embed_review(text))
classification = await classify_task
embedding = await embed_task
# 4. Trust score
trust_score = compute_trust_score(raw, text, classification)
# 5. Dedup check
dedup_group_id = await find_dedup_group(embedding, raw['place_id'])
# 6. Mark previous versions as not-latest
await db.execute("""
UPDATE reviews_enriched
SET is_latest = FALSE
WHERE source = 'google' AND review_id = %s AND is_latest = TRUE
""", [raw['review_id']])
# 7. Store enriched (versioned)
enriched = {
'source': 'google',
'review_id': raw['review_id'],
'review_version': raw['review_version'],
'is_latest': True,
'raw_id': raw_id,
'business_id': business_id,
'place_id': raw['place_id'],
'text': raw['review_text'],
'text_normalized': text,
'rating': raw['rating'],
'review_time': raw['review_time'],
'language': detect_language(text),
'embedding': embedding,
'trust_score': trust_score,
'dedup_group_id': dedup_group_id,
# Review-level classification derived from primary span
'urt_primary': classification['spans'][0]['urt_primary'] if classification['spans'] else 'O1.01',
'valence': classification['review_valence'],
'intensity': classification['review_intensity'],
**classification.get('review_meta', {}),
}
await upsert_enriched_review(enriched)
# 8. Extract and store spans (v3.2)
batch_id = f"batch-{raw['review_id']}-{raw['review_version']}-{int(time.time())}"
await store_review_spans(
enriched,
classification['spans'],
batch_id
)
return enriched
4.3 LLM Classification with Span Extraction
CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).
Analyze the review and extract SPANS (individual semantic units). Each span is a phrase or sentence expressing one classifiable idea.
Return JSON:
{
"spans": [
{
"text": "exact phrase from review",
"start": 0,
"end": 25,
"urt_primary": "X1.23",
"urt_secondary": [],
"valence": "V-",
"intensity": "I2",
"comparative": "CR-N",
"specificity": "S2",
"actionability": "A2",
"temporal": "TC",
"evidence": "ES",
"entity": "Mike",
"entity_type": "staff",
"confidence": "high"
}
],
"review_valence": "V-",
"review_intensity": "I2",
"review_meta": {
"staff_mentions": ["Mike"],
"comparative": "CR-N"
}
}
URT DOMAINS:
- O (Offering): Product/service quality, function, completeness
- P (People): Staff attitude, competence, responsiveness
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance
- A (Access): Availability, accessibility, convenience
- V (Value): Price, transparency, worth
- R (Relationship): Trust, dependability, loyalty
SPAN RULES:
1. Each span = one classifiable semantic unit
2. Spans must not overlap
3. text must be EXACT substring from review
4. start/end are character offsets (0-indexed)
5. First span with highest intensity + negative valence becomes primary
INTENSITY:
- I1: Mild observation, passing mention
- I2: Moderate emphasis, clear statement
- I3: Strong emotion, repeated emphasis, dealbreaker
SPECIFICITY:
- S1: Vague ("it was bad")
- S2: Specific ("the wait was 30 minutes")
- S3: Precise ("waited 32 minutes on Tuesday at 6pm")
ACTIONABILITY:
- A1: No clear action ("didn't like it")
- A2: Implied action ("too slow")
- A3: Explicit action ("need more cashiers during rush hour")
TEMPORAL:
- TC: Current/recent experience
- TR: Recurring pattern
- TH: Historical comparison
- TF: Future expectation
EVIDENCE:
- ES: Subjective opinion
- EI: Indirect evidence
- EC: Concrete/verifiable
Return valid JSON only."""
async def classify_review_with_spans(text: str) -> dict:
"""LLM-powered URT classification with span extraction."""
response = await llm.chat(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": CLASSIFICATION_PROMPT},
{"role": "user", "content": text}
],
response_format={"type": "json_object"},
temperature=0.1
)
result = json.loads(response.content)
result['classification_model'] = 'gpt-4o-mini'
return result
4.4 Span Storage
async def store_review_spans(
enriched: dict,
spans: list[dict],
batch_id: str
) -> list[str]:
"""
Store extracted spans with soft-switch pattern.
Returns list of span_ids.
"""
span_ids = []
for idx, span in enumerate(spans):
# Generate deterministic span_id
span_id = generate_span_id(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
idx
)
# Build USN string
usn = build_usn(span, profile='standard')
await db.execute("""
INSERT INTO review_spans (
span_id, business_id, place_id, source, review_id, review_version,
span_index, span_text, span_start, span_end,
profile, urt_primary, urt_secondary, valence, intensity, comparative,
specificity, actionability, temporal, evidence,
entity, entity_type, entity_normalized,
is_primary, is_active, review_time,
confidence, usn, model_version, ingest_batch_id
) VALUES (
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s
)
""", [
span_id, enriched['business_id'], enriched['place_id'],
enriched['source'], enriched['review_id'], enriched['review_version'],
idx, span['text'], span['start'], span['end'],
'standard', span['urt_primary'], span.get('urt_secondary', []),
span['valence'], span['intensity'], span.get('comparative', 'CR-N'),
span.get('specificity', 'S2'), span.get('actionability', 'A2'),
span.get('temporal', 'TC'), span.get('evidence', 'ES'),
span.get('entity'), span.get('entity_type'),
normalize_entity(span.get('entity')),
False, # is_primary set later
False, # is_active=FALSE until validated
enriched['review_time'],
span.get('confidence', 'medium'), usn, 'gpt-4o-mini', batch_id
])
span_ids.append(span_id)
# Set primary span
await set_primary_span_for_batch(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
batch_id
)
# Atomic activation (soft-switch)
await activate_span_batch(
enriched['source'],
enriched['review_id'],
enriched['review_version'],
batch_id
)
return span_ids
def generate_span_id(source: str, review_id: str, version: int, index: int) -> str:
"""Generate deterministic span ID."""
key = f"{source}|{review_id}|{version}|{index}"
hash_bytes = hashlib.sha256(key.encode()).digest()
return f"SPN-{hash_bytes[:8].hex()}"
def build_usn(span: dict, profile: str = 'standard') -> str:
"""Build URT Semantic Notation string."""
base = f"V{span['valence'][-1]}:I{span['intensity'][-1]}:{span['urt_primary']}"
if profile == 'full':
base += f":S{span.get('specificity', 'S2')[-1]}"
base += f":A{span.get('actionability', 'A2')[-1]}"
base += f":T{span.get('temporal', 'TC')[-1]}"
base += f":E{span.get('evidence', 'ES')[-1]}"
return base
4.5 Reprocessing Pattern
The soft-switch pattern enables atomic span replacement without downtime:
async def reprocess_review_spans(
source: str,
review_id: str,
review_version: int
) -> str:
"""
Reprocess spans for a review using soft-switch pattern.
Returns new batch_id.
"""
# 1. Fetch review
review = await db.query_one("""
SELECT * FROM reviews_enriched
WHERE source = %s AND review_id = %s AND review_version = %s
""", [source, review_id, review_version])
# 2. Re-classify with LLM
classification = await classify_review_with_spans(review['text'])
# 3. Generate new batch ID
batch_id = f"reprocess-{review_id}-{review_version}-{int(time.time())}"
# 4. INSERT new spans with is_active=FALSE
for idx, span in enumerate(classification['spans']):
span_id = generate_span_id(source, review_id, review_version, idx)
# ... insert with is_active=FALSE, ingest_batch_id=batch_id
# 5. Validate new spans
if not await validate_span_set(source, review_id, review_version, batch_id):
# Rollback: delete invalid batch
await db.execute("""
DELETE FROM review_spans
WHERE ingest_batch_id = %s
""", [batch_id])
raise ValueError("New span set failed validation")
# 6. Set primary span for new batch
await set_primary_span_for_batch(source, review_id, review_version, batch_id)
# 7. Atomic switch
async with db.transaction():
# Deactivate old spans
await db.execute("""
UPDATE review_spans
SET is_active = FALSE
WHERE source = %s AND review_id = %s AND review_version = %s
AND is_active = TRUE
AND ingest_batch_id != %s
""", [source, review_id, review_version, batch_id])
# Activate new spans
await db.execute("""
UPDATE review_spans
SET is_active = TRUE
WHERE ingest_batch_id = %s
""", [batch_id])
return batch_id
async def activate_span_batch(
source: str,
review_id: str,
review_version: int,
batch_id: str
):
"""Atomically switch from old spans to new batch."""
async with db.transaction():
# Deactivate existing active spans
await db.execute("""
UPDATE review_spans
SET is_active = FALSE
WHERE source = %s AND review_id = %s AND review_version = %s
AND is_active = TRUE
AND ingest_batch_id != %s
""", [source, review_id, review_version, batch_id])
# Activate new batch
await db.execute("""
UPDATE review_spans
SET is_active = TRUE
WHERE ingest_batch_id = %s
""", [batch_id])
4.6 Trust Score Computation
def compute_trust_score(raw: dict, text: str, classification: dict) -> float:
"""
Compute trust score (0.0 to 1.0) based on review quality signals.
Low trust = likely spam, fake, or low-quality.
"""
score = 1.0
# Length penalty
word_count = len(text.split())
if word_count < 5:
score *= 0.5
elif word_count > 500:
score *= 0.8
# Rating/sentiment mismatch
rating = raw.get('rating')
valence = classification.get('review_valence')
if rating and valence:
if rating >= 4 and valence == 'V-':
score *= 0.7
elif rating <= 2 and valence == 'V+':
score *= 0.7
# Generic text patterns
if is_generic_review(text):
score *= 0.6
# Span confidence
spans = classification.get('spans', [])
if spans:
low_conf_count = sum(1 for s in spans if s.get('confidence') == 'low')
if low_conf_count > len(spans) / 2:
score *= 0.9
return max(0.1, min(1.0, score))
Part 5: Issue Lifecycle Management
5.1 Issue Routing (Span-Based)
v3.2 Issue Key: (business_id, place_id, urt_primary, entity_normalized)
async def route_span_to_issue(span: dict) -> Optional[str]:
"""
Route a span to an existing or new issue.
Returns issue_id or None if span doesn't warrant an issue.
"""
# Only negative/mixed spans create issues
if span['valence'] not in ('V-', 'V±'):
return None
# Generate deterministic issue_id from grouping key
issue_id = await db.query_one("""
SELECT generate_issue_id(%s, %s, %s, %s) as issue_id
""", [
span['business_id'],
span['place_id'],
span['urt_primary'],
span.get('entity_normalized') # NULL in v3.2
])
issue_id = issue_id['issue_id']
# Check if issue exists
existing = await db.query_one("""
SELECT issue_id, state, span_count
FROM issues
WHERE issue_id = %s
""", [issue_id])
if existing:
# Add span to existing issue
await add_span_to_issue(existing['issue_id'], span)
return existing['issue_id']
# Create new issue
if should_create_issue(span):
return await create_issue_from_span(span, issue_id)
return None
async def create_issue_from_span(span: dict, issue_id: str) -> str:
"""Create a new issue from a span."""
domain = span['urt_primary'][0]
await db.execute("""
INSERT INTO issues (
issue_id, business_id, place_id, primary_subcode, domain,
state, priority_score, confidence_score,
span_count, max_intensity, entity, entity_normalized
) VALUES (
%s, %s, %s, %s, %s,
'DETECTED', %s, %s,
1, %s, %s, %s
)
""", [
issue_id, span['business_id'], span['place_id'],
span['urt_primary'], domain,
compute_initial_priority(span),
confidence_to_score(span.get('confidence', 'medium')),
span['intensity'],
span.get('entity'), span.get('entity_normalized')
])
# Link span to issue
await db.execute("""
INSERT INTO issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
) VALUES (
%s, %s, %s, %s, %s,
TRUE, %s, %s
)
""", [
issue_id, span['span_id'], span['source'],
span['review_id'], span['review_version'],
span['intensity'], span['review_time']
])
await log_issue_event(issue_id, 'created', span_id=span['span_id'])
return issue_id
async def add_span_to_issue(issue_id: str, span: dict):
"""Add span to existing issue and update counters."""
# Insert span link (1:1 mapping enforced by UNIQUE constraint)
await db.execute("""
INSERT INTO issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (span_id) DO NOTHING
""", [
issue_id, span['span_id'], span['source'],
span['review_id'], span['review_version'],
True, span['intensity'], span['review_time']
])
# Update issue counters
await db.execute("""
UPDATE issues SET
span_count = (SELECT COUNT(*) FROM issue_spans WHERE issue_id = %s),
max_intensity = (
SELECT CASE MAX(CASE intensity
WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END)
WHEN 3 THEN 'I3' WHEN 2 THEN 'I2' ELSE 'I1' END
FROM issue_spans WHERE issue_id = %s
),
updated_at = NOW()
WHERE issue_id = %s
""", [issue_id, issue_id, issue_id])
await recalculate_priority(issue_id)
await log_issue_event(
issue_id, 'span_added',
span_id=span['span_id'],
source=span['source'],
review_id=span['review_id'],
review_version=span['review_version']
)
5.2 Priority Scoring (Trust-Weighted)
INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}
async def recalculate_priority(issue_id: str):
"""
Priority = intensity × volume × decay × recurrence × trend × trust
"""
issue = await db.query_one("""
SELECT
i.*,
(SELECT AVG(re.trust_score)
FROM issue_spans s
JOIN review_spans rs ON s.span_id = rs.span_id
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE s.issue_id = i.issue_id) as avg_trust
FROM issues i
WHERE i.issue_id = %s
""", [issue_id])
intensity_num = {'I1': 1, 'I2': 2, 'I3': 3}.get(issue['max_intensity'], 1)
i_weight = INTENSITY_WEIGHTS.get(f"I{intensity_num}", 1.0)
volume_factor = 1 + math.log(max(1, issue['span_count']))
days_old = (datetime.now() - issue['created_at']).days
decay = math.exp(-0.023 * days_old)
recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)
if issue['cr_worse_count'] >= 2:
trend_modifier = 1.3
elif issue['cr_better_count'] >= 2:
trend_modifier = 0.7
else:
trend_modifier = 1.0
trust_factor = issue['avg_trust'] or 1.0
priority = (
i_weight * volume_factor * decay *
recurrence_boost * trend_modifier * trust_factor
)
await db.execute("""
UPDATE issues SET
priority_score = %s,
avg_trust_score = %s,
updated_at = NOW()
WHERE issue_id = %s
""", [priority, issue['avg_trust'], issue_id])
5.3 Issue Span Drill-Down
async def get_issue_spans(issue_id: str,
sort_by: str = 'date',
limit: int = 50,
offset: int = 0) -> list[dict]:
"""Fetch all spans for an issue with full details."""
order_clause = {
'date': 's.review_time DESC',
'intensity': "CASE s.intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END",
'trust': 're.trust_score DESC',
}.get(sort_by, 's.review_time DESC')
return await db.query(f"""
SELECT
rs.span_id,
rs.span_text,
rs.span_start,
rs.span_end,
rs.urt_primary,
rs.valence,
rs.intensity,
rs.specificity,
rs.actionability,
rs.entity,
rs.entity_type,
rs.usn,
s.review_time,
s.is_primary_match,
re.review_id,
re.review_version,
re.text as review_text,
re.rating,
re.trust_score,
l.display_name as location_name
FROM issue_spans s
JOIN review_spans rs ON s.span_id = rs.span_id
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
JOIN locations l ON (rs.business_id, rs.place_id) = (l.business_id, l.place_id)
WHERE s.issue_id = %s
AND rs.is_active = TRUE
ORDER BY {order_clause}
LIMIT %s OFFSET %s
""", [issue_id, limit, offset])
5.4 Strength Score
Strength Score = Σ (intensity_weight)
Where:
I1 (mild) → weight = 1
I2 (moderate) → weight = 2
I3 (strong) → weight = 4
One I3 span = 4 I1 spans = 2 I2 spans
Part 6: Analytics Spine (Fact Population)
6.1 Daily Fact Aggregation Job
async def populate_facts(business_id: str, date: date, bucket_type: str = 'day'):
"""
Aggregate spans into fact_timeseries. Run daily.
v3.2 populates:
- subject_type='overall', subject_id='all' (per location + 'ALL')
- subject_type='urt_code', subject_id=<code> (per location + 'ALL')
- subject_type='issue', subject_id=<issue_id> (per issue)
"""
if bucket_type == 'day':
period_start = date
period_end = date + timedelta(days=1)
elif bucket_type == 'week':
period_start = date - timedelta(days=date.weekday())
period_end = period_start + timedelta(days=7)
elif bucket_type == 'month':
period_start = date.replace(day=1)
next_month = period_start.replace(day=28) + timedelta(days=4)
period_end = next_month.replace(day=1)
# Get owned locations (competitors excluded from 'ALL' rollup)
owned_locations = await db.query(
"SELECT place_id FROM locations WHERE business_id = %s AND is_active = TRUE AND location_type = 'owned'",
[business_id]
)
owned_place_ids = [loc['place_id'] for loc in owned_locations]
# Per-location facts (owned)
for loc in owned_locations:
await populate_location_facts_from_spans(
business_id, loc['place_id'], period_start, period_end, bucket_type
)
# All-locations rollup (owned only — place_id='ALL')
await populate_all_locations_facts_from_spans(
business_id, owned_place_ids, period_start, period_end, bucket_type
)
# Issue facts
await populate_issue_facts(business_id, period_start, period_end, bucket_type)
async def populate_location_facts_from_spans(
business_id: str,
place_id: str,
period_start: date,
period_end: date,
bucket_type: str
):
"""Populate facts for a single location from spans."""
# Aggregate by URT code from spans
code_stats = await db.query("""
SELECT
rs.urt_primary as code,
COUNT(DISTINCT re.review_id) as review_count,
COUNT(*) as span_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
SUM(CASE WHEN rs.valence = 'V0' THEN 1 ELSE 0 END) as neutral_count,
SUM(CASE WHEN rs.valence = 'V±' THEN 1 ELSE 0 END) as mixed_count,
SUM(CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as strength_score,
SUM(CASE WHEN rs.valence = 'V-' THEN
CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as negative_strength,
SUM(CASE WHEN rs.valence = 'V+' THEN
CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as positive_strength,
SUM(CASE WHEN rs.intensity::text = 'I1' THEN 1 ELSE 0 END) as i1_count,
SUM(CASE WHEN rs.intensity::text = 'I2' THEN 1 ELSE 0 END) as i2_count,
SUM(CASE WHEN rs.intensity::text = 'I3' THEN 1 ELSE 0 END) as i3_count,
SUM(CASE WHEN rs.comparative::text = 'CR-B' THEN 1 ELSE 0 END) as cr_better,
SUM(CASE WHEN rs.comparative::text = 'CR-W' THEN 1 ELSE 0 END) as cr_worse,
SUM(CASE WHEN rs.comparative::text = 'CR-S' THEN 1 ELSE 0 END) as cr_same,
-- Trust-weighted metrics (v3.2)
SUM(re.trust_score * CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0
END) as trust_weighted_strength,
SUM(CASE WHEN rs.valence = 'V-' THEN
re.trust_score * CASE rs.intensity::text WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 4 ELSE 0 END
ELSE 0 END) as trust_weighted_negative,
AVG(re.rating) as avg_rating,
COUNT(re.rating) as rating_count
FROM review_spans rs
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE rs.business_id = %s
AND rs.place_id = %s
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.is_active = TRUE
AND re.is_latest = TRUE
GROUP BY rs.urt_primary
""", [business_id, place_id, period_start, period_end])
for stat in code_stats:
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='urt_code',
subject_id=stat['code'],
metrics=stat
)
# Aggregate overall
overall = await db.query_one("""
SELECT
COUNT(DISTINCT re.review_id) as review_count,
COUNT(*) as span_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(re.rating) as avg_rating
FROM review_spans rs
JOIN reviews_enriched re ON (rs.source, rs.review_id, rs.review_version)
= (re.source, re.review_id, re.review_version)
WHERE rs.business_id = %s
AND rs.place_id = %s
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.is_active = TRUE
AND re.is_latest = TRUE
""", [business_id, place_id, period_start, period_end])
await upsert_fact(
business_id=business_id,
place_id=place_id,
period_date=period_start,
bucket_type=bucket_type,
subject_type='overall',
subject_id='all',
metrics=overall
)
6.2 Timeline Query (For Charts)
async def get_timeline(business_id: str,
place_id: Optional[str],
subject_type: str,
subject_id: str,
start: date,
end: date,
bucket_type: str = 'week') -> list[dict]:
"""
Query pre-aggregated facts for line charts.
Args:
place_id: Specific place_id, or None for 'ALL' locations rollup
"""
# Use 'ALL' sentinel for all-locations queries
effective_place_id = place_id if place_id else 'ALL'
return await db.query("""
SELECT
period_date,
review_count,
span_count,
negative_count,
positive_count,
strength_score,
negative_strength,
avg_rating,
cr_better,
cr_worse,
cr_same,
trust_weighted_strength,
trust_weighted_negative
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = %s
AND subject_id = %s
AND bucket_type = %s
AND period_date BETWEEN %s AND %s
ORDER BY period_date
""", [business_id, effective_place_id, subject_type, subject_id, bucket_type, start, end])
Part 7: Competitor Analysis
7.1 Competitor Setup (Clean Model)
Competitors are tracked in both competitors (relationship metadata) and locations (with location_type='competitor'). This preserves FK integrity and enables consistent joins for display names/timezones.
Competitor Review Storage Rule: Competitor reviews are stored with the customer's business_id and the competitor's place_id:
reviews_enriched.business_id = <customer_business_id>
reviews_enriched.place_id = <competitor_place_id>
The locations.location_type column distinguishes ownership:
'owned'— customer's own locations'competitor'— tracked competitor locations
async def setup_competitor(business_id: str, competitor_place_id: str,
competitor_name: str, relationship: str = 'direct'):
"""Register a competitor for tracking."""
# 1. Add to locations with location_type='competitor'
await db.execute("""
INSERT INTO locations (business_id, place_id, location_type, display_name)
VALUES (%s, %s, 'competitor', %s)
ON CONFLICT (business_id, place_id) DO UPDATE SET
display_name = EXCLUDED.display_name
""", [business_id, competitor_place_id, competitor_name])
# 2. Track relationship metadata in competitors table
await db.execute("""
INSERT INTO competitors (business_id, competitor_place_id, competitor_name, relationship)
VALUES (%s, %s, %s, %s)
ON CONFLICT (business_id, competitor_place_id) DO UPDATE SET
competitor_name = EXCLUDED.competitor_name,
relationship = EXCLUDED.relationship
""", [business_id, competitor_place_id, competitor_name, relationship])
7.2 Competitor Comparison
async def get_competitor_comparison(business_id: str, code: str,
start: date, end: date) -> dict:
"""Compare your URT metrics against competitors."""
# Your metrics (from 'ALL' rollup)
your_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(span_count) as span_count,
AVG(avg_rating) as avg_rating,
SUM(trust_weighted_negative) as trust_weighted_negative
FROM fact_timeseries
WHERE business_id = %s
AND place_id = 'ALL'
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, code, start, end])
# Competitor metrics
competitors = await db.query("""
SELECT competitor_place_id, competitor_name
FROM competitors WHERE business_id = %s AND is_active = TRUE
""", [business_id])
comparison = {
'your_business': your_metrics or {},
'competitors': []
}
for comp in competitors:
comp_metrics = await db.query_one("""
SELECT
SUM(negative_strength) as negative_strength,
SUM(span_count) as span_count,
AVG(avg_rating) as avg_rating
FROM fact_timeseries
WHERE business_id = %s
AND place_id = %s
AND subject_type = 'urt_code'
AND subject_id = %s
AND period_date BETWEEN %s AND %s
""", [business_id, comp['competitor_place_id'], code, start, end])
comparison['competitors'].append({
'name': comp['competitor_name'],
'place_id': comp['competitor_place_id'],
**(comp_metrics or {})
})
return comparison
Part 8: Report Generation (Facts-First)
async def generate_report(business_id: str, place_id: Optional[str],
start: date, end: date) -> dict:
"""Generate report from pre-aggregated facts."""
effective_place_id = place_id if place_id else 'ALL'
# 1. Top issues from facts (fast)
top_issues = await get_top_issues_from_facts(business_id, effective_place_id, start, end)
# 2. Strengths from facts
strengths = await get_strengths_from_facts(business_id, effective_place_id, start, end)
# 3. Sub-patterns with span references
for issue in top_issues[:5]:
patterns = await discover_and_store_subpatterns(
business_id, effective_place_id, issue['code'], start, end
)
issue['sub_patterns'] = patterns
# 4. Trends from facts
trends = await compute_trends_from_facts(business_id, effective_place_id, start, end)
# 5. Entity analysis (v3.2)
entities = await analyze_entities(business_id, effective_place_id, start, end)
# 6. Competitor benchmarks
competitors = await get_competitor_benchmarks(business_id, start, end)
payload = {
'business_id': business_id,
'place_id': place_id,
'period': f"{start} to {end}",
'issues': top_issues,
'strengths': strengths,
'trends': trends,
'entities': entities,
'competitors': competitors,
}
narrative = await generate_narrative(payload)
return {
'payload': payload,
'narrative': narrative,
'generated_at': datetime.now().isoformat()
}
async def analyze_entities(business_id: str, place_id: str,
start: date, end: date) -> list[dict]:
"""Analyze entity mentions from spans."""
return await db.query("""
SELECT
rs.entity_normalized,
rs.entity_type::text,
COUNT(*) as mention_count,
SUM(CASE WHEN rs.valence = 'V-' THEN 1 ELSE 0 END) as negative_count,
SUM(CASE WHEN rs.valence = 'V+' THEN 1 ELSE 0 END) as positive_count,
AVG(CASE rs.intensity::text
WHEN 'I1' THEN 1 WHEN 'I2' THEN 2 WHEN 'I3' THEN 3
END) as avg_intensity,
array_agg(DISTINCT rs.urt_primary) as codes
FROM review_spans rs
WHERE rs.business_id = %s
AND (rs.place_id = %s OR %s = 'ALL')
AND rs.review_time >= %s AND rs.review_time < %s
AND rs.entity_normalized IS NOT NULL
AND rs.is_active = TRUE
GROUP BY rs.entity_normalized, rs.entity_type
ORDER BY mention_count DESC
LIMIT 20
""", [business_id, place_id, place_id, start, end])
Part 9: KPI-Ready Hooks
9.1 Future KPI Integration (Interface Only)
-- Future: KPI fact table with same grain
CREATE TABLE fact_kpi_timeseries (
id SERIAL PRIMARY KEY,
-- Same join keys as fact_timeseries
business_id TEXT NOT NULL,
place_id TEXT NOT NULL, -- 'ALL' for rollups
period_date DATE NOT NULL,
bucket_type TEXT NOT NULL,
-- KPI metrics
revenue DECIMAL(12,2),
transactions INT,
cancellations INT,
refunds DECIMAL(12,2),
support_tickets INT,
computed_at TIMESTAMP DEFAULT NOW(),
UNIQUE(business_id, place_id, period_date, bucket_type)
);
-- Join reviews and KPIs:
SELECT
r.period_date,
r.negative_strength,
r.trust_weighted_negative,
k.revenue,
k.cancellations
FROM fact_timeseries r
JOIN fact_kpi_timeseries k USING (business_id, place_id, period_date, bucket_type)
WHERE r.subject_type = 'overall' AND r.subject_id = 'all';
Part 10: Cost Model
| Stage | When | Cost | Notes |
|---|---|---|---|
| Raw Storage | Per review | $0.00 | ~1KB per review |
| Embedding | Per review | $0.00 | Local model, ~50ms |
| LLM Classification + Spans | Per review | ~$0.0003 | GPT-4o-mini (larger prompt) |
| Fact Aggregation | Daily job | $0.00 | SQL, <1 minute |
| Sub-Clustering | Per report | $0.00 | HDBSCAN, <1s |
| LLM Narrative | Per report | ~$0.15 | GPT-4o |
Total Costs:
| Volume | Monthly Ingest | Reports (10/mo) | Total |
|---|---|---|---|
| 1K reviews | $0.30 | $1.50 | $1.80 |
| 10K reviews | $3.00 | $1.50 | $4.50 |
| 100K reviews | $30.00 | $1.50 | $31.50 |
Part 11: Key Innovations
| Innovation | Benefit |
|---|---|
| Span layer | Fine-grained classification at semantic unit level |
| 1:1 span-to-issue | Clean data model, no ambiguity in routing |
| Deterministic issue IDs | SHA256-based, reproducible from grouping key |
| Soft-switch reprocessing | Atomic span replacement without downtime |
| URT ENUM types | Database-enforced classification constraints |
| Causal chain support | Full profile enables cause/effect analysis |
| Entity extraction | Named entity routing for targeted issues |
| Trust-weighted facts | Spam resistance with weighted aggregation |
| USN notation | Compact semantic notation for spans |
Document Control
| Field | Value |
|---|---|
| Document | ReviewIQ Architecture v3.2.0 |
| Status | Specification Complete |
| Date | 2026-01-24 |
| Dependencies | URT Specification v5.1, Issue Lifecycle Framework C1 |
| Source | Google Reviews only |
| Cost Target | <$35/month at 100K reviews |
Changelog
| Version | Changes |
|---|---|
| v3.0 | Issue lifecycle, strength scores, timeline charts |
| v3.1 | Relational refactor: issue_spans, fact_timeseries, raw/enriched split, multi-location, competitors, trust scoring |
| v3.1.1 | Versioned enriched PK, tenant-scoped locations, 'ALL' sentinel, competitor cleanup |
| v3.1.2 | Versioned issue_spans FK, competitor business_id rule, trust-weighted facts deferred, location_type flag |
| v3.2.0 | Span layer: review_spans table, URT ENUM types, causal chain support, entity extraction, reprocessing pattern, deterministic issue IDs, 1:1 span mapping |
New in v3.2.0
| Feature | Description |
|---|---|
| review_spans table | Fine-grained semantic unit extraction from reviews |
| URT ENUM types | 12 strongly-typed enums for classification fields |
| Required extensions | btree_gist for exclusion constraints, pgcrypto for SHA256 |
| Span constraints | chk_span_end, chk_primary_tier3, chk_secondary_max2, chk_secondary_tier3, chk_full_only_fields, chk_no_self_relation, chk_usn_format |
| Span indexes | Active ordering, one-primary enforcement, non-overlap exclusion, issue routing |
| Validation triggers | Bounds validation, text matching, causal chain structure |
| Helper functions | urt_validate_causal_chain, validate_review_relations, validate_active_spans, set_primary_span, generate_issue_id |
| Reprocessing pattern | Soft-switch with is_active flag and ingest_batch_id |
| issue_spans rewrite | 1:1 span-to-issue mapping with UNIQUE(span_id) |
| Trust-weighted facts | trust_weighted_strength, trust_weighted_negative now populated |
| span_count in facts | Span-level counting alongside review_count |
Deferred to v3.3+
| Feature | Reason |
|---|---|
| Distinct entity issues | entity_normalized defaults to NULL in v3.2; v3.3 creates separate issues per entity |
| Journey step inference | Needs better grounding data |
| Intent signals extraction | Needs action playbooks |
| Stability score tracking | Premature for current version |
| Span embeddings | Per-span vectors for sub-clustering |
End of ReviewIQ Architecture v3.2.0