Files
whyrating-engine-legacy/.artifacts/ReviewIQ-Pipeline-Contracts-v1.md
Alejandro Gutiérrez acd3b22e88 docs: Add pipeline development artifacts for parallel implementation
New artifacts:
- ReviewIQ-Pipeline-DevGuide.md: Entry point for pipeline work
- ReviewIQ-Pipeline-Contracts-v1.md: Stage I/O specs, validation rules, test fixtures
- ReviewIQ-Pipeline-Checklist.md: Per-stage implementation checklists
- ReviewIQ-Codebase-Overview.md: File structure, integration points
- ReviewIQ-v3.2.1-Taxonomy-Versioning.md: Taxonomy versioning addendum

Updated:
- ReviewIQ-v32-Decisions.md: Added B2 audit findings, taxonomy versioning decisions, pipeline status

These artifacts enable parallel development of pipeline stages 1-4 with:
- Independent validation (35 rules across stages)
- Clear input/output contracts
- Test fixtures for each stage
- Definition of done criteria

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 17:08:40 +00:00

36 KiB

ReviewIQ Pipeline Contracts v1.0

Version: 1.0.0 Status: Implementation Blueprint Date: 2026-01-24 Purpose: Enable parallel development with independent stage validation


Executive Summary

This document defines the contract for each pipeline stage: inputs, outputs, validation rules, and test fixtures. Each stage can be developed and tested independently. Integration tests verify stage-to-stage compatibility.

┌─────────────────────────────────────────────────────────────────────────────┐
│                         REVIEWIQ PIPELINE STAGES                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  STAGE 0        STAGE 1         STAGE 2         STAGE 3         STAGE 4     │
│  ════════       ════════        ════════        ════════        ════════    │
│                                                                              │
│  [Scrape] ──▶ [Normalize] ──▶ [Classify] ──▶ [Route] ──▶ [Aggregate]       │
│                                                                              │
│  Google        Raw → Clean     LLM → Spans    Spans → Issues   Facts        │
│  Maps API      Text            + URT codes    Dedup + Link     Timeseries   │
│                                                                              │
│  ✅ DONE       ❌ TODO         ❌ TODO        ❌ TODO          ❌ TODO       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Table of Contents

  1. Stage 0: Raw Ingestion (Complete)
  2. Stage 1: Normalization
  3. Stage 2: LLM Classification
  4. Stage 3: Issue Routing
  5. Stage 4: Fact Aggregation
  6. Integration Tests
  7. Test Fixtures

Stage 0: Raw Ingestion

Status: COMPLETE Owner: Scraper Team Code: scrapers/google_reviews/v1_0_0.py

Input Contract

interface Stage0Input {
  job_id: string;              // UUID
  url: string;                 // Google Maps URL
  max_reviews?: number;        // Default: 100
  business_id: string;         // Tenant identifier
  place_id?: string;           // Google Place ID (extracted from URL if missing)
}

Output Contract

interface Stage0Output {
  job_id: string;
  status: 'completed' | 'failed' | 'partial';
  business_id: string;
  place_id: string;

  // Business metadata
  business_info: {
    name: string;
    address: string;
    category: string;
    total_reviews: number;
    average_rating: number;
  };

  // Raw reviews array
  reviews: RawReview[];

  // Execution metadata
  scrape_time_ms: number;
  reviews_scraped: number;
  scraper_version: string;
}

interface RawReview {
  review_id: string;           // Google's review identifier
  author_name: string;
  author_id?: string;
  rating: number;              // 1-5
  text: string | null;         // May be null for rating-only reviews
  review_time: string;         // ISO 8601
  response_text?: string;      // Owner response
  response_time?: string;
  photos?: string[];

  // Raw payload for audit
  raw_payload: object;
}

Validation Rules

Rule Check Error Code
V0.1 reviews is array STAGE0_INVALID_OUTPUT
V0.2 Each review has review_id STAGE0_MISSING_REVIEW_ID
V0.3 Each review has rating 1-5 STAGE0_INVALID_RATING
V0.4 review_time is valid ISO 8601 STAGE0_INVALID_TIMESTAMP
V0.5 business_info.name is non-empty STAGE0_MISSING_BUSINESS

Storage Location

-- Currently stored as JSONB blob
jobs.reviews_data -> Stage0Output

Handoff to Stage 1

Stage 1 reads from jobs.reviews_data where status = 'completed'.


Stage 1: Normalization

Status: NOT IMPLEMENTED Owner: TBD Depends On: Stage 0

Purpose

Transform raw scraped reviews into clean, versioned records ready for LLM classification.

Input Contract

