diff --git a/core/database.py b/core/database.py index dbc64b7..f68dccc 100644 --- a/core/database.py +++ b/core/database.py @@ -182,6 +182,66 @@ class DatabaseManager: log.info("Database schema initialized") + async def run_migrations(self, migrations_dir: str = "migrations/versions"): + """ + Run versioned migrations from SQL files. + + Args: + migrations_dir: Path to directory containing .sql migration files. + Files are run in sorted order. + + Returns: + Number of migrations applied. + """ + from pathlib import Path + + migrations_path = Path(migrations_dir) + if not migrations_path.exists(): + log.warning(f"Migrations directory not found: {migrations_dir}") + return 0 + + async with self.pool.acquire() as conn: + # Create migrations tracking table + await conn.execute(""" + CREATE TABLE IF NOT EXISTS _migrations ( + id SERIAL PRIMARY KEY, + filename VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + ) + """) + + # Get already applied migrations + applied = await conn.fetch("SELECT filename FROM _migrations") + applied_set = {r["filename"] for r in applied} + + # Find and run pending migrations + migration_files = sorted(migrations_path.glob("*.sql")) + migrations_run = 0 + + for migration_file in migration_files: + filename = migration_file.name + if filename in applied_set: + continue + + log.info(f"Running migration: {filename}") + + async with conn.transaction(): + try: + sql = migration_file.read_text() + await conn.execute(sql) + await conn.execute( + "INSERT INTO _migrations (filename) VALUES ($1)", + filename, + ) + migrations_run += 1 + log.info(f"Migration {filename} applied successfully") + except Exception as e: + log.error(f"Migration {filename} failed: {e}") + raise + + log.info(f"Ran {migrations_run} migrations") + return migrations_run + # ==================== Job Operations ==================== async def create_job( diff --git a/migrations/versions/005_create_pipeline_schema.sql b/migrations/versions/005_create_pipeline_schema.sql new file mode 100644 index 0000000..a260f79 --- /dev/null +++ b/migrations/versions/005_create_pipeline_schema.sql @@ -0,0 +1,544 @@ +-- ============================================================================= +-- Migration: 005_create_pipeline_schema.sql +-- ReviewIQ Pipeline - Decoupled Schema +-- ============================================================================= +-- +-- Creates a separate 'pipeline' schema for all review classification tables. +-- This keeps the pipeline tables decoupled from the main scraper schema while +-- sharing the same database. +-- +-- Tables created: +-- pipeline.reviews_raw - Immutable audit log of scraped reviews +-- pipeline.reviews_enriched - Normalized/classified reviews +-- pipeline.review_spans - Extracted semantic spans +-- pipeline.issues - Aggregated issues from negative spans +-- pipeline.issue_spans - Issue-to-span linking +-- pipeline.issue_events - Audit log for issue changes +-- pipeline.fact_timeseries - Pre-aggregated metrics for dashboards +-- pipeline.urt_domains - URT taxonomy domains +-- pipeline.urt_categories - URT taxonomy categories +-- +-- Soft FK: pipeline.reviews_raw.job_id -> public.jobs.job_id (optional) +-- +-- Date: 2026-01-24 +-- ============================================================================= + +-- Create the pipeline schema +CREATE SCHEMA IF NOT EXISTS pipeline; + +COMMENT ON SCHEMA pipeline IS 'ReviewIQ Pipeline - LLM-powered review classification and aggregation'; + + +-- ============================================================================= +-- SECTION 1: ENUM TYPES (in pipeline schema) +-- ============================================================================= + +-- Valence enum +DO $$ BEGIN + CREATE TYPE pipeline.valence_type AS ENUM ('V+', 'V-', 'V0', 'V±'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Intensity enum +DO $$ BEGIN + CREATE TYPE pipeline.intensity_type AS ENUM ('I1', 'I2', 'I3'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Specificity enum +DO $$ BEGIN + CREATE TYPE pipeline.specificity_type AS ENUM ('S1', 'S2', 'S3'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Actionability enum +DO $$ BEGIN + CREATE TYPE pipeline.actionability_type AS ENUM ('A1', 'A2', 'A3'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Temporal enum +DO $$ BEGIN + CREATE TYPE pipeline.temporal_type AS ENUM ('TC', 'TR', 'TH', 'TF'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Evidence enum +DO $$ BEGIN + CREATE TYPE pipeline.evidence_type AS ENUM ('ES', 'EI', 'EC'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Comparative enum +DO $$ BEGIN + CREATE TYPE pipeline.comparative_type AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Issue state enum +DO $$ BEGIN + CREATE TYPE pipeline.issue_state AS ENUM ('open', 'resolved', 'ignored', 'merged'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Subject type enum (for facts) +DO $$ BEGIN + CREATE TYPE pipeline.subject_type AS ENUM ('overall', 'urt_code', 'domain', 'issue'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + +-- Bucket type enum (for facts) +DO $$ BEGIN + CREATE TYPE pipeline.bucket_type AS ENUM ('day', 'week', 'month'); +EXCEPTION + WHEN duplicate_object THEN NULL; +END $$; + + +-- ============================================================================= +-- SECTION 2: URT TAXONOMY LOOKUP TABLES +-- ============================================================================= + +-- URT Domain lookup table +CREATE TABLE IF NOT EXISTS pipeline.urt_domains ( + code CHAR(1) PRIMARY KEY, + name VARCHAR(50) NOT NULL, + description TEXT +); + +INSERT INTO pipeline.urt_domains (code, name, description) VALUES + ('O', 'Offering', 'Product/service quality, features, variety'), + ('P', 'Price', 'Value, pricing, promotions, payment'), + ('J', 'Journey', 'Timing, process, convenience, accessibility'), + ('E', 'Environment', 'Physical space, ambiance, cleanliness, digital UX'), + ('A', 'Attitude', 'Staff behavior, helpfulness, professionalism'), + ('V', 'Voice', 'Brand, communication, marketing, transparency'), + ('R', 'Relationship', 'Loyalty, trust, consistency, personalization') +ON CONFLICT (code) DO NOTHING; + +-- URT Tier-2 categories lookup table +CREATE TABLE IF NOT EXISTS pipeline.urt_categories ( + code VARCHAR(5) PRIMARY KEY, + domain_code CHAR(1) NOT NULL REFERENCES pipeline.urt_domains(code), + name VARCHAR(100) NOT NULL, + description TEXT +); + +INSERT INTO pipeline.urt_categories (code, domain_code, name) VALUES + ('O1', 'O', 'Core Product/Service'), + ('O2', 'O', 'Product Features'), + ('O3', 'O', 'Variety & Selection'), + ('O4', 'O', 'Customization'), + ('P1', 'P', 'Value Perception'), + ('P2', 'P', 'Pricing Structure'), + ('P3', 'P', 'Promotions & Deals'), + ('P4', 'P', 'Payment Process'), + ('J1', 'J', 'Wait Times'), + ('J2', 'J', 'Booking & Reservations'), + ('J3', 'J', 'Navigation & Convenience'), + ('J4', 'J', 'Accessibility'), + ('E1', 'E', 'Physical Environment'), + ('E2', 'E', 'Ambiance & Atmosphere'), + ('E3', 'E', 'Cleanliness'), + ('E4', 'E', 'Digital Experience'), + ('A1', 'A', 'Friendliness'), + ('A2', 'A', 'Helpfulness'), + ('A3', 'A', 'Professionalism'), + ('A4', 'A', 'Knowledge & Expertise'), + ('V1', 'V', 'Brand Identity'), + ('V2', 'V', 'Communication'), + ('V3', 'V', 'Marketing'), + ('V4', 'V', 'Transparency'), + ('R1', 'R', 'Loyalty'), + ('R2', 'R', 'Trust'), + ('R3', 'R', 'Consistency'), + ('R4', 'R', 'Personalization') +ON CONFLICT (code) DO NOTHING; + +COMMENT ON TABLE pipeline.urt_domains IS 'URT v5.1 top-level domains'; +COMMENT ON TABLE pipeline.urt_categories IS 'URT v5.1 Tier-2 categories'; + + +-- ============================================================================= +-- SECTION 3: STAGE 1 - RAW & ENRICHED REVIEWS +-- ============================================================================= + +-- Raw reviews table (immutable audit log) +CREATE TABLE IF NOT EXISTS pipeline.reviews_raw ( + id BIGSERIAL PRIMARY KEY, + + -- Link to scraper job (soft FK to public.jobs) + job_id UUID, + + source VARCHAR(20) NOT NULL DEFAULT 'google', + review_id VARCHAR(255) NOT NULL, + place_id VARCHAR(255) NOT NULL, + raw_payload JSONB NOT NULL DEFAULT '{}', + review_text TEXT, + rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5), + review_time TIMESTAMP WITH TIME ZONE NOT NULL, + reviewer_name VARCHAR(255) NOT NULL, + reviewer_id VARCHAR(255), + review_version INTEGER NOT NULL DEFAULT 1, + pulled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + CONSTRAINT reviews_raw_unique UNIQUE (source, review_id, review_version) +); + +-- Indexes for reviews_raw +CREATE INDEX IF NOT EXISTS idx_reviews_raw_job_id ON pipeline.reviews_raw(job_id) WHERE job_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_reviews_raw_place_id ON pipeline.reviews_raw(place_id); +CREATE INDEX IF NOT EXISTS idx_reviews_raw_review_time ON pipeline.reviews_raw(review_time); +CREATE INDEX IF NOT EXISTS idx_reviews_raw_pulled_at ON pipeline.reviews_raw(pulled_at); + +COMMENT ON TABLE pipeline.reviews_raw IS 'Immutable raw review data as scraped from source'; +COMMENT ON COLUMN pipeline.reviews_raw.job_id IS 'Optional link to public.jobs.job_id for traceability'; + + +-- Enriched reviews table (mutable, updated by classification) +CREATE TABLE IF NOT EXISTS pipeline.reviews_enriched ( + id BIGSERIAL PRIMARY KEY, + source VARCHAR(20) NOT NULL DEFAULT 'google', + review_id VARCHAR(255) NOT NULL, + review_version INTEGER NOT NULL DEFAULT 1, + is_latest BOOLEAN NOT NULL DEFAULT TRUE, + raw_id BIGINT REFERENCES pipeline.reviews_raw(id), + + -- Tenant context + business_id VARCHAR(255) NOT NULL, + place_id VARCHAR(255) NOT NULL, + + -- Content + text TEXT NOT NULL, + text_normalized TEXT NOT NULL, + rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5), + review_time TIMESTAMP WITH TIME ZONE NOT NULL, + + -- Normalization fields + language VARCHAR(10) NOT NULL DEFAULT 'en', + taxonomy_version VARCHAR(20) NOT NULL DEFAULT 'v5.1', + + -- Classification fields (NULL until Stage 2) + urt_primary VARCHAR(10), + urt_secondary VARCHAR(10)[] DEFAULT '{}', + valence VARCHAR(5), + intensity VARCHAR(5), + comparative VARCHAR(10), + staff_mentions VARCHAR(255)[] DEFAULT '{}', + quotes JSONB DEFAULT '{}', + embedding REAL[] DEFAULT '{}', + trust_score REAL, + classification_model VARCHAR(100), + classification_confidence JSONB DEFAULT '{}', + processed_at TIMESTAMP WITH TIME ZONE, + + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + CONSTRAINT reviews_enriched_unique UNIQUE (source, review_id, review_version) +); + +-- Indexes for reviews_enriched +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_business_id ON pipeline.reviews_enriched(business_id); +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_place_id ON pipeline.reviews_enriched(place_id); +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_review_time ON pipeline.reviews_enriched(review_time); +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_urt_primary ON pipeline.reviews_enriched(urt_primary) WHERE urt_primary IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_unclassified ON pipeline.reviews_enriched(review_time DESC) WHERE urt_primary IS NULL AND is_latest = TRUE; +CREATE INDEX IF NOT EXISTS idx_reviews_enriched_valence ON pipeline.reviews_enriched(valence) WHERE valence IS NOT NULL; + +COMMENT ON TABLE pipeline.reviews_enriched IS 'Enriched reviews with normalization and classification'; + + +-- ============================================================================= +-- SECTION 4: STAGE 2 - REVIEW SPANS +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS pipeline.review_spans ( + id BIGSERIAL PRIMARY KEY, + span_id VARCHAR(50) NOT NULL UNIQUE, + + -- Context + business_id VARCHAR(255) NOT NULL, + place_id VARCHAR(255) NOT NULL, + source VARCHAR(20) NOT NULL DEFAULT 'google', + review_id VARCHAR(255) NOT NULL, + review_version INTEGER NOT NULL DEFAULT 1, + + -- Position + span_index INTEGER NOT NULL CHECK (span_index >= 0), + span_text TEXT NOT NULL, + span_start INTEGER NOT NULL CHECK (span_start >= 0), + span_end INTEGER NOT NULL CHECK (span_end > span_start), + + -- Classification profile + profile VARCHAR(20) NOT NULL DEFAULT 'standard', + + -- Core URT classification + urt_primary VARCHAR(10) NOT NULL, + urt_secondary VARCHAR(10)[] DEFAULT '{}', + valence VARCHAR(5) NOT NULL, + intensity VARCHAR(5) NOT NULL, + comparative VARCHAR(10) NOT NULL DEFAULT 'CR-N', + + -- Extended classification (standard/full profile) + specificity VARCHAR(5), + actionability VARCHAR(5), + temporal VARCHAR(5), + evidence VARCHAR(5), + + -- Entity extraction + entity VARCHAR(255), + entity_type VARCHAR(20), + entity_normalized VARCHAR(255), + + -- Causal relations (full profile) + relation_type VARCHAR(20), + related_span_id VARCHAR(50), + causal_chain JSONB, + + -- Flags + is_primary BOOLEAN NOT NULL DEFAULT FALSE, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + + -- Time reference + review_time TIMESTAMP WITH TIME ZONE NOT NULL, + + -- Metadata + confidence VARCHAR(10) NOT NULL DEFAULT 'medium', + usn VARCHAR(100) NOT NULL, + taxonomy_version VARCHAR(20) NOT NULL, + model_version VARCHAR(100) NOT NULL, + ingest_batch_id VARCHAR(50) NOT NULL, + + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + -- Foreign key to review + CONSTRAINT fk_review FOREIGN KEY (source, review_id, review_version) + REFERENCES pipeline.reviews_enriched(source, review_id, review_version) +); + +-- Indexes for review_spans +CREATE INDEX IF NOT EXISTS idx_spans_business_id ON pipeline.review_spans(business_id); +CREATE INDEX IF NOT EXISTS idx_spans_place_id ON pipeline.review_spans(place_id); +CREATE INDEX IF NOT EXISTS idx_spans_review_time ON pipeline.review_spans(review_time); +CREATE INDEX IF NOT EXISTS idx_spans_urt_primary ON pipeline.review_spans(urt_primary); +CREATE INDEX IF NOT EXISTS idx_spans_valence ON pipeline.review_spans(valence); +CREATE INDEX IF NOT EXISTS idx_spans_intensity ON pipeline.review_spans(intensity); +CREATE INDEX IF NOT EXISTS idx_spans_is_active ON pipeline.review_spans(is_active) WHERE is_active = TRUE; +CREATE INDEX IF NOT EXISTS idx_spans_is_primary ON pipeline.review_spans(is_primary) WHERE is_primary = TRUE; +CREATE INDEX IF NOT EXISTS idx_spans_entity_normalized ON pipeline.review_spans(entity_normalized) WHERE entity_normalized IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_spans_batch ON pipeline.review_spans(ingest_batch_id); + +-- Index for unrouted negative spans (Stage 3 query) +CREATE INDEX IF NOT EXISTS idx_spans_unrouted_negative ON pipeline.review_spans(review_time DESC) + WHERE is_active = TRUE AND valence IN ('V-', 'V±'); + +COMMENT ON TABLE pipeline.review_spans IS 'Extracted semantic spans with URT classification from reviews'; + + +-- ============================================================================= +-- SECTION 5: STAGE 3 - ISSUES +-- ============================================================================= + +-- Issues table +CREATE TABLE IF NOT EXISTS pipeline.issues ( + id BIGSERIAL PRIMARY KEY, + issue_id VARCHAR(50) NOT NULL UNIQUE, + + -- Context + business_id VARCHAR(255) NOT NULL, + place_id VARCHAR(255) NOT NULL, + + -- Classification + primary_subcode VARCHAR(10) NOT NULL, + domain CHAR(1) NOT NULL, + + -- State + state pipeline.issue_state NOT NULL DEFAULT 'open', + priority_score REAL NOT NULL DEFAULT 1.0, + confidence_score REAL NOT NULL DEFAULT 1.0, + + -- Aggregates + span_count INTEGER NOT NULL DEFAULT 1, + max_intensity VARCHAR(5) NOT NULL DEFAULT 'I1', + + -- Entity (optional - for entity-specific issues) + entity VARCHAR(255), + entity_normalized VARCHAR(255), + + -- Metadata + taxonomy_version VARCHAR(20) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Issue-span links (1:1 - each span routes to exactly one issue) +CREATE TABLE IF NOT EXISTS pipeline.issue_spans ( + id BIGSERIAL PRIMARY KEY, + issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id), + span_id VARCHAR(50) NOT NULL UNIQUE, + + -- Review reference + source VARCHAR(20) NOT NULL DEFAULT 'google', + review_id VARCHAR(255) NOT NULL, + review_version INTEGER NOT NULL DEFAULT 1, + + -- Match info + is_primary_match BOOLEAN NOT NULL DEFAULT TRUE, + intensity VARCHAR(5) NOT NULL, + review_time TIMESTAMP WITH TIME ZONE NOT NULL, + + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Issue events (audit log) +CREATE TABLE IF NOT EXISTS pipeline.issue_events ( + id BIGSERIAL PRIMARY KEY, + issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id), + event_type VARCHAR(50) NOT NULL, + span_id VARCHAR(50), + old_value TEXT, + new_value TEXT, + metadata JSONB, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Indexes for issues +CREATE INDEX IF NOT EXISTS idx_issues_business_id ON pipeline.issues(business_id); +CREATE INDEX IF NOT EXISTS idx_issues_place_id ON pipeline.issues(place_id); +CREATE INDEX IF NOT EXISTS idx_issues_state ON pipeline.issues(state); +CREATE INDEX IF NOT EXISTS idx_issues_primary_subcode ON pipeline.issues(primary_subcode); +CREATE INDEX IF NOT EXISTS idx_issues_domain ON pipeline.issues(domain); +CREATE INDEX IF NOT EXISTS idx_issues_entity_normalized ON pipeline.issues(entity_normalized) WHERE entity_normalized IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_issues_priority ON pipeline.issues(priority_score DESC) WHERE state = 'open'; +CREATE INDEX IF NOT EXISTS idx_issues_created ON pipeline.issues(created_at); +CREATE INDEX IF NOT EXISTS idx_issues_updated ON pipeline.issues(updated_at); + +-- Indexes for issue_spans +CREATE INDEX IF NOT EXISTS idx_issue_spans_issue_id ON pipeline.issue_spans(issue_id); +CREATE INDEX IF NOT EXISTS idx_issue_spans_review_time ON pipeline.issue_spans(review_time); + +-- Indexes for issue_events +CREATE INDEX IF NOT EXISTS idx_issue_events_issue_id ON pipeline.issue_events(issue_id); +CREATE INDEX IF NOT EXISTS idx_issue_events_created ON pipeline.issue_events(created_at); +CREATE INDEX IF NOT EXISTS idx_issue_events_type ON pipeline.issue_events(event_type); + +COMMENT ON TABLE pipeline.issues IS 'Aggregated issues derived from negative/mixed spans'; +COMMENT ON TABLE pipeline.issue_spans IS 'Links between issues and their source spans'; +COMMENT ON TABLE pipeline.issue_events IS 'Audit log for issue state changes'; + + +-- ============================================================================= +-- SECTION 6: STAGE 4 - FACT TIMESERIES +-- ============================================================================= + +CREATE TABLE IF NOT EXISTS pipeline.fact_timeseries ( + id BIGSERIAL PRIMARY KEY, + + -- Dimension keys + business_id VARCHAR(255) NOT NULL, + place_id VARCHAR(255) NOT NULL, -- Or 'ALL' for rollup + period_date DATE NOT NULL, + bucket_type pipeline.bucket_type NOT NULL DEFAULT 'day', + subject_type pipeline.subject_type NOT NULL DEFAULT 'urt_code', + subject_id VARCHAR(50) NOT NULL, -- URT code, domain letter, or issue_id + taxonomy_version VARCHAR(20) NOT NULL, + + -- Core counts + review_count INTEGER NOT NULL DEFAULT 0, + span_count INTEGER NOT NULL DEFAULT 0, + + -- Valence counts + negative_count INTEGER NOT NULL DEFAULT 0, + positive_count INTEGER NOT NULL DEFAULT 0, + neutral_count INTEGER NOT NULL DEFAULT 0, + mixed_count INTEGER NOT NULL DEFAULT 0, + + -- Strength scores + strength_score REAL NOT NULL DEFAULT 0.0, + negative_strength REAL NOT NULL DEFAULT 0.0, + positive_strength REAL NOT NULL DEFAULT 0.0, + + -- Rating + avg_rating REAL, + + -- Intensity counts + i1_count INTEGER NOT NULL DEFAULT 0, + i2_count INTEGER NOT NULL DEFAULT 0, + i3_count INTEGER NOT NULL DEFAULT 0, + + -- Comparative counts + cr_better INTEGER NOT NULL DEFAULT 0, + cr_worse INTEGER NOT NULL DEFAULT 0, + cr_same INTEGER NOT NULL DEFAULT 0, + + -- Trust-weighted metrics + trust_weighted_strength REAL NOT NULL DEFAULT 0.0, + trust_weighted_negative REAL NOT NULL DEFAULT 0.0, + + -- Metadata + computed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + -- Unique constraint for upsert + CONSTRAINT fact_timeseries_unique UNIQUE ( + business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version + ) +); + +-- Indexes for fact_timeseries +CREATE INDEX IF NOT EXISTS idx_facts_business_id ON pipeline.fact_timeseries(business_id); +CREATE INDEX IF NOT EXISTS idx_facts_place_id ON pipeline.fact_timeseries(place_id); +CREATE INDEX IF NOT EXISTS idx_facts_period ON pipeline.fact_timeseries(period_date); +CREATE INDEX IF NOT EXISTS idx_facts_bucket ON pipeline.fact_timeseries(bucket_type); +CREATE INDEX IF NOT EXISTS idx_facts_subject_type ON pipeline.fact_timeseries(subject_type); +CREATE INDEX IF NOT EXISTS idx_facts_subject_id ON pipeline.fact_timeseries(subject_id); + +-- Composite index for common dashboard queries +CREATE INDEX IF NOT EXISTS idx_facts_dashboard ON pipeline.fact_timeseries( + business_id, place_id, bucket_type, period_date DESC +); + +-- Index for specific code trends +CREATE INDEX IF NOT EXISTS idx_facts_code_trend ON pipeline.fact_timeseries( + business_id, subject_id, bucket_type, period_date DESC +) WHERE subject_type = 'urt_code'; + +-- Index for domain aggregates +CREATE INDEX IF NOT EXISTS idx_facts_domain ON pipeline.fact_timeseries( + business_id, subject_id, bucket_type, period_date DESC +) WHERE subject_type = 'domain'; + +COMMENT ON TABLE pipeline.fact_timeseries IS 'Pre-aggregated time series facts for dashboard queries'; + + +-- ============================================================================= +-- SECTION 7: HELPER VIEWS +-- ============================================================================= + +-- View for latest enriched reviews only +CREATE OR REPLACE VIEW pipeline.reviews_latest AS +SELECT * FROM pipeline.reviews_enriched WHERE is_latest = TRUE; + +-- View for open issues with span counts +CREATE OR REPLACE VIEW pipeline.issues_open AS +SELECT + i.*, + COUNT(s.id) as total_spans +FROM pipeline.issues i +LEFT JOIN pipeline.issue_spans s ON i.issue_id = s.issue_id +WHERE i.state = 'open' +GROUP BY i.id; + +COMMENT ON VIEW pipeline.reviews_latest IS 'Latest version of each review'; +COMMENT ON VIEW pipeline.issues_open IS 'Open issues with total span counts'; diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/db/repositories.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/db/repositories.py index 88e5a6b..c2f8afc 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/db/repositories.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/db/repositories.py @@ -1,4 +1,8 @@ -"""Data access layer for pipeline operations.""" +"""Data access layer for pipeline operations. + +All tables live in the 'pipeline' schema, keeping them decoupled from the +main scraper schema while sharing the same database. +""" from __future__ import annotations @@ -20,6 +24,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Schema prefix for all pipeline tables +SCHEMA = "pipeline" + class ReviewRepository: """Repository for review data operations.""" @@ -35,7 +42,7 @@ class ReviewRepository: ) -> int: """Insert a raw review and return its ID.""" query = """ - INSERT INTO reviews_raw ( + INSERT INTO pipeline.reviews_raw ( source, review_id, place_id, raw_payload, review_text, rating, review_time, reviewer_name, reviewer_id, review_version, pulled_at @@ -66,7 +73,7 @@ class ReviewRepository: ) -> int: """Insert an enriched review stub (pre-classification).""" query = """ - INSERT INTO reviews_enriched ( + INSERT INTO pipeline.reviews_enriched ( source, review_id, review_version, is_latest, raw_id, business_id, place_id, text, text_normalized, rating, review_time, language, taxonomy_version @@ -101,7 +108,7 @@ class ReviewRepository: ) -> None: """Update an enriched review with classification results.""" query = """ - UPDATE reviews_enriched SET + UPDATE pipeline.reviews_enriched SET urt_primary = $1, urt_secondary = $2, valence = $3, @@ -147,7 +154,7 @@ class ReviewRepository: SELECT source, review_id, review_version, business_id, place_id, text, text_normalized, rating, review_time - FROM reviews_enriched + FROM pipeline.reviews_enriched WHERE urt_primary IS NULL AND is_latest = TRUE ORDER BY review_time DESC @@ -164,7 +171,7 @@ class ReviewRepository: ) -> dict[str, Any] | None: """Get a specific review by its composite key.""" query = """ - SELECT * FROM reviews_enriched + SELECT * FROM pipeline.reviews_enriched WHERE source = $1 AND review_id = $2 AND review_version = $3 """ row = await self.db.fetchrow(query, source, review_id, review_version) @@ -179,7 +186,7 @@ class ReviewRepository: # For now, we check by querying the first occurrence # A proper dedup table would be better for production query = """ - SELECT review_id FROM reviews_enriched + SELECT review_id FROM pipeline.reviews_enriched WHERE business_id = $1 AND text_normalized IS NOT NULL LIMIT 1 @@ -209,7 +216,7 @@ class SpanRepository: ) -> None: """Insert a span into the database.""" query = """ - INSERT INTO review_spans ( + INSERT INTO pipeline.review_spans ( span_id, business_id, place_id, source, review_id, review_version, span_index, span_text, span_start, span_end, profile, urt_primary, urt_secondary, valence, intensity, comparative, @@ -282,8 +289,8 @@ class SpanRepository: rs.urt_primary, rs.valence, rs.intensity, rs.entity_normalized, rs.review_time, rs.confidence, re.trust_score - FROM review_spans rs - JOIN reviews_enriched re ON ( + FROM pipeline.review_spans rs + JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version @@ -291,7 +298,7 @@ class SpanRepository: WHERE rs.is_active = TRUE AND rs.valence IN ('V-', 'V±') AND NOT EXISTS ( - SELECT 1 FROM issue_spans iss WHERE iss.span_id = rs.span_id + SELECT 1 FROM pipeline.issue_spans iss WHERE iss.span_id = rs.span_id ) ORDER BY rs.review_time DESC LIMIT $1 @@ -301,7 +308,7 @@ class SpanRepository: async def get_span_by_id(self, span_id: str) -> dict[str, Any] | None: """Get a span by its ID.""" - query = "SELECT * FROM review_spans WHERE span_id = $1" + query = "SELECT * FROM pipeline.review_spans WHERE span_id = $1" row = await self.db.fetchrow(query, span_id) return dict(row) if row else None @@ -326,7 +333,7 @@ class IssueRepository: """Create or update an issue. Returns True if newly created.""" # First check if exists existing = await self.db.fetchval( - "SELECT 1 FROM issues WHERE issue_id = $1", + "SELECT 1 FROM pipeline.issues WHERE issue_id = $1", issue_id, ) @@ -334,7 +341,7 @@ class IssueRepository: # Update await self.db.execute( """ - UPDATE issues SET + UPDATE pipeline.issues SET span_count = span_count + 1, max_intensity = CASE WHEN $1 = 'I3' THEN 'I3' @@ -353,7 +360,7 @@ class IssueRepository: domain = primary_subcode[0] if primary_subcode else "O" await self.db.execute( """ - INSERT INTO issues ( + INSERT INTO pipeline.issues ( issue_id, business_id, place_id, primary_subcode, domain, state, priority_score, confidence_score, span_count, max_intensity, entity, entity_normalized, taxonomy_version @@ -388,7 +395,7 @@ class IssueRepository: """Link a span to an issue.""" await self.db.execute( """ - INSERT INTO issue_spans ( + INSERT INTO pipeline.issue_spans ( issue_id, span_id, source, review_id, review_version, is_primary_match, intensity, review_time ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) @@ -416,7 +423,7 @@ class IssueRepository: """Log an issue event for audit trail.""" await self.db.execute( """ - INSERT INTO issue_events ( + INSERT INTO pipeline.issue_events ( issue_id, event_type, span_id, old_value, new_value, metadata ) VALUES ($1, $2, $3, $4, $5, $6) """, @@ -430,14 +437,14 @@ class IssueRepository: async def get_issue_by_id(self, issue_id: str) -> dict[str, Any] | None: """Get an issue by its ID.""" - query = "SELECT * FROM issues WHERE issue_id = $1" + query = "SELECT * FROM pipeline.issues WHERE issue_id = $1" row = await self.db.fetchrow(query, issue_id) return dict(row) if row else None async def check_span_already_linked(self, span_id: str) -> str | None: """Check if a span is already linked to an issue.""" return await self.db.fetchval( - "SELECT issue_id FROM issue_spans WHERE span_id = $1", + "SELECT issue_id FROM pipeline.issue_spans WHERE span_id = $1", span_id, ) @@ -452,7 +459,7 @@ class FactRepository: """Insert or update a fact record.""" await self.db.execute( """ - INSERT INTO fact_timeseries ( + INSERT INTO pipeline.fact_timeseries ( business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version, review_count, span_count, negative_count, positive_count, @@ -534,8 +541,8 @@ class FactRepository: rs.comparative, re.trust_score, re.rating - FROM review_spans rs - JOIN reviews_enriched re ON ( + FROM pipeline.review_spans rs + JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version @@ -554,7 +561,7 @@ class FactRepository: """Get all place IDs for a business.""" rows = await self.db.fetch( """ - SELECT DISTINCT place_id FROM reviews_enriched + SELECT DISTINCT place_id FROM pipeline.reviews_enriched WHERE business_id = $1 """, business_id, diff --git a/tools/run_migrations.py b/tools/run_migrations.py new file mode 100644 index 0000000..0dd2834 --- /dev/null +++ b/tools/run_migrations.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +""" +CLI tool to run database migrations. + +Usage: + python tools/run_migrations.py --database-url $DATABASE_URL + + # Or with environment variable + export DATABASE_URL=postgresql://user:pass@localhost/db + python tools/run_migrations.py +""" + +import asyncio +import os +import sys +import argparse +import logging + +# Add project root to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from core.database import DatabaseManager + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + + +async def main(database_url: str, migrations_dir: str = "migrations/versions"): + """Run migrations against the database.""" + db = DatabaseManager(database_url) + + try: + await db.connect() + + # First initialize base schema (jobs table, etc.) + print("Initializing base schema...") + await db.initialize_schema() + + # Then run versioned migrations + print(f"\nRunning migrations from {migrations_dir}...") + count = await db.run_migrations(migrations_dir) + + if count > 0: + print(f"\n✓ Applied {count} migration(s)") + else: + print("\n✓ No pending migrations") + + except Exception as e: + print(f"\n✗ Migration failed: {e}", file=sys.stderr) + sys.exit(1) + finally: + await db.disconnect() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run database migrations") + parser.add_argument( + "--database-url", + default=os.environ.get("DATABASE_URL"), + help="PostgreSQL connection string (default: $DATABASE_URL)", + ) + parser.add_argument( + "--migrations-dir", + default="migrations/versions", + help="Directory containing .sql migration files", + ) + + args = parser.parse_args() + + if not args.database_url: + print("Error: --database-url required or set DATABASE_URL environment variable", file=sys.stderr) + sys.exit(1) + + asyncio.run(main(args.database_url, args.migrations_dir))