New artifacts: - ReviewIQ-Pipeline-DevGuide.md: Entry point for pipeline work - ReviewIQ-Pipeline-Contracts-v1.md: Stage I/O specs, validation rules, test fixtures - ReviewIQ-Pipeline-Checklist.md: Per-stage implementation checklists - ReviewIQ-Codebase-Overview.md: File structure, integration points - ReviewIQ-v3.2.1-Taxonomy-Versioning.md: Taxonomy versioning addendum Updated: - ReviewIQ-v32-Decisions.md: Added B2 audit findings, taxonomy versioning decisions, pipeline status These artifacts enable parallel development of pipeline stages 1-4 with: - Independent validation (35 rules across stages) - Clear input/output contracts - Test fixtures for each stage - Definition of done criteria Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
11 KiB
11 KiB
ReviewIQ Pipeline Implementation Checklist
Purpose: Quick reference for agents to verify stage completion
Reference: ReviewIQ-Pipeline-Contracts-v1.md for full specs
Pipeline Overview
Stage 0 ──▶ Stage 1 ──▶ Stage 2 ──▶ Stage 3 ──▶ Stage 4
Scrape Normalize Classify Route Aggregate
✅ DONE ❌ TODO ❌ TODO ❌ TODO ❌ TODO
Stage 0: Raw Ingestion ✅ COMPLETE
No action needed. Already implemented in scrapers/google_reviews/v1_0_0.py.
Output Location: jobs.reviews_data (JSONB)
Stage 1: Normalization ❌ TODO
Files to Create
pipeline/stage1_normalize.py- Main normalization logicpipeline/tests/test_stage1.py- Unit testsmigrations/005_create_reviews_tables.sql- Schema migration
Database Schema Required
-- Must exist before Stage 1 can write
CREATE TABLE reviews_raw (...);
CREATE TABLE reviews_enriched (...);
Implementation Checklist
- Read from
jobs.reviews_datawherestatus = 'completed' - Filter out empty/null review texts
- Normalize text (lowercase, whitespace, emoji)
- Detect language (ISO 639-1)
- Compute content hash (SHA256)
- Check for duplicates within business
- Insert into
reviews_raw(immutable) - Insert stub into
reviews_enriched(classification fields NULL) - Return
Stage1Outputwith stats
Validation (run after implementation)
python -m pytest pipeline/tests/test_stage1.py -v
Definition of Done
- All V1.1-V1.6 validation rules pass
reviews_rawpopulated with immutable recordsreviews_enrichedhas stubs ready for Stage 2- Integration test: Stage 0 output → Stage 1 input passes
- No empty texts in output
- Duplicate detection working
Stage 2: LLM Classification ❌ TODO
Files to Create
pipeline/stage2_classify.py- LLM classification logicpipeline/llm_client.py- LLM provider abstractionpipeline/span_extractor.py- Span boundary detectionpipeline/tests/test_stage2.py- Unit testsmigrations/006_create_spans_table.sql- Schema migrationmigrations/007_create_urt_enums.sql- ENUM types
Database Schema Required
-- ENUM types
CREATE TYPE urt_valence AS ENUM (...);
CREATE TYPE urt_intensity AS ENUM (...);
-- ... all 12 ENUMs from v3.2 spec
-- Spans table
CREATE TABLE review_spans (...);
Implementation Checklist
- Query unclassified reviews from
reviews_enriched - Build LLM prompt per
LLM-Classification-Contract-v1.md - Call LLM API (support GPT-4o-mini, Claude-3-haiku)
- Parse structured JSON response
- Extract spans with character offsets
- Validate span_text matches original text substring
- Check spans don't overlap
- Select primary span (I3 > I2 > I1, V- > V± > V0 > V+)
- Generate embeddings (384-dim)
- Compute trust score (0.2 floor)
- Build USN string per profile
- Update
reviews_enrichedwith classification - Insert spans into
review_spans - Return
Stage2Outputwith stats
Validation (run after implementation)
python -m pytest pipeline/tests/test_stage2.py -v
Definition of Done
- All V2.1-V2.12 validation rules pass
- LLM calls working with retry logic
- Span offsets correct (text substring matches)
- No overlapping spans
- Exactly one primary span per review
- Embeddings are 384-dim vectors
- Trust scores clamped to [0.2, 1.0]
- USN format valid per profile
- Integration test: Stage 1 output → Stage 2 input passes
Stage 3: Issue Routing ❌ TODO
Files to Create
pipeline/stage3_route.py- Issue routing logicpipeline/issue_manager.py- Issue create/update logicpipeline/tests/test_stage3.py- Unit testsmigrations/008_create_issues_tables.sql- Schema migration
Database Schema Required
CREATE TABLE issues (...);
CREATE TABLE issue_spans (...);
CREATE TABLE issue_events (...);
Implementation Checklist
- Query unrouted spans where
valence IN ('V-', 'V±') - Generate deterministic
issue_idfrom routing key - Check if issue exists
- Create new issue OR update existing counters
- Insert
issue_spanslink (enforce 1:1 with UNIQUE) - Log event to
issue_events - Recalculate priority score
- Return
Stage3Outputwith stats
Validation (run after implementation)
python -m pytest pipeline/tests/test_stage3.py -v
Definition of Done
- All V3.1-V3.5 validation rules pass
- Issue IDs are deterministic (same key = same ID)
- 1:1 span-to-issue mapping enforced
- Only V-/V± spans create issues
- Issue counters updated correctly
- Events logged for audit
- Integration test: Stage 2 output → Stage 3 input passes
Stage 4: Fact Aggregation ❌ TODO
Files to Create
pipeline/stage4_aggregate.py- Fact aggregation logicpipeline/tests/test_stage4.py- Unit testsmigrations/009_create_facts_table.sql- Schema migration
Database Schema Required
CREATE TABLE fact_timeseries (...);
Implementation Checklist
- Accept business_id, date, bucket_types
- Query spans joined with reviews for the period
- Aggregate by URT code (per location + 'ALL' rollup)
- Compute: review_count, span_count, valence counts
- Compute: strength_score, negative_strength, positive_strength
- Compute: intensity distribution (I1/I2/I3)
- Compute: CR counts (better/worse/same)
- Compute: trust-weighted metrics
- UPSERT into
fact_timeseries - Return
Stage4Outputwith stats
Validation (run after implementation)
python -m pytest pipeline/tests/test_stage4.py -v
Definition of Done
- All V4.1-V4.7 validation rules pass
- Valence counts sum to span_count
- Intensity counts sum to span_count
- 'ALL' rollup includes owned locations only
- Facts are idempotent (re-run produces same result)
- Integration test: Full pipeline E2E passes
Integration Tests
Handoff Tests (run after each stage)
# Stage 0 → 1
python -m pytest pipeline/tests/integration/test_stage0_to_1.py
# Stage 1 → 2
python -m pytest pipeline/tests/integration/test_stage1_to_2.py
# Stage 2 → 3
python -m pytest pipeline/tests/integration/test_stage2_to_3.py
# Full E2E
python -m pytest pipeline/tests/integration/test_e2e.py
E2E Validation Command
# Run full pipeline validation
python -m pipeline.validate --job-id <JOB_ID> --verbose
Expected output:
Stage 0: ✅ PASS (5/5 rules)
Stage 1: ✅ PASS (6/6 rules)
Stage 2: ✅ PASS (12/12 rules)
Stage 3: ✅ PASS (5/5 rules)
Stage 4: ✅ PASS (7/7 rules)
E2E Integration: ✅ PASS
- Reviews scraped: 47
- Reviews normalized: 45 (2 empty filtered)
- Spans extracted: 127
- Issues created: 23
- Facts written: 156
Quick Reference: Validation Rules
Stage 1
| Rule | Description |
|---|---|
| V1.1 | text is non-empty |
| V1.2 | text_normalized has no control chars |
| V1.3 | content_hash is 64-char hex |
| V1.4 | review_version >= 1 |
| V1.5 | text_language is valid ISO 639-1 |
| V1.6 | raw_id references valid row |
Stage 2
| Rule | Description |
|---|---|
| V2.1 | urt_primary matches ^[OPJEAVR][1-4]\.[0-9]{2}$ |
| V2.2 | urt_secondary max 2 elements |
| V2.3 | valence is valid enum |
| V2.4 | intensity is valid enum |
| V2.5 | span_end > span_start |
| V2.6 | span_text == text[start:end] |
| V2.7 | Spans don't overlap |
| V2.8 | Exactly one is_primary = true |
| V2.9 | trust_score in [0.2, 1.0] |
| V2.10 | embedding is 384-dim |
| V2.11 | usn matches profile regex |
| V2.12 | related_span_index valid if set |
Stage 3
| Rule | Description |
|---|---|
| V3.1 | issue_id matches ^ISS-[a-f0-9]{16}$ |
| V3.2 | routing_key non-empty |
| V3.3 | Span not already linked elsewhere |
| V3.4 | Issue exists in issues table |
| V3.5 | Only V-/V± spans routed |
Stage 4
| Rule | Description |
|---|---|
| V4.1 | place_id valid or 'ALL' |
| V4.2 | period_date matches bucket |
| V4.3 | span_count >= review_count |
| V4.4 | Valence counts sum correctly |
| V4.5 | Intensity counts sum correctly |
| V4.6 | strength_score >= 0 |
| V4.7 | avg_rating in [1.0, 5.0] or NULL |
Migration Execution Order
# Run in sequence
psql $DATABASE_URL -f migrations/005_create_reviews_tables.sql
psql $DATABASE_URL -f migrations/006_create_spans_table.sql
psql $DATABASE_URL -f migrations/007_create_urt_enums.sql
psql $DATABASE_URL -f migrations/008_create_issues_tables.sql
psql $DATABASE_URL -f migrations/009_create_facts_table.sql
Environment Variables Required
# LLM Provider (Stage 2)
OPENAI_API_KEY=sk-...
# OR
ANTHROPIC_API_KEY=sk-ant-...
# Embedding Model (Stage 2)
EMBEDDING_MODEL=all-MiniLM-L6-v2
# Database
DATABASE_URL=postgresql://...
# Taxonomy
DEFAULT_TAXONOMY_VERSION=v5.1
File Structure After Implementation
pipeline/
├── __init__.py
├── stage1_normalize.py # ❌ TODO
├── stage2_classify.py # ❌ TODO
├── stage3_route.py # ❌ TODO
├── stage4_aggregate.py # ❌ TODO
├── llm_client.py # ❌ TODO
├── span_extractor.py # ❌ TODO
├── issue_manager.py # ❌ TODO
├── validate.py # ❌ TODO (CLI validator)
├── contracts.py # ❌ TODO (TypedDict definitions)
└── tests/
├── __init__.py
├── test_stage1.py # ❌ TODO
├── test_stage2.py # ❌ TODO
├── test_stage3.py # ❌ TODO
├── test_stage4.py # ❌ TODO
├── fixtures/
│ ├── stage0_output.json
│ ├── stage1_output.json
│ ├── stage2_output.json
│ ├── stage3_output.json
│ └── stage4_output.json
└── integration/
├── test_stage0_to_1.py
├── test_stage1_to_2.py
├── test_stage2_to_3.py
└── test_e2e.py
migrations/
├── 001_add_job_platform_fields.sql # ✅ EXISTS
├── 002_create_batches_table.sql # ✅ EXISTS
├── 003_create_scraper_registry.sql # ✅ EXISTS
├── 004_create_api_keys.sql # ✅ EXISTS
├── 005_create_reviews_tables.sql # ❌ TODO
├── 006_create_spans_table.sql # ❌ TODO
├── 007_create_urt_enums.sql # ❌ TODO
├── 008_create_issues_tables.sql # ❌ TODO
└── 009_create_facts_table.sql # ❌ TODO
Last Updated: 2026-01-24