// Reads Stage0Output from jobs.reviews_data
interface Stage1Input {
  job_id: string;
  business_id: string;
  place_id: string;
  reviews: RawReview[];        // From Stage 0
}

Output Contract

interface Stage1Output {
  job_id: string;
  business_id: string;
  place_id: string;

  // Normalized reviews ready for classification
  reviews_normalized: NormalizedReview[];

  // Processing stats
  stats: {
    input_count: number;
    output_count: number;
    skipped_empty: number;
    skipped_duplicate: number;
  };
}

interface NormalizedReview {
  // Identity (composite key)
  source: 'google';
  review_id: string;
  review_version: number;      // Increments if review edited

  // Tenant context
  business_id: string;
  place_id: string;

  // Content
  text: string;                // Original text (NOT NULL - empty reviews filtered)
  text_normalized: string;     // Cleaned: lowercase, whitespace normalized, emoji standardized
  text_language: string;       // ISO 639-1 code
  text_length: number;         // Character count
  word_count: number;

  // Metadata
  rating: number;
  review_time: string;         // ISO 8601
  author_name: string;
  author_id?: string;

  // Dedup
  content_hash: string;        // SHA256 of normalized text
  dedup_group_id?: string;     // If duplicate detected

  // Reference
  raw_id: number;              // FK to reviews_raw.id
}

Database Schema

-- Stage 1 writes to reviews_raw (immutable) and reviews_enriched (initial)
INSERT INTO reviews_raw (
  source, review_id, place_id, raw_payload,
  review_text, rating, review_time, reviewer_name, reviewer_id,
  review_version, pulled_at
) VALUES (...);

-- Then creates reviews_enriched stub (no classification yet)
INSERT INTO reviews_enriched (
  source, review_id, review_version, is_latest, raw_id,
  business_id, place_id, text, text_normalized, rating, review_time,
  language, taxonomy_version,
  -- Classification fields NULL until Stage 2
  urt_primary, valence, intensity, embedding, trust_score
) VALUES (
  ...,
  NULL, NULL, NULL, NULL, NULL  -- Filled by Stage 2
);

Validation Rules

Rule Check Error Code
V1.1 text is non-empty string STAGE1_EMPTY_TEXT
V1.2 text_normalized contains no control chars STAGE1_INVALID_NORMALIZATION
V1.3 content_hash is 64-char hex STAGE1_INVALID_HASH
V1.4 review_version >= 1 STAGE1_INVALID_VERSION
V1.5 text_language is valid ISO 639-1 STAGE1_INVALID_LANGUAGE
V1.6 raw_id references valid reviews_raw row STAGE1_ORPHAN_ENRICHED

Validation Script

def validate_stage1_output(output: Stage1Output) -> ValidationResult:
    """
    Run after Stage 1 completes.
    Returns: ValidationResult with passed/failed rules and error details.
    """
    errors = []

    for review in output.reviews_normalized:
        # V1.1: Non-empty text
        if not review.text or not review.text.strip():
            errors.append(ValidationError('V1.1', review.review_id, 'Empty text'))

        # V1.2: No control characters
        if has_control_chars(review.text_normalized):
            errors.append(ValidationError('V1.2', review.review_id, 'Control chars in normalized'))

        # V1.3: Valid hash
        if not is_valid_sha256(review.content_hash):
            errors.append(ValidationError('V1.3', review.review_id, 'Invalid content hash'))

        # V1.4: Version >= 1
        if review.review_version < 1:
            errors.append(ValidationError('V1.4', review.review_id, 'Invalid version'))

        # V1.5: Valid language code
        if not is_valid_iso639(review.text_language):
            errors.append(ValidationError('V1.5', review.review_id, 'Invalid language'))

    return ValidationResult(
        stage='stage1',
        passed=len(errors) == 0,
        error_count=len(errors),
        errors=errors
    )

Handoff to Stage 2

Stage 2 queries:

SELECT * FROM reviews_enriched
WHERE urt_primary IS NULL  -- Not yet classified
  AND is_latest = TRUE
ORDER BY review_time DESC
LIMIT 100;  -- Batch size

Stage 2: LLM Classification

Status: NOT IMPLEMENTED Owner: TBD Depends On: Stage 1 Contract Reference: LLM-Classification-Contract-v1.md

Purpose

Classify normalized reviews into URT codes with span-level extraction.

Input Contract

interface Stage2Input {
  // Batch of reviews to classify
  reviews: ReviewToClassify[];

  // Configuration
  config: {
    model: string;              // 'gpt-4o-mini', 'claude-3-haiku', etc.
    taxonomy_version: string;   // 'v5.1'
    profile: 'lite' | 'core' | 'standard' | 'full';
    max_spans_per_review: number;  // Default: 10
  };
}

