-- ============================================================================= -- Migration: 005_create_pipeline_schema.sql -- ReviewIQ Pipeline - Decoupled Schema -- ============================================================================= -- -- Creates a separate 'pipeline' schema for all review classification tables. -- This keeps the pipeline tables decoupled from the main scraper schema while -- sharing the same database. -- -- Tables created: -- pipeline.reviews_raw - Immutable audit log of scraped reviews -- pipeline.reviews_enriched - Normalized/classified reviews -- pipeline.review_spans - Extracted semantic spans -- pipeline.issues - Aggregated issues from negative spans -- pipeline.issue_spans - Issue-to-span linking -- pipeline.issue_events - Audit log for issue changes -- pipeline.fact_timeseries - Pre-aggregated metrics for dashboards -- pipeline.urt_domains - URT taxonomy domains -- pipeline.urt_categories - URT taxonomy categories -- -- Soft FK: pipeline.reviews_raw.job_id -> public.jobs.job_id (optional) -- -- Date: 2026-01-24 -- ============================================================================= -- Create the pipeline schema CREATE SCHEMA IF NOT EXISTS pipeline; COMMENT ON SCHEMA pipeline IS 'ReviewIQ Pipeline - LLM-powered review classification and aggregation'; -- ============================================================================= -- SECTION 1: ENUM TYPES (in pipeline schema) -- ============================================================================= -- Valence enum DO $$ BEGIN CREATE TYPE pipeline.valence_type AS ENUM ('V+', 'V-', 'V0', 'V±'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Intensity enum DO $$ BEGIN CREATE TYPE pipeline.intensity_type AS ENUM ('I1', 'I2', 'I3'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Specificity enum DO $$ BEGIN CREATE TYPE pipeline.specificity_type AS ENUM ('S1', 'S2', 'S3'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Actionability enum DO $$ BEGIN CREATE TYPE pipeline.actionability_type AS ENUM ('A1', 'A2', 'A3'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Temporal enum DO $$ BEGIN CREATE TYPE pipeline.temporal_type AS ENUM ('TC', 'TR', 'TH', 'TF'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Evidence enum DO $$ BEGIN CREATE TYPE pipeline.evidence_type AS ENUM ('ES', 'EI', 'EC'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Comparative enum DO $$ BEGIN CREATE TYPE pipeline.comparative_type AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Issue state enum DO $$ BEGIN CREATE TYPE pipeline.issue_state AS ENUM ('open', 'resolved', 'ignored', 'merged'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Subject type enum (for facts) DO $$ BEGIN CREATE TYPE pipeline.subject_type AS ENUM ('overall', 'urt_code', 'domain', 'issue'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Bucket type enum (for facts) DO $$ BEGIN CREATE TYPE pipeline.bucket_type AS ENUM ('day', 'week', 'month'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- ============================================================================= -- SECTION 2: URT TAXONOMY LOOKUP TABLES -- ============================================================================= -- URT Domain lookup table CREATE TABLE IF NOT EXISTS pipeline.urt_domains ( code CHAR(1) PRIMARY KEY, name VARCHAR(50) NOT NULL, description TEXT ); INSERT INTO pipeline.urt_domains (code, name, description) VALUES ('O', 'Offering', 'Product/service quality, features, variety'), ('P', 'Price', 'Value, pricing, promotions, payment'), ('J', 'Journey', 'Timing, process, convenience, accessibility'), ('E', 'Environment', 'Physical space, ambiance, cleanliness, digital UX'), ('A', 'Attitude', 'Staff behavior, helpfulness, professionalism'), ('V', 'Voice', 'Brand, communication, marketing, transparency'), ('R', 'Relationship', 'Loyalty, trust, consistency, personalization') ON CONFLICT (code) DO NOTHING; -- URT Tier-2 categories lookup table CREATE TABLE IF NOT EXISTS pipeline.urt_categories ( code VARCHAR(5) PRIMARY KEY, domain_code CHAR(1) NOT NULL REFERENCES pipeline.urt_domains(code), name VARCHAR(100) NOT NULL, description TEXT ); INSERT INTO pipeline.urt_categories (code, domain_code, name) VALUES ('O1', 'O', 'Core Product/Service'), ('O2', 'O', 'Product Features'), ('O3', 'O', 'Variety & Selection'), ('O4', 'O', 'Customization'), ('P1', 'P', 'Value Perception'), ('P2', 'P', 'Pricing Structure'), ('P3', 'P', 'Promotions & Deals'), ('P4', 'P', 'Payment Process'), ('J1', 'J', 'Wait Times'), ('J2', 'J', 'Booking & Reservations'), ('J3', 'J', 'Navigation & Convenience'), ('J4', 'J', 'Accessibility'), ('E1', 'E', 'Physical Environment'), ('E2', 'E', 'Ambiance & Atmosphere'), ('E3', 'E', 'Cleanliness'), ('E4', 'E', 'Digital Experience'), ('A1', 'A', 'Friendliness'), ('A2', 'A', 'Helpfulness'), ('A3', 'A', 'Professionalism'), ('A4', 'A', 'Knowledge & Expertise'), ('V1', 'V', 'Brand Identity'), ('V2', 'V', 'Communication'), ('V3', 'V', 'Marketing'), ('V4', 'V', 'Transparency'), ('R1', 'R', 'Loyalty'), ('R2', 'R', 'Trust'), ('R3', 'R', 'Consistency'), ('R4', 'R', 'Personalization') ON CONFLICT (code) DO NOTHING; COMMENT ON TABLE pipeline.urt_domains IS 'URT v5.1 top-level domains'; COMMENT ON TABLE pipeline.urt_categories IS 'URT v5.1 Tier-2 categories'; -- ============================================================================= -- SECTION 3: STAGE 1 - RAW & ENRICHED REVIEWS -- ============================================================================= -- Raw reviews table (immutable audit log) CREATE TABLE IF NOT EXISTS pipeline.reviews_raw ( id BIGSERIAL PRIMARY KEY, -- Link to scraper job (soft FK to public.jobs) job_id UUID, source VARCHAR(20) NOT NULL DEFAULT 'google', review_id VARCHAR(255) NOT NULL, place_id VARCHAR(255) NOT NULL, raw_payload JSONB NOT NULL DEFAULT '{}', review_text TEXT, rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5), review_time TIMESTAMP WITH TIME ZONE NOT NULL, reviewer_name VARCHAR(255) NOT NULL, reviewer_id VARCHAR(255), review_version INTEGER NOT NULL DEFAULT 1, pulled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), CONSTRAINT reviews_raw_unique UNIQUE (source, review_id, review_version) ); -- Indexes for reviews_raw CREATE INDEX IF NOT EXISTS idx_reviews_raw_job_id ON pipeline.reviews_raw(job_id) WHERE job_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_reviews_raw_place_id ON pipeline.reviews_raw(place_id); CREATE INDEX IF NOT EXISTS idx_reviews_raw_review_time ON pipeline.reviews_raw(review_time); CREATE INDEX IF NOT EXISTS idx_reviews_raw_pulled_at ON pipeline.reviews_raw(pulled_at); COMMENT ON TABLE pipeline.reviews_raw IS 'Immutable raw review data as scraped from source'; COMMENT ON COLUMN pipeline.reviews_raw.job_id IS 'Optional link to public.jobs.job_id for traceability'; -- Enriched reviews table (mutable, updated by classification) CREATE TABLE IF NOT EXISTS pipeline.reviews_enriched ( id BIGSERIAL PRIMARY KEY, source VARCHAR(20) NOT NULL DEFAULT 'google', review_id VARCHAR(255) NOT NULL, review_version INTEGER NOT NULL DEFAULT 1, is_latest BOOLEAN NOT NULL DEFAULT TRUE, raw_id BIGINT REFERENCES pipeline.reviews_raw(id), -- Tenant context business_id VARCHAR(255) NOT NULL, place_id VARCHAR(255) NOT NULL, -- Content text TEXT NOT NULL, text_normalized TEXT NOT NULL, rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5), review_time TIMESTAMP WITH TIME ZONE NOT NULL, -- Normalization fields language VARCHAR(10) NOT NULL DEFAULT 'en', taxonomy_version VARCHAR(20) NOT NULL DEFAULT 'v5.1', -- Classification fields (NULL until Stage 2) urt_primary VARCHAR(10), urt_secondary VARCHAR(10)[] DEFAULT '{}', valence VARCHAR(5), intensity VARCHAR(5), comparative VARCHAR(10), staff_mentions VARCHAR(255)[] DEFAULT '{}', quotes JSONB DEFAULT '{}', embedding REAL[] DEFAULT '{}', trust_score REAL, classification_model VARCHAR(100), classification_confidence JSONB DEFAULT '{}', processed_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), CONSTRAINT reviews_enriched_unique UNIQUE (source, review_id, review_version) ); -- Indexes for reviews_enriched CREATE INDEX IF NOT EXISTS idx_reviews_enriched_business_id ON pipeline.reviews_enriched(business_id); CREATE INDEX IF NOT EXISTS idx_reviews_enriched_place_id ON pipeline.reviews_enriched(place_id); CREATE INDEX IF NOT EXISTS idx_reviews_enriched_review_time ON pipeline.reviews_enriched(review_time); CREATE INDEX IF NOT EXISTS idx_reviews_enriched_urt_primary ON pipeline.reviews_enriched(urt_primary) WHERE urt_primary IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_reviews_enriched_unclassified ON pipeline.reviews_enriched(review_time DESC) WHERE urt_primary IS NULL AND is_latest = TRUE; CREATE INDEX IF NOT EXISTS idx_reviews_enriched_valence ON pipeline.reviews_enriched(valence) WHERE valence IS NOT NULL; COMMENT ON TABLE pipeline.reviews_enriched IS 'Enriched reviews with normalization and classification'; -- ============================================================================= -- SECTION 4: STAGE 2 - REVIEW SPANS -- ============================================================================= CREATE TABLE IF NOT EXISTS pipeline.review_spans ( id BIGSERIAL PRIMARY KEY, span_id VARCHAR(50) NOT NULL UNIQUE, -- Context business_id VARCHAR(255) NOT NULL, place_id VARCHAR(255) NOT NULL, source VARCHAR(20) NOT NULL DEFAULT 'google', review_id VARCHAR(255) NOT NULL, review_version INTEGER NOT NULL DEFAULT 1, -- Position span_index INTEGER NOT NULL CHECK (span_index >= 0), span_text TEXT NOT NULL, span_start INTEGER NOT NULL CHECK (span_start >= 0), span_end INTEGER NOT NULL CHECK (span_end > span_start), -- Classification profile profile VARCHAR(20) NOT NULL DEFAULT 'standard', -- Core URT classification urt_primary VARCHAR(10) NOT NULL, urt_secondary VARCHAR(10)[] DEFAULT '{}', valence VARCHAR(5) NOT NULL, intensity VARCHAR(5) NOT NULL, comparative VARCHAR(10) NOT NULL DEFAULT 'CR-N', -- Extended classification (standard/full profile) specificity VARCHAR(5), actionability VARCHAR(5), temporal VARCHAR(5), evidence VARCHAR(5), -- Entity extraction entity VARCHAR(255), entity_type VARCHAR(20), entity_normalized VARCHAR(255), -- Causal relations (full profile) relation_type VARCHAR(20), related_span_id VARCHAR(50), causal_chain JSONB, -- Flags is_primary BOOLEAN NOT NULL DEFAULT FALSE, is_active BOOLEAN NOT NULL DEFAULT TRUE, -- Time reference review_time TIMESTAMP WITH TIME ZONE NOT NULL, -- Metadata confidence VARCHAR(10) NOT NULL DEFAULT 'medium', usn VARCHAR(100) NOT NULL, taxonomy_version VARCHAR(20) NOT NULL, model_version VARCHAR(100) NOT NULL, ingest_batch_id VARCHAR(50) NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), -- Foreign key to review CONSTRAINT fk_review FOREIGN KEY (source, review_id, review_version) REFERENCES pipeline.reviews_enriched(source, review_id, review_version) ); -- Indexes for review_spans CREATE INDEX IF NOT EXISTS idx_spans_business_id ON pipeline.review_spans(business_id); CREATE INDEX IF NOT EXISTS idx_spans_place_id ON pipeline.review_spans(place_id); CREATE INDEX IF NOT EXISTS idx_spans_review_time ON pipeline.review_spans(review_time); CREATE INDEX IF NOT EXISTS idx_spans_urt_primary ON pipeline.review_spans(urt_primary); CREATE INDEX IF NOT EXISTS idx_spans_valence ON pipeline.review_spans(valence); CREATE INDEX IF NOT EXISTS idx_spans_intensity ON pipeline.review_spans(intensity); CREATE INDEX IF NOT EXISTS idx_spans_is_active ON pipeline.review_spans(is_active) WHERE is_active = TRUE; CREATE INDEX IF NOT EXISTS idx_spans_is_primary ON pipeline.review_spans(is_primary) WHERE is_primary = TRUE; CREATE INDEX IF NOT EXISTS idx_spans_entity_normalized ON pipeline.review_spans(entity_normalized) WHERE entity_normalized IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_spans_batch ON pipeline.review_spans(ingest_batch_id); -- Index for unrouted negative spans (Stage 3 query) CREATE INDEX IF NOT EXISTS idx_spans_unrouted_negative ON pipeline.review_spans(review_time DESC) WHERE is_active = TRUE AND valence IN ('V-', 'V±'); COMMENT ON TABLE pipeline.review_spans IS 'Extracted semantic spans with URT classification from reviews'; -- ============================================================================= -- SECTION 5: STAGE 3 - ISSUES -- ============================================================================= -- Issues table CREATE TABLE IF NOT EXISTS pipeline.issues ( id BIGSERIAL PRIMARY KEY, issue_id VARCHAR(50) NOT NULL UNIQUE, -- Context business_id VARCHAR(255) NOT NULL, place_id VARCHAR(255) NOT NULL, -- Classification primary_subcode VARCHAR(10) NOT NULL, domain CHAR(1) NOT NULL, -- State state pipeline.issue_state NOT NULL DEFAULT 'open', priority_score REAL NOT NULL DEFAULT 1.0, confidence_score REAL NOT NULL DEFAULT 1.0, -- Aggregates span_count INTEGER NOT NULL DEFAULT 1, max_intensity VARCHAR(5) NOT NULL DEFAULT 'I1', -- Entity (optional - for entity-specific issues) entity VARCHAR(255), entity_normalized VARCHAR(255), -- Metadata taxonomy_version VARCHAR(20) NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); -- Issue-span links (1:1 - each span routes to exactly one issue) CREATE TABLE IF NOT EXISTS pipeline.issue_spans ( id BIGSERIAL PRIMARY KEY, issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id), span_id VARCHAR(50) NOT NULL UNIQUE, -- Review reference source VARCHAR(20) NOT NULL DEFAULT 'google', review_id VARCHAR(255) NOT NULL, review_version INTEGER NOT NULL DEFAULT 1, -- Match info is_primary_match BOOLEAN NOT NULL DEFAULT TRUE, intensity VARCHAR(5) NOT NULL, review_time TIMESTAMP WITH TIME ZONE NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); -- Issue events (audit log) CREATE TABLE IF NOT EXISTS pipeline.issue_events ( id BIGSERIAL PRIMARY KEY, issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id), event_type VARCHAR(50) NOT NULL, span_id VARCHAR(50), old_value TEXT, new_value TEXT, metadata JSONB, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); -- Indexes for issues CREATE INDEX IF NOT EXISTS idx_issues_business_id ON pipeline.issues(business_id); CREATE INDEX IF NOT EXISTS idx_issues_place_id ON pipeline.issues(place_id); CREATE INDEX IF NOT EXISTS idx_issues_state ON pipeline.issues(state); CREATE INDEX IF NOT EXISTS idx_issues_primary_subcode ON pipeline.issues(primary_subcode); CREATE INDEX IF NOT EXISTS idx_issues_domain ON pipeline.issues(domain); CREATE INDEX IF NOT EXISTS idx_issues_entity_normalized ON pipeline.issues(entity_normalized) WHERE entity_normalized IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_issues_priority ON pipeline.issues(priority_score DESC) WHERE state = 'open'; CREATE INDEX IF NOT EXISTS idx_issues_created ON pipeline.issues(created_at); CREATE INDEX IF NOT EXISTS idx_issues_updated ON pipeline.issues(updated_at); -- Indexes for issue_spans CREATE INDEX IF NOT EXISTS idx_issue_spans_issue_id ON pipeline.issue_spans(issue_id); CREATE INDEX IF NOT EXISTS idx_issue_spans_review_time ON pipeline.issue_spans(review_time); -- Indexes for issue_events CREATE INDEX IF NOT EXISTS idx_issue_events_issue_id ON pipeline.issue_events(issue_id); CREATE INDEX IF NOT EXISTS idx_issue_events_created ON pipeline.issue_events(created_at); CREATE INDEX IF NOT EXISTS idx_issue_events_type ON pipeline.issue_events(event_type); COMMENT ON TABLE pipeline.issues IS 'Aggregated issues derived from negative/mixed spans'; COMMENT ON TABLE pipeline.issue_spans IS 'Links between issues and their source spans'; COMMENT ON TABLE pipeline.issue_events IS 'Audit log for issue state changes'; -- ============================================================================= -- SECTION 6: STAGE 4 - FACT TIMESERIES -- ============================================================================= CREATE TABLE IF NOT EXISTS pipeline.fact_timeseries ( id BIGSERIAL PRIMARY KEY, -- Dimension keys business_id VARCHAR(255) NOT NULL, place_id VARCHAR(255) NOT NULL, -- Or 'ALL' for rollup period_date DATE NOT NULL, bucket_type pipeline.bucket_type NOT NULL DEFAULT 'day', subject_type pipeline.subject_type NOT NULL DEFAULT 'urt_code', subject_id VARCHAR(50) NOT NULL, -- URT code, domain letter, or issue_id taxonomy_version VARCHAR(20) NOT NULL, -- Core counts review_count INTEGER NOT NULL DEFAULT 0, span_count INTEGER NOT NULL DEFAULT 0, -- Valence counts negative_count INTEGER NOT NULL DEFAULT 0, positive_count INTEGER NOT NULL DEFAULT 0, neutral_count INTEGER NOT NULL DEFAULT 0, mixed_count INTEGER NOT NULL DEFAULT 0, -- Strength scores strength_score REAL NOT NULL DEFAULT 0.0, negative_strength REAL NOT NULL DEFAULT 0.0, positive_strength REAL NOT NULL DEFAULT 0.0, -- Rating avg_rating REAL, -- Intensity counts i1_count INTEGER NOT NULL DEFAULT 0, i2_count INTEGER NOT NULL DEFAULT 0, i3_count INTEGER NOT NULL DEFAULT 0, -- Comparative counts cr_better INTEGER NOT NULL DEFAULT 0, cr_worse INTEGER NOT NULL DEFAULT 0, cr_same INTEGER NOT NULL DEFAULT 0, -- Trust-weighted metrics trust_weighted_strength REAL NOT NULL DEFAULT 0.0, trust_weighted_negative REAL NOT NULL DEFAULT 0.0, -- Metadata computed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), -- Unique constraint for upsert CONSTRAINT fact_timeseries_unique UNIQUE ( business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version ) ); -- Indexes for fact_timeseries CREATE INDEX IF NOT EXISTS idx_facts_business_id ON pipeline.fact_timeseries(business_id); CREATE INDEX IF NOT EXISTS idx_facts_place_id ON pipeline.fact_timeseries(place_id); CREATE INDEX IF NOT EXISTS idx_facts_period ON pipeline.fact_timeseries(period_date); CREATE INDEX IF NOT EXISTS idx_facts_bucket ON pipeline.fact_timeseries(bucket_type); CREATE INDEX IF NOT EXISTS idx_facts_subject_type ON pipeline.fact_timeseries(subject_type); CREATE INDEX IF NOT EXISTS idx_facts_subject_id ON pipeline.fact_timeseries(subject_id); -- Composite index for common dashboard queries CREATE INDEX IF NOT EXISTS idx_facts_dashboard ON pipeline.fact_timeseries( business_id, place_id, bucket_type, period_date DESC ); -- Index for specific code trends CREATE INDEX IF NOT EXISTS idx_facts_code_trend ON pipeline.fact_timeseries( business_id, subject_id, bucket_type, period_date DESC ) WHERE subject_type = 'urt_code'; -- Index for domain aggregates CREATE INDEX IF NOT EXISTS idx_facts_domain ON pipeline.fact_timeseries( business_id, subject_id, bucket_type, period_date DESC ) WHERE subject_type = 'domain'; COMMENT ON TABLE pipeline.fact_timeseries IS 'Pre-aggregated time series facts for dashboard queries'; -- ============================================================================= -- SECTION 7: HELPER VIEWS -- ============================================================================= -- View for latest enriched reviews only CREATE OR REPLACE VIEW pipeline.reviews_latest AS SELECT * FROM pipeline.reviews_enriched WHERE is_latest = TRUE; -- View for open issues with span counts CREATE OR REPLACE VIEW pipeline.issues_open AS SELECT i.*, COUNT(s.id) as total_spans FROM pipeline.issues i LEFT JOIN pipeline.issue_spans s ON i.issue_id = s.issue_id WHERE i.state = 'open' GROUP BY i.id; COMMENT ON VIEW pipeline.reviews_latest IS 'Latest version of each review'; COMMENT ON VIEW pipeline.issues_open IS 'Open issues with total span counts';