From acd3b22e88fd213d70c2d59c6ec7b1cd9cbe1e4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 17:08:40 +0000 Subject: [PATCH] docs: Add pipeline development artifacts for parallel implementation New artifacts: - ReviewIQ-Pipeline-DevGuide.md: Entry point for pipeline work - ReviewIQ-Pipeline-Contracts-v1.md: Stage I/O specs, validation rules, test fixtures - ReviewIQ-Pipeline-Checklist.md: Per-stage implementation checklists - ReviewIQ-Codebase-Overview.md: File structure, integration points - ReviewIQ-v3.2.1-Taxonomy-Versioning.md: Taxonomy versioning addendum Updated: - ReviewIQ-v32-Decisions.md: Added B2 audit findings, taxonomy versioning decisions, pipeline status These artifacts enable parallel development of pipeline stages 1-4 with: - Independent validation (35 rules across stages) - Clear input/output contracts - Test fixtures for each stage - Definition of done criteria Co-Authored-By: Claude Opus 4.5 --- .artifacts/ReviewIQ-Codebase-Overview.md | 450 ++++++ .artifacts/ReviewIQ-Pipeline-Checklist.md | 371 +++++ .artifacts/ReviewIQ-Pipeline-Contracts-v1.md | 1255 +++++++++++++++++ .artifacts/ReviewIQ-Pipeline-DevGuide.md | 312 ++++ .../ReviewIQ-v3.2.1-Taxonomy-Versioning.md | 1107 +++++++++++++++ .artifacts/ReviewIQ-v32-Decisions.md | 109 +- 6 files changed, 3600 insertions(+), 4 deletions(-) create mode 100644 .artifacts/ReviewIQ-Codebase-Overview.md create mode 100644 .artifacts/ReviewIQ-Pipeline-Checklist.md create mode 100644 .artifacts/ReviewIQ-Pipeline-Contracts-v1.md create mode 100644 .artifacts/ReviewIQ-Pipeline-DevGuide.md create mode 100644 .artifacts/ReviewIQ-v3.2.1-Taxonomy-Versioning.md diff --git a/.artifacts/ReviewIQ-Codebase-Overview.md b/.artifacts/ReviewIQ-Codebase-Overview.md new file mode 100644 index 0000000..c24475f --- /dev/null +++ b/.artifacts/ReviewIQ-Codebase-Overview.md @@ -0,0 +1,450 @@ +# ReviewIQ Codebase Overview + +**Purpose**: Map existing code for agents starting fresh +**Last Updated**: 2026-01-24 +**Status**: ~55% implemented (scraping done, enrichment pipeline missing) + +--- + +## Quick Start for New Agents + +1. **Read first**: `ReviewIQ-v32-Decisions.md` (context recovery) +2. **For implementation**: `ReviewIQ-Pipeline-Contracts-v1.md` + `ReviewIQ-Pipeline-Checklist.md` +3. **For schema**: `ReviewIQ-Architecture-v3.2.md` § Part 2 +4. **For LLM prompts**: `LLM-Classification-Contract-v1.md` + +--- + +## Implementation Status Summary + +``` +PIPELINE COMPLETION: ~55% + +✅ COMPLETE (Working in Production) +├── Google Maps scraping (v1.0.0) +├── Job orchestration & queuing +├── Chrome worker pool +├── Webhook delivery +├── SSE real-time streaming +├── Frontend job management +└── Basic analytics dashboard + +❌ NOT IMPLEMENTED (Spec'd Only) +├── Stage 1: Normalization +├── Stage 2: LLM Classification +├── Stage 3: Issue Routing +├── Stage 4: Fact Aggregation +├── Enrichment database schema +└── Advanced analytics UI +``` + +--- + +## Directory Structure + +``` +google-reviews-scraper-pro/ +│ +├── .artifacts/ # Design documents (YOU ARE HERE) +│ ├── ReviewIQ-v32-Decisions.md # START HERE - context recovery +│ ├── ReviewIQ-Architecture-v3.2.md # Full v3.2 spec +│ ├── ReviewIQ-Pipeline-Contracts-v1.md # Stage I/O contracts +│ ├── ReviewIQ-Pipeline-Checklist.md # Implementation checklist +│ ├── LLM-Classification-Contract-v1.md # LLM prompt spec +│ ├── URT-v5.1-Reference.md # URT dimension codes +│ └── ReviewIQ-Codebase-Overview.md # THIS FILE +│ +├── api/ # ✅ API routes (FastAPI) +│ ├── routes/ +│ │ ├── admin.py # Scraper management endpoints +│ │ ├── dashboard.py # Analytics endpoints +│ │ └── batches.py # Batch job endpoints +│ └── __init__.py +│ +├── core/ # ✅ Core services +│ ├── database.py # AsyncPG database layer (~1200 lines) +│ ├── config.py # Configuration management +│ └── logging.py # Structured logging +│ +├── services/ # ✅ Background services +│ ├── webhook_service.py # Async webhook delivery +│ └── job_callback_service.py # Callback handling +│ +├── workers/ # ✅ Worker pool +│ └── chrome_pool.py # Chrome instance pooling +│ +├── scrapers/ # ✅ Scraper implementations +│ └── google_reviews/ +│ └── v1_0_0.py # Main scraper (~2000 lines) +│ +├── pipeline/ # ❌ TO BE CREATED +│ ├── stage1_normalize.py # TODO +│ ├── stage2_classify.py # TODO +│ ├── stage3_route.py # TODO +│ ├── stage4_aggregate.py # TODO +│ └── tests/ # TODO +│ +├── migrations/ # Database migrations +│ ├── 001_add_job_platform_fields.sql # ✅ Deployed +│ ├── 002_create_batches_table.sql # ✅ Deployed +│ ├── 003_create_scraper_registry.sql # ✅ Deployed +│ ├── 004_create_api_keys.sql # ✅ Deployed +│ ├── 005_create_reviews_tables.sql # ❌ TODO +│ ├── 006_create_spans_table.sql # ❌ TODO +│ ├── 007_create_urt_enums.sql # ❌ TODO +│ ├── 008_create_issues_tables.sql # ❌ TODO +│ └── 009_create_facts_table.sql # ❌ TODO +│ +├── frontend/ # ✅ Next.js frontend +│ ├── app/ +│ │ ├── dashboard/ # System overview +│ │ ├── jobs/ # Job list & detail +│ │ ├── analytics/ # Basic charts +│ │ └── new/ # Job submission forms +│ └── components/ +│ +├── api_server_production.py # ✅ Main API server (~1920 lines) +├── Dockerfile # ✅ Production container +├── docker-compose.production.yml # ✅ Docker orchestration +├── requirements-production.txt # ✅ Python dependencies +└── package.json # ✅ Node.js dependencies +``` + +--- + +## Key Files to Understand + +### 1. API Server Entry Point +**File**: `api_server_production.py` +**Lines**: ~1920 +**What it does**: +- FastAPI application setup +- All endpoint definitions +- Job submission and management +- SSE streaming +- Health checks + +**Key endpoints**: +```python +POST /api/scrape/google-reviews # Submit scrape job +GET /jobs/{job_id} # Get job status +GET /jobs/{job_id}/reviews # Get scraped reviews +GET /jobs/{job_id}/stream # SSE real-time updates +``` + +### 2. Database Layer +**File**: `core/database.py` +**Lines**: ~1200 +**What it does**: +- AsyncPG connection pooling +- Job CRUD operations +- Review storage (currently JSONB blob) +- Webhook tracking + +**Key functions**: +```python +async def create_job(job_data: dict) -> str +async def update_job_status(job_id: str, status: str, ...) +async def get_job(job_id: str) -> dict +async def store_reviews(job_id: str, reviews: list) -> int +``` + +**Note**: Currently stores reviews as JSONB in `jobs.reviews_data`. +The enrichment pipeline will need to: +1. Read from `jobs.reviews_data` +2. Write to `reviews_raw` and `reviews_enriched` tables + +### 3. Google Scraper +**File**: `scrapers/google_reviews/v1_0_0.py` +**Lines**: ~2000 +**What it does**: +- SeleniumBase Chrome automation +- DOM scraping + API interception +- Review extraction (text, rating, author, date) +- Business metadata extraction +- Pagination handling + +**Output format** (stored in `jobs.reviews_data`): +```json +{ + "business_info": {...}, + "reviews": [ + { + "review_id": "...", + "author_name": "...", + "rating": 4, + "text": "...", + "review_time": "2026-01-20T14:30:00Z" + } + ] +} +``` + +### 4. Chrome Worker Pool +**File**: `workers/chrome_pool.py` +**What it does**: +- Pre-warms Chrome instances +- Manages concurrent scraping jobs +- Handles resource cleanup + +--- + +## Database Schema (Current State) + +### Deployed Tables + +```sql +-- Core job tracking +CREATE TABLE jobs ( + job_id UUID PRIMARY KEY, + status VARCHAR(50), + url TEXT, + reviews_data JSONB, -- Raw scraped reviews live here + reviews_count INTEGER, + started_at TIMESTAMP, + completed_at TIMESTAMP, + -- ... 20+ more columns +); + +-- Batch processing +CREATE TABLE batches (...); + +-- Webhook tracking +CREATE TABLE webhook_attempts (...); + +-- Scraper versioning +CREATE TABLE scraper_registry (...); + +-- API authentication (not enforced) +CREATE TABLE api_keys (...); +``` + +### NOT Deployed (Defined in v3.2 Spec) + +```sql +-- These tables need to be created via migrations 005-009 + +CREATE TABLE locations (...); -- Multi-tenant locations +CREATE TABLE reviews_raw (...); -- Immutable raw storage +CREATE TABLE reviews_enriched (...); -- Classified reviews +CREATE TABLE review_spans (...); -- Span-level classification +CREATE TABLE urt_codes (...); -- URT reference data +CREATE TABLE issues (...); -- Aggregated issues +CREATE TABLE issue_spans (...); -- Issue-span links +CREATE TABLE issue_events (...); -- Audit log +CREATE TABLE fact_timeseries (...); -- Pre-aggregated analytics +``` + +--- + +## Integration Points + +### Where New Pipeline Code Connects + +``` + EXISTING CODE NEW CODE + ════════════ ════════ + +api_server_production.py + │ + ▼ + jobs table + (reviews_data JSONB) ──────────────────▶ pipeline/stage1_normalize.py + │ + ▼ + reviews_raw table + reviews_enriched table + │ + ▼ + pipeline/stage2_classify.py + │ + ▼ + review_spans table + │ + ▼ + pipeline/stage3_route.py + │ + ▼ + issues table + issue_spans table + │ + ▼ + pipeline/stage4_aggregate.py + │ + ▼ + fact_timeseries table +``` + +### How to Trigger Pipeline + +**Option A: Post-scrape hook** (recommended) +```python +# In api_server_production.py, after job completes: +async def on_job_complete(job_id: str): + # Existing: send webhook + await webhook_service.dispatch(job_id) + + # NEW: trigger enrichment pipeline + await pipeline.stage1.process_job(job_id) +``` + +**Option B: Background worker** +```python +# New file: workers/enrichment_worker.py +async def enrichment_loop(): + while True: + jobs = await db.query(""" + SELECT job_id FROM jobs + WHERE status = 'completed' + AND enrichment_status IS NULL + LIMIT 10 + """) + for job in jobs: + await pipeline.process(job['job_id']) + await asyncio.sleep(60) +``` + +**Option C: Manual trigger via API** +```python +# New endpoint in api_server_production.py +@app.post("/api/jobs/{job_id}/enrich") +async def trigger_enrichment(job_id: str): + await pipeline.process(job_id) + return {"status": "processing"} +``` + +--- + +## Environment Setup + +### Required Environment Variables + +```bash +# Database (required) +DATABASE_URL=postgresql://user:pass@localhost:5432/reviewiq + +# LLM Provider (for Stage 2) +OPENAI_API_KEY=sk-... +# OR +ANTHROPIC_API_KEY=sk-ant-... + +# Embedding model (for Stage 2) +EMBEDDING_MODEL=all-MiniLM-L6-v2 + +# Taxonomy version +DEFAULT_TAXONOMY_VERSION=v5.1 +``` + +### Local Development + +```bash +# Start database +docker-compose -f docker-compose.production.yml up -d postgres + +# Run migrations +psql $DATABASE_URL -f migrations/001_add_job_platform_fields.sql +# ... repeat for all migrations + +# Start API server +python api_server_production.py + +# Start frontend (separate terminal) +cd frontend && npm run dev +``` + +### Running Tests + +```bash +# Unit tests +pytest pipeline/tests/ -v + +# Integration tests +pytest pipeline/tests/integration/ -v + +# Full E2E validation +python -m pipeline.validate --job-id +``` + +--- + +## Tech Stack + +| Component | Technology | Version | +|-----------|------------|---------| +| API | FastAPI | 0.100+ | +| Database | PostgreSQL | 15+ | +| DB Driver | asyncpg | 0.28+ | +| Scraping | SeleniumBase | 4.20+ | +| Browser | Chrome (headless) | 120+ | +| Frontend | Next.js | 16.1.3 | +| UI | React | 19.2.3 | +| Charts | Recharts | 2.x | +| LLM | OpenAI / Anthropic | Latest | +| Embeddings | sentence-transformers | 2.x | +| Vectors | pgvector | 0.5+ | + +--- + +## Common Patterns + +### Database Queries (asyncpg) + +```python +# In core/database.py pattern: +async def get_job(job_id: str) -> dict: + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM jobs WHERE job_id = $1", + job_id + ) + return dict(row) if row else None +``` + +### API Endpoints (FastAPI) + +```python +# In api_server_production.py pattern: +@app.get("/jobs/{job_id}") +async def get_job(job_id: str): + job = await db.get_job(job_id) + if not job: + raise HTTPException(404, "Job not found") + return job +``` + +### Background Tasks + +```python +# Pattern for async processing: +async def process_in_background(job_id: str): + asyncio.create_task(do_heavy_work(job_id)) + return {"status": "processing"} +``` + +--- + +## Gotchas & Notes + +1. **Reviews are JSONB blobs** - Currently in `jobs.reviews_data`, not normalized tables +2. **No auth enforcement** - `api_keys` table exists but not used +3. **CORS is wide open** - Set to `*` in production (fix before launch) +4. **Scraper is single-threaded per job** - Chrome pool handles concurrency +5. **Webhooks have retry logic** - 3 attempts with exponential backoff +6. **SSE streaming works** - Real-time job updates via `/jobs/{job_id}/stream` + +--- + +## Next Steps for Implementation + +1. **Create migrations 005-009** - Deploy enrichment schema +2. **Create `pipeline/` directory** - New code goes here +3. **Implement Stage 1** - Read from jobs.reviews_data, write to reviews_raw/enriched +4. **Implement Stage 2** - LLM classification with span extraction +5. **Implement Stage 3** - Issue routing +6. **Implement Stage 4** - Fact aggregation +7. **Add pipeline trigger** - Hook into job completion or create worker +8. **Update frontend** - Add enrichment views + +--- + +*This document should be updated when significant code changes occur.* diff --git a/.artifacts/ReviewIQ-Pipeline-Checklist.md b/.artifacts/ReviewIQ-Pipeline-Checklist.md new file mode 100644 index 0000000..88b3cf6 --- /dev/null +++ b/.artifacts/ReviewIQ-Pipeline-Checklist.md @@ -0,0 +1,371 @@ +# ReviewIQ Pipeline Implementation Checklist + +**Purpose**: Quick reference for agents to verify stage completion +**Reference**: `ReviewIQ-Pipeline-Contracts-v1.md` for full specs + +--- + +## Pipeline Overview + +``` +Stage 0 ──▶ Stage 1 ──▶ Stage 2 ──▶ Stage 3 ──▶ Stage 4 +Scrape Normalize Classify Route Aggregate +✅ DONE ❌ TODO ❌ TODO ❌ TODO ❌ TODO +``` + +--- + +## Stage 0: Raw Ingestion ✅ COMPLETE + +No action needed. Already implemented in `scrapers/google_reviews/v1_0_0.py`. + +**Output Location**: `jobs.reviews_data` (JSONB) + +--- + +## Stage 1: Normalization ❌ TODO + +### Files to Create +- [ ] `pipeline/stage1_normalize.py` - Main normalization logic +- [ ] `pipeline/tests/test_stage1.py` - Unit tests +- [ ] `migrations/005_create_reviews_tables.sql` - Schema migration + +### Database Schema Required +```sql +-- Must exist before Stage 1 can write +CREATE TABLE reviews_raw (...); +CREATE TABLE reviews_enriched (...); +``` + +### Implementation Checklist +- [ ] Read from `jobs.reviews_data` where `status = 'completed'` +- [ ] Filter out empty/null review texts +- [ ] Normalize text (lowercase, whitespace, emoji) +- [ ] Detect language (ISO 639-1) +- [ ] Compute content hash (SHA256) +- [ ] Check for duplicates within business +- [ ] Insert into `reviews_raw` (immutable) +- [ ] Insert stub into `reviews_enriched` (classification fields NULL) +- [ ] Return `Stage1Output` with stats + +### Validation (run after implementation) +```bash +python -m pytest pipeline/tests/test_stage1.py -v +``` + +### Definition of Done +- [ ] All V1.1-V1.6 validation rules pass +- [ ] `reviews_raw` populated with immutable records +- [ ] `reviews_enriched` has stubs ready for Stage 2 +- [ ] Integration test: Stage 0 output → Stage 1 input passes +- [ ] No empty texts in output +- [ ] Duplicate detection working + +--- + +## Stage 2: LLM Classification ❌ TODO + +### Files to Create +- [ ] `pipeline/stage2_classify.py` - LLM classification logic +- [ ] `pipeline/llm_client.py` - LLM provider abstraction +- [ ] `pipeline/span_extractor.py` - Span boundary detection +- [ ] `pipeline/tests/test_stage2.py` - Unit tests +- [ ] `migrations/006_create_spans_table.sql` - Schema migration +- [ ] `migrations/007_create_urt_enums.sql` - ENUM types + +### Database Schema Required +```sql +-- ENUM types +CREATE TYPE urt_valence AS ENUM (...); +CREATE TYPE urt_intensity AS ENUM (...); +-- ... all 12 ENUMs from v3.2 spec + +-- Spans table +CREATE TABLE review_spans (...); +``` + +### Implementation Checklist +- [ ] Query unclassified reviews from `reviews_enriched` +- [ ] Build LLM prompt per `LLM-Classification-Contract-v1.md` +- [ ] Call LLM API (support GPT-4o-mini, Claude-3-haiku) +- [ ] Parse structured JSON response +- [ ] Extract spans with character offsets +- [ ] Validate span_text matches original text substring +- [ ] Check spans don't overlap +- [ ] Select primary span (I3 > I2 > I1, V- > V± > V0 > V+) +- [ ] Generate embeddings (384-dim) +- [ ] Compute trust score (0.2 floor) +- [ ] Build USN string per profile +- [ ] Update `reviews_enriched` with classification +- [ ] Insert spans into `review_spans` +- [ ] Return `Stage2Output` with stats + +### Validation (run after implementation) +```bash +python -m pytest pipeline/tests/test_stage2.py -v +``` + +### Definition of Done +- [ ] All V2.1-V2.12 validation rules pass +- [ ] LLM calls working with retry logic +- [ ] Span offsets correct (text substring matches) +- [ ] No overlapping spans +- [ ] Exactly one primary span per review +- [ ] Embeddings are 384-dim vectors +- [ ] Trust scores clamped to [0.2, 1.0] +- [ ] USN format valid per profile +- [ ] Integration test: Stage 1 output → Stage 2 input passes + +--- + +## Stage 3: Issue Routing ❌ TODO + +### Files to Create +- [ ] `pipeline/stage3_route.py` - Issue routing logic +- [ ] `pipeline/issue_manager.py` - Issue create/update logic +- [ ] `pipeline/tests/test_stage3.py` - Unit tests +- [ ] `migrations/008_create_issues_tables.sql` - Schema migration + +### Database Schema Required +```sql +CREATE TABLE issues (...); +CREATE TABLE issue_spans (...); +CREATE TABLE issue_events (...); +``` + +### Implementation Checklist +- [ ] Query unrouted spans where `valence IN ('V-', 'V±')` +- [ ] Generate deterministic `issue_id` from routing key +- [ ] Check if issue exists +- [ ] Create new issue OR update existing counters +- [ ] Insert `issue_spans` link (enforce 1:1 with UNIQUE) +- [ ] Log event to `issue_events` +- [ ] Recalculate priority score +- [ ] Return `Stage3Output` with stats + +### Validation (run after implementation) +```bash +python -m pytest pipeline/tests/test_stage3.py -v +``` + +### Definition of Done +- [ ] All V3.1-V3.5 validation rules pass +- [ ] Issue IDs are deterministic (same key = same ID) +- [ ] 1:1 span-to-issue mapping enforced +- [ ] Only V-/V± spans create issues +- [ ] Issue counters updated correctly +- [ ] Events logged for audit +- [ ] Integration test: Stage 2 output → Stage 3 input passes + +--- + +## Stage 4: Fact Aggregation ❌ TODO + +### Files to Create +- [ ] `pipeline/stage4_aggregate.py` - Fact aggregation logic +- [ ] `pipeline/tests/test_stage4.py` - Unit tests +- [ ] `migrations/009_create_facts_table.sql` - Schema migration + +### Database Schema Required +```sql +CREATE TABLE fact_timeseries (...); +``` + +### Implementation Checklist +- [ ] Accept business_id, date, bucket_types +- [ ] Query spans joined with reviews for the period +- [ ] Aggregate by URT code (per location + 'ALL' rollup) +- [ ] Compute: review_count, span_count, valence counts +- [ ] Compute: strength_score, negative_strength, positive_strength +- [ ] Compute: intensity distribution (I1/I2/I3) +- [ ] Compute: CR counts (better/worse/same) +- [ ] Compute: trust-weighted metrics +- [ ] UPSERT into `fact_timeseries` +- [ ] Return `Stage4Output` with stats + +### Validation (run after implementation) +```bash +python -m pytest pipeline/tests/test_stage4.py -v +``` + +### Definition of Done +- [ ] All V4.1-V4.7 validation rules pass +- [ ] Valence counts sum to span_count +- [ ] Intensity counts sum to span_count +- [ ] 'ALL' rollup includes owned locations only +- [ ] Facts are idempotent (re-run produces same result) +- [ ] Integration test: Full pipeline E2E passes + +--- + +## Integration Tests + +### Handoff Tests (run after each stage) +```bash +# Stage 0 → 1 +python -m pytest pipeline/tests/integration/test_stage0_to_1.py + +# Stage 1 → 2 +python -m pytest pipeline/tests/integration/test_stage1_to_2.py + +# Stage 2 → 3 +python -m pytest pipeline/tests/integration/test_stage2_to_3.py + +# Full E2E +python -m pytest pipeline/tests/integration/test_e2e.py +``` + +### E2E Validation Command +```bash +# Run full pipeline validation +python -m pipeline.validate --job-id --verbose +``` + +Expected output: +``` +Stage 0: ✅ PASS (5/5 rules) +Stage 1: ✅ PASS (6/6 rules) +Stage 2: ✅ PASS (12/12 rules) +Stage 3: ✅ PASS (5/5 rules) +Stage 4: ✅ PASS (7/7 rules) + +E2E Integration: ✅ PASS +- Reviews scraped: 47 +- Reviews normalized: 45 (2 empty filtered) +- Spans extracted: 127 +- Issues created: 23 +- Facts written: 156 +``` + +--- + +## Quick Reference: Validation Rules + +### Stage 1 +| Rule | Description | +|------|-------------| +| V1.1 | `text` is non-empty | +| V1.2 | `text_normalized` has no control chars | +| V1.3 | `content_hash` is 64-char hex | +| V1.4 | `review_version` >= 1 | +| V1.5 | `text_language` is valid ISO 639-1 | +| V1.6 | `raw_id` references valid row | + +### Stage 2 +| Rule | Description | +|------|-------------| +| V2.1 | `urt_primary` matches `^[OPJEAVR][1-4]\.[0-9]{2}$` | +| V2.2 | `urt_secondary` max 2 elements | +| V2.3 | `valence` is valid enum | +| V2.4 | `intensity` is valid enum | +| V2.5 | `span_end > span_start` | +| V2.6 | `span_text == text[start:end]` | +| V2.7 | Spans don't overlap | +| V2.8 | Exactly one `is_primary = true` | +| V2.9 | `trust_score` in [0.2, 1.0] | +| V2.10 | `embedding` is 384-dim | +| V2.11 | `usn` matches profile regex | +| V2.12 | `related_span_index` valid if set | + +### Stage 3 +| Rule | Description | +|------|-------------| +| V3.1 | `issue_id` matches `^ISS-[a-f0-9]{16}$` | +| V3.2 | `routing_key` non-empty | +| V3.3 | Span not already linked elsewhere | +| V3.4 | Issue exists in `issues` table | +| V3.5 | Only V-/V± spans routed | + +### Stage 4 +| Rule | Description | +|------|-------------| +| V4.1 | `place_id` valid or 'ALL' | +| V4.2 | `period_date` matches bucket | +| V4.3 | `span_count >= review_count` | +| V4.4 | Valence counts sum correctly | +| V4.5 | Intensity counts sum correctly | +| V4.6 | `strength_score >= 0` | +| V4.7 | `avg_rating` in [1.0, 5.0] or NULL | + +--- + +## Migration Execution Order + +```bash +# Run in sequence +psql $DATABASE_URL -f migrations/005_create_reviews_tables.sql +psql $DATABASE_URL -f migrations/006_create_spans_table.sql +psql $DATABASE_URL -f migrations/007_create_urt_enums.sql +psql $DATABASE_URL -f migrations/008_create_issues_tables.sql +psql $DATABASE_URL -f migrations/009_create_facts_table.sql +``` + +--- + +## Environment Variables Required + +```bash +# LLM Provider (Stage 2) +OPENAI_API_KEY=sk-... +# OR +ANTHROPIC_API_KEY=sk-ant-... + +# Embedding Model (Stage 2) +EMBEDDING_MODEL=all-MiniLM-L6-v2 + +# Database +DATABASE_URL=postgresql://... + +# Taxonomy +DEFAULT_TAXONOMY_VERSION=v5.1 +``` + +--- + +## File Structure After Implementation + +``` +pipeline/ +├── __init__.py +├── stage1_normalize.py # ❌ TODO +├── stage2_classify.py # ❌ TODO +├── stage3_route.py # ❌ TODO +├── stage4_aggregate.py # ❌ TODO +├── llm_client.py # ❌ TODO +├── span_extractor.py # ❌ TODO +├── issue_manager.py # ❌ TODO +├── validate.py # ❌ TODO (CLI validator) +├── contracts.py # ❌ TODO (TypedDict definitions) +└── tests/ + ├── __init__.py + ├── test_stage1.py # ❌ TODO + ├── test_stage2.py # ❌ TODO + ├── test_stage3.py # ❌ TODO + ├── test_stage4.py # ❌ TODO + ├── fixtures/ + │ ├── stage0_output.json + │ ├── stage1_output.json + │ ├── stage2_output.json + │ ├── stage3_output.json + │ └── stage4_output.json + └── integration/ + ├── test_stage0_to_1.py + ├── test_stage1_to_2.py + ├── test_stage2_to_3.py + └── test_e2e.py + +migrations/ +├── 001_add_job_platform_fields.sql # ✅ EXISTS +├── 002_create_batches_table.sql # ✅ EXISTS +├── 003_create_scraper_registry.sql # ✅ EXISTS +├── 004_create_api_keys.sql # ✅ EXISTS +├── 005_create_reviews_tables.sql # ❌ TODO +├── 006_create_spans_table.sql # ❌ TODO +├── 007_create_urt_enums.sql # ❌ TODO +├── 008_create_issues_tables.sql # ❌ TODO +└── 009_create_facts_table.sql # ❌ TODO +``` + +--- + +*Last Updated: 2026-01-24* diff --git a/.artifacts/ReviewIQ-Pipeline-Contracts-v1.md b/.artifacts/ReviewIQ-Pipeline-Contracts-v1.md new file mode 100644 index 0000000..f7191f3 --- /dev/null +++ b/.artifacts/ReviewIQ-Pipeline-Contracts-v1.md @@ -0,0 +1,1255 @@ +# ReviewIQ Pipeline Contracts v1.0 + +**Version**: 1.0.0 +**Status**: Implementation Blueprint +**Date**: 2026-01-24 +**Purpose**: Enable parallel development with independent stage validation + +--- + +## Executive Summary + +This document defines the **contract** for each pipeline stage: inputs, outputs, validation rules, and test fixtures. Each stage can be developed and tested independently. Integration tests verify stage-to-stage compatibility. + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ REVIEWIQ PIPELINE STAGES │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ STAGE 0 STAGE 1 STAGE 2 STAGE 3 STAGE 4 │ +│ ════════ ════════ ════════ ════════ ════════ │ +│ │ +│ [Scrape] ──▶ [Normalize] ──▶ [Classify] ──▶ [Route] ──▶ [Aggregate] │ +│ │ +│ Google Raw → Clean LLM → Spans Spans → Issues Facts │ +│ Maps API Text + URT codes Dedup + Link Timeseries │ +│ │ +│ ✅ DONE ❌ TODO ❌ TODO ❌ TODO ❌ TODO │ +│ │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Table of Contents + +1. [Stage 0: Raw Ingestion](#stage-0-raw-ingestion) (Complete) +2. [Stage 1: Normalization](#stage-1-normalization) +3. [Stage 2: LLM Classification](#stage-2-llm-classification) +4. [Stage 3: Issue Routing](#stage-3-issue-routing) +5. [Stage 4: Fact Aggregation](#stage-4-fact-aggregation) +6. [Integration Tests](#integration-tests) +7. [Test Fixtures](#test-fixtures) + +--- + +## Stage 0: Raw Ingestion + +**Status**: ✅ COMPLETE +**Owner**: Scraper Team +**Code**: `scrapers/google_reviews/v1_0_0.py` + +### Input Contract + +```typescript +interface Stage0Input { + job_id: string; // UUID + url: string; // Google Maps URL + max_reviews?: number; // Default: 100 + business_id: string; // Tenant identifier + place_id?: string; // Google Place ID (extracted from URL if missing) +} +``` + +### Output Contract + +```typescript +interface Stage0Output { + job_id: string; + status: 'completed' | 'failed' | 'partial'; + business_id: string; + place_id: string; + + // Business metadata + business_info: { + name: string; + address: string; + category: string; + total_reviews: number; + average_rating: number; + }; + + // Raw reviews array + reviews: RawReview[]; + + // Execution metadata + scrape_time_ms: number; + reviews_scraped: number; + scraper_version: string; +} + +interface RawReview { + review_id: string; // Google's review identifier + author_name: string; + author_id?: string; + rating: number; // 1-5 + text: string | null; // May be null for rating-only reviews + review_time: string; // ISO 8601 + response_text?: string; // Owner response + response_time?: string; + photos?: string[]; + + // Raw payload for audit + raw_payload: object; +} +``` + +### Validation Rules + +| Rule | Check | Error Code | +|------|-------|------------| +| V0.1 | `reviews` is array | `STAGE0_INVALID_OUTPUT` | +| V0.2 | Each review has `review_id` | `STAGE0_MISSING_REVIEW_ID` | +| V0.3 | Each review has `rating` 1-5 | `STAGE0_INVALID_RATING` | +| V0.4 | `review_time` is valid ISO 8601 | `STAGE0_INVALID_TIMESTAMP` | +| V0.5 | `business_info.name` is non-empty | `STAGE0_MISSING_BUSINESS` | + +### Storage Location + +```sql +-- Currently stored as JSONB blob +jobs.reviews_data -> Stage0Output +``` + +### Handoff to Stage 1 + +Stage 1 reads from `jobs.reviews_data` where `status = 'completed'`. + +--- + +## Stage 1: Normalization + +**Status**: ❌ NOT IMPLEMENTED +**Owner**: TBD +**Depends On**: Stage 0 + +### Purpose + +Transform raw scraped reviews into clean, versioned records ready for LLM classification. + +### Input Contract + +```typescript +// Reads Stage0Output from jobs.reviews_data +interface Stage1Input { + job_id: string; + business_id: string; + place_id: string; + reviews: RawReview[]; // From Stage 0 +} +``` + +### Output Contract + +```typescript +interface Stage1Output { + job_id: string; + business_id: string; + place_id: string; + + // Normalized reviews ready for classification + reviews_normalized: NormalizedReview[]; + + // Processing stats + stats: { + input_count: number; + output_count: number; + skipped_empty: number; + skipped_duplicate: number; + }; +} + +interface NormalizedReview { + // Identity (composite key) + source: 'google'; + review_id: string; + review_version: number; // Increments if review edited + + // Tenant context + business_id: string; + place_id: string; + + // Content + text: string; // Original text (NOT NULL - empty reviews filtered) + text_normalized: string; // Cleaned: lowercase, whitespace normalized, emoji standardized + text_language: string; // ISO 639-1 code + text_length: number; // Character count + word_count: number; + + // Metadata + rating: number; + review_time: string; // ISO 8601 + author_name: string; + author_id?: string; + + // Dedup + content_hash: string; // SHA256 of normalized text + dedup_group_id?: string; // If duplicate detected + + // Reference + raw_id: number; // FK to reviews_raw.id +} +``` + +### Database Schema + +```sql +-- Stage 1 writes to reviews_raw (immutable) and reviews_enriched (initial) +INSERT INTO reviews_raw ( + source, review_id, place_id, raw_payload, + review_text, rating, review_time, reviewer_name, reviewer_id, + review_version, pulled_at +) VALUES (...); + +-- Then creates reviews_enriched stub (no classification yet) +INSERT INTO reviews_enriched ( + source, review_id, review_version, is_latest, raw_id, + business_id, place_id, text, text_normalized, rating, review_time, + language, taxonomy_version, + -- Classification fields NULL until Stage 2 + urt_primary, valence, intensity, embedding, trust_score +) VALUES ( + ..., + NULL, NULL, NULL, NULL, NULL -- Filled by Stage 2 +); +``` + +### Validation Rules + +| Rule | Check | Error Code | +|------|-------|------------| +| V1.1 | `text` is non-empty string | `STAGE1_EMPTY_TEXT` | +| V1.2 | `text_normalized` contains no control chars | `STAGE1_INVALID_NORMALIZATION` | +| V1.3 | `content_hash` is 64-char hex | `STAGE1_INVALID_HASH` | +| V1.4 | `review_version` >= 1 | `STAGE1_INVALID_VERSION` | +| V1.5 | `text_language` is valid ISO 639-1 | `STAGE1_INVALID_LANGUAGE` | +| V1.6 | `raw_id` references valid reviews_raw row | `STAGE1_ORPHAN_ENRICHED` | + +### Validation Script + +```python +def validate_stage1_output(output: Stage1Output) -> ValidationResult: + """ + Run after Stage 1 completes. + Returns: ValidationResult with passed/failed rules and error details. + """ + errors = [] + + for review in output.reviews_normalized: + # V1.1: Non-empty text + if not review.text or not review.text.strip(): + errors.append(ValidationError('V1.1', review.review_id, 'Empty text')) + + # V1.2: No control characters + if has_control_chars(review.text_normalized): + errors.append(ValidationError('V1.2', review.review_id, 'Control chars in normalized')) + + # V1.3: Valid hash + if not is_valid_sha256(review.content_hash): + errors.append(ValidationError('V1.3', review.review_id, 'Invalid content hash')) + + # V1.4: Version >= 1 + if review.review_version < 1: + errors.append(ValidationError('V1.4', review.review_id, 'Invalid version')) + + # V1.5: Valid language code + if not is_valid_iso639(review.text_language): + errors.append(ValidationError('V1.5', review.review_id, 'Invalid language')) + + return ValidationResult( + stage='stage1', + passed=len(errors) == 0, + error_count=len(errors), + errors=errors + ) +``` + +### Handoff to Stage 2 + +Stage 2 queries: +```sql +SELECT * FROM reviews_enriched +WHERE urt_primary IS NULL -- Not yet classified + AND is_latest = TRUE +ORDER BY review_time DESC +LIMIT 100; -- Batch size +``` + +--- + +## Stage 2: LLM Classification + +**Status**: ❌ NOT IMPLEMENTED +**Owner**: TBD +**Depends On**: Stage 1 +**Contract Reference**: `LLM-Classification-Contract-v1.md` + +### Purpose + +Classify normalized reviews into URT codes with span-level extraction. + +### Input Contract + +```typescript +interface Stage2Input { + // Batch of reviews to classify + reviews: ReviewToClassify[]; + + // Configuration + config: { + model: string; // 'gpt-4o-mini', 'claude-3-haiku', etc. + taxonomy_version: string; // 'v5.1' + profile: 'lite' | 'core' | 'standard' | 'full'; + max_spans_per_review: number; // Default: 10 + }; +} + +interface ReviewToClassify { + source: string; + review_id: string; + review_version: number; + business_id: string; + place_id: string; + text: string; // Original text (offsets reference this) + text_normalized: string; // For LLM input + rating: number; + review_time: string; +} +``` + +### Output Contract + +```typescript +interface Stage2Output { + // Batch metadata + batch_id: string; + taxonomy_version: string; + model_version: string; + prompt_version: string; + + // Classified reviews + reviews_classified: ClassifiedReview[]; + + // Stats + stats: { + input_count: number; + success_count: number; + error_count: number; + total_spans: number; + avg_spans_per_review: number; + llm_tokens_used: number; + llm_cost_usd: number; + }; +} + +interface ClassifiedReview { + // Identity + source: string; + review_id: string; + review_version: number; + + // Review-level classification (derived from primary span) + urt_primary: string; // Tier-3 code: 'J1.01' + urt_secondary: string[]; // Max 2 + valence: 'V+' | 'V-' | 'V0' | 'V±'; + intensity: 'I1' | 'I2' | 'I3'; + comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S'; + + // Extracted entities (from spans) + staff_mentions: string[]; + quotes: Record; // code -> quote + + // Trust score + trust_score: number; // 0.2 to 1.0 + + // Embedding + embedding: number[]; // 384-dim vector + + // Spans + spans: ExtractedSpan[]; + + // Processing metadata + classification_confidence: Record; + processing_time_ms: number; +} + +interface ExtractedSpan { + // Identity + span_id: string; // Deterministic: 'SPN-{hash}' + span_index: number; // 0-based within review + + // Position (offsets into original text, NOT normalized) + span_text: string; // Exact substring + span_start: number; // Character offset + span_end: number; // Character offset (exclusive) + + // Classification + profile: 'lite' | 'core' | 'standard' | 'full'; + urt_primary: string; + urt_secondary: string[]; + valence: 'V+' | 'V-' | 'V0' | 'V±'; + intensity: 'I1' | 'I2' | 'I3'; + comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S'; + + // Extended (standard/full profile) + specificity: 'S1' | 'S2' | 'S3'; + actionability: 'A1' | 'A2' | 'A3'; + temporal: 'TC' | 'TR' | 'TH' | 'TF'; + evidence: 'ES' | 'EI' | 'EC'; + + // Entity + entity?: string; + entity_type?: 'location' | 'staff' | 'product' | 'process' | 'time' | 'other'; + entity_normalized?: string; + + // Causal (full profile) + relation_type?: 'cause_of' | 'effect_of' | 'contrast' | 'resolution'; + related_span_index?: number; + causal_chain?: CausalLink[]; + + // Metadata + confidence: 'high' | 'medium' | 'low'; + usn: string; // URT Semantic Notation + + // Flags + is_primary: boolean; // Primary span for this review +} + +interface CausalLink { + code: string; + role: 'cause' | 'effect' | 'context' | 'outcome'; + order: number; +} +``` + +### Database Writes + +```sql +-- Update reviews_enriched with classification +UPDATE reviews_enriched SET + urt_primary = $urt_primary, + urt_secondary = $urt_secondary, + valence = $valence, + intensity = $intensity, + comparative = $comparative, + staff_mentions = $staff_mentions, + quotes = $quotes, + embedding = $embedding, + trust_score = $trust_score, + classification_model = $model_version, + classification_confidence = $confidence, + taxonomy_version = $taxonomy_version, + processed_at = NOW() +WHERE source = $source + AND review_id = $review_id + AND review_version = $review_version; + +-- Insert spans +INSERT INTO review_spans ( + span_id, business_id, place_id, source, review_id, review_version, + span_index, span_text, span_start, span_end, + profile, urt_primary, urt_secondary, valence, intensity, comparative, + specificity, actionability, temporal, evidence, + entity, entity_type, entity_normalized, + relation_type, related_span_id, causal_chain, + is_primary, is_active, review_time, + confidence, usn, taxonomy_version, model_version, ingest_batch_id +) VALUES (...); +``` + +### Validation Rules + +| Rule | Check | Error Code | +|------|-------|------------| +| V2.1 | `urt_primary` matches `^[OPJEAVR][1-4]\.[0-9]{2}$` | `STAGE2_INVALID_URT_CODE` | +| V2.2 | `urt_secondary` has max 2 elements | `STAGE2_TOO_MANY_SECONDARY` | +| V2.3 | `valence` is valid enum value | `STAGE2_INVALID_VALENCE` | +| V2.4 | `intensity` is valid enum value | `STAGE2_INVALID_INTENSITY` | +| V2.5 | `span_end > span_start` | `STAGE2_INVALID_SPAN_BOUNDS` | +| V2.6 | `span_text == text[span_start:span_end]` | `STAGE2_SPAN_TEXT_MISMATCH` | +| V2.7 | Spans do not overlap | `STAGE2_OVERLAPPING_SPANS` | +| V2.8 | Exactly one `is_primary = true` per review | `STAGE2_PRIMARY_SPAN_COUNT` | +| V2.9 | `trust_score` >= 0.2 and <= 1.0 | `STAGE2_INVALID_TRUST` | +| V2.10 | `embedding` is 384-dim array | `STAGE2_INVALID_EMBEDDING` | +| V2.11 | `usn` matches profile-specific regex | `STAGE2_INVALID_USN` | +| V2.12 | `related_span_index` references valid span | `STAGE2_INVALID_RELATION` | + +### Validation Script + +```python +def validate_stage2_output(output: Stage2Output, input_reviews: dict) -> ValidationResult: + """ + Validate Stage 2 classification output. + input_reviews: dict mapping (source, review_id, version) -> original text + """ + errors = [] + + for review in output.reviews_classified: + # V2.1: Valid URT code + if not re.match(r'^[OPJEAVR][1-4]\.[0-9]{2}$', review.urt_primary): + errors.append(ValidationError('V2.1', review.review_id, f'Invalid code: {review.urt_primary}')) + + # V2.2: Max 2 secondary codes + if len(review.urt_secondary) > 2: + errors.append(ValidationError('V2.2', review.review_id, 'Too many secondary codes')) + + # Get original text for span validation + key = (review.source, review.review_id, review.review_version) + original_text = input_reviews.get(key, {}).get('text', '') + + primary_count = 0 + span_ranges = [] + + for span in review.spans: + # V2.5: Valid bounds + if span.span_end <= span.span_start: + errors.append(ValidationError('V2.5', span.span_id, 'Invalid bounds')) + + # V2.6: Text matches + expected_text = original_text[span.span_start:span.span_end] + if span.span_text != expected_text: + errors.append(ValidationError('V2.6', span.span_id, + f'Text mismatch: expected "{expected_text[:30]}..." got "{span.span_text[:30]}..."')) + + # V2.7: No overlap (check against previous spans) + for prev_start, prev_end in span_ranges: + if not (span.span_end <= prev_start or span.span_start >= prev_end): + errors.append(ValidationError('V2.7', span.span_id, 'Overlapping span')) + span_ranges.append((span.span_start, span.span_end)) + + # V2.8: Count primaries + if span.is_primary: + primary_count += 1 + + if primary_count != 1: + errors.append(ValidationError('V2.8', review.review_id, f'Primary count: {primary_count}')) + + # V2.9: Trust score bounds + if not (0.2 <= review.trust_score <= 1.0): + errors.append(ValidationError('V2.9', review.review_id, f'Trust: {review.trust_score}')) + + # V2.10: Embedding dimension + if len(review.embedding) != 384: + errors.append(ValidationError('V2.10', review.review_id, f'Embedding dim: {len(review.embedding)}')) + + return ValidationResult( + stage='stage2', + passed=len(errors) == 0, + error_count=len(errors), + errors=errors + ) +``` + +### Handoff to Stage 3 + +Stage 3 queries: +```sql +SELECT rs.* +FROM review_spans rs +WHERE rs.is_active = TRUE + AND rs.valence IN ('V-', 'V±') -- Only negative/mixed create issues + AND NOT EXISTS ( + SELECT 1 FROM issue_spans iss WHERE iss.span_id = rs.span_id + ) +ORDER BY rs.review_time DESC; +``` + +--- + +## Stage 3: Issue Routing + +**Status**: ❌ NOT IMPLEMENTED +**Owner**: TBD +**Depends On**: Stage 2 + +### Purpose + +Route classified spans to issues (create new or aggregate to existing). + +### Input Contract + +```typescript +interface Stage3Input { + // Spans to route (from Stage 2) + spans: SpanToRoute[]; +} + +interface SpanToRoute { + span_id: string; + business_id: string; + place_id: string; + urt_primary: string; + valence: string; + intensity: string; + entity_normalized?: string; + review_time: string; + confidence: string; + + // For trust weighting + trust_score: number; +} +``` + +### Output Contract + +```typescript +interface Stage3Output { + // Routing results + routed_spans: RoutedSpan[]; + + // Issues created/updated + issues_created: string[]; // issue_ids + issues_updated: string[]; // issue_ids + + // Stats + stats: { + spans_processed: number; + spans_routed: number; + spans_skipped: number; // Positive valence, etc. + issues_created: number; + issues_updated: number; + }; +} + +interface RoutedSpan { + span_id: string; + issue_id: string; + routing_key: string; // business|place|code|entity + is_new_issue: boolean; +} +``` + +### Issue ID Generation + +```typescript +// Deterministic issue ID from routing key +function generateIssueId( + business_id: string, + place_id: string, + urt_primary: string, + entity_normalized: string | null +): string { + const key = `${business_id}|${place_id}|${urt_primary}|${entity_normalized || ''}`; + const hash = sha256(key); + return `ISS-${hash.substring(0, 16)}`; +} +``` + +### Database Writes + +```sql +-- Create or update issue +INSERT INTO issues ( + issue_id, business_id, place_id, primary_subcode, domain, + state, priority_score, confidence_score, span_count, max_intensity, + entity, entity_normalized, taxonomy_version +) VALUES (...) +ON CONFLICT (issue_id) DO UPDATE SET + span_count = issues.span_count + 1, + max_intensity = GREATEST(issues.max_intensity, EXCLUDED.max_intensity), + updated_at = NOW(); + +-- Link span to issue +INSERT INTO issue_spans ( + issue_id, span_id, source, review_id, review_version, + is_primary_match, intensity, review_time +) VALUES (...) +ON CONFLICT (span_id) DO NOTHING; -- 1:1 mapping + +-- Log event +INSERT INTO issue_events (issue_id, event_type, span_id, ...) +VALUES (...); +``` + +### Validation Rules + +| Rule | Check | Error Code | +|------|-------|------------| +| V3.1 | `issue_id` matches `^ISS-[a-f0-9]{16}$` | `STAGE3_INVALID_ISSUE_ID` | +| V3.2 | `routing_key` is non-empty | `STAGE3_EMPTY_ROUTING_KEY` | +| V3.3 | Span not already linked to different issue | `STAGE3_DUPLICATE_ROUTING` | +| V3.4 | Issue exists in `issues` table | `STAGE3_ORPHAN_SPAN_LINK` | +| V3.5 | Only V-/V± spans create issues | `STAGE3_POSITIVE_ROUTED` | + +### Validation Script + +```python +def validate_stage3_output(output: Stage3Output, db: Database) -> ValidationResult: + errors = [] + + for routed in output.routed_spans: + # V3.1: Valid issue ID format + if not re.match(r'^ISS-[a-f0-9]{16}$', routed.issue_id): + errors.append(ValidationError('V3.1', routed.span_id, 'Invalid issue_id')) + + # V3.2: Non-empty routing key + if not routed.routing_key: + errors.append(ValidationError('V3.2', routed.span_id, 'Empty routing key')) + + # V3.3: Check no duplicate routing + existing = db.query( + "SELECT issue_id FROM issue_spans WHERE span_id = %s", + [routed.span_id] + ) + if existing and existing['issue_id'] != routed.issue_id: + errors.append(ValidationError('V3.3', routed.span_id, 'Already routed elsewhere')) + + # V3.4: Issue exists + issue_exists = db.query( + "SELECT 1 FROM issues WHERE issue_id = %s", + [routed.issue_id] + ) + if not issue_exists: + errors.append(ValidationError('V3.4', routed.span_id, 'Orphan issue_id')) + + return ValidationResult( + stage='stage3', + passed=len(errors) == 0, + error_count=len(errors), + errors=errors + ) +``` + +### Handoff to Stage 4 + +Stage 4 runs as scheduled job, reads from all tables. + +--- + +## Stage 4: Fact Aggregation + +**Status**: ❌ NOT IMPLEMENTED +**Owner**: TBD +**Depends On**: Stages 1, 2, 3 +**Schedule**: Daily batch job + +### Purpose + +Pre-aggregate span/review data into `fact_timeseries` for fast dashboard queries. + +### Input Contract + +```typescript +interface Stage4Input { + business_id: string; + date: string; // YYYY-MM-DD + bucket_types: ('day' | 'week' | 'month')[]; + taxonomy_version: string; +} +``` + +### Output Contract + +```typescript +interface Stage4Output { + facts_written: FactRecord[]; + + stats: { + business_id: string; + date: string; + locations_processed: number; + codes_aggregated: number; + facts_upserted: number; + }; +} + +interface FactRecord { + // Keys + business_id: string; + place_id: string; // Or 'ALL' for rollup + period_date: string; + bucket_type: string; + subject_type: 'overall' | 'urt_code' | 'domain' | 'issue'; + subject_id: string; + taxonomy_version: string; + + // Metrics + review_count: number; + span_count: number; + negative_count: number; + positive_count: number; + neutral_count: number; + mixed_count: number; + strength_score: number; + negative_strength: number; + positive_strength: number; + avg_rating: number; + i1_count: number; + i2_count: number; + i3_count: number; + cr_better: number; + cr_worse: number; + cr_same: number; + trust_weighted_strength: number; + trust_weighted_negative: number; +} +``` + +### Database Writes + +```sql +INSERT INTO fact_timeseries ( + business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version, + review_count, span_count, negative_count, positive_count, ... +) VALUES (...) +ON CONFLICT (business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version) +DO UPDATE SET + review_count = EXCLUDED.review_count, + span_count = EXCLUDED.span_count, + ... + computed_at = NOW(); +``` + +### Validation Rules + +| Rule | Check | Error Code | +|------|-------|------------| +| V4.1 | `place_id` is valid or 'ALL' | `STAGE4_INVALID_PLACE` | +| V4.2 | `period_date` matches bucket | `STAGE4_DATE_BUCKET_MISMATCH` | +| V4.3 | `span_count >= review_count` (reviews have >=1 span) | `STAGE4_COUNT_MISMATCH` | +| V4.4 | `negative_count + positive_count + neutral_count + mixed_count == span_count` | `STAGE4_VALENCE_SUM` | +| V4.5 | `i1_count + i2_count + i3_count == span_count` | `STAGE4_INTENSITY_SUM` | +| V4.6 | `strength_score >= 0` | `STAGE4_NEGATIVE_STRENGTH` | +| V4.7 | `avg_rating` between 1.0 and 5.0 (or NULL) | `STAGE4_INVALID_RATING` | + +### Validation Script + +```python +def validate_stage4_output(output: Stage4Output) -> ValidationResult: + errors = [] + + for fact in output.facts_written: + # V4.3: Span count >= review count + if fact.span_count < fact.review_count: + errors.append(ValidationError('V4.3', f'{fact.subject_id}', 'span < review count')) + + # V4.4: Valence sum + valence_sum = fact.negative_count + fact.positive_count + fact.neutral_count + fact.mixed_count + if valence_sum != fact.span_count: + errors.append(ValidationError('V4.4', f'{fact.subject_id}', + f'Valence sum {valence_sum} != {fact.span_count}')) + + # V4.5: Intensity sum + intensity_sum = fact.i1_count + fact.i2_count + fact.i3_count + if intensity_sum != fact.span_count: + errors.append(ValidationError('V4.5', f'{fact.subject_id}', + f'Intensity sum {intensity_sum} != {fact.span_count}')) + + # V4.7: Rating bounds + if fact.avg_rating is not None and not (1.0 <= fact.avg_rating <= 5.0): + errors.append(ValidationError('V4.7', f'{fact.subject_id}', + f'Invalid rating: {fact.avg_rating}')) + + return ValidationResult( + stage='stage4', + passed=len(errors) == 0, + error_count=len(errors), + errors=errors + ) +``` + +--- + +## Integration Tests + +### Test 1: Stage 0 → Stage 1 Handoff + +```python +def test_stage0_to_stage1(): + """Verify Stage 0 output is valid Stage 1 input.""" + + # Get Stage 0 output + stage0_output = db.query("SELECT reviews_data FROM jobs WHERE job_id = %s", [JOB_ID]) + + # Validate as Stage 1 input + stage1_input = Stage1Input.parse(stage0_output) + + # Check all required fields present + assert stage1_input.job_id is not None + assert stage1_input.business_id is not None + assert len(stage1_input.reviews) > 0 + + for review in stage1_input.reviews: + assert review.review_id is not None + assert review.rating in [1, 2, 3, 4, 5] + assert review.review_time is not None +``` + +### Test 2: Stage 1 → Stage 2 Handoff + +```python +def test_stage1_to_stage2(): + """Verify Stage 1 output can be classified.""" + + # Get Stage 1 output from DB + reviews = db.query(""" + SELECT * FROM reviews_enriched + WHERE urt_primary IS NULL AND is_latest = TRUE + LIMIT 10 + """) + + # Build Stage 2 input + stage2_input = Stage2Input( + reviews=[ReviewToClassify.from_db(r) for r in reviews], + config=ClassificationConfig( + model='gpt-4o-mini', + taxonomy_version='v5.1', + profile='standard' + ) + ) + + # Validate input + for review in stage2_input.reviews: + assert review.text is not None + assert len(review.text) > 0 + assert review.text_normalized is not None +``` + +### Test 3: Stage 2 → Stage 3 Handoff + +```python +def test_stage2_to_stage3(): + """Verify classified spans can be routed.""" + + # Get unrouted negative spans + spans = db.query(""" + SELECT * FROM review_spans + WHERE is_active = TRUE + AND valence IN ('V-', 'V±') + AND NOT EXISTS (SELECT 1 FROM issue_spans WHERE span_id = review_spans.span_id) + LIMIT 10 + """) + + # Build Stage 3 input + stage3_input = Stage3Input( + spans=[SpanToRoute.from_db(s) for s in spans] + ) + + # Validate + for span in stage3_input.spans: + assert span.span_id is not None + assert span.business_id is not None + assert span.urt_primary is not None + assert span.valence in ['V-', 'V±'] +``` + +### Test 4: End-to-End Pipeline + +```python +def test_e2e_pipeline(): + """Full pipeline integration test.""" + + # Stage 0: Scrape + job_id = submit_scrape_job(TEST_URL) + wait_for_job(job_id) + stage0_result = validate_stage0_output(get_job(job_id)) + assert stage0_result.passed + + # Stage 1: Normalize + stage1_output = run_stage1(job_id) + stage1_result = validate_stage1_output(stage1_output) + assert stage1_result.passed + + # Stage 2: Classify + stage2_output = run_stage2(stage1_output.reviews_normalized) + stage2_result = validate_stage2_output(stage2_output, stage1_output) + assert stage2_result.passed + + # Stage 3: Route + negative_spans = [s for r in stage2_output.reviews_classified + for s in r.spans if s.valence in ['V-', 'V±']] + stage3_output = run_stage3(negative_spans) + stage3_result = validate_stage3_output(stage3_output, db) + assert stage3_result.passed + + # Stage 4: Aggregate + stage4_output = run_stage4(TEST_BUSINESS_ID, today(), ['day']) + stage4_result = validate_stage4_output(stage4_output) + assert stage4_result.passed + + # Final assertion: Facts exist + facts = db.query(""" + SELECT COUNT(*) FROM fact_timeseries + WHERE business_id = %s AND period_date = %s + """, [TEST_BUSINESS_ID, today()]) + assert facts['count'] > 0 +``` + +--- + +## Test Fixtures + +### Fixture 1: Sample Raw Review (Stage 0 Output) + +```json +{ + "job_id": "test-job-001", + "status": "completed", + "business_id": "acme-corp", + "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4", + "business_info": { + "name": "Acme Restaurant", + "address": "123 Main St, Anytown, USA", + "category": "Restaurant", + "total_reviews": 1247, + "average_rating": 4.2 + }, + "reviews": [ + { + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB", + "author_name": "John Smith", + "author_id": "103456789012345678901", + "rating": 2, + "text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.", + "review_time": "2026-01-20T14:30:00Z", + "response_text": null, + "photos": [], + "raw_payload": {} + } + ], + "scrape_time_ms": 12500, + "reviews_scraped": 1, + "scraper_version": "v1.0.0" +} +``` + +### Fixture 2: Normalized Review (Stage 1 Output) + +```json +{ + "source": "google", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB", + "review_version": 1, + "business_id": "acme-corp", + "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4", + "text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.", + "text_normalized": "the food was great but the wait was absolutely terrible we waited 45 minutes just to be seated and another 30 minutes for our appetizers the server mike was rude and dismissive when we complained however the steak was cooked perfectly and the dessert was amazing", + "text_language": "en", + "text_length": 267, + "word_count": 52, + "rating": 2, + "review_time": "2026-01-20T14:30:00Z", + "author_name": "John Smith", + "content_hash": "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd", + "raw_id": 12345 +} +``` + +### Fixture 3: Classified Review with Spans (Stage 2 Output) + +```json +{ + "source": "google", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB", + "review_version": 1, + "urt_primary": "J1.01", + "urt_secondary": ["P1.02"], + "valence": "V±", + "intensity": "I3", + "comparative": "CR-N", + "staff_mentions": ["Mike"], + "quotes": { + "J1.01": "waited 45 minutes just to be seated", + "P1.02": "rude and dismissive" + }, + "trust_score": 0.85, + "embedding": [0.123, -0.456, 0.789, "... 384 values ..."], + "spans": [ + { + "span_id": "SPN-a1b2c3d4e5f67890", + "span_index": 0, + "span_text": "The food was great", + "span_start": 0, + "span_end": 18, + "profile": "standard", + "urt_primary": "O1.01", + "urt_secondary": [], + "valence": "V+", + "intensity": "I2", + "comparative": "CR-N", + "specificity": "S1", + "actionability": "A1", + "temporal": "TC", + "evidence": "ES", + "confidence": "high", + "usn": "URT:S:O1.01:+2:11TC.ES.N", + "is_primary": false + }, + { + "span_id": "SPN-b2c3d4e5f6789012", + "span_index": 1, + "span_text": "the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers", + "span_start": 23, + "span_end": 138, + "profile": "standard", + "urt_primary": "J1.01", + "urt_secondary": [], + "valence": "V-", + "intensity": "I3", + "comparative": "CR-N", + "specificity": "S3", + "actionability": "A2", + "temporal": "TC", + "evidence": "EC", + "confidence": "high", + "usn": "URT:S:J1.01:-3:32TC.EC.N", + "is_primary": true + }, + { + "span_id": "SPN-c3d4e5f678901234", + "span_index": 2, + "span_text": "The server Mike was rude and dismissive when we complained", + "span_start": 140, + "span_end": 198, + "profile": "standard", + "urt_primary": "P1.02", + "urt_secondary": [], + "valence": "V-", + "intensity": "I2", + "comparative": "CR-N", + "specificity": "S2", + "actionability": "A2", + "temporal": "TC", + "evidence": "ES", + "entity": "Mike", + "entity_type": "staff", + "entity_normalized": "mike", + "confidence": "high", + "usn": "URT:S:P1.02:-2:22TC.ES.N", + "is_primary": false + }, + { + "span_id": "SPN-d4e5f67890123456", + "span_index": 3, + "span_text": "the steak was cooked perfectly and the dessert was amazing", + "span_start": 209, + "span_end": 267, + "profile": "standard", + "urt_primary": "O1.01", + "urt_secondary": [], + "valence": "V+", + "intensity": "I2", + "comparative": "CR-N", + "specificity": "S2", + "actionability": "A1", + "temporal": "TC", + "evidence": "ES", + "confidence": "high", + "usn": "URT:S:O1.01:+2:21TC.ES.N", + "is_primary": false + } + ] +} +``` + +### Fixture 4: Routed Spans (Stage 3 Output) + +```json +{ + "routed_spans": [ + { + "span_id": "SPN-b2c3d4e5f6789012", + "issue_id": "ISS-7a8b9c0d1e2f3a4b", + "routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|J1.01|", + "is_new_issue": true + }, + { + "span_id": "SPN-c3d4e5f678901234", + "issue_id": "ISS-2b3c4d5e6f7a8b9c", + "routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|P1.02|mike", + "is_new_issue": true + } + ], + "issues_created": ["ISS-7a8b9c0d1e2f3a4b", "ISS-2b3c4d5e6f7a8b9c"], + "issues_updated": [], + "stats": { + "spans_processed": 4, + "spans_routed": 2, + "spans_skipped": 2, + "issues_created": 2, + "issues_updated": 0 + } +} +``` + +### Fixture 5: Aggregated Facts (Stage 4 Output) + +```json +{ + "facts_written": [ + { + "business_id": "acme-corp", + "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4", + "period_date": "2026-01-20", + "bucket_type": "day", + "subject_type": "urt_code", + "subject_id": "J1.01", + "taxonomy_version": "v5.1", + "review_count": 1, + "span_count": 1, + "negative_count": 1, + "positive_count": 0, + "neutral_count": 0, + "mixed_count": 0, + "strength_score": 4.0, + "negative_strength": 4.0, + "positive_strength": 0.0, + "avg_rating": 2.0, + "i1_count": 0, + "i2_count": 0, + "i3_count": 1, + "cr_better": 0, + "cr_worse": 0, + "cr_same": 0, + "trust_weighted_strength": 3.4, + "trust_weighted_negative": 3.4 + } + ], + "stats": { + "business_id": "acme-corp", + "date": "2026-01-20", + "locations_processed": 1, + "codes_aggregated": 3, + "facts_upserted": 5 + } +} +``` + +--- + +## Validation Summary Table + +| Stage | Input From | Output To | Validation Rules | Test Coverage | +|-------|------------|-----------|------------------|---------------| +| 0 | Google Maps | jobs.reviews_data | V0.1-V0.5 | Unit + E2E | +| 1 | Stage 0 | reviews_raw, reviews_enriched | V1.1-V1.6 | Unit + Integration | +| 2 | Stage 1 | reviews_enriched, review_spans | V2.1-V2.12 | Unit + Integration | +| 3 | Stage 2 | issues, issue_spans, issue_events | V3.1-V3.5 | Unit + Integration | +| 4 | All stages | fact_timeseries | V4.1-V4.7 | Unit + E2E | + +--- + +## Document Control + +| Field | Value | +|-------|-------| +| **Document** | ReviewIQ Pipeline Contracts v1.0 | +| **Status** | Implementation Blueprint | +| **Date** | 2026-01-24 | +| **Purpose** | Enable parallel development with stage validation | + +--- + +*End of Pipeline Contracts Document* diff --git a/.artifacts/ReviewIQ-Pipeline-DevGuide.md b/.artifacts/ReviewIQ-Pipeline-DevGuide.md new file mode 100644 index 0000000..1b47f31 --- /dev/null +++ b/.artifacts/ReviewIQ-Pipeline-DevGuide.md @@ -0,0 +1,312 @@ +# ReviewIQ Pipeline Development Guide + +**Purpose**: Entry point for agents implementing the enrichment pipeline +**Last Updated**: 2026-01-24 + +--- + +## TL;DR - Current State + +**Pipeline Implementation: ~55% complete** + +``` +✅ WORKING ❌ NOT IMPLEMENTED +────────── ────────────────── +Google Maps scraping Stage 1: Normalization +Job orchestration Stage 2: LLM Classification +Chrome worker pool Stage 3: Issue Routing +Webhook delivery Stage 4: Fact Aggregation +SSE streaming Enrichment database schema +Frontend (job management) Advanced analytics UI +``` + +**Estimated effort to 100%**: 6-8 weeks + +--- + +## Cold Start Instructions + +A new agent should: + +| Step | Action | Time | +|------|--------|------| +| 1 | Read this file (`ReviewIQ-Pipeline-DevGuide.md`) | 2 min | +| 2 | Read `ReviewIQ-v32-Decisions.md` | 5 min | +| 3 | Read `ReviewIQ-Codebase-Overview.md` | 10 min | +| 4 | Read assigned stage in `ReviewIQ-Pipeline-Contracts-v1.md` | 15 min | +| 5 | Use `ReviewIQ-Pipeline-Checklist.md` to verify completion | Reference | + +--- + +## Document Map + +``` + ┌─────────────────────────────────────┐ + │ ReviewIQ-Pipeline-DevGuide.md │ + │ (YOU ARE HERE) │ + └─────────────────┬───────────────────┘ + │ + ┌─────────────────────────────┼─────────────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌─────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────┐ +│ CONTEXT RECOVERY │ │ IMPLEMENTATION │ │ REFERENCE │ +├─────────────────────┤ ├─────────────────────────┤ ├─────────────────────┤ +│ │ │ │ │ │ +│ ReviewIQ-v32- │ │ Pipeline-Contracts-v1 │ │ Architecture-v3.2 │ +│ Decisions.md │ │ (I/O specs, validation) │ │ (full DDL spec) │ +│ (key decisions, │ │ │ │ │ +│ markpoint) │ │ Pipeline-Checklist │ │ v3.2.1-Taxonomy- │ +│ │ │ (implementation tasks) │ │ Versioning │ +│ Codebase-Overview │ │ │ │ (versioning spec) │ +│ (file structure, │ │ LLM-Classification- │ │ │ +│ integration points) │ │ Contract-v1 │ │ URT-v5.1-Reference │ +│ │ │ (prompt engineering) │ │ (dimension codes) │ +└─────────────────────┘ └─────────────────────────┘ └─────────────────────┘ +``` + +--- + +## Core Documents + +### Context & Status (Read First) + +| File | Purpose | Est. Read Time | +|------|---------|----------------| +| `ReviewIQ-Pipeline-DevGuide.md` | Entry point, document map | 2 min | +| `ReviewIQ-v32-Decisions.md` | Key decisions, current markpoint | 5 min | +| `ReviewIQ-Codebase-Overview.md` | File structure, what code exists, integration points | 10 min | + +### Implementation Guides (For Building) + +| File | Purpose | Est. Read Time | +|------|---------|----------------| +| `ReviewIQ-Pipeline-Contracts-v1.md` | Stage I/O specs, validation rules, test fixtures | 15 min | +| `ReviewIQ-Pipeline-Checklist.md` | Per-stage implementation checklist, definition of done | 5 min | +| `LLM-Classification-Contract-v1.md` | LLM prompt engineering spec (Stage 2) | 10 min | + +### Full Specifications (Reference) + +| File | Purpose | When to Read | +|------|---------|--------------| +| `ReviewIQ-Architecture-v3.2.md` | Complete v3.2 spec with DDL | Schema details | +| `ReviewIQ-v3.2.1-Taxonomy-Versioning.md` | Taxonomy versioning addendum | Future-proofing | +| `URT-v5.1-Reference.md` | URT dimension codes reference | Classification reference | + +### Legacy (Superseded - Reference Only) + +| File | Note | +|------|------| +| `ReviewIQ-Architecture-v2.md` | Superseded by v3.2 | +| `ReviewIQ-Architecture-v3.md` | Superseded by v3.2 | +| `ReviewIQ-Architecture-v3.1.md` | Superseded by v3.2 | +| `CONTEXT-KEEPER.md` | Use `ReviewIQ-v32-Decisions.md` instead | + +--- + +## What's Captured in Artifacts + +| Context | Document | +|---------|----------| +| Key architectural decisions | `ReviewIQ-v32-Decisions.md` | +| Current implementation status (~55%) | `ReviewIQ-Codebase-Overview.md` | +| Existing file structure | `ReviewIQ-Codebase-Overview.md` | +| Integration points (where new code connects) | `ReviewIQ-Codebase-Overview.md` | +| Stage input/output contracts | `ReviewIQ-Pipeline-Contracts-v1.md` | +| Validation rules (35 total across stages) | `ReviewIQ-Pipeline-Contracts-v1.md` | +| Test fixtures (5 sample JSON payloads) | `ReviewIQ-Pipeline-Contracts-v1.md` | +| Implementation checklists | `ReviewIQ-Pipeline-Checklist.md` | +| Definition of done per stage | `ReviewIQ-Pipeline-Checklist.md` | +| LLM prompt specification | `LLM-Classification-Contract-v1.md` | +| URT taxonomy codes | `URT-v5.1-Reference.md` | +| Full database DDL | `ReviewIQ-Architecture-v3.2.md` | +| Taxonomy versioning schema | `ReviewIQ-v3.2.1-Taxonomy-Versioning.md` | + +--- + +## Pipeline Stages + +| Stage | Name | Status | Contract Section | Validation Rules | +|-------|------|--------|------------------|------------------| +| 0 | Raw Ingestion | ✅ Done | Pipeline-Contracts § Stage 0 | V0.1-V0.5 | +| 1 | Normalization | ❌ TODO | Pipeline-Contracts § Stage 1 | V1.1-V1.6 | +| 2 | LLM Classification | ❌ TODO | Pipeline-Contracts § Stage 2 | V2.1-V2.12 | +| 3 | Issue Routing | ❌ TODO | Pipeline-Contracts § Stage 3 | V3.1-V3.5 | +| 4 | Fact Aggregation | ❌ TODO | Pipeline-Contracts § Stage 4 | V4.1-V4.7 | + +--- + +## Parallel Development Assignment + +### Agent 1 - Stage 1 (Normalization) +``` +Read: + - ReviewIQ-Pipeline-Contracts-v1.md § Stage 1 + - ReviewIQ-Codebase-Overview.md (integration points) + +Create: + - pipeline/stage1_normalize.py + - migrations/005_create_reviews_tables.sql + - pipeline/tests/test_stage1.py + +Validate: + - V1.1-V1.6 rules pass + - Integration test: Stage 0 → Stage 1 passes +``` + +### Agent 2 - Stage 2 (LLM Classification) +``` +Read: + - ReviewIQ-Pipeline-Contracts-v1.md § Stage 2 + - LLM-Classification-Contract-v1.md + - URT-v5.1-Reference.md + +Create: + - pipeline/stage2_classify.py + - pipeline/llm_client.py + - pipeline/span_extractor.py + - migrations/006_create_spans_table.sql + - migrations/007_create_urt_enums.sql + - pipeline/tests/test_stage2.py + +Validate: + - V2.1-V2.12 rules pass + - Integration test: Stage 1 → Stage 2 passes +``` + +### Agent 3 - Stage 3 (Issue Routing) +``` +Read: + - ReviewIQ-Pipeline-Contracts-v1.md § Stage 3 + - ReviewIQ-Architecture-v3.2.md § Part 5 (issue lifecycle) + +Create: + - pipeline/stage3_route.py + - pipeline/issue_manager.py + - migrations/008_create_issues_tables.sql + - pipeline/tests/test_stage3.py + +Validate: + - V3.1-V3.5 rules pass + - Integration test: Stage 2 → Stage 3 passes +``` + +### Agent 4 - Stage 4 (Fact Aggregation) +``` +Read: + - ReviewIQ-Pipeline-Contracts-v1.md § Stage 4 + - ReviewIQ-Architecture-v3.2.md § Part 6 (analytics) + +Create: + - pipeline/stage4_aggregate.py + - migrations/009_create_facts_table.sql + - pipeline/tests/test_stage4.py + +Validate: + - V4.1-V4.7 rules pass + - E2E pipeline test passes +``` + +--- + +## Success Criteria + +Pipeline is complete when: + +```bash +python -m pipeline.validate --job-id --verbose + +# Expected output: +Stage 0: ✅ PASS (5/5 rules) +Stage 1: ✅ PASS (6/6 rules) +Stage 2: ✅ PASS (12/12 rules) +Stage 3: ✅ PASS (5/5 rules) +Stage 4: ✅ PASS (7/7 rules) +E2E Integration: ✅ PASS +``` + +--- + +## Quick Commands + +```bash +# Check current branch +git branch --show-current +# Expected: feature/platform-restructure + +# View recent commits +git log --oneline -5 + +# Start database +docker-compose -f docker-compose.production.yml up -d postgres + +# Run API server +python api_server_production.py + +# Run frontend +cd frontend && npm run dev + +# Run migrations (when created) +psql $DATABASE_URL -f migrations/005_create_reviews_tables.sql + +# Run tests +pytest pipeline/tests/ -v + +# Validate pipeline +python -m pipeline.validate --job-id +``` + +--- + +## Environment Variables + +```bash +# Database (required) +DATABASE_URL=postgresql://user:pass@localhost:5432/reviewiq + +# LLM Provider (Stage 2) +OPENAI_API_KEY=sk-... +# OR +ANTHROPIC_API_KEY=sk-ant-... + +# Embedding model (Stage 2) +EMBEDDING_MODEL=all-MiniLM-L6-v2 + +# Taxonomy version +DEFAULT_TAXONOMY_VERSION=v5.1 +``` + +--- + +## File Structure After Implementation + +``` +google-reviews-scraper-pro/ +├── .artifacts/ # ← Design documents +│ ├── ReviewIQ-Pipeline-DevGuide.md # ← START HERE (for pipeline work) +│ ├── ReviewIQ-v32-Decisions.md +│ ├── ReviewIQ-Codebase-Overview.md +│ ├── ReviewIQ-Pipeline-Contracts-v1.md +│ ├── ReviewIQ-Pipeline-Checklist.md +│ └── ... +│ +├── api_server_production.py # ✅ Exists - Main API +├── core/database.py # ✅ Exists - DB layer +├── scrapers/google_reviews/ # ✅ Exists - Scraper +│ +├── pipeline/ # ❌ TO CREATE +│ ├── stage1_normalize.py +│ ├── stage2_classify.py +│ ├── stage3_route.py +│ ├── stage4_aggregate.py +│ ├── llm_client.py +│ └── tests/ +│ +└── migrations/ + ├── 001-004 # ✅ Exists + └── 005-009 # ❌ TO CREATE +``` + +--- + +*Keep this guide updated when adding new artifacts or completing stages.* diff --git a/.artifacts/ReviewIQ-v3.2.1-Taxonomy-Versioning.md b/.artifacts/ReviewIQ-v3.2.1-Taxonomy-Versioning.md new file mode 100644 index 0000000..5221127 --- /dev/null +++ b/.artifacts/ReviewIQ-v3.2.1-Taxonomy-Versioning.md @@ -0,0 +1,1107 @@ +# ReviewIQ v3.2.1 Addendum: Taxonomy Versioning + +**Version**: 3.2.1 +**Status**: Draft Specification +**Date**: 2026-01-24 +**Extends**: ReviewIQ Architecture v3.2.0 + +--- + +## Executive Summary + +This addendum introduces **taxonomy versioning** to ReviewIQ, enabling the system to track how URT classifications evolve over time. Classifications become fully contextualized facts that include the taxonomy version used, allowing accurate historical analysis, safe taxonomy evolution, and cross-version trend normalization. + +**Key Additions**: +- `taxonomy_version` column on classification tables +- `urt_taxonomy_versions` table for version metadata +- `urt_codes_versioned` table for versioned code definitions +- `urt_code_mappings` table for cross-version translation +- Translation functions for normalized queries +- Lineage tracking for code evolution + +**Design Principle**: Facts are immutable. A span classified as `J1.01` in taxonomy v5.1 stays that way forever. Translation between versions is explicit and auditable. + +--- + +## Part 1: Problem Statement + +### Why Taxonomy Versioning? + +Without versioning, these scenarios cause data integrity issues: + +| Scenario | Problem | +|----------|---------| +| Code renamed | Historical reports show wrong label | +| Definition changed | Same code means different things over time | +| Code split | Old data can't distinguish new subcategories | +| Code merged | Trend analysis shows artificial drop | +| Code deprecated | Orphaned data with no valid reference | + +### The Three Dimensions of Classification Context + +A classification is only meaningful when you know: + +1. **Taxonomy Version** — What do the codes mean? +2. **Model Version** — How was the text processed? (already in v3.2) +3. **Prompt Version** — What instructions were given? (optional) + +``` +┌─────────────────────────────────────────────────────────────┐ +│ CLASSIFICATION FACT │ +│ │ +│ "At time T, using taxonomy V and model M, │ +│ this text was classified as code C with confidence X" │ +│ │ +│ This fact is IMMUTABLE. │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## Part 2: Schema Additions + +### 2.1 Taxonomy Version Registry + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- TAXONOMY VERSION REGISTRY +-- Tracks taxonomy releases like git tags +-- ═══════════════════════════════════════════════════════════════ + +CREATE TABLE urt_taxonomy_versions ( + version_id TEXT PRIMARY KEY, -- 'v5.1', 'v5.2', 'v6.0' + semver TEXT NOT NULL, -- '5.1.0' for programmatic comparison + + -- Validity period + effective_from DATE NOT NULL, + effective_to DATE, -- NULL = current/open-ended + is_current BOOLEAN NOT NULL DEFAULT FALSE, + + -- Metadata + release_notes TEXT, + changelog_url TEXT, + + -- Statistics (populated on release) + domain_count SMALLINT, + category_count SMALLINT, + subcode_count SMALLINT, + + -- Migration hints + breaking_changes BOOLEAN NOT NULL DEFAULT FALSE, + migration_guide_url TEXT, + predecessor_version TEXT REFERENCES urt_taxonomy_versions(version_id), + + -- Audit + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT, + + -- Ensure semver format + CONSTRAINT chk_semver_format + CHECK (semver ~ '^\d+\.\d+\.\d+(-[a-zA-Z0-9]+)?$') +); + +-- Only one current version allowed +CREATE UNIQUE INDEX uq_taxonomy_current + ON urt_taxonomy_versions(is_current) + WHERE is_current = TRUE; + +CREATE INDEX idx_taxonomy_versions_effective + ON urt_taxonomy_versions(effective_from, effective_to); + +COMMENT ON TABLE urt_taxonomy_versions IS + 'Registry of URT taxonomy versions with validity periods and migration metadata'; +``` + +### 2.2 Versioned Code Definitions + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- VERSIONED CODE DEFINITIONS +-- Full code definitions per taxonomy version (SCD Type 2 style) +-- ═══════════════════════════════════════════════════════════════ + +CREATE TABLE urt_codes_versioned ( + -- Composite primary key + code TEXT NOT NULL, + version_id TEXT NOT NULL REFERENCES urt_taxonomy_versions(version_id), + + -- Classification hierarchy + domain CHAR(1) NOT NULL, + category TEXT NOT NULL, -- 'J1', 'P2', etc. + subcategory TEXT, -- 'J1.01', 'P2.03', etc. + tier SMALLINT NOT NULL, -- 1=domain, 2=category, 3=subcode + + -- Semantics + display_name TEXT NOT NULL, + definition TEXT NOT NULL, + keywords TEXT[] DEFAULT '{}', + + -- Examples (for classifier training/validation) + examples JSONB, + -- Format: { + -- "positive": ["example text 1", "example text 2"], + -- "negative": ["counter-example 1"], + -- "boundary": ["edge case 1"] + -- } + + -- Disambiguation + dont_confuse_with TEXT, -- Another code + dont_confuse_reason TEXT, + + -- Hierarchy links (within same version) + parent_code TEXT, -- Category code if this is subcode + + -- Cross-version lineage + change_type TEXT NOT NULL DEFAULT 'unchanged', + predecessor_codes TEXT[] DEFAULT '{}', -- Codes in previous version this evolved from + deprecation_reason TEXT, + successor_hint TEXT, -- Suggested replacement if deprecated + + -- Ownership + default_owner TEXT, -- Team responsible for this domain + + -- Ordering + display_order SMALLINT NOT NULL DEFAULT 0, + + -- Audit + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + PRIMARY KEY (code, version_id), + + CONSTRAINT chk_tier_valid + CHECK (tier IN (1, 2, 3)), + + CONSTRAINT chk_change_type_valid + CHECK (change_type IN ( + 'new', -- First appearance in this version + 'unchanged', -- Identical to previous version + 'renamed', -- Display name changed, same meaning + 'redefined', -- Definition changed (semantic drift) + 'deprecated', -- Marked for removal + 'split', -- This code split from a previous code + 'merged' -- This code merged from multiple previous codes + )), + + CONSTRAINT chk_code_format + CHECK ( + (tier = 1 AND code ~ '^[OPJEAVR]$') OR + (tier = 2 AND code ~ '^[OPJEAVR][1-4]$') OR + (tier = 3 AND code ~ '^[OPJEAVR][1-4]\.[0-9]{2}$') + ), + + CONSTRAINT chk_deprecated_has_reason + CHECK (change_type != 'deprecated' OR deprecation_reason IS NOT NULL), + + -- Parent must be in same version + CONSTRAINT fk_parent_code + FOREIGN KEY (parent_code, version_id) + REFERENCES urt_codes_versioned(code, version_id) +); + +CREATE INDEX idx_codes_versioned_domain + ON urt_codes_versioned(version_id, domain); + +CREATE INDEX idx_codes_versioned_category + ON urt_codes_versioned(version_id, category); + +CREATE INDEX idx_codes_versioned_change + ON urt_codes_versioned(version_id, change_type) + WHERE change_type != 'unchanged'; + +CREATE INDEX idx_codes_versioned_deprecated + ON urt_codes_versioned(version_id) + WHERE change_type = 'deprecated'; + +COMMENT ON TABLE urt_codes_versioned IS + 'URT code definitions with full semantics, versioned per taxonomy release'; +``` + +### 2.3 Cross-Version Code Mappings + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- CROSS-VERSION CODE MAPPINGS +-- Explicit translation rules between taxonomy versions +-- ═══════════════════════════════════════════════════════════════ + +CREATE TABLE urt_code_mappings ( + id SERIAL PRIMARY KEY, + + -- Source (older version) + from_code TEXT NOT NULL, + from_version TEXT NOT NULL, + + -- Target (newer version) + to_code TEXT NOT NULL, + to_version TEXT NOT NULL, + + -- Mapping semantics + mapping_type TEXT NOT NULL, + + -- Confidence and applicability + confidence FLOAT NOT NULL DEFAULT 1.0, + bidirectional BOOLEAN NOT NULL DEFAULT FALSE, + + -- Context + notes TEXT, + effective_from DATE, -- When this mapping became valid + + -- Audit + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT, + + UNIQUE (from_code, from_version, to_code, to_version), + + CONSTRAINT fk_from_code + FOREIGN KEY (from_code, from_version) + REFERENCES urt_codes_versioned(code, version_id), + + CONSTRAINT fk_to_code + FOREIGN KEY (to_code, to_version) + REFERENCES urt_codes_versioned(code, version_id), + + CONSTRAINT chk_mapping_type_valid + CHECK (mapping_type IN ( + 'equivalent', -- Same meaning, safe 1:1 translation + 'broader', -- to_code is more general (result of merge) + 'narrower', -- to_code is more specific (result of split) + 'related', -- Conceptually similar but not equivalent + 'superseded' -- from_code deprecated, to_code is replacement + )), + + CONSTRAINT chk_confidence_range + CHECK (confidence > 0 AND confidence <= 1.0), + + CONSTRAINT chk_different_versions + CHECK (from_version != to_version), + + CONSTRAINT chk_version_order + CHECK (from_version < to_version) -- Mappings go forward in time +); + +CREATE INDEX idx_mappings_from + ON urt_code_mappings(from_code, from_version); + +CREATE INDEX idx_mappings_to + ON urt_code_mappings(to_code, to_version); + +CREATE INDEX idx_mappings_type + ON urt_code_mappings(mapping_type); + +COMMENT ON TABLE urt_code_mappings IS + 'Explicit translation rules between taxonomy versions for query normalization'; +``` + +### 2.4 Schema Modifications to Existing Tables + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- MODIFICATIONS TO EXISTING TABLES +-- Add taxonomy_version to classification tables +-- ═══════════════════════════════════════════════════════════════ + +-- review_spans: Add taxonomy version +ALTER TABLE review_spans + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +ALTER TABLE review_spans + ADD COLUMN prompt_version TEXT; -- Optional: hash or ID of prompt template + +-- FK to versioned codes (replaces existing FK to urt_codes) +ALTER TABLE review_spans + DROP CONSTRAINT IF EXISTS fk_spans_urt_primary; + +ALTER TABLE review_spans + ADD CONSTRAINT fk_spans_urt_versioned + FOREIGN KEY (urt_primary, taxonomy_version) + REFERENCES urt_codes_versioned(code, version_id); + +-- Index for version-aware queries +CREATE INDEX idx_spans_taxonomy_version + ON review_spans(taxonomy_version); + +CREATE INDEX idx_spans_code_version + ON review_spans(urt_primary, taxonomy_version); + + +-- reviews_enriched: Add taxonomy version for review-level classification +ALTER TABLE reviews_enriched + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +CREATE INDEX idx_enriched_taxonomy_version + ON reviews_enriched(taxonomy_version); + + +-- issues: Track which taxonomy version the issue was created under +ALTER TABLE issues + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +ALTER TABLE issues + ADD CONSTRAINT fk_issues_urt_versioned + FOREIGN KEY (primary_subcode, taxonomy_version) + REFERENCES urt_codes_versioned(code, version_id); + +CREATE INDEX idx_issues_taxonomy_version + ON issues(taxonomy_version); + + +-- fact_timeseries: Track taxonomy version for aggregated facts +ALTER TABLE fact_timeseries + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +-- Update unique constraint to include taxonomy version +ALTER TABLE fact_timeseries + DROP CONSTRAINT IF EXISTS fact_timeseries_business_id_place_id_period_date_bucket_typ_key; + +ALTER TABLE fact_timeseries + ADD CONSTRAINT uq_fact_timeseries + UNIQUE (business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version); +``` + +--- + +## Part 3: Functions + +### 3.1 Code Translation + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- TRANSLATION FUNCTIONS +-- ═══════════════════════════════════════════════════════════════ + +-- Translate a single code between versions +CREATE OR REPLACE FUNCTION translate_urt_code( + p_code TEXT, + p_from_version TEXT, + p_to_version TEXT +) RETURNS TABLE ( + translated_code TEXT, + mapping_type TEXT, + confidence FLOAT, + notes TEXT +) AS $$ +BEGIN + -- If same version, return as-is + IF p_from_version = p_to_version THEN + RETURN QUERY SELECT p_code, 'identity'::TEXT, 1.0::FLOAT, NULL::TEXT; + RETURN; + END IF; + + -- Check for direct mapping + RETURN QUERY + SELECT m.to_code, m.mapping_type, m.confidence, m.notes + FROM urt_code_mappings m + WHERE m.from_code = p_code + AND m.from_version = p_from_version + AND m.to_version = p_to_version; + + -- If no direct mapping, check if code exists unchanged in target + IF NOT FOUND THEN + RETURN QUERY + SELECT p_code, 'unchanged'::TEXT, 1.0::FLOAT, + 'Code exists in both versions without explicit mapping'::TEXT + FROM urt_codes_versioned + WHERE code = p_code + AND version_id = p_to_version + AND change_type = 'unchanged'; + END IF; + + -- If still not found, check transitive mappings (one hop) + IF NOT FOUND THEN + RETURN QUERY + SELECT m2.to_code, + 'transitive'::TEXT, + m1.confidence * m2.confidence, + 'Via ' || m1.to_version + FROM urt_code_mappings m1 + JOIN urt_code_mappings m2 + ON m1.to_code = m2.from_code + AND m1.to_version = m2.from_version + WHERE m1.from_code = p_code + AND m1.from_version = p_from_version + AND m2.to_version = p_to_version; + END IF; +END; +$$ LANGUAGE plpgsql STABLE; + +COMMENT ON FUNCTION translate_urt_code IS + 'Translate a URT code from one taxonomy version to another'; + + +-- Get full lineage for a code (all historical versions) +CREATE OR REPLACE FUNCTION get_code_lineage( + p_code TEXT, + p_version TEXT DEFAULT NULL +) RETURNS TABLE ( + code TEXT, + version_id TEXT, + display_name TEXT, + relationship TEXT, + confidence FLOAT, + depth INT +) AS $$ +DECLARE + v_target_version TEXT; +BEGIN + -- Default to current version + v_target_version := COALESCE(p_version, + (SELECT tv.version_id FROM urt_taxonomy_versions tv WHERE tv.is_current = TRUE)); + + RETURN QUERY + WITH RECURSIVE lineage AS ( + -- Base: the code itself + SELECT + cv.code, + cv.version_id, + cv.display_name, + 'self'::TEXT as relationship, + 1.0::FLOAT as confidence, + 0 as depth + FROM urt_codes_versioned cv + WHERE cv.code = p_code AND cv.version_id = v_target_version + + UNION ALL + + -- Recursive: predecessors via mappings + SELECT + m.from_code, + m.from_version, + cv.display_name, + m.mapping_type, + l.confidence * m.confidence, + l.depth + 1 + FROM lineage l + JOIN urt_code_mappings m + ON m.to_code = l.code + AND m.to_version = l.version_id + JOIN urt_codes_versioned cv + ON cv.code = m.from_code + AND cv.version_id = m.from_version + WHERE l.depth < 10 -- Prevent infinite loops + ) + SELECT * FROM lineage + ORDER BY depth, version_id; +END; +$$ LANGUAGE plpgsql STABLE; + +COMMENT ON FUNCTION get_code_lineage IS + 'Get full historical lineage for a code across all taxonomy versions'; + + +-- Get current version of taxonomy +CREATE OR REPLACE FUNCTION get_current_taxonomy_version() +RETURNS TEXT AS $$ + SELECT version_id + FROM urt_taxonomy_versions + WHERE is_current = TRUE; +$$ LANGUAGE sql STABLE; + +COMMENT ON FUNCTION get_current_taxonomy_version IS + 'Returns the current/active taxonomy version ID'; +``` + +### 3.2 Normalized Aggregation + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- NORMALIZED AGGREGATION +-- Query spans with automatic translation to target version +-- ═══════════════════════════════════════════════════════════════ + +-- View: Spans normalized to current taxonomy version +CREATE OR REPLACE VIEW v_spans_normalized AS +SELECT + rs.*, + COALESCE( + (SELECT translated_code FROM translate_urt_code( + rs.urt_primary, + rs.taxonomy_version, + get_current_taxonomy_version() + ) LIMIT 1), + rs.urt_primary + ) as urt_primary_normalized, + COALESCE( + (SELECT confidence FROM translate_urt_code( + rs.urt_primary, + rs.taxonomy_version, + get_current_taxonomy_version() + ) LIMIT 1), + 1.0 + ) as translation_confidence, + get_current_taxonomy_version() as normalized_to_version +FROM review_spans rs +WHERE rs.is_active = TRUE; + +COMMENT ON VIEW v_spans_normalized IS + 'Review spans with URT codes translated to current taxonomy version'; + + +-- Function: Aggregate facts with version normalization +CREATE OR REPLACE FUNCTION aggregate_spans_normalized( + p_business_id TEXT, + p_place_id TEXT, + p_start_date DATE, + p_end_date DATE, + p_target_version TEXT DEFAULT NULL +) RETURNS TABLE ( + urt_code TEXT, + span_count BIGINT, + negative_count BIGINT, + positive_count BIGINT, + avg_confidence FLOAT, + source_versions TEXT[] +) AS $$ +DECLARE + v_target TEXT; +BEGIN + v_target := COALESCE(p_target_version, get_current_taxonomy_version()); + + RETURN QUERY + SELECT + COALESCE( + (SELECT translated_code FROM translate_urt_code( + rs.urt_primary, rs.taxonomy_version, v_target + ) LIMIT 1), + rs.urt_primary + ) as urt_code, + COUNT(*) as span_count, + COUNT(*) FILTER (WHERE rs.valence = 'V-') as negative_count, + COUNT(*) FILTER (WHERE rs.valence = 'V+') as positive_count, + AVG(COALESCE( + (SELECT confidence FROM translate_urt_code( + rs.urt_primary, rs.taxonomy_version, v_target + ) LIMIT 1), + 1.0 + )) as avg_confidence, + array_agg(DISTINCT rs.taxonomy_version) as source_versions + FROM review_spans rs + WHERE rs.business_id = p_business_id + AND (rs.place_id = p_place_id OR p_place_id = 'ALL') + AND rs.review_time >= p_start_date + AND rs.review_time < p_end_date + AND rs.is_active = TRUE + GROUP BY 1 + ORDER BY span_count DESC; +END; +$$ LANGUAGE plpgsql STABLE; + +COMMENT ON FUNCTION aggregate_spans_normalized IS + 'Aggregate span counts with automatic translation to target taxonomy version'; +``` + +### 3.3 Drift Detection + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- DRIFT DETECTION +-- Analyze classification changes between taxonomy versions +-- ═══════════════════════════════════════════════════════════════ + +-- Function: Detect potential drift when upgrading taxonomy +CREATE OR REPLACE FUNCTION detect_taxonomy_drift( + p_from_version TEXT, + p_to_version TEXT, + p_business_id TEXT DEFAULT NULL +) RETURNS TABLE ( + from_code TEXT, + from_display_name TEXT, + to_code TEXT, + to_display_name TEXT, + mapping_type TEXT, + affected_spans BIGINT, + sample_span_ids TEXT[] +) AS $$ +BEGIN + RETURN QUERY + SELECT + m.from_code, + cv_from.display_name as from_display_name, + m.to_code, + cv_to.display_name as to_display_name, + m.mapping_type, + COUNT(rs.span_id) as affected_spans, + (array_agg(rs.span_id ORDER BY rs.review_time DESC))[1:5] as sample_span_ids + FROM urt_code_mappings m + JOIN urt_codes_versioned cv_from + ON cv_from.code = m.from_code AND cv_from.version_id = m.from_version + JOIN urt_codes_versioned cv_to + ON cv_to.code = m.to_code AND cv_to.version_id = m.to_version + LEFT JOIN review_spans rs + ON rs.urt_primary = m.from_code + AND rs.taxonomy_version = m.from_version + AND rs.is_active = TRUE + AND (p_business_id IS NULL OR rs.business_id = p_business_id) + WHERE m.from_version = p_from_version + AND m.to_version = p_to_version + AND m.mapping_type NOT IN ('equivalent', 'unchanged') + GROUP BY m.from_code, cv_from.display_name, m.to_code, cv_to.display_name, m.mapping_type + HAVING COUNT(rs.span_id) > 0 + ORDER BY affected_spans DESC; +END; +$$ LANGUAGE plpgsql STABLE; + +COMMENT ON FUNCTION detect_taxonomy_drift IS + 'Identify spans affected by non-equivalent mappings between taxonomy versions'; + + +-- Function: Get deprecated codes still in use +CREATE OR REPLACE FUNCTION get_deprecated_codes_in_use( + p_version TEXT, + p_business_id TEXT DEFAULT NULL +) RETURNS TABLE ( + code TEXT, + display_name TEXT, + deprecation_reason TEXT, + successor_hint TEXT, + span_count BIGINT, + latest_span_date TIMESTAMPTZ +) AS $$ +BEGIN + RETURN QUERY + SELECT + cv.code, + cv.display_name, + cv.deprecation_reason, + cv.successor_hint, + COUNT(rs.span_id) as span_count, + MAX(rs.review_time) as latest_span_date + FROM urt_codes_versioned cv + LEFT JOIN review_spans rs + ON rs.urt_primary = cv.code + AND rs.taxonomy_version = cv.version_id + AND rs.is_active = TRUE + AND (p_business_id IS NULL OR rs.business_id = p_business_id) + WHERE cv.version_id = p_version + AND cv.change_type = 'deprecated' + GROUP BY cv.code, cv.display_name, cv.deprecation_reason, cv.successor_hint + HAVING COUNT(rs.span_id) > 0 + ORDER BY span_count DESC; +END; +$$ LANGUAGE plpgsql STABLE; + +COMMENT ON FUNCTION get_deprecated_codes_in_use IS + 'Find deprecated codes that still have active spans'; +``` + +--- + +## Part 4: Seed Data + +### 4.1 Initial Version Registration + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- SEED DATA: v5.1 Initial Version +-- ═══════════════════════════════════════════════════════════════ + +INSERT INTO urt_taxonomy_versions ( + version_id, semver, effective_from, is_current, + release_notes, domain_count, category_count, subcode_count, + breaking_changes, created_by +) VALUES ( + 'v5.1', '5.1.0', '2026-01-01', TRUE, + 'Initial URT v5.1 release. 7 domains, 28 categories, 138 subcodes.', + 7, 28, 138, + FALSE, 'system' +); + +-- Note: urt_codes_versioned should be populated from B1-urt-codes.yaml +-- using a separate seed script. See: scripts/seed-urt-codes.py +``` + +### 4.2 Sample Versioned Codes (Subset) + +```sql +-- Sample: Domain-level codes for v5.1 +INSERT INTO urt_codes_versioned ( + code, version_id, domain, category, tier, + display_name, definition, default_owner, display_order, change_type +) VALUES +('O', 'v5.1', 'O', 'O', 1, 'Offering', + 'The core product, service, or outcome delivered', 'Product / Operations', 1, 'new'), +('P', 'v5.1', 'P', 'P', 1, 'People', + 'Human interactions and personnel behavior', 'HR / Training', 2, 'new'), +('J', 'v5.1', 'J', 'J', 1, 'Journey', + 'The process, timing, and operational flow', 'Operations / Process', 3, 'new'), +('E', 'v5.1', 'E', 'E', 1, 'Environment', + 'Physical, digital, and ambient context', 'Facilities / IT', 4, 'new'), +('A', 'v5.1', 'A', 'A', 1, 'Access', + 'Availability, accessibility, and inclusivity', 'Compliance / Design', 5, 'new'), +('V', 'v5.1', 'V', 'V', 1, 'Value', + 'Cost, pricing, and worth of the exchange', 'Finance / Pricing', 6, 'new'), +('R', 'v5.1', 'R', 'R', 1, 'Relationship', + 'Trust, reliability, and ongoing connection', 'Leadership / CX', 7, 'new'); + +-- Sample: Category-level codes for Journey domain +INSERT INTO urt_codes_versioned ( + code, version_id, domain, category, tier, parent_code, + display_name, definition, display_order, change_type +) VALUES +('J1', 'v5.1', 'J', 'J1', 2, 'J', 'Timing', + 'Speed, punctuality, and time management', 1, 'new'), +('J2', 'v5.1', 'J', 'J2', 2, 'J', 'Ease', + 'Effort required and friction encountered', 2, 'new'), +('J3', 'v5.1', 'J', 'J3', 2, 'J', 'Reliability', + 'Consistency and predictability of process', 3, 'new'), +('J4', 'v5.1', 'J', 'J4', 2, 'J', 'Resolution', + 'How problems are handled when they arise', 4, 'new'); + +-- Sample: Subcode-level codes for J1 (Timing) +INSERT INTO urt_codes_versioned ( + code, version_id, domain, category, subcategory, tier, parent_code, + display_name, definition, + dont_confuse_with, dont_confuse_reason, + display_order, change_type +) VALUES +('J1.01', 'v5.1', 'J', 'J1', 'J1.01', 3, 'J1', + 'Wait Time', 'Duration of waiting before service begins or between steps', + 'J1.03', 'J1.03 is total duration, J1.01 is specifically idle waiting', 1, 'new'), +('J1.02', 'v5.1', 'J', 'J1', 'J1.02', 3, 'J1', + 'Punctuality', 'On-time arrival, start, or delivery vs. scheduled time', + 'J3.01', 'J3.01 is consistency over time, J1.02 is single-instance timeliness', 2, 'new'), +('J1.03', 'v5.1', 'J', 'J1', 'J1.03', 3, 'J1', + 'Service Speed', 'Overall pace of active service delivery', + 'O1.02', 'O1.02 is product performance, J1.03 is service delivery speed', 3, 'new'), +('J1.04', 'v5.1', 'J', 'J1', 'J1.04', 3, 'J1', + 'Response Time', 'Speed of response to customer requests or inquiries', + 'P3.01', 'P3.01 is attentiveness, J1.04 is measured response time', 4, 'new'), +('J1.05', 'v5.1', 'J', 'J1', 'J1.05', 3, 'J1', + 'Time Respect', 'Respect for customer''s time and schedule constraints', + 'P1.01', 'P1.01 is general attitude, J1.05 is specifically about time', 5, 'new'); +``` + +--- + +## Part 5: Migration Guide + +### 5.1 Migration Steps (v3.2 → v3.2.1) + +```sql +-- ═══════════════════════════════════════════════════════════════ +-- MIGRATION SCRIPT: v3.2 to v3.2.1 +-- Run in a transaction, test on staging first +-- ═══════════════════════════════════════════════════════════════ + +BEGIN; + +-- Step 1: Create new tables +-- (Run DDL from Part 2 above) + +-- Step 2: Register initial taxonomy version +INSERT INTO urt_taxonomy_versions ( + version_id, semver, effective_from, is_current, + release_notes, breaking_changes +) VALUES ( + 'v5.1', '5.1.0', '2026-01-01', TRUE, + 'Initial URT v5.1 - baseline for versioning', FALSE +); + +-- Step 3: Populate urt_codes_versioned from existing urt_codes +INSERT INTO urt_codes_versioned ( + code, version_id, domain, category, subcategory, tier, + display_name, definition, keywords, display_order, change_type +) +SELECT + code, + 'v5.1', + domain, + category, + CASE WHEN code ~ '^[OPJEAVR][1-4]\.[0-9]{2}$' THEN code ELSE NULL END, + CASE + WHEN code ~ '^[OPJEAVR]$' THEN 1 + WHEN code ~ '^[OPJEAVR][1-4]$' THEN 2 + ELSE 3 + END, + display_name, + description, + keywords, + 0, + 'new' +FROM urt_codes; + +-- Step 4: Add taxonomy_version columns with default +ALTER TABLE review_spans + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +ALTER TABLE reviews_enriched + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +ALTER TABLE issues + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +ALTER TABLE fact_timeseries + ADD COLUMN taxonomy_version TEXT NOT NULL DEFAULT 'v5.1'; + +-- Step 5: Add FK constraints +ALTER TABLE review_spans + ADD CONSTRAINT fk_spans_urt_versioned + FOREIGN KEY (urt_primary, taxonomy_version) + REFERENCES urt_codes_versioned(code, version_id); + +ALTER TABLE issues + ADD CONSTRAINT fk_issues_urt_versioned + FOREIGN KEY (primary_subcode, taxonomy_version) + REFERENCES urt_codes_versioned(code, version_id); + +-- Step 6: Create indexes +CREATE INDEX idx_spans_taxonomy_version ON review_spans(taxonomy_version); +CREATE INDEX idx_enriched_taxonomy_version ON reviews_enriched(taxonomy_version); +CREATE INDEX idx_issues_taxonomy_version ON issues(taxonomy_version); + +-- Step 7: Update unique constraint on fact_timeseries +ALTER TABLE fact_timeseries + DROP CONSTRAINT IF EXISTS fact_timeseries_business_id_place_id_period_date_bucket_typ_key; + +ALTER TABLE fact_timeseries + ADD CONSTRAINT uq_fact_timeseries + UNIQUE (business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version); + +COMMIT; +``` + +### 5.2 Application Code Changes + +```python +# Classification pipeline: Include taxonomy version +async def store_review_spans(enriched: dict, spans: list[dict], batch_id: str): + """Store extracted spans with taxonomy version.""" + + # Get current taxonomy version + taxonomy_version = await get_current_taxonomy_version() + + for idx, span in enumerate(spans): + await db.execute(""" + INSERT INTO review_spans ( + span_id, business_id, place_id, source, review_id, review_version, + span_index, span_text, span_start, span_end, + profile, urt_primary, valence, intensity, + taxonomy_version, model_version, -- NEW: taxonomy_version + ... + ) VALUES (...) + """, [ + ..., + taxonomy_version, + 'gpt-4o-mini-2024-07-18', + ... + ]) + + +# Trend queries: Support version normalization +async def get_timeline( + business_id: str, + place_id: str, + subject_id: str, + start: date, + end: date, + normalize_to_version: str = None # NEW: optional normalization +) -> list[dict]: + """Query timeline with optional version normalization.""" + + if normalize_to_version: + # Use normalized aggregation + return await db.query(""" + SELECT * FROM aggregate_spans_normalized(%s, %s, %s, %s, %s) + WHERE urt_code = %s + """, [business_id, place_id, start, end, normalize_to_version, subject_id]) + else: + # Standard query (preserves original versions) + return await db.query(""" + SELECT * FROM fact_timeseries + WHERE business_id = %s AND place_id = %s + AND subject_type = 'urt_code' AND subject_id = %s + AND period_date BETWEEN %s AND %s + """, [business_id, place_id, subject_id, start, end]) +``` + +--- + +## Part 6: Query Patterns + +### 6.1 Point-in-Time Query (Historical Context) + +```sql +-- "What were our J1 issues in Q3 2025, as classified then?" +-- Returns data with original taxonomy context +SELECT + rs.span_id, + rs.span_text, + rs.urt_primary, + rs.taxonomy_version, + cv.display_name, + cv.definition +FROM review_spans rs +JOIN urt_codes_versioned cv + ON rs.urt_primary = cv.code + AND rs.taxonomy_version = cv.version_id +WHERE rs.business_id = 'acme' + AND rs.urt_primary LIKE 'J1%' + AND rs.review_time BETWEEN '2025-07-01' AND '2025-09-30' + AND rs.is_active = TRUE +ORDER BY rs.review_time DESC; +``` + +### 6.2 Normalized Trend Query + +```sql +-- "Show all historical data mapped to current J1.01" +-- Translates across taxonomy versions +SELECT + DATE_TRUNC('month', rs.review_time) as month, + rs.taxonomy_version as original_version, + COUNT(*) as span_count +FROM review_spans rs +JOIN get_code_lineage('J1.01') lineage + ON rs.urt_primary = lineage.code + AND rs.taxonomy_version = lineage.version_id +WHERE rs.business_id = 'acme' + AND rs.is_active = TRUE +GROUP BY 1, 2 +ORDER BY 1, 2; +``` + +### 6.3 Drift Analysis + +```sql +-- "What spans would be affected by upgrading to v5.2?" +SELECT * FROM detect_taxonomy_drift('v5.1', 'v5.2', 'acme'); + +-- "Are we still using deprecated codes?" +SELECT * FROM get_deprecated_codes_in_use('v5.2', 'acme'); +``` + +### 6.4 Cross-Version Comparison + +```sql +-- "Compare classification distribution between taxonomy versions" +SELECT + rs.taxonomy_version, + LEFT(rs.urt_primary, 1) as domain, + COUNT(*) as span_count, + ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (PARTITION BY rs.taxonomy_version), 1) as pct +FROM review_spans rs +WHERE rs.business_id = 'acme' + AND rs.is_active = TRUE +GROUP BY rs.taxonomy_version, LEFT(rs.urt_primary, 1) +ORDER BY rs.taxonomy_version, span_count DESC; +``` + +--- + +## Part 7: Operational Procedures + +### 7.1 Releasing a New Taxonomy Version + +```sql +-- Step 1: Register new version (not current yet) +INSERT INTO urt_taxonomy_versions ( + version_id, semver, effective_from, is_current, + release_notes, predecessor_version, breaking_changes +) VALUES ( + 'v5.2', '5.2.0', '2026-04-01', FALSE, + 'Split J1.01 into J1.01a/J1.01b for queue vs service wait', + 'v5.1', TRUE +); + +-- Step 2: Populate versioned codes +INSERT INTO urt_codes_versioned (code, version_id, ...) +SELECT ... FROM urt_codes_v52_staging; + +-- Step 3: Define mappings from v5.1 to v5.2 +INSERT INTO urt_code_mappings ( + from_code, from_version, to_code, to_version, + mapping_type, confidence, notes +) VALUES +('J1.01', 'v5.1', 'J1.01a', 'v5.2', 'narrower', 0.7, + 'J1.01 split: queue wait'), +('J1.01', 'v5.1', 'J1.01b', 'v5.2', 'narrower', 0.3, + 'J1.01 split: service wait'); + +-- Step 4: Run drift analysis +SELECT * FROM detect_taxonomy_drift('v5.1', 'v5.2'); + +-- Step 5: Activate new version +UPDATE urt_taxonomy_versions SET is_current = FALSE WHERE is_current = TRUE; +UPDATE urt_taxonomy_versions SET is_current = TRUE WHERE version_id = 'v5.2'; +UPDATE urt_taxonomy_versions SET effective_to = CURRENT_DATE WHERE version_id = 'v5.1'; +``` + +### 7.2 Reclassification Workflow + +```sql +-- Reclassify historical spans with new taxonomy (creates parallel records) +-- Uses soft-switch pattern from v3.2 + +-- 1. Create new batch with new taxonomy version +INSERT INTO review_spans ( + span_id, ..., taxonomy_version, ingest_batch_id, is_active +) +SELECT + generate_span_id(...), -- New span ID + ..., + 'v5.2', -- New taxonomy version + 'reclassify-batch-001', + FALSE -- Not active yet +FROM review_spans_to_reclassify; + +-- 2. Run LLM reclassification on batch +-- (Application code) + +-- 3. Atomic switch (deactivate old, activate new) +BEGIN; +UPDATE review_spans SET is_active = FALSE +WHERE taxonomy_version = 'v5.1' AND business_id = 'acme'; + +UPDATE review_spans SET is_active = TRUE +WHERE ingest_batch_id = 'reclassify-batch-001'; +COMMIT; +``` + +--- + +## Part 8: Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Version ID format | `v{major}.{minor}` | Human-readable, matches URT releases | +| Semver column | Separate TEXT field | Enables programmatic version comparison | +| Code FK strategy | Composite `(code, version_id)` | Prevents orphaned classifications | +| Mapping direction | Forward only (old→new) | Simpler model, matches time flow | +| Transitive mappings | Single hop in function | Balances accuracy vs complexity | +| Default version | `'v5.1'` hardcoded | Safe baseline, explicit upgrade path | +| Fact table versioning | Per-row `taxonomy_version` | Enables version-specific aggregation | + +--- + +## Part 9: Future Considerations (v3.3+) + +| Feature | Description | +|---------|-------------| +| **Prompt versioning** | Track prompt templates used for classification | +| **A/B testing** | Compare classifier accuracy across versions | +| **Automatic mapping suggestion** | LLM-powered mapping recommendations | +| **Version-aware dashboards** | UI toggle for normalized vs point-in-time | +| **Bulk reclassification pipeline** | Scheduled jobs for taxonomy upgrades | + +--- + +## Document Control + +| Field | Value | +|-------|-------| +| **Document** | ReviewIQ v3.2.1 Addendum: Taxonomy Versioning | +| **Status** | Draft Specification | +| **Date** | 2026-01-24 | +| **Extends** | ReviewIQ Architecture v3.2.0 | +| **Author** | Architecture Team | + +### Changelog + +| Version | Changes | +|---------|---------| +| v3.2.1-draft | Initial taxonomy versioning specification | + +--- + +*End of ReviewIQ v3.2.1 Addendum* diff --git a/.artifacts/ReviewIQ-v32-Decisions.md b/.artifacts/ReviewIQ-v32-Decisions.md index b3ce44e..2f94abc 100644 --- a/.artifacts/ReviewIQ-v32-Decisions.md +++ b/.artifacts/ReviewIQ-v32-Decisions.md @@ -7,9 +7,18 @@ ## 1. Markpoint ``` -ID: reviewiq-v32-span-layer-2026-01-24-001 -Status: v3.2 span layer complete -Based on: v3.1.2 (commit f998277) +ID: reviewiq-v32-span-layer-2026-01-24-004 +Status: Pipeline contracts defined, ready for parallel implementation +Based on: v3.2 (commit 43fd151) + +START HERE: ReviewIQ-Pipeline-DevGuide.md (for pipeline implementation) + +Key Documents: + - ReviewIQ-Pipeline-DevGuide.md (entry point for pipeline work) + - ReviewIQ-Codebase-Overview.md (file structure, what exists) + - ReviewIQ-Pipeline-Contracts-v1.md (stage I/O contracts, validation) + - ReviewIQ-Pipeline-Checklist.md (implementation checklist) + - ReviewIQ-v3.2.1-Taxonomy-Versioning.md (taxonomy versioning spec) ``` --- @@ -152,6 +161,98 @@ Full: URT:F:{codes}:{V}{I}:{S}{A}{T}.{E}.{CR}:{causal} | Offsets nullable for LLM-inferred? | **No** — required, NOT NULL | | Reprocessing strategy? | **Soft-switch** with is_active flag | | TEXT vs ENUM for dimensions? | **ENUMs** — committed to Postgres | +| Taxonomy evolution tracking? | **Yes** — versioned codes with explicit mappings (v3.2.1) | +| B2 schema vs v3.2 divergence? | **Documented** — B2 is canonical URT, v3.2 is app layer | +| Taxonomy versioning? | **Yes** — `taxonomy_version` column on spans, versioned code tables | + +--- + +## 13. B2 Schema Audit Findings + +**Audit Date**: 2026-01-24 + +The B2-database-schema.sql (canonical URT v5.1) and ReviewIQ v3.2 spec have deliberate divergences: + +| Aspect | B2 (URT v5.1) | v3.2 (ReviewIQ) | Resolution | +|--------|---------------|-----------------|------------| +| Purpose | Source-agnostic taxonomy | Google Reviews app layer | Keep both | +| ID strategy | UUIDs + sequential | Deterministic SHA256 | v3.2 choice | +| Type safety | VARCHAR + CHECK | Postgres ENUMs | v3.2 choice | +| Span table | `spans` | `review_spans` | v3.2 naming | +| Offset columns | `char_start/char_end` | `span_start/span_end` | Document divergence | +| Tenant model | Single-tenant | Multi-tenant (business_id) | v3.2 requirement | +| Issue-span mapping | Many-to-many | One-to-one | v3.2 choice | +| Causal chain | Normalized table | JSONB column | v3.2 flexibility | +| Reprocessing | Not supported | Soft-switch pattern | v3.2 innovation | + +**Action Items**: +1. Import reference data (domains, categories, subcodes) from B2 INSERTs +2. Seed `urt_codes` / `urt_codes_versioned` from B1-urt-codes.yaml +3. Do NOT adopt B2 structure directly — v3.2 has specific app requirements + +--- + +## 14. Taxonomy Versioning (v3.2.1) + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Track taxonomy version | Required column on spans | Classifications only meaningful in version context | +| Version ID format | `v{major}.{minor}` | Human-readable, matches URT releases | +| Code FK strategy | Composite `(code, version_id)` | Prevents orphaned classifications | +| Cross-version mappings | Explicit mapping table | Enables normalized trend queries | +| Mapping direction | Forward only (old→new) | Simpler model, matches time flow | +| Default version | `'v5.1'` hardcoded | Safe baseline, explicit upgrade path | +| Fact table versioning | Per-row `taxonomy_version` | Enables version-specific aggregation | + +**Key Tables Added**: +- `urt_taxonomy_versions` — Version registry with validity periods +- `urt_codes_versioned` — Full code definitions per version (SCD Type 2) +- `urt_code_mappings` — Cross-version translation rules + +**Key Functions Added**: +- `translate_urt_code(code, from_version, to_version)` — Single code translation +- `get_code_lineage(code, version)` — Full historical lineage +- `detect_taxonomy_drift(from_version, to_version)` — Impact analysis +- `aggregate_spans_normalized(...)` — Version-normalized aggregation + +**Principle**: Facts are immutable. A span classified as `J1.01` in v5.1 stays that way forever. Translation is explicit and auditable. + +See: `.artifacts/ReviewIQ-v3.2.1-Taxonomy-Versioning.md` + +--- + +## 15. Pipeline Implementation Status + +**Overall: ~55% Complete** (as of 2026-01-24) + +| Stage | Name | Status | Owner | +|-------|------|--------|-------| +| 0 | Raw Ingestion | ✅ DONE | Scraper Team | +| 1 | Normalization | ❌ TODO | TBD | +| 2 | LLM Classification | ❌ TODO | TBD | +| 3 | Issue Routing | ❌ TODO | TBD | +| 4 | Fact Aggregation | ❌ TODO | TBD | + +**What's Working**: +- Google Maps scraping (v1.0.0) +- Job orchestration & queuing +- Webhook delivery +- Frontend job management +- Real-time SSE streaming + +**What's Missing**: +- Entire enrichment pipeline (Stages 1-4) +- LLM integration +- Span extraction +- Issue routing +- Analytics aggregation + +**Parallel Development**: +Each stage can be implemented independently using the contracts defined in: +- `ReviewIQ-Pipeline-Contracts-v1.md` — Full I/O specs, validation rules, test fixtures +- `ReviewIQ-Pipeline-Checklist.md` — Implementation checklist, definition of done + +**Estimated Effort to 100%**: 6-8 weeks --- @@ -180,4 +281,4 @@ GREATEST(0.2, base_trust * modifiers) -- Floor prevents collapse --- -*Last updated: 2026-01-24* +*Last updated: 2026-01-24 (pipeline contracts + codebase overview)*