interface ReviewToClassify {
  source: string;
  review_id: string;
  review_version: number;
  business_id: string;
  place_id: string;
  text: string;                 // Original text (offsets reference this)
  text_normalized: string;      // For LLM input
  rating: number;
  review_time: string;
}

Output Contract

interface Stage2Output {
  // Batch metadata
  batch_id: string;
  taxonomy_version: string;
  model_version: string;
  prompt_version: string;

  // Classified reviews
  reviews_classified: ClassifiedReview[];

  // Stats
  stats: {
    input_count: number;
    success_count: number;
    error_count: number;
    total_spans: number;
    avg_spans_per_review: number;
    llm_tokens_used: number;
    llm_cost_usd: number;
  };
}

interface ClassifiedReview {
  // Identity
  source: string;
  review_id: string;
  review_version: number;

  // Review-level classification (derived from primary span)
  urt_primary: string;          // Tier-3 code: 'J1.01'
  urt_secondary: string[];      // Max 2
  valence: 'V+' | 'V-' | 'V0' | 'V±';
  intensity: 'I1' | 'I2' | 'I3';
  comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S';

  // Extracted entities (from spans)
  staff_mentions: string[];
  quotes: Record<string, string>;  // code -> quote

  // Trust score
  trust_score: number;          // 0.2 to 1.0

  // Embedding
  embedding: number[];          // 384-dim vector

  // Spans
  spans: ExtractedSpan[];

  // Processing metadata
  classification_confidence: Record<string, number>;
  processing_time_ms: number;
}

interface ExtractedSpan {
  // Identity
  span_id: string;              // Deterministic: 'SPN-{hash}'
  span_index: number;           // 0-based within review

  // Position (offsets into original text, NOT normalized)
  span_text: string;            // Exact substring
  span_start: number;           // Character offset
  span_end: number;             // Character offset (exclusive)

  // Classification
  profile: 'lite' | 'core' | 'standard' | 'full';
  urt_primary: string;
  urt_secondary: string[];
  valence: 'V+' | 'V-' | 'V0' | 'V±';
  intensity: 'I1' | 'I2' | 'I3';
  comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S';

  // Extended (standard/full profile)
  specificity: 'S1' | 'S2' | 'S3';
  actionability: 'A1' | 'A2' | 'A3';
  temporal: 'TC' | 'TR' | 'TH' | 'TF';
  evidence: 'ES' | 'EI' | 'EC';

  // Entity
  entity?: string;
  entity_type?: 'location' | 'staff' | 'product' | 'process' | 'time' | 'other';
  entity_normalized?: string;

  // Causal (full profile)
  relation_type?: 'cause_of' | 'effect_of' | 'contrast' | 'resolution';
  related_span_index?: number;
  causal_chain?: CausalLink[];

  // Metadata
  confidence: 'high' | 'medium' | 'low';
  usn: string;                  // URT Semantic Notation

  // Flags
  is_primary: boolean;          // Primary span for this review
}

interface CausalLink {
  code: string;
  role: 'cause' | 'effect' | 'context' | 'outcome';
  order: number;
}

Database Writes

-- Update reviews_enriched with classification
UPDATE reviews_enriched SET
  urt_primary = $urt_primary,
  urt_secondary = $urt_secondary,
  valence = $valence,
  intensity = $intensity,
  comparative = $comparative,
  staff_mentions = $staff_mentions,
  quotes = $quotes,
  embedding = $embedding,
  trust_score = $trust_score,
  classification_model = $model_version,
  classification_confidence = $confidence,
  taxonomy_version = $taxonomy_version,
  processed_at = NOW()
WHERE source = $source
  AND review_id = $review_id
  AND review_version = $review_version;

-- Insert spans
INSERT INTO review_spans (
  span_id, business_id, place_id, source, review_id, review_version,
  span_index, span_text, span_start, span_end,
  profile, urt_primary, urt_secondary, valence, intensity, comparative,
  specificity, actionability, temporal, evidence,
  entity, entity_type, entity_normalized,
  relation_type, related_span_id, causal_chain,
  is_primary, is_active, review_time,
  confidence, usn, taxonomy_version, model_version, ingest_batch_id
) VALUES (...);

Validation Rules

