feat: Add decoupled pipeline schema with separate PostgreSQL namespace

- Create consolidated migration (005_create_pipeline_schema.sql) with
  'pipeline' schema for all classification tables
- Update pipeline repositories to use schema prefix (pipeline.*)
- Add run_migrations() method to DatabaseManager
- Add CLI tool for running versioned migrations

Tables created in pipeline schema:
- reviews_raw, reviews_enriched (Stage 1)
- review_spans (Stage 2)
- issues, issue_spans, issue_events (Stage 3)
- fact_timeseries (Stage 4)
- urt_domains, urt_categories (taxonomy lookup)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 18:17:20 +00:00
parent 7d720f5378
commit 03ed7029e2
4 changed files with 710 additions and 23 deletions

View File

@@ -182,6 +182,66 @@ class DatabaseManager:
log.info("Database schema initialized")
async def run_migrations(self, migrations_dir: str = "migrations/versions"):
"""
Run versioned migrations from SQL files.
Args:
migrations_dir: Path to directory containing .sql migration files.
Files are run in sorted order.
Returns:
Number of migrations applied.
"""
from pathlib import Path
migrations_path = Path(migrations_dir)
if not migrations_path.exists():
log.warning(f"Migrations directory not found: {migrations_dir}")
return 0
async with self.pool.acquire() as conn:
# Create migrations tracking table
await conn.execute("""
CREATE TABLE IF NOT EXISTS _migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) UNIQUE NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
# Get already applied migrations
applied = await conn.fetch("SELECT filename FROM _migrations")
applied_set = {r["filename"] for r in applied}
# Find and run pending migrations
migration_files = sorted(migrations_path.glob("*.sql"))
migrations_run = 0
for migration_file in migration_files:
filename = migration_file.name
if filename in applied_set:
continue
log.info(f"Running migration: {filename}")
async with conn.transaction():
try:
sql = migration_file.read_text()
await conn.execute(sql)
await conn.execute(
"INSERT INTO _migrations (filename) VALUES ($1)",
filename,
)
migrations_run += 1
log.info(f"Migration {filename} applied successfully")
except Exception as e:
log.error(f"Migration {filename} failed: {e}")
raise
log.info(f"Ran {migrations_run} migrations")
return migrations_run
# ==================== Job Operations ====================
async def create_job(

View File

@@ -0,0 +1,544 @@
-- =============================================================================
-- Migration: 005_create_pipeline_schema.sql
-- ReviewIQ Pipeline - Decoupled Schema
-- =============================================================================
--
-- Creates a separate 'pipeline' schema for all review classification tables.
-- This keeps the pipeline tables decoupled from the main scraper schema while
-- sharing the same database.
--
-- Tables created:
-- pipeline.reviews_raw - Immutable audit log of scraped reviews
-- pipeline.reviews_enriched - Normalized/classified reviews
-- pipeline.review_spans - Extracted semantic spans
-- pipeline.issues - Aggregated issues from negative spans
-- pipeline.issue_spans - Issue-to-span linking
-- pipeline.issue_events - Audit log for issue changes
-- pipeline.fact_timeseries - Pre-aggregated metrics for dashboards
-- pipeline.urt_domains - URT taxonomy domains
-- pipeline.urt_categories - URT taxonomy categories
--
-- Soft FK: pipeline.reviews_raw.job_id -> public.jobs.job_id (optional)
--
-- Date: 2026-01-24
-- =============================================================================
-- Create the pipeline schema
CREATE SCHEMA IF NOT EXISTS pipeline;
COMMENT ON SCHEMA pipeline IS 'ReviewIQ Pipeline - LLM-powered review classification and aggregation';
-- =============================================================================
-- SECTION 1: ENUM TYPES (in pipeline schema)
-- =============================================================================
-- Valence enum
DO $$ BEGIN
CREATE TYPE pipeline.valence_type AS ENUM ('V+', 'V-', 'V0', '');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Intensity enum
DO $$ BEGIN
CREATE TYPE pipeline.intensity_type AS ENUM ('I1', 'I2', 'I3');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Specificity enum
DO $$ BEGIN
CREATE TYPE pipeline.specificity_type AS ENUM ('S1', 'S2', 'S3');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Actionability enum
DO $$ BEGIN
CREATE TYPE pipeline.actionability_type AS ENUM ('A1', 'A2', 'A3');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Temporal enum
DO $$ BEGIN
CREATE TYPE pipeline.temporal_type AS ENUM ('TC', 'TR', 'TH', 'TF');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Evidence enum
DO $$ BEGIN
CREATE TYPE pipeline.evidence_type AS ENUM ('ES', 'EI', 'EC');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Comparative enum
DO $$ BEGIN
CREATE TYPE pipeline.comparative_type AS ENUM ('CR-N', 'CR-B', 'CR-W', 'CR-S');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Issue state enum
DO $$ BEGIN
CREATE TYPE pipeline.issue_state AS ENUM ('open', 'resolved', 'ignored', 'merged');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Subject type enum (for facts)
DO $$ BEGIN
CREATE TYPE pipeline.subject_type AS ENUM ('overall', 'urt_code', 'domain', 'issue');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- Bucket type enum (for facts)
DO $$ BEGIN
CREATE TYPE pipeline.bucket_type AS ENUM ('day', 'week', 'month');
EXCEPTION
WHEN duplicate_object THEN NULL;
END $$;
-- =============================================================================
-- SECTION 2: URT TAXONOMY LOOKUP TABLES
-- =============================================================================
-- URT Domain lookup table
CREATE TABLE IF NOT EXISTS pipeline.urt_domains (
code CHAR(1) PRIMARY KEY,
name VARCHAR(50) NOT NULL,
description TEXT
);
INSERT INTO pipeline.urt_domains (code, name, description) VALUES
('O', 'Offering', 'Product/service quality, features, variety'),
('P', 'Price', 'Value, pricing, promotions, payment'),
('J', 'Journey', 'Timing, process, convenience, accessibility'),
('E', 'Environment', 'Physical space, ambiance, cleanliness, digital UX'),
('A', 'Attitude', 'Staff behavior, helpfulness, professionalism'),
('V', 'Voice', 'Brand, communication, marketing, transparency'),
('R', 'Relationship', 'Loyalty, trust, consistency, personalization')
ON CONFLICT (code) DO NOTHING;
-- URT Tier-2 categories lookup table
CREATE TABLE IF NOT EXISTS pipeline.urt_categories (
code VARCHAR(5) PRIMARY KEY,
domain_code CHAR(1) NOT NULL REFERENCES pipeline.urt_domains(code),
name VARCHAR(100) NOT NULL,
description TEXT
);
INSERT INTO pipeline.urt_categories (code, domain_code, name) VALUES
('O1', 'O', 'Core Product/Service'),
('O2', 'O', 'Product Features'),
('O3', 'O', 'Variety & Selection'),
('O4', 'O', 'Customization'),
('P1', 'P', 'Value Perception'),
('P2', 'P', 'Pricing Structure'),
('P3', 'P', 'Promotions & Deals'),
('P4', 'P', 'Payment Process'),
('J1', 'J', 'Wait Times'),
('J2', 'J', 'Booking & Reservations'),
('J3', 'J', 'Navigation & Convenience'),
('J4', 'J', 'Accessibility'),
('E1', 'E', 'Physical Environment'),
('E2', 'E', 'Ambiance & Atmosphere'),
('E3', 'E', 'Cleanliness'),
('E4', 'E', 'Digital Experience'),
('A1', 'A', 'Friendliness'),
('A2', 'A', 'Helpfulness'),
('A3', 'A', 'Professionalism'),
('A4', 'A', 'Knowledge & Expertise'),
('V1', 'V', 'Brand Identity'),
('V2', 'V', 'Communication'),
('V3', 'V', 'Marketing'),
('V4', 'V', 'Transparency'),
('R1', 'R', 'Loyalty'),
('R2', 'R', 'Trust'),
('R3', 'R', 'Consistency'),
('R4', 'R', 'Personalization')
ON CONFLICT (code) DO NOTHING;
COMMENT ON TABLE pipeline.urt_domains IS 'URT v5.1 top-level domains';
COMMENT ON TABLE pipeline.urt_categories IS 'URT v5.1 Tier-2 categories';
-- =============================================================================
-- SECTION 3: STAGE 1 - RAW & ENRICHED REVIEWS
-- =============================================================================
-- Raw reviews table (immutable audit log)
CREATE TABLE IF NOT EXISTS pipeline.reviews_raw (
id BIGSERIAL PRIMARY KEY,
-- Link to scraper job (soft FK to public.jobs)
job_id UUID,
source VARCHAR(20) NOT NULL DEFAULT 'google',
review_id VARCHAR(255) NOT NULL,
place_id VARCHAR(255) NOT NULL,
raw_payload JSONB NOT NULL DEFAULT '{}',
review_text TEXT,
rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5),
review_time TIMESTAMP WITH TIME ZONE NOT NULL,
reviewer_name VARCHAR(255) NOT NULL,
reviewer_id VARCHAR(255),
review_version INTEGER NOT NULL DEFAULT 1,
pulled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT reviews_raw_unique UNIQUE (source, review_id, review_version)
);
-- Indexes for reviews_raw
CREATE INDEX IF NOT EXISTS idx_reviews_raw_job_id ON pipeline.reviews_raw(job_id) WHERE job_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_reviews_raw_place_id ON pipeline.reviews_raw(place_id);
CREATE INDEX IF NOT EXISTS idx_reviews_raw_review_time ON pipeline.reviews_raw(review_time);
CREATE INDEX IF NOT EXISTS idx_reviews_raw_pulled_at ON pipeline.reviews_raw(pulled_at);
COMMENT ON TABLE pipeline.reviews_raw IS 'Immutable raw review data as scraped from source';
COMMENT ON COLUMN pipeline.reviews_raw.job_id IS 'Optional link to public.jobs.job_id for traceability';
-- Enriched reviews table (mutable, updated by classification)
CREATE TABLE IF NOT EXISTS pipeline.reviews_enriched (
id BIGSERIAL PRIMARY KEY,
source VARCHAR(20) NOT NULL DEFAULT 'google',
review_id VARCHAR(255) NOT NULL,
review_version INTEGER NOT NULL DEFAULT 1,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
raw_id BIGINT REFERENCES pipeline.reviews_raw(id),
-- Tenant context
business_id VARCHAR(255) NOT NULL,
place_id VARCHAR(255) NOT NULL,
-- Content
text TEXT NOT NULL,
text_normalized TEXT NOT NULL,
rating SMALLINT NOT NULL CHECK (rating BETWEEN 1 AND 5),
review_time TIMESTAMP WITH TIME ZONE NOT NULL,
-- Normalization fields
language VARCHAR(10) NOT NULL DEFAULT 'en',
taxonomy_version VARCHAR(20) NOT NULL DEFAULT 'v5.1',
-- Classification fields (NULL until Stage 2)
urt_primary VARCHAR(10),
urt_secondary VARCHAR(10)[] DEFAULT '{}',
valence VARCHAR(5),
intensity VARCHAR(5),
comparative VARCHAR(10),
staff_mentions VARCHAR(255)[] DEFAULT '{}',
quotes JSONB DEFAULT '{}',
embedding REAL[] DEFAULT '{}',
trust_score REAL,
classification_model VARCHAR(100),
classification_confidence JSONB DEFAULT '{}',
processed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT reviews_enriched_unique UNIQUE (source, review_id, review_version)
);
-- Indexes for reviews_enriched
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_business_id ON pipeline.reviews_enriched(business_id);
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_place_id ON pipeline.reviews_enriched(place_id);
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_review_time ON pipeline.reviews_enriched(review_time);
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_urt_primary ON pipeline.reviews_enriched(urt_primary) WHERE urt_primary IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_unclassified ON pipeline.reviews_enriched(review_time DESC) WHERE urt_primary IS NULL AND is_latest = TRUE;
CREATE INDEX IF NOT EXISTS idx_reviews_enriched_valence ON pipeline.reviews_enriched(valence) WHERE valence IS NOT NULL;
COMMENT ON TABLE pipeline.reviews_enriched IS 'Enriched reviews with normalization and classification';
-- =============================================================================
-- SECTION 4: STAGE 2 - REVIEW SPANS
-- =============================================================================
CREATE TABLE IF NOT EXISTS pipeline.review_spans (
id BIGSERIAL PRIMARY KEY,
span_id VARCHAR(50) NOT NULL UNIQUE,
-- Context
business_id VARCHAR(255) NOT NULL,
place_id VARCHAR(255) NOT NULL,
source VARCHAR(20) NOT NULL DEFAULT 'google',
review_id VARCHAR(255) NOT NULL,
review_version INTEGER NOT NULL DEFAULT 1,
-- Position
span_index INTEGER NOT NULL CHECK (span_index >= 0),
span_text TEXT NOT NULL,
span_start INTEGER NOT NULL CHECK (span_start >= 0),
span_end INTEGER NOT NULL CHECK (span_end > span_start),
-- Classification profile
profile VARCHAR(20) NOT NULL DEFAULT 'standard',
-- Core URT classification
urt_primary VARCHAR(10) NOT NULL,
urt_secondary VARCHAR(10)[] DEFAULT '{}',
valence VARCHAR(5) NOT NULL,
intensity VARCHAR(5) NOT NULL,
comparative VARCHAR(10) NOT NULL DEFAULT 'CR-N',
-- Extended classification (standard/full profile)
specificity VARCHAR(5),
actionability VARCHAR(5),
temporal VARCHAR(5),
evidence VARCHAR(5),
-- Entity extraction
entity VARCHAR(255),
entity_type VARCHAR(20),
entity_normalized VARCHAR(255),
-- Causal relations (full profile)
relation_type VARCHAR(20),
related_span_id VARCHAR(50),
causal_chain JSONB,
-- Flags
is_primary BOOLEAN NOT NULL DEFAULT FALSE,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
-- Time reference
review_time TIMESTAMP WITH TIME ZONE NOT NULL,
-- Metadata
confidence VARCHAR(10) NOT NULL DEFAULT 'medium',
usn VARCHAR(100) NOT NULL,
taxonomy_version VARCHAR(20) NOT NULL,
model_version VARCHAR(100) NOT NULL,
ingest_batch_id VARCHAR(50) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
-- Foreign key to review
CONSTRAINT fk_review FOREIGN KEY (source, review_id, review_version)
REFERENCES pipeline.reviews_enriched(source, review_id, review_version)
);
-- Indexes for review_spans
CREATE INDEX IF NOT EXISTS idx_spans_business_id ON pipeline.review_spans(business_id);
CREATE INDEX IF NOT EXISTS idx_spans_place_id ON pipeline.review_spans(place_id);
CREATE INDEX IF NOT EXISTS idx_spans_review_time ON pipeline.review_spans(review_time);
CREATE INDEX IF NOT EXISTS idx_spans_urt_primary ON pipeline.review_spans(urt_primary);
CREATE INDEX IF NOT EXISTS idx_spans_valence ON pipeline.review_spans(valence);
CREATE INDEX IF NOT EXISTS idx_spans_intensity ON pipeline.review_spans(intensity);
CREATE INDEX IF NOT EXISTS idx_spans_is_active ON pipeline.review_spans(is_active) WHERE is_active = TRUE;
CREATE INDEX IF NOT EXISTS idx_spans_is_primary ON pipeline.review_spans(is_primary) WHERE is_primary = TRUE;
CREATE INDEX IF NOT EXISTS idx_spans_entity_normalized ON pipeline.review_spans(entity_normalized) WHERE entity_normalized IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_spans_batch ON pipeline.review_spans(ingest_batch_id);
-- Index for unrouted negative spans (Stage 3 query)
CREATE INDEX IF NOT EXISTS idx_spans_unrouted_negative ON pipeline.review_spans(review_time DESC)
WHERE is_active = TRUE AND valence IN ('V-', '');
COMMENT ON TABLE pipeline.review_spans IS 'Extracted semantic spans with URT classification from reviews';
-- =============================================================================
-- SECTION 5: STAGE 3 - ISSUES
-- =============================================================================
-- Issues table
CREATE TABLE IF NOT EXISTS pipeline.issues (
id BIGSERIAL PRIMARY KEY,
issue_id VARCHAR(50) NOT NULL UNIQUE,
-- Context
business_id VARCHAR(255) NOT NULL,
place_id VARCHAR(255) NOT NULL,
-- Classification
primary_subcode VARCHAR(10) NOT NULL,
domain CHAR(1) NOT NULL,
-- State
state pipeline.issue_state NOT NULL DEFAULT 'open',
priority_score REAL NOT NULL DEFAULT 1.0,
confidence_score REAL NOT NULL DEFAULT 1.0,
-- Aggregates
span_count INTEGER NOT NULL DEFAULT 1,
max_intensity VARCHAR(5) NOT NULL DEFAULT 'I1',
-- Entity (optional - for entity-specific issues)
entity VARCHAR(255),
entity_normalized VARCHAR(255),
-- Metadata
taxonomy_version VARCHAR(20) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Issue-span links (1:1 - each span routes to exactly one issue)
CREATE TABLE IF NOT EXISTS pipeline.issue_spans (
id BIGSERIAL PRIMARY KEY,
issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id),
span_id VARCHAR(50) NOT NULL UNIQUE,
-- Review reference
source VARCHAR(20) NOT NULL DEFAULT 'google',
review_id VARCHAR(255) NOT NULL,
review_version INTEGER NOT NULL DEFAULT 1,
-- Match info
is_primary_match BOOLEAN NOT NULL DEFAULT TRUE,
intensity VARCHAR(5) NOT NULL,
review_time TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Issue events (audit log)
CREATE TABLE IF NOT EXISTS pipeline.issue_events (
id BIGSERIAL PRIMARY KEY,
issue_id VARCHAR(50) NOT NULL REFERENCES pipeline.issues(issue_id),
event_type VARCHAR(50) NOT NULL,
span_id VARCHAR(50),
old_value TEXT,
new_value TEXT,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Indexes for issues
CREATE INDEX IF NOT EXISTS idx_issues_business_id ON pipeline.issues(business_id);
CREATE INDEX IF NOT EXISTS idx_issues_place_id ON pipeline.issues(place_id);
CREATE INDEX IF NOT EXISTS idx_issues_state ON pipeline.issues(state);
CREATE INDEX IF NOT EXISTS idx_issues_primary_subcode ON pipeline.issues(primary_subcode);
CREATE INDEX IF NOT EXISTS idx_issues_domain ON pipeline.issues(domain);
CREATE INDEX IF NOT EXISTS idx_issues_entity_normalized ON pipeline.issues(entity_normalized) WHERE entity_normalized IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_issues_priority ON pipeline.issues(priority_score DESC) WHERE state = 'open';
CREATE INDEX IF NOT EXISTS idx_issues_created ON pipeline.issues(created_at);
CREATE INDEX IF NOT EXISTS idx_issues_updated ON pipeline.issues(updated_at);
-- Indexes for issue_spans
CREATE INDEX IF NOT EXISTS idx_issue_spans_issue_id ON pipeline.issue_spans(issue_id);
CREATE INDEX IF NOT EXISTS idx_issue_spans_review_time ON pipeline.issue_spans(review_time);
-- Indexes for issue_events
CREATE INDEX IF NOT EXISTS idx_issue_events_issue_id ON pipeline.issue_events(issue_id);
CREATE INDEX IF NOT EXISTS idx_issue_events_created ON pipeline.issue_events(created_at);
CREATE INDEX IF NOT EXISTS idx_issue_events_type ON pipeline.issue_events(event_type);
COMMENT ON TABLE pipeline.issues IS 'Aggregated issues derived from negative/mixed spans';
COMMENT ON TABLE pipeline.issue_spans IS 'Links between issues and their source spans';
COMMENT ON TABLE pipeline.issue_events IS 'Audit log for issue state changes';
-- =============================================================================
-- SECTION 6: STAGE 4 - FACT TIMESERIES
-- =============================================================================
CREATE TABLE IF NOT EXISTS pipeline.fact_timeseries (
id BIGSERIAL PRIMARY KEY,
-- Dimension keys
business_id VARCHAR(255) NOT NULL,
place_id VARCHAR(255) NOT NULL, -- Or 'ALL' for rollup
period_date DATE NOT NULL,
bucket_type pipeline.bucket_type NOT NULL DEFAULT 'day',
subject_type pipeline.subject_type NOT NULL DEFAULT 'urt_code',
subject_id VARCHAR(50) NOT NULL, -- URT code, domain letter, or issue_id
taxonomy_version VARCHAR(20) NOT NULL,
-- Core counts
review_count INTEGER NOT NULL DEFAULT 0,
span_count INTEGER NOT NULL DEFAULT 0,
-- Valence counts
negative_count INTEGER NOT NULL DEFAULT 0,
positive_count INTEGER NOT NULL DEFAULT 0,
neutral_count INTEGER NOT NULL DEFAULT 0,
mixed_count INTEGER NOT NULL DEFAULT 0,
-- Strength scores
strength_score REAL NOT NULL DEFAULT 0.0,
negative_strength REAL NOT NULL DEFAULT 0.0,
positive_strength REAL NOT NULL DEFAULT 0.0,
-- Rating
avg_rating REAL,
-- Intensity counts
i1_count INTEGER NOT NULL DEFAULT 0,
i2_count INTEGER NOT NULL DEFAULT 0,
i3_count INTEGER NOT NULL DEFAULT 0,
-- Comparative counts
cr_better INTEGER NOT NULL DEFAULT 0,
cr_worse INTEGER NOT NULL DEFAULT 0,
cr_same INTEGER NOT NULL DEFAULT 0,
-- Trust-weighted metrics
trust_weighted_strength REAL NOT NULL DEFAULT 0.0,
trust_weighted_negative REAL NOT NULL DEFAULT 0.0,
-- Metadata
computed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
-- Unique constraint for upsert
CONSTRAINT fact_timeseries_unique UNIQUE (
business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version
)
);
-- Indexes for fact_timeseries
CREATE INDEX IF NOT EXISTS idx_facts_business_id ON pipeline.fact_timeseries(business_id);
CREATE INDEX IF NOT EXISTS idx_facts_place_id ON pipeline.fact_timeseries(place_id);
CREATE INDEX IF NOT EXISTS idx_facts_period ON pipeline.fact_timeseries(period_date);
CREATE INDEX IF NOT EXISTS idx_facts_bucket ON pipeline.fact_timeseries(bucket_type);
CREATE INDEX IF NOT EXISTS idx_facts_subject_type ON pipeline.fact_timeseries(subject_type);
CREATE INDEX IF NOT EXISTS idx_facts_subject_id ON pipeline.fact_timeseries(subject_id);
-- Composite index for common dashboard queries
CREATE INDEX IF NOT EXISTS idx_facts_dashboard ON pipeline.fact_timeseries(
business_id, place_id, bucket_type, period_date DESC
);
-- Index for specific code trends
CREATE INDEX IF NOT EXISTS idx_facts_code_trend ON pipeline.fact_timeseries(
business_id, subject_id, bucket_type, period_date DESC
) WHERE subject_type = 'urt_code';
-- Index for domain aggregates
CREATE INDEX IF NOT EXISTS idx_facts_domain ON pipeline.fact_timeseries(
business_id, subject_id, bucket_type, period_date DESC
) WHERE subject_type = 'domain';
COMMENT ON TABLE pipeline.fact_timeseries IS 'Pre-aggregated time series facts for dashboard queries';
-- =============================================================================
-- SECTION 7: HELPER VIEWS
-- =============================================================================
-- View for latest enriched reviews only
CREATE OR REPLACE VIEW pipeline.reviews_latest AS
SELECT * FROM pipeline.reviews_enriched WHERE is_latest = TRUE;
-- View for open issues with span counts
CREATE OR REPLACE VIEW pipeline.issues_open AS
SELECT
i.*,
COUNT(s.id) as total_spans
FROM pipeline.issues i
LEFT JOIN pipeline.issue_spans s ON i.issue_id = s.issue_id
WHERE i.state = 'open'
GROUP BY i.id;
COMMENT ON VIEW pipeline.reviews_latest IS 'Latest version of each review';
COMMENT ON VIEW pipeline.issues_open IS 'Open issues with total span counts';

View File

@@ -1,4 +1,8 @@
"""Data access layer for pipeline operations."""
"""Data access layer for pipeline operations.
All tables live in the 'pipeline' schema, keeping them decoupled from the
main scraper schema while sharing the same database.
"""
from __future__ import annotations
@@ -20,6 +24,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# Schema prefix for all pipeline tables
SCHEMA = "pipeline"
class ReviewRepository:
"""Repository for review data operations."""
@@ -35,7 +42,7 @@ class ReviewRepository:
) -> int:
"""Insert a raw review and return its ID."""
query = """
INSERT INTO reviews_raw (
INSERT INTO pipeline.reviews_raw (
source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id,
review_version, pulled_at
@@ -66,7 +73,7 @@ class ReviewRepository:
) -> int:
"""Insert an enriched review stub (pre-classification)."""
query = """
INSERT INTO reviews_enriched (
INSERT INTO pipeline.reviews_enriched (
source, review_id, review_version, is_latest, raw_id,
business_id, place_id, text, text_normalized, rating, review_time,
language, taxonomy_version
@@ -101,7 +108,7 @@ class ReviewRepository:
) -> None:
"""Update an enriched review with classification results."""
query = """
UPDATE reviews_enriched SET
UPDATE pipeline.reviews_enriched SET
urt_primary = $1,
urt_secondary = $2,
valence = $3,
@@ -147,7 +154,7 @@ class ReviewRepository:
SELECT
source, review_id, review_version, business_id, place_id,
text, text_normalized, rating, review_time
FROM reviews_enriched
FROM pipeline.reviews_enriched
WHERE urt_primary IS NULL
AND is_latest = TRUE
ORDER BY review_time DESC
@@ -164,7 +171,7 @@ class ReviewRepository:
) -> dict[str, Any] | None:
"""Get a specific review by its composite key."""
query = """
SELECT * FROM reviews_enriched
SELECT * FROM pipeline.reviews_enriched
WHERE source = $1 AND review_id = $2 AND review_version = $3
"""
row = await self.db.fetchrow(query, source, review_id, review_version)
@@ -179,7 +186,7 @@ class ReviewRepository:
# For now, we check by querying the first occurrence
# A proper dedup table would be better for production
query = """
SELECT review_id FROM reviews_enriched
SELECT review_id FROM pipeline.reviews_enriched
WHERE business_id = $1
AND text_normalized IS NOT NULL
LIMIT 1
@@ -209,7 +216,7 @@ class SpanRepository:
) -> None:
"""Insert a span into the database."""
query = """
INSERT INTO review_spans (
INSERT INTO pipeline.review_spans (
span_id, business_id, place_id, source, review_id, review_version,
span_index, span_text, span_start, span_end,
profile, urt_primary, urt_secondary, valence, intensity, comparative,
@@ -282,8 +289,8 @@ class SpanRepository:
rs.urt_primary, rs.valence, rs.intensity,
rs.entity_normalized, rs.review_time, rs.confidence,
re.trust_score
FROM review_spans rs
JOIN reviews_enriched re ON (
FROM pipeline.review_spans rs
JOIN pipeline.reviews_enriched re ON (
re.source = rs.source
AND re.review_id = rs.review_id
AND re.review_version = rs.review_version
@@ -291,7 +298,7 @@ class SpanRepository:
WHERE rs.is_active = TRUE
AND rs.valence IN ('V-', '')
AND NOT EXISTS (
SELECT 1 FROM issue_spans iss WHERE iss.span_id = rs.span_id
SELECT 1 FROM pipeline.issue_spans iss WHERE iss.span_id = rs.span_id
)
ORDER BY rs.review_time DESC
LIMIT $1
@@ -301,7 +308,7 @@ class SpanRepository:
async def get_span_by_id(self, span_id: str) -> dict[str, Any] | None:
"""Get a span by its ID."""
query = "SELECT * FROM review_spans WHERE span_id = $1"
query = "SELECT * FROM pipeline.review_spans WHERE span_id = $1"
row = await self.db.fetchrow(query, span_id)
return dict(row) if row else None
@@ -326,7 +333,7 @@ class IssueRepository:
"""Create or update an issue. Returns True if newly created."""
# First check if exists
existing = await self.db.fetchval(
"SELECT 1 FROM issues WHERE issue_id = $1",
"SELECT 1 FROM pipeline.issues WHERE issue_id = $1",
issue_id,
)
@@ -334,7 +341,7 @@ class IssueRepository:
# Update
await self.db.execute(
"""
UPDATE issues SET
UPDATE pipeline.issues SET
span_count = span_count + 1,
max_intensity = CASE
WHEN $1 = 'I3' THEN 'I3'
@@ -353,7 +360,7 @@ class IssueRepository:
domain = primary_subcode[0] if primary_subcode else "O"
await self.db.execute(
"""
INSERT INTO issues (
INSERT INTO pipeline.issues (
issue_id, business_id, place_id, primary_subcode, domain,
state, priority_score, confidence_score, span_count, max_intensity,
entity, entity_normalized, taxonomy_version
@@ -388,7 +395,7 @@ class IssueRepository:
"""Link a span to an issue."""
await self.db.execute(
"""
INSERT INTO issue_spans (
INSERT INTO pipeline.issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
@@ -416,7 +423,7 @@ class IssueRepository:
"""Log an issue event for audit trail."""
await self.db.execute(
"""
INSERT INTO issue_events (
INSERT INTO pipeline.issue_events (
issue_id, event_type, span_id, old_value, new_value, metadata
) VALUES ($1, $2, $3, $4, $5, $6)
""",
@@ -430,14 +437,14 @@ class IssueRepository:
async def get_issue_by_id(self, issue_id: str) -> dict[str, Any] | None:
"""Get an issue by its ID."""
query = "SELECT * FROM issues WHERE issue_id = $1"
query = "SELECT * FROM pipeline.issues WHERE issue_id = $1"
row = await self.db.fetchrow(query, issue_id)
return dict(row) if row else None
async def check_span_already_linked(self, span_id: str) -> str | None:
"""Check if a span is already linked to an issue."""
return await self.db.fetchval(
"SELECT issue_id FROM issue_spans WHERE span_id = $1",
"SELECT issue_id FROM pipeline.issue_spans WHERE span_id = $1",
span_id,
)
@@ -452,7 +459,7 @@ class FactRepository:
"""Insert or update a fact record."""
await self.db.execute(
"""
INSERT INTO fact_timeseries (
INSERT INTO pipeline.fact_timeseries (
business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version,
review_count, span_count, negative_count, positive_count,
@@ -534,8 +541,8 @@ class FactRepository:
rs.comparative,
re.trust_score,
re.rating
FROM review_spans rs
JOIN reviews_enriched re ON (
FROM pipeline.review_spans rs
JOIN pipeline.reviews_enriched re ON (
re.source = rs.source
AND re.review_id = rs.review_id
AND re.review_version = rs.review_version
@@ -554,7 +561,7 @@ class FactRepository:
"""Get all place IDs for a business."""
rows = await self.db.fetch(
"""
SELECT DISTINCT place_id FROM reviews_enriched
SELECT DISTINCT place_id FROM pipeline.reviews_enriched
WHERE business_id = $1
""",
business_id,

76
tools/run_migrations.py Normal file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""
CLI tool to run database migrations.
Usage:
python tools/run_migrations.py --database-url $DATABASE_URL
# Or with environment variable
export DATABASE_URL=postgresql://user:pass@localhost/db
python tools/run_migrations.py
"""
import asyncio
import os
import sys
import argparse
import logging
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.database import DatabaseManager
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
async def main(database_url: str, migrations_dir: str = "migrations/versions"):
"""Run migrations against the database."""
db = DatabaseManager(database_url)
try:
await db.connect()
# First initialize base schema (jobs table, etc.)
print("Initializing base schema...")
await db.initialize_schema()
# Then run versioned migrations
print(f"\nRunning migrations from {migrations_dir}...")
count = await db.run_migrations(migrations_dir)
if count > 0:
print(f"\n✓ Applied {count} migration(s)")
else:
print("\n✓ No pending migrations")
except Exception as e:
print(f"\n✗ Migration failed: {e}", file=sys.stderr)
sys.exit(1)
finally:
await db.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run database migrations")
parser.add_argument(
"--database-url",
default=os.environ.get("DATABASE_URL"),
help="PostgreSQL connection string (default: $DATABASE_URL)",
)
parser.add_argument(
"--migrations-dir",
default="migrations/versions",
help="Directory containing .sql migration files",
)
args = parser.parse_args()
if not args.database_url:
print("Error: --database-url required or set DATABASE_URL environment variable", file=sys.stderr)
sys.exit(1)
asyncio.run(main(args.database_url, args.migrations_dir))