Rule Check Error Code
V2.1 urt_primary matches ^[OPJEAVR][1-4]\.[0-9]{2}$ STAGE2_INVALID_URT_CODE
V2.2 urt_secondary has max 2 elements STAGE2_TOO_MANY_SECONDARY
V2.3 valence is valid enum value STAGE2_INVALID_VALENCE
V2.4 intensity is valid enum value STAGE2_INVALID_INTENSITY
V2.5 span_end > span_start STAGE2_INVALID_SPAN_BOUNDS
V2.6 span_text == text[span_start:span_end] STAGE2_SPAN_TEXT_MISMATCH
V2.7 Spans do not overlap STAGE2_OVERLAPPING_SPANS
V2.8 Exactly one is_primary = true per review STAGE2_PRIMARY_SPAN_COUNT
V2.9 trust_score >= 0.2 and <= 1.0 STAGE2_INVALID_TRUST
V2.10 embedding is 384-dim array STAGE2_INVALID_EMBEDDING
V2.11 usn matches profile-specific regex STAGE2_INVALID_USN
V2.12 related_span_index references valid span STAGE2_INVALID_RELATION

Validation Script

def validate_stage2_output(output: Stage2Output, input_reviews: dict) -> ValidationResult:
    """
    Validate Stage 2 classification output.
    input_reviews: dict mapping (source, review_id, version) -> original text
    """
    errors = []

    for review in output.reviews_classified:
        # V2.1: Valid URT code
        if not re.match(r'^[OPJEAVR][1-4]\.[0-9]{2}$', review.urt_primary):
            errors.append(ValidationError('V2.1', review.review_id, f'Invalid code: {review.urt_primary}'))

        # V2.2: Max 2 secondary codes
        if len(review.urt_secondary) > 2:
            errors.append(ValidationError('V2.2', review.review_id, 'Too many secondary codes'))

        # Get original text for span validation
        key = (review.source, review.review_id, review.review_version)
        original_text = input_reviews.get(key, {}).get('text', '')

        primary_count = 0
        span_ranges = []

        for span in review.spans:
            # V2.5: Valid bounds
            if span.span_end <= span.span_start:
                errors.append(ValidationError('V2.5', span.span_id, 'Invalid bounds'))

            # V2.6: Text matches
            expected_text = original_text[span.span_start:span.span_end]
            if span.span_text != expected_text:
                errors.append(ValidationError('V2.6', span.span_id,
                    f'Text mismatch: expected "{expected_text[:30]}..." got "{span.span_text[:30]}..."'))

            # V2.7: No overlap (check against previous spans)
            for prev_start, prev_end in span_ranges:
                if not (span.span_end <= prev_start or span.span_start >= prev_end):
                    errors.append(ValidationError('V2.7', span.span_id, 'Overlapping span'))
            span_ranges.append((span.span_start, span.span_end))

            # V2.8: Count primaries
            if span.is_primary:
                primary_count += 1

        if primary_count != 1:
            errors.append(ValidationError('V2.8', review.review_id, f'Primary count: {primary_count}'))

        # V2.9: Trust score bounds
        if not (0.2 <= review.trust_score <= 1.0):
            errors.append(ValidationError('V2.9', review.review_id, f'Trust: {review.trust_score}'))

        # V2.10: Embedding dimension
        if len(review.embedding) != 384:
            errors.append(ValidationError('V2.10', review.review_id, f'Embedding dim: {len(review.embedding)}'))

    return ValidationResult(
        stage='stage2',
        passed=len(errors) == 0,
        error_count=len(errors),
        errors=errors
    )

Handoff to Stage 3

Stage 3 queries:

SELECT rs.*
FROM review_spans rs
WHERE rs.is_active = TRUE
  AND rs.valence IN ('V-', 'V±')  -- Only negative/mixed create issues
  AND NOT EXISTS (
    SELECT 1 FROM issue_spans iss WHERE iss.span_id = rs.span_id
  )
ORDER BY rs.review_time DESC;

Stage 3: Issue Routing

Status: NOT IMPLEMENTED Owner: TBD Depends On: Stage 2

Purpose

Route classified spans to issues (create new or aggregate to existing).

Input Contract

interface Stage3Input {
  // Spans to route (from Stage 2)
  spans: SpanToRoute[];
}

interface SpanToRoute {
  span_id: string;
  business_id: string;
  place_id: string;
  urt_primary: string;
  valence: string;
  intensity: string;
  entity_normalized?: string;
  review_time: string;
  confidence: string;

  // For trust weighting
  trust_score: number;
}

Output Contract

interface Stage3Output {
  // Routing results
  routed_spans: RoutedSpan[];

  // Issues created/updated
  issues_created: string[];     // issue_ids
  issues_updated: string[];     // issue_ids

  // Stats
  stats: {
    spans_processed: number;
    spans_routed: number;
    spans_skipped: number;      // Positive valence, etc.
    issues_created: number;
    issues_updated: number;
  };
}

interface RoutedSpan {
  span_id: string;
  issue_id: string;
  routing_key: string;          // business|place|code|entity
  is_new_issue: boolean;
}

Issue ID Generation

// Deterministic issue ID from routing key
function generateIssueId(
  business_id: string,
  place_id: string,
  urt_primary: string,
  entity_normalized: string | null
): string {
  const key = `${business_id}|${place_id}|${urt_primary}|${entity_normalized || ''}`;
  const hash = sha256(key);
  return `ISS-${hash.substring(0, 16)}`;
}

Database Writes

-- Create or update issue
INSERT INTO issues (
  issue_id, business_id, place_id, primary_subcode, domain,
  state, priority_score, confidence_score, span_count, max_intensity,
  entity, entity_normalized, taxonomy_version
) VALUES (...)
ON CONFLICT (issue_id) DO UPDATE SET
  span_count = issues.span_count + 1,
  max_intensity = GREATEST(issues.max_intensity, EXCLUDED.max_intensity),
  updated_at = NOW();

-- Link span to issue
INSERT INTO issue_spans (
  issue_id, span_id, source, review_id, review_version,
  is_primary_match, intensity, review_time
) VALUES (...)
ON CONFLICT (span_id) DO NOTHING;  -- 1:1 mapping

-- Log event
INSERT INTO issue_events (issue_id, event_type, span_id, ...)
VALUES (...);

Validation Rules

Rule Check Error Code
V3.1 issue_id matches ^ISS-[a-f0-9]{16}$ STAGE3_INVALID_ISSUE_ID
V3.2 routing_key is non-empty STAGE3_EMPTY_ROUTING_KEY
V3.3 Span not already linked to different issue STAGE3_DUPLICATE_ROUTING
V3.4 Issue exists in issues table STAGE3_ORPHAN_SPAN_LINK
V3.5 Only V-/V± spans create issues STAGE3_POSITIVE_ROUTED

Validation Script

def validate_stage3_output(output: Stage3Output, db: Database) -> ValidationResult:
    errors = []

    for routed in output.routed_spans:
        # V3.1: Valid issue ID format
        if not re.match(r'^ISS-[a-f0-9]{16}$', routed.issue_id):
            errors.append(ValidationError('V3.1', routed.span_id, 'Invalid issue_id'))

        # V3.2: Non-empty routing key
        if not routed.routing_key:
            errors.append(ValidationError('V3.2', routed.span_id, 'Empty routing key'))

        # V3.3: Check no duplicate routing
        existing = db.query(
            "SELECT issue_id FROM issue_spans WHERE span_id = %s",
            [routed.span_id]
        )
        if existing and existing['issue_id'] != routed.issue_id:
            errors.append(ValidationError('V3.3', routed.span_id, 'Already routed elsewhere'))

        # V3.4: Issue exists
        issue_exists = db.query(
            "SELECT 1 FROM issues WHERE issue_id = %s",
            [routed.issue_id]
        )
        if not issue_exists:
            errors.append(ValidationError('V3.4', routed.span_id, 'Orphan issue_id'))

    return ValidationResult(
        stage='stage3',
        passed=len(errors) == 0,
        error_count=len(errors),
        errors=errors
    )

Handoff to Stage 4

Stage 4 runs as scheduled job, reads from all tables.


Stage 4: Fact Aggregation

Status: NOT IMPLEMENTED Owner: TBD Depends On: Stages 1, 2, 3 Schedule: Daily batch job

Purpose

Pre-aggregate span/review data into fact_timeseries for fast dashboard queries.

Input Contract

interface Stage4Input {
  business_id: string;
  date: string;                 // YYYY-MM-DD
  bucket_types: ('day' | 'week' | 'month')[];
  taxonomy_version: string;
}

Output Contract

interface Stage4Output {
  facts_written: FactRecord[];

  stats: {
    business_id: string;
    date: string;
    locations_processed: number;
    codes_aggregated: number;
    facts_upserted: number;
  };
}

interface FactRecord {
  // Keys
  business_id: string;
  place_id: string;             // Or 'ALL' for rollup
  period_date: string;
  bucket_type: string;
  subject_type: 'overall' | 'urt_code' | 'domain' | 'issue';
  subject_id: string;
  taxonomy_version: string;

  // Metrics
  review_count: number;
  span_count: number;
  negative_count: number;
  positive_count: number;
  neutral_count: number;
  mixed_count: number;
  strength_score: number;
  negative_strength: number;
  positive_strength: number;
  avg_rating: number;
  i1_count: number;
  i2_count: number;
  i3_count: number;
  cr_better: number;
  cr_worse: number;
  cr_same: number;
  trust_weighted_strength: number;
  trust_weighted_negative: number;
}

Database Writes

INSERT INTO fact_timeseries (
  business_id, place_id, period_date, bucket_type,
  subject_type, subject_id, taxonomy_version,
  review_count, span_count, negative_count, positive_count, ...
) VALUES (...)
ON CONFLICT (business_id, place_id, period_date, bucket_type,
             subject_type, subject_id, taxonomy_version)
DO UPDATE SET
  review_count = EXCLUDED.review_count,
  span_count = EXCLUDED.span_count,
  ...
  computed_at = NOW();

Validation Rules

Rule Check Error Code
V4.1 place_id is valid or 'ALL' STAGE4_INVALID_PLACE
V4.2 period_date matches bucket STAGE4_DATE_BUCKET_MISMATCH
V4.3 span_count >= review_count (reviews have >=1 span) STAGE4_COUNT_MISMATCH
V4.4 negative_count + positive_count + neutral_count + mixed_count == span_count STAGE4_VALENCE_SUM
V4.5 i1_count + i2_count + i3_count == span_count STAGE4_INTENSITY_SUM
V4.6 strength_score >= 0 STAGE4_NEGATIVE_STRENGTH
V4.7 avg_rating between 1.0 and 5.0 (or NULL) STAGE4_INVALID_RATING

Validation Script

def validate_stage4_output(output: Stage4Output) -> ValidationResult:
    errors = []

    for fact in output.facts_written:
        # V4.3: Span count >= review count
        if fact.span_count < fact.review_count:
            errors.append(ValidationError('V4.3', f'{fact.subject_id}', 'span < review count'))

        # V4.4: Valence sum
        valence_sum = fact.negative_count + fact.positive_count + fact.neutral_count + fact.mixed_count
        if valence_sum != fact.span_count:
            errors.append(ValidationError('V4.4', f'{fact.subject_id}',
                f'Valence sum {valence_sum} != {fact.span_count}'))

        # V4.5: Intensity sum
        intensity_sum = fact.i1_count + fact.i2_count + fact.i3_count
        if intensity_sum != fact.span_count:
            errors.append(ValidationError('V4.5', f'{fact.subject_id}',
                f'Intensity sum {intensity_sum} != {fact.span_count}'))

        # V4.7: Rating bounds
        if fact.avg_rating is not None and not (1.0 <= fact.avg_rating <= 5.0):
            errors.append(ValidationError('V4.7', f'{fact.subject_id}',
                f'Invalid rating: {fact.avg_rating}'))

    return ValidationResult(
        stage='stage4',
        passed=len(errors) == 0,
        error_count=len(errors),
        errors=errors
    )

Integration Tests

Test 1: Stage 0 → Stage 1 Handoff

def test_stage0_to_stage1():
    """Verify Stage 0 output is valid Stage 1 input."""

    # Get Stage 0 output
    stage0_output = db.query("SELECT reviews_data FROM jobs WHERE job_id = %s", [JOB_ID])

    # Validate as Stage 1 input
    stage1_input = Stage1Input.parse(stage0_output)

    # Check all required fields present
    assert stage1_input.job_id is not None
    assert stage1_input.business_id is not None
    assert len(stage1_input.reviews) > 0

    for review in stage1_input.reviews:
        assert review.review_id is not None
        assert review.rating in [1, 2, 3, 4, 5]
        assert review.review_time is not None

Test 2: Stage 1 → Stage 2 Handoff

def test_stage1_to_stage2():
    """Verify Stage 1 output can be classified."""

    # Get Stage 1 output from DB
    reviews = db.query("""
        SELECT * FROM reviews_enriched
        WHERE urt_primary IS NULL AND is_latest = TRUE
        LIMIT 10
    """)

    # Build Stage 2 input
    stage2_input = Stage2Input(
        reviews=[ReviewToClassify.from_db(r) for r in reviews],
        config=ClassificationConfig(
            model='gpt-4o-mini',
            taxonomy_version='v5.1',
            profile='standard'
        )
    )

    # Validate input
    for review in stage2_input.reviews:
        assert review.text is not None
        assert len(review.text) > 0
        assert review.text_normalized is not None

Test 3: Stage 2 → Stage 3 Handoff

def test_stage2_to_stage3():
    """Verify classified spans can be routed."""

    # Get unrouted negative spans
    spans = db.query("""
        SELECT * FROM review_spans
        WHERE is_active = TRUE
          AND valence IN ('V-', 'V±')
          AND NOT EXISTS (SELECT 1 FROM issue_spans WHERE span_id = review_spans.span_id)
        LIMIT 10
    """)

    # Build Stage 3 input
    stage3_input = Stage3Input(
        spans=[SpanToRoute.from_db(s) for s in spans]
    )

    # Validate
    for span in stage3_input.spans:
        assert span.span_id is not None
        assert span.business_id is not None
        assert span.urt_primary is not None
        assert span.valence in ['V-', 'V±']

Test 4: End-to-End Pipeline

def test_e2e_pipeline():
    """Full pipeline integration test."""

    # Stage 0: Scrape
    job_id = submit_scrape_job(TEST_URL)
    wait_for_job(job_id)
    stage0_result = validate_stage0_output(get_job(job_id))
    assert stage0_result.passed

    # Stage 1: Normalize
    stage1_output = run_stage1(job_id)
    stage1_result = validate_stage1_output(stage1_output)
    assert stage1_result.passed

    # Stage 2: Classify
    stage2_output = run_stage2(stage1_output.reviews_normalized)
    stage2_result = validate_stage2_output(stage2_output, stage1_output)
    assert stage2_result.passed

    # Stage 3: Route
    negative_spans = [s for r in stage2_output.reviews_classified
                      for s in r.spans if s.valence in ['V-', 'V±']]
    stage3_output = run_stage3(negative_spans)
    stage3_result = validate_stage3_output(stage3_output, db)
    assert stage3_result.passed

    # Stage 4: Aggregate
    stage4_output = run_stage4(TEST_BUSINESS_ID, today(), ['day'])
    stage4_result = validate_stage4_output(stage4_output)
    assert stage4_result.passed

    # Final assertion: Facts exist
    facts = db.query("""
        SELECT COUNT(*) FROM fact_timeseries
        WHERE business_id = %s AND period_date = %s
    """, [TEST_BUSINESS_ID, today()])
    assert facts['count'] > 0

Test Fixtures

Fixture 1: Sample Raw Review (Stage 0 Output)

{
  "job_id": "test-job-001",
  "status": "completed",
  "business_id": "acme-corp",
  "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
  "business_info": {
    "name": "Acme Restaurant",
    "address": "123 Main St, Anytown, USA",
    "category": "Restaurant",
    "total_reviews": 1247,
    "average_rating": 4.2
  },
  "reviews": [
    {
      "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
      "author_name": "John Smith",
      "author_id": "103456789012345678901",
      "rating": 2,
      "text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.",
      "review_time": "2026-01-20T14:30:00Z",
      "response_text": null,
      "photos": [],
      "raw_payload": {}
    }
  ],
  "scrape_time_ms": 12500,
  "reviews_scraped": 1,
  "scraper_version": "v1.0.0"
}

Fixture 2: Normalized Review (Stage 1 Output)

{
  "source": "google",
  "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
  "review_version": 1,
  "business_id": "acme-corp",
  "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
  "text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.",
  "text_normalized": "the food was great but the wait was absolutely terrible we waited 45 minutes just to be seated and another 30 minutes for our appetizers the server mike was rude and dismissive when we complained however the steak was cooked perfectly and the dessert was amazing",
  "text_language": "en",
  "text_length": 267,
  "word_count": 52,
  "rating": 2,
  "review_time": "2026-01-20T14:30:00Z",
  "author_name": "John Smith",
  "content_hash": "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd",
  "raw_id": 12345
}

Fixture 3: Classified Review with Spans (Stage 2 Output)

{
  "source": "google",
  "review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
  "review_version": 1,
  "urt_primary": "J1.01",
  "urt_secondary": ["P1.02"],
  "valence": "V±",
  "intensity": "I3",
  "comparative": "CR-N",
  "staff_mentions": ["Mike"],
  "quotes": {
    "J1.01": "waited 45 minutes just to be seated",
    "P1.02": "rude and dismissive"
  },
  "trust_score": 0.85,
  "embedding": [0.123, -0.456, 0.789, "... 384 values ..."],
  "spans": [
    {
      "span_id": "SPN-a1b2c3d4e5f67890",
      "span_index": 0,
      "span_text": "The food was great",
      "span_start": 0,
      "span_end": 18,
      "profile": "standard",
      "urt_primary": "O1.01",
      "urt_secondary": [],
      "valence": "V+",
      "intensity": "I2",
      "comparative": "CR-N",
      "specificity": "S1",
      "actionability": "A1",
      "temporal": "TC",
      "evidence": "ES",
      "confidence": "high",
      "usn": "URT:S:O1.01:+2:11TC.ES.N",
      "is_primary": false
    },
    {
      "span_id": "SPN-b2c3d4e5f6789012",
      "span_index": 1,
      "span_text": "the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers",
      "span_start": 23,
      "span_end": 138,
      "profile": "standard",
      "urt_primary": "J1.01",
      "urt_secondary": [],
      "valence": "V-",
      "intensity": "I3",
      "comparative": "CR-N",
      "specificity": "S3",
      "actionability": "A2",
      "temporal": "TC",
      "evidence": "EC",
      "confidence": "high",
      "usn": "URT:S:J1.01:-3:32TC.EC.N",
      "is_primary": true
    },
    {
      "span_id": "SPN-c3d4e5f678901234",
      "span_index": 2,
      "span_text": "The server Mike was rude and dismissive when we complained",
      "span_start": 140,
      "span_end": 198,
      "profile": "standard",
      "urt_primary": "P1.02",
      "urt_secondary": [],
      "valence": "V-",
      "intensity": "I2",
      "comparative": "CR-N",
      "specificity": "S2",
      "actionability": "A2",
      "temporal": "TC",
      "evidence": "ES",
      "entity": "Mike",
      "entity_type": "staff",
      "entity_normalized": "mike",
      "confidence": "high",
      "usn": "URT:S:P1.02:-2:22TC.ES.N",
      "is_primary": false
    },
    {
      "span_id": "SPN-d4e5f67890123456",
      "span_index": 3,
      "span_text": "the steak was cooked perfectly and the dessert was amazing",
      "span_start": 209,
      "span_end": 267,
      "profile": "standard",
      "urt_primary": "O1.01",
      "urt_secondary": [],
      "valence": "V+",
      "intensity": "I2",
      "comparative": "CR-N",
      "specificity": "S2",
      "actionability": "A1",
      "temporal": "TC",
      "evidence": "ES",
      "confidence": "high",
      "usn": "URT:S:O1.01:+2:21TC.ES.N",
      "is_primary": false
    }
  ]
}

Fixture 4: Routed Spans (Stage 3 Output)

{
  "routed_spans": [
    {
      "span_id": "SPN-b2c3d4e5f6789012",
      "issue_id": "ISS-7a8b9c0d1e2f3a4b",
      "routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|J1.01|",
      "is_new_issue": true
    },
    {
      "span_id": "SPN-c3d4e5f678901234",
      "issue_id": "ISS-2b3c4d5e6f7a8b9c",
      "routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|P1.02|mike",
      "is_new_issue": true
    }
  ],
  "issues_created": ["ISS-7a8b9c0d1e2f3a4b", "ISS-2b3c4d5e6f7a8b9c"],
  "issues_updated": [],
  "stats": {
    "spans_processed": 4,
    "spans_routed": 2,
    "spans_skipped": 2,
    "issues_created": 2,
    "issues_updated": 0
  }
}

Fixture 5: Aggregated Facts (Stage 4 Output)

{
  "facts_written": [
    {
      "business_id": "acme-corp",
      "place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
      "period_date": "2026-01-20",
      "bucket_type": "day",
      "subject_type": "urt_code",
      "subject_id": "J1.01",
      "taxonomy_version": "v5.1",
      "review_count": 1,
      "span_count": 1,
      "negative_count": 1,
      "positive_count": 0,
      "neutral_count": 0,
      "mixed_count": 0,
      "strength_score": 4.0,
      "negative_strength": 4.0,
      "positive_strength": 0.0,
      "avg_rating": 2.0,
      "i1_count": 0,
      "i2_count": 0,
      "i3_count": 1,
      "cr_better": 0,
      "cr_worse": 0,
      "cr_same": 0,
      "trust_weighted_strength": 3.4,
      "trust_weighted_negative": 3.4
    }
  ],
  "stats": {
    "business_id": "acme-corp",
    "date": "2026-01-20",
    "locations_processed": 1,
    "codes_aggregated": 3,
    "facts_upserted": 5
  }
}

Validation Summary Table

Stage Input From Output To Validation Rules Test Coverage
0 Google Maps jobs.reviews_data V0.1-V0.5 Unit + E2E
1 Stage 0 reviews_raw, reviews_enriched V1.1-V1.6 Unit + Integration
2 Stage 1 reviews_enriched, review_spans V2.1-V2.12 Unit + Integration
3 Stage 2 issues, issue_spans, issue_events V3.1-V3.5 Unit + Integration
4 All stages fact_timeseries V4.1-V4.7 Unit + E2E

Document Control

Field Value
Document ReviewIQ Pipeline Contracts v1.0
Status Implementation Blueprint
Date 2026-01-24
Purpose Enable parallel development with stage validation

End of Pipeline Contracts Document