New artifacts: - ReviewIQ-Pipeline-DevGuide.md: Entry point for pipeline work - ReviewIQ-Pipeline-Contracts-v1.md: Stage I/O specs, validation rules, test fixtures - ReviewIQ-Pipeline-Checklist.md: Per-stage implementation checklists - ReviewIQ-Codebase-Overview.md: File structure, integration points - ReviewIQ-v3.2.1-Taxonomy-Versioning.md: Taxonomy versioning addendum Updated: - ReviewIQ-v32-Decisions.md: Added B2 audit findings, taxonomy versioning decisions, pipeline status These artifacts enable parallel development of pipeline stages 1-4 with: - Independent validation (35 rules across stages) - Clear input/output contracts - Test fixtures for each stage - Definition of done criteria Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1256 lines
36 KiB
Markdown
1256 lines
36 KiB
Markdown
# ReviewIQ Pipeline Contracts v1.0
|
|
|
|
**Version**: 1.0.0
|
|
**Status**: Implementation Blueprint
|
|
**Date**: 2026-01-24
|
|
**Purpose**: Enable parallel development with independent stage validation
|
|
|
|
---
|
|
|
|
## Executive Summary
|
|
|
|
This document defines the **contract** for each pipeline stage: inputs, outputs, validation rules, and test fixtures. Each stage can be developed and tested independently. Integration tests verify stage-to-stage compatibility.
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ REVIEWIQ PIPELINE STAGES │
|
|
├─────────────────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ STAGE 0 STAGE 1 STAGE 2 STAGE 3 STAGE 4 │
|
|
│ ════════ ════════ ════════ ════════ ════════ │
|
|
│ │
|
|
│ [Scrape] ──▶ [Normalize] ──▶ [Classify] ──▶ [Route] ──▶ [Aggregate] │
|
|
│ │
|
|
│ Google Raw → Clean LLM → Spans Spans → Issues Facts │
|
|
│ Maps API Text + URT codes Dedup + Link Timeseries │
|
|
│ │
|
|
│ ✅ DONE ❌ TODO ❌ TODO ❌ TODO ❌ TODO │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [Stage 0: Raw Ingestion](#stage-0-raw-ingestion) (Complete)
|
|
2. [Stage 1: Normalization](#stage-1-normalization)
|
|
3. [Stage 2: LLM Classification](#stage-2-llm-classification)
|
|
4. [Stage 3: Issue Routing](#stage-3-issue-routing)
|
|
5. [Stage 4: Fact Aggregation](#stage-4-fact-aggregation)
|
|
6. [Integration Tests](#integration-tests)
|
|
7. [Test Fixtures](#test-fixtures)
|
|
|
|
---
|
|
|
|
## Stage 0: Raw Ingestion
|
|
|
|
**Status**: ✅ COMPLETE
|
|
**Owner**: Scraper Team
|
|
**Code**: `scrapers/google_reviews/v1_0_0.py`
|
|
|
|
### Input Contract
|
|
|
|
```typescript
|
|
interface Stage0Input {
|
|
job_id: string; // UUID
|
|
url: string; // Google Maps URL
|
|
max_reviews?: number; // Default: 100
|
|
business_id: string; // Tenant identifier
|
|
place_id?: string; // Google Place ID (extracted from URL if missing)
|
|
}
|
|
```
|
|
|
|
### Output Contract
|
|
|
|
```typescript
|
|
interface Stage0Output {
|
|
job_id: string;
|
|
status: 'completed' | 'failed' | 'partial';
|
|
business_id: string;
|
|
place_id: string;
|
|
|
|
// Business metadata
|
|
business_info: {
|
|
name: string;
|
|
address: string;
|
|
category: string;
|
|
total_reviews: number;
|
|
average_rating: number;
|
|
};
|
|
|
|
// Raw reviews array
|
|
reviews: RawReview[];
|
|
|
|
// Execution metadata
|
|
scrape_time_ms: number;
|
|
reviews_scraped: number;
|
|
scraper_version: string;
|
|
}
|
|
|
|
interface RawReview {
|
|
review_id: string; // Google's review identifier
|
|
author_name: string;
|
|
author_id?: string;
|
|
rating: number; // 1-5
|
|
text: string | null; // May be null for rating-only reviews
|
|
review_time: string; // ISO 8601
|
|
response_text?: string; // Owner response
|
|
response_time?: string;
|
|
photos?: string[];
|
|
|
|
// Raw payload for audit
|
|
raw_payload: object;
|
|
}
|
|
```
|
|
|
|
### Validation Rules
|
|
|
|
| Rule | Check | Error Code |
|
|
|------|-------|------------|
|
|
| V0.1 | `reviews` is array | `STAGE0_INVALID_OUTPUT` |
|
|
| V0.2 | Each review has `review_id` | `STAGE0_MISSING_REVIEW_ID` |
|
|
| V0.3 | Each review has `rating` 1-5 | `STAGE0_INVALID_RATING` |
|
|
| V0.4 | `review_time` is valid ISO 8601 | `STAGE0_INVALID_TIMESTAMP` |
|
|
| V0.5 | `business_info.name` is non-empty | `STAGE0_MISSING_BUSINESS` |
|
|
|
|
### Storage Location
|
|
|
|
```sql
|
|
-- Currently stored as JSONB blob
|
|
jobs.reviews_data -> Stage0Output
|
|
```
|
|
|
|
### Handoff to Stage 1
|
|
|
|
Stage 1 reads from `jobs.reviews_data` where `status = 'completed'`.
|
|
|
|
---
|
|
|
|
## Stage 1: Normalization
|
|
|
|
**Status**: ❌ NOT IMPLEMENTED
|
|
**Owner**: TBD
|
|
**Depends On**: Stage 0
|
|
|
|
### Purpose
|
|
|
|
Transform raw scraped reviews into clean, versioned records ready for LLM classification.
|
|
|
|
### Input Contract
|
|
|
|
```typescript
|
|
// Reads Stage0Output from jobs.reviews_data
|
|
interface Stage1Input {
|
|
job_id: string;
|
|
business_id: string;
|
|
place_id: string;
|
|
reviews: RawReview[]; // From Stage 0
|
|
}
|
|
```
|
|
|
|
### Output Contract
|
|
|
|
```typescript
|
|
interface Stage1Output {
|
|
job_id: string;
|
|
business_id: string;
|
|
place_id: string;
|
|
|
|
// Normalized reviews ready for classification
|
|
reviews_normalized: NormalizedReview[];
|
|
|
|
// Processing stats
|
|
stats: {
|
|
input_count: number;
|
|
output_count: number;
|
|
skipped_empty: number;
|
|
skipped_duplicate: number;
|
|
};
|
|
}
|
|
|
|
interface NormalizedReview {
|
|
// Identity (composite key)
|
|
source: 'google';
|
|
review_id: string;
|
|
review_version: number; // Increments if review edited
|
|
|
|
// Tenant context
|
|
business_id: string;
|
|
place_id: string;
|
|
|
|
// Content
|
|
text: string; // Original text (NOT NULL - empty reviews filtered)
|
|
text_normalized: string; // Cleaned: lowercase, whitespace normalized, emoji standardized
|
|
text_language: string; // ISO 639-1 code
|
|
text_length: number; // Character count
|
|
word_count: number;
|
|
|
|
// Metadata
|
|
rating: number;
|
|
review_time: string; // ISO 8601
|
|
author_name: string;
|
|
author_id?: string;
|
|
|
|
// Dedup
|
|
content_hash: string; // SHA256 of normalized text
|
|
dedup_group_id?: string; // If duplicate detected
|
|
|
|
// Reference
|
|
raw_id: number; // FK to reviews_raw.id
|
|
}
|
|
```
|
|
|
|
### Database Schema
|
|
|
|
```sql
|
|
-- Stage 1 writes to reviews_raw (immutable) and reviews_enriched (initial)
|
|
INSERT INTO reviews_raw (
|
|
source, review_id, place_id, raw_payload,
|
|
review_text, rating, review_time, reviewer_name, reviewer_id,
|
|
review_version, pulled_at
|
|
) VALUES (...);
|
|
|
|
-- Then creates reviews_enriched stub (no classification yet)
|
|
INSERT INTO reviews_enriched (
|
|
source, review_id, review_version, is_latest, raw_id,
|
|
business_id, place_id, text, text_normalized, rating, review_time,
|
|
language, taxonomy_version,
|
|
-- Classification fields NULL until Stage 2
|
|
urt_primary, valence, intensity, embedding, trust_score
|
|
) VALUES (
|
|
...,
|
|
NULL, NULL, NULL, NULL, NULL -- Filled by Stage 2
|
|
);
|
|
```
|
|
|
|
### Validation Rules
|
|
|
|
| Rule | Check | Error Code |
|
|
|------|-------|------------|
|
|
| V1.1 | `text` is non-empty string | `STAGE1_EMPTY_TEXT` |
|
|
| V1.2 | `text_normalized` contains no control chars | `STAGE1_INVALID_NORMALIZATION` |
|
|
| V1.3 | `content_hash` is 64-char hex | `STAGE1_INVALID_HASH` |
|
|
| V1.4 | `review_version` >= 1 | `STAGE1_INVALID_VERSION` |
|
|
| V1.5 | `text_language` is valid ISO 639-1 | `STAGE1_INVALID_LANGUAGE` |
|
|
| V1.6 | `raw_id` references valid reviews_raw row | `STAGE1_ORPHAN_ENRICHED` |
|
|
|
|
### Validation Script
|
|
|
|
```python
|
|
def validate_stage1_output(output: Stage1Output) -> ValidationResult:
|
|
"""
|
|
Run after Stage 1 completes.
|
|
Returns: ValidationResult with passed/failed rules and error details.
|
|
"""
|
|
errors = []
|
|
|
|
for review in output.reviews_normalized:
|
|
# V1.1: Non-empty text
|
|
if not review.text or not review.text.strip():
|
|
errors.append(ValidationError('V1.1', review.review_id, 'Empty text'))
|
|
|
|
# V1.2: No control characters
|
|
if has_control_chars(review.text_normalized):
|
|
errors.append(ValidationError('V1.2', review.review_id, 'Control chars in normalized'))
|
|
|
|
# V1.3: Valid hash
|
|
if not is_valid_sha256(review.content_hash):
|
|
errors.append(ValidationError('V1.3', review.review_id, 'Invalid content hash'))
|
|
|
|
# V1.4: Version >= 1
|
|
if review.review_version < 1:
|
|
errors.append(ValidationError('V1.4', review.review_id, 'Invalid version'))
|
|
|
|
# V1.5: Valid language code
|
|
if not is_valid_iso639(review.text_language):
|
|
errors.append(ValidationError('V1.5', review.review_id, 'Invalid language'))
|
|
|
|
return ValidationResult(
|
|
stage='stage1',
|
|
passed=len(errors) == 0,
|
|
error_count=len(errors),
|
|
errors=errors
|
|
)
|
|
```
|
|
|
|
### Handoff to Stage 2
|
|
|
|
Stage 2 queries:
|
|
```sql
|
|
SELECT * FROM reviews_enriched
|
|
WHERE urt_primary IS NULL -- Not yet classified
|
|
AND is_latest = TRUE
|
|
ORDER BY review_time DESC
|
|
LIMIT 100; -- Batch size
|
|
```
|
|
|
|
---
|
|
|
|
## Stage 2: LLM Classification
|
|
|
|
**Status**: ❌ NOT IMPLEMENTED
|
|
**Owner**: TBD
|
|
**Depends On**: Stage 1
|
|
**Contract Reference**: `LLM-Classification-Contract-v1.md`
|
|
|
|
### Purpose
|
|
|
|
Classify normalized reviews into URT codes with span-level extraction.
|
|
|
|
### Input Contract
|
|
|
|
```typescript
|
|
interface Stage2Input {
|
|
// Batch of reviews to classify
|
|
reviews: ReviewToClassify[];
|
|
|
|
// Configuration
|
|
config: {
|
|
model: string; // 'gpt-4o-mini', 'claude-3-haiku', etc.
|
|
taxonomy_version: string; // 'v5.1'
|
|
profile: 'lite' | 'core' | 'standard' | 'full';
|
|
max_spans_per_review: number; // Default: 10
|
|
};
|
|
}
|
|
|
|
interface ReviewToClassify {
|
|
source: string;
|
|
review_id: string;
|
|
review_version: number;
|
|
business_id: string;
|
|
place_id: string;
|
|
text: string; // Original text (offsets reference this)
|
|
text_normalized: string; // For LLM input
|
|
rating: number;
|
|
review_time: string;
|
|
}
|
|
```
|
|
|
|
### Output Contract
|
|
|
|
```typescript
|
|
interface Stage2Output {
|
|
// Batch metadata
|
|
batch_id: string;
|
|
taxonomy_version: string;
|
|
model_version: string;
|
|
prompt_version: string;
|
|
|
|
// Classified reviews
|
|
reviews_classified: ClassifiedReview[];
|
|
|
|
// Stats
|
|
stats: {
|
|
input_count: number;
|
|
success_count: number;
|
|
error_count: number;
|
|
total_spans: number;
|
|
avg_spans_per_review: number;
|
|
llm_tokens_used: number;
|
|
llm_cost_usd: number;
|
|
};
|
|
}
|
|
|
|
interface ClassifiedReview {
|
|
// Identity
|
|
source: string;
|
|
review_id: string;
|
|
review_version: number;
|
|
|
|
// Review-level classification (derived from primary span)
|
|
urt_primary: string; // Tier-3 code: 'J1.01'
|
|
urt_secondary: string[]; // Max 2
|
|
valence: 'V+' | 'V-' | 'V0' | 'V±';
|
|
intensity: 'I1' | 'I2' | 'I3';
|
|
comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S';
|
|
|
|
// Extracted entities (from spans)
|
|
staff_mentions: string[];
|
|
quotes: Record<string, string>; // code -> quote
|
|
|
|
// Trust score
|
|
trust_score: number; // 0.2 to 1.0
|
|
|
|
// Embedding
|
|
embedding: number[]; // 384-dim vector
|
|
|
|
// Spans
|
|
spans: ExtractedSpan[];
|
|
|
|
// Processing metadata
|
|
classification_confidence: Record<string, number>;
|
|
processing_time_ms: number;
|
|
}
|
|
|
|
interface ExtractedSpan {
|
|
// Identity
|
|
span_id: string; // Deterministic: 'SPN-{hash}'
|
|
span_index: number; // 0-based within review
|
|
|
|
// Position (offsets into original text, NOT normalized)
|
|
span_text: string; // Exact substring
|
|
span_start: number; // Character offset
|
|
span_end: number; // Character offset (exclusive)
|
|
|
|
// Classification
|
|
profile: 'lite' | 'core' | 'standard' | 'full';
|
|
urt_primary: string;
|
|
urt_secondary: string[];
|
|
valence: 'V+' | 'V-' | 'V0' | 'V±';
|
|
intensity: 'I1' | 'I2' | 'I3';
|
|
comparative: 'CR-N' | 'CR-B' | 'CR-W' | 'CR-S';
|
|
|
|
// Extended (standard/full profile)
|
|
specificity: 'S1' | 'S2' | 'S3';
|
|
actionability: 'A1' | 'A2' | 'A3';
|
|
temporal: 'TC' | 'TR' | 'TH' | 'TF';
|
|
evidence: 'ES' | 'EI' | 'EC';
|
|
|
|
// Entity
|
|
entity?: string;
|
|
entity_type?: 'location' | 'staff' | 'product' | 'process' | 'time' | 'other';
|
|
entity_normalized?: string;
|
|
|
|
// Causal (full profile)
|
|
relation_type?: 'cause_of' | 'effect_of' | 'contrast' | 'resolution';
|
|
related_span_index?: number;
|
|
causal_chain?: CausalLink[];
|
|
|
|
// Metadata
|
|
confidence: 'high' | 'medium' | 'low';
|
|
usn: string; // URT Semantic Notation
|
|
|
|
// Flags
|
|
is_primary: boolean; // Primary span for this review
|
|
}
|
|
|
|
interface CausalLink {
|
|
code: string;
|
|
role: 'cause' | 'effect' | 'context' | 'outcome';
|
|
order: number;
|
|
}
|
|
```
|
|
|
|
### Database Writes
|
|
|
|
```sql
|
|
-- Update reviews_enriched with classification
|
|
UPDATE reviews_enriched SET
|
|
urt_primary = $urt_primary,
|
|
urt_secondary = $urt_secondary,
|
|
valence = $valence,
|
|
intensity = $intensity,
|
|
comparative = $comparative,
|
|
staff_mentions = $staff_mentions,
|
|
quotes = $quotes,
|
|
embedding = $embedding,
|
|
trust_score = $trust_score,
|
|
classification_model = $model_version,
|
|
classification_confidence = $confidence,
|
|
taxonomy_version = $taxonomy_version,
|
|
processed_at = NOW()
|
|
WHERE source = $source
|
|
AND review_id = $review_id
|
|
AND review_version = $review_version;
|
|
|
|
-- Insert spans
|
|
INSERT INTO review_spans (
|
|
span_id, business_id, place_id, source, review_id, review_version,
|
|
span_index, span_text, span_start, span_end,
|
|
profile, urt_primary, urt_secondary, valence, intensity, comparative,
|
|
specificity, actionability, temporal, evidence,
|
|
entity, entity_type, entity_normalized,
|
|
relation_type, related_span_id, causal_chain,
|
|
is_primary, is_active, review_time,
|
|
confidence, usn, taxonomy_version, model_version, ingest_batch_id
|
|
) VALUES (...);
|
|
```
|
|
|
|
### Validation Rules
|
|
|
|
| Rule | Check | Error Code |
|
|
|------|-------|------------|
|
|
| V2.1 | `urt_primary` matches `^[OPJEAVR][1-4]\.[0-9]{2}$` | `STAGE2_INVALID_URT_CODE` |
|
|
| V2.2 | `urt_secondary` has max 2 elements | `STAGE2_TOO_MANY_SECONDARY` |
|
|
| V2.3 | `valence` is valid enum value | `STAGE2_INVALID_VALENCE` |
|
|
| V2.4 | `intensity` is valid enum value | `STAGE2_INVALID_INTENSITY` |
|
|
| V2.5 | `span_end > span_start` | `STAGE2_INVALID_SPAN_BOUNDS` |
|
|
| V2.6 | `span_text == text[span_start:span_end]` | `STAGE2_SPAN_TEXT_MISMATCH` |
|
|
| V2.7 | Spans do not overlap | `STAGE2_OVERLAPPING_SPANS` |
|
|
| V2.8 | Exactly one `is_primary = true` per review | `STAGE2_PRIMARY_SPAN_COUNT` |
|
|
| V2.9 | `trust_score` >= 0.2 and <= 1.0 | `STAGE2_INVALID_TRUST` |
|
|
| V2.10 | `embedding` is 384-dim array | `STAGE2_INVALID_EMBEDDING` |
|
|
| V2.11 | `usn` matches profile-specific regex | `STAGE2_INVALID_USN` |
|
|
| V2.12 | `related_span_index` references valid span | `STAGE2_INVALID_RELATION` |
|
|
|
|
### Validation Script
|
|
|
|
```python
|
|
def validate_stage2_output(output: Stage2Output, input_reviews: dict) -> ValidationResult:
|
|
"""
|
|
Validate Stage 2 classification output.
|
|
input_reviews: dict mapping (source, review_id, version) -> original text
|
|
"""
|
|
errors = []
|
|
|
|
for review in output.reviews_classified:
|
|
# V2.1: Valid URT code
|
|
if not re.match(r'^[OPJEAVR][1-4]\.[0-9]{2}$', review.urt_primary):
|
|
errors.append(ValidationError('V2.1', review.review_id, f'Invalid code: {review.urt_primary}'))
|
|
|
|
# V2.2: Max 2 secondary codes
|
|
if len(review.urt_secondary) > 2:
|
|
errors.append(ValidationError('V2.2', review.review_id, 'Too many secondary codes'))
|
|
|
|
# Get original text for span validation
|
|
key = (review.source, review.review_id, review.review_version)
|
|
original_text = input_reviews.get(key, {}).get('text', '')
|
|
|
|
primary_count = 0
|
|
span_ranges = []
|
|
|
|
for span in review.spans:
|
|
# V2.5: Valid bounds
|
|
if span.span_end <= span.span_start:
|
|
errors.append(ValidationError('V2.5', span.span_id, 'Invalid bounds'))
|
|
|
|
# V2.6: Text matches
|
|
expected_text = original_text[span.span_start:span.span_end]
|
|
if span.span_text != expected_text:
|
|
errors.append(ValidationError('V2.6', span.span_id,
|
|
f'Text mismatch: expected "{expected_text[:30]}..." got "{span.span_text[:30]}..."'))
|
|
|
|
# V2.7: No overlap (check against previous spans)
|
|
for prev_start, prev_end in span_ranges:
|
|
if not (span.span_end <= prev_start or span.span_start >= prev_end):
|
|
errors.append(ValidationError('V2.7', span.span_id, 'Overlapping span'))
|
|
span_ranges.append((span.span_start, span.span_end))
|
|
|
|
# V2.8: Count primaries
|
|
if span.is_primary:
|
|
primary_count += 1
|
|
|
|
if primary_count != 1:
|
|
errors.append(ValidationError('V2.8', review.review_id, f'Primary count: {primary_count}'))
|
|
|
|
# V2.9: Trust score bounds
|
|
if not (0.2 <= review.trust_score <= 1.0):
|
|
errors.append(ValidationError('V2.9', review.review_id, f'Trust: {review.trust_score}'))
|
|
|
|
# V2.10: Embedding dimension
|
|
if len(review.embedding) != 384:
|
|
errors.append(ValidationError('V2.10', review.review_id, f'Embedding dim: {len(review.embedding)}'))
|
|
|
|
return ValidationResult(
|
|
stage='stage2',
|
|
passed=len(errors) == 0,
|
|
error_count=len(errors),
|
|
errors=errors
|
|
)
|
|
```
|
|
|
|
### Handoff to Stage 3
|
|
|
|
Stage 3 queries:
|
|
```sql
|
|
SELECT rs.*
|
|
FROM review_spans rs
|
|
WHERE rs.is_active = TRUE
|
|
AND rs.valence IN ('V-', 'V±') -- Only negative/mixed create issues
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM issue_spans iss WHERE iss.span_id = rs.span_id
|
|
)
|
|
ORDER BY rs.review_time DESC;
|
|
```
|
|
|
|
---
|
|
|
|
## Stage 3: Issue Routing
|
|
|
|
**Status**: ❌ NOT IMPLEMENTED
|
|
**Owner**: TBD
|
|
**Depends On**: Stage 2
|
|
|
|
### Purpose
|
|
|
|
Route classified spans to issues (create new or aggregate to existing).
|
|
|
|
### Input Contract
|
|
|
|
```typescript
|
|
interface Stage3Input {
|
|
// Spans to route (from Stage 2)
|
|
spans: SpanToRoute[];
|
|
}
|
|
|
|
interface SpanToRoute {
|
|
span_id: string;
|
|
business_id: string;
|
|
place_id: string;
|
|
urt_primary: string;
|
|
valence: string;
|
|
intensity: string;
|
|
entity_normalized?: string;
|
|
review_time: string;
|
|
confidence: string;
|
|
|
|
// For trust weighting
|
|
trust_score: number;
|
|
}
|
|
```
|
|
|
|
### Output Contract
|
|
|
|
```typescript
|
|
interface Stage3Output {
|
|
// Routing results
|
|
routed_spans: RoutedSpan[];
|
|
|
|
// Issues created/updated
|
|
issues_created: string[]; // issue_ids
|
|
issues_updated: string[]; // issue_ids
|
|
|
|
// Stats
|
|
stats: {
|
|
spans_processed: number;
|
|
spans_routed: number;
|
|
spans_skipped: number; // Positive valence, etc.
|
|
issues_created: number;
|
|
issues_updated: number;
|
|
};
|
|
}
|
|
|
|
interface RoutedSpan {
|
|
span_id: string;
|
|
issue_id: string;
|
|
routing_key: string; // business|place|code|entity
|
|
is_new_issue: boolean;
|
|
}
|
|
```
|
|
|
|
### Issue ID Generation
|
|
|
|
```typescript
|
|
// Deterministic issue ID from routing key
|
|
function generateIssueId(
|
|
business_id: string,
|
|
place_id: string,
|
|
urt_primary: string,
|
|
entity_normalized: string | null
|
|
): string {
|
|
const key = `${business_id}|${place_id}|${urt_primary}|${entity_normalized || ''}`;
|
|
const hash = sha256(key);
|
|
return `ISS-${hash.substring(0, 16)}`;
|
|
}
|
|
```
|
|
|
|
### Database Writes
|
|
|
|
```sql
|
|
-- Create or update issue
|
|
INSERT INTO issues (
|
|
issue_id, business_id, place_id, primary_subcode, domain,
|
|
state, priority_score, confidence_score, span_count, max_intensity,
|
|
entity, entity_normalized, taxonomy_version
|
|
) VALUES (...)
|
|
ON CONFLICT (issue_id) DO UPDATE SET
|
|
span_count = issues.span_count + 1,
|
|
max_intensity = GREATEST(issues.max_intensity, EXCLUDED.max_intensity),
|
|
updated_at = NOW();
|
|
|
|
-- Link span to issue
|
|
INSERT INTO issue_spans (
|
|
issue_id, span_id, source, review_id, review_version,
|
|
is_primary_match, intensity, review_time
|
|
) VALUES (...)
|
|
ON CONFLICT (span_id) DO NOTHING; -- 1:1 mapping
|
|
|
|
-- Log event
|
|
INSERT INTO issue_events (issue_id, event_type, span_id, ...)
|
|
VALUES (...);
|
|
```
|
|
|
|
### Validation Rules
|
|
|
|
| Rule | Check | Error Code |
|
|
|------|-------|------------|
|
|
| V3.1 | `issue_id` matches `^ISS-[a-f0-9]{16}$` | `STAGE3_INVALID_ISSUE_ID` |
|
|
| V3.2 | `routing_key` is non-empty | `STAGE3_EMPTY_ROUTING_KEY` |
|
|
| V3.3 | Span not already linked to different issue | `STAGE3_DUPLICATE_ROUTING` |
|
|
| V3.4 | Issue exists in `issues` table | `STAGE3_ORPHAN_SPAN_LINK` |
|
|
| V3.5 | Only V-/V± spans create issues | `STAGE3_POSITIVE_ROUTED` |
|
|
|
|
### Validation Script
|
|
|
|
```python
|
|
def validate_stage3_output(output: Stage3Output, db: Database) -> ValidationResult:
|
|
errors = []
|
|
|
|
for routed in output.routed_spans:
|
|
# V3.1: Valid issue ID format
|
|
if not re.match(r'^ISS-[a-f0-9]{16}$', routed.issue_id):
|
|
errors.append(ValidationError('V3.1', routed.span_id, 'Invalid issue_id'))
|
|
|
|
# V3.2: Non-empty routing key
|
|
if not routed.routing_key:
|
|
errors.append(ValidationError('V3.2', routed.span_id, 'Empty routing key'))
|
|
|
|
# V3.3: Check no duplicate routing
|
|
existing = db.query(
|
|
"SELECT issue_id FROM issue_spans WHERE span_id = %s",
|
|
[routed.span_id]
|
|
)
|
|
if existing and existing['issue_id'] != routed.issue_id:
|
|
errors.append(ValidationError('V3.3', routed.span_id, 'Already routed elsewhere'))
|
|
|
|
# V3.4: Issue exists
|
|
issue_exists = db.query(
|
|
"SELECT 1 FROM issues WHERE issue_id = %s",
|
|
[routed.issue_id]
|
|
)
|
|
if not issue_exists:
|
|
errors.append(ValidationError('V3.4', routed.span_id, 'Orphan issue_id'))
|
|
|
|
return ValidationResult(
|
|
stage='stage3',
|
|
passed=len(errors) == 0,
|
|
error_count=len(errors),
|
|
errors=errors
|
|
)
|
|
```
|
|
|
|
### Handoff to Stage 4
|
|
|
|
Stage 4 runs as scheduled job, reads from all tables.
|
|
|
|
---
|
|
|
|
## Stage 4: Fact Aggregation
|
|
|
|
**Status**: ❌ NOT IMPLEMENTED
|
|
**Owner**: TBD
|
|
**Depends On**: Stages 1, 2, 3
|
|
**Schedule**: Daily batch job
|
|
|
|
### Purpose
|
|
|
|
Pre-aggregate span/review data into `fact_timeseries` for fast dashboard queries.
|
|
|
|
### Input Contract
|
|
|
|
```typescript
|
|
interface Stage4Input {
|
|
business_id: string;
|
|
date: string; // YYYY-MM-DD
|
|
bucket_types: ('day' | 'week' | 'month')[];
|
|
taxonomy_version: string;
|
|
}
|
|
```
|
|
|
|
### Output Contract
|
|
|
|
```typescript
|
|
interface Stage4Output {
|
|
facts_written: FactRecord[];
|
|
|
|
stats: {
|
|
business_id: string;
|
|
date: string;
|
|
locations_processed: number;
|
|
codes_aggregated: number;
|
|
facts_upserted: number;
|
|
};
|
|
}
|
|
|
|
interface FactRecord {
|
|
// Keys
|
|
business_id: string;
|
|
place_id: string; // Or 'ALL' for rollup
|
|
period_date: string;
|
|
bucket_type: string;
|
|
subject_type: 'overall' | 'urt_code' | 'domain' | 'issue';
|
|
subject_id: string;
|
|
taxonomy_version: string;
|
|
|
|
// Metrics
|
|
review_count: number;
|
|
span_count: number;
|
|
negative_count: number;
|
|
positive_count: number;
|
|
neutral_count: number;
|
|
mixed_count: number;
|
|
strength_score: number;
|
|
negative_strength: number;
|
|
positive_strength: number;
|
|
avg_rating: number;
|
|
i1_count: number;
|
|
i2_count: number;
|
|
i3_count: number;
|
|
cr_better: number;
|
|
cr_worse: number;
|
|
cr_same: number;
|
|
trust_weighted_strength: number;
|
|
trust_weighted_negative: number;
|
|
}
|
|
```
|
|
|
|
### Database Writes
|
|
|
|
```sql
|
|
INSERT INTO fact_timeseries (
|
|
business_id, place_id, period_date, bucket_type,
|
|
subject_type, subject_id, taxonomy_version,
|
|
review_count, span_count, negative_count, positive_count, ...
|
|
) VALUES (...)
|
|
ON CONFLICT (business_id, place_id, period_date, bucket_type,
|
|
subject_type, subject_id, taxonomy_version)
|
|
DO UPDATE SET
|
|
review_count = EXCLUDED.review_count,
|
|
span_count = EXCLUDED.span_count,
|
|
...
|
|
computed_at = NOW();
|
|
```
|
|
|
|
### Validation Rules
|
|
|
|
| Rule | Check | Error Code |
|
|
|------|-------|------------|
|
|
| V4.1 | `place_id` is valid or 'ALL' | `STAGE4_INVALID_PLACE` |
|
|
| V4.2 | `period_date` matches bucket | `STAGE4_DATE_BUCKET_MISMATCH` |
|
|
| V4.3 | `span_count >= review_count` (reviews have >=1 span) | `STAGE4_COUNT_MISMATCH` |
|
|
| V4.4 | `negative_count + positive_count + neutral_count + mixed_count == span_count` | `STAGE4_VALENCE_SUM` |
|
|
| V4.5 | `i1_count + i2_count + i3_count == span_count` | `STAGE4_INTENSITY_SUM` |
|
|
| V4.6 | `strength_score >= 0` | `STAGE4_NEGATIVE_STRENGTH` |
|
|
| V4.7 | `avg_rating` between 1.0 and 5.0 (or NULL) | `STAGE4_INVALID_RATING` |
|
|
|
|
### Validation Script
|
|
|
|
```python
|
|
def validate_stage4_output(output: Stage4Output) -> ValidationResult:
|
|
errors = []
|
|
|
|
for fact in output.facts_written:
|
|
# V4.3: Span count >= review count
|
|
if fact.span_count < fact.review_count:
|
|
errors.append(ValidationError('V4.3', f'{fact.subject_id}', 'span < review count'))
|
|
|
|
# V4.4: Valence sum
|
|
valence_sum = fact.negative_count + fact.positive_count + fact.neutral_count + fact.mixed_count
|
|
if valence_sum != fact.span_count:
|
|
errors.append(ValidationError('V4.4', f'{fact.subject_id}',
|
|
f'Valence sum {valence_sum} != {fact.span_count}'))
|
|
|
|
# V4.5: Intensity sum
|
|
intensity_sum = fact.i1_count + fact.i2_count + fact.i3_count
|
|
if intensity_sum != fact.span_count:
|
|
errors.append(ValidationError('V4.5', f'{fact.subject_id}',
|
|
f'Intensity sum {intensity_sum} != {fact.span_count}'))
|
|
|
|
# V4.7: Rating bounds
|
|
if fact.avg_rating is not None and not (1.0 <= fact.avg_rating <= 5.0):
|
|
errors.append(ValidationError('V4.7', f'{fact.subject_id}',
|
|
f'Invalid rating: {fact.avg_rating}'))
|
|
|
|
return ValidationResult(
|
|
stage='stage4',
|
|
passed=len(errors) == 0,
|
|
error_count=len(errors),
|
|
errors=errors
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Integration Tests
|
|
|
|
### Test 1: Stage 0 → Stage 1 Handoff
|
|
|
|
```python
|
|
def test_stage0_to_stage1():
|
|
"""Verify Stage 0 output is valid Stage 1 input."""
|
|
|
|
# Get Stage 0 output
|
|
stage0_output = db.query("SELECT reviews_data FROM jobs WHERE job_id = %s", [JOB_ID])
|
|
|
|
# Validate as Stage 1 input
|
|
stage1_input = Stage1Input.parse(stage0_output)
|
|
|
|
# Check all required fields present
|
|
assert stage1_input.job_id is not None
|
|
assert stage1_input.business_id is not None
|
|
assert len(stage1_input.reviews) > 0
|
|
|
|
for review in stage1_input.reviews:
|
|
assert review.review_id is not None
|
|
assert review.rating in [1, 2, 3, 4, 5]
|
|
assert review.review_time is not None
|
|
```
|
|
|
|
### Test 2: Stage 1 → Stage 2 Handoff
|
|
|
|
```python
|
|
def test_stage1_to_stage2():
|
|
"""Verify Stage 1 output can be classified."""
|
|
|
|
# Get Stage 1 output from DB
|
|
reviews = db.query("""
|
|
SELECT * FROM reviews_enriched
|
|
WHERE urt_primary IS NULL AND is_latest = TRUE
|
|
LIMIT 10
|
|
""")
|
|
|
|
# Build Stage 2 input
|
|
stage2_input = Stage2Input(
|
|
reviews=[ReviewToClassify.from_db(r) for r in reviews],
|
|
config=ClassificationConfig(
|
|
model='gpt-4o-mini',
|
|
taxonomy_version='v5.1',
|
|
profile='standard'
|
|
)
|
|
)
|
|
|
|
# Validate input
|
|
for review in stage2_input.reviews:
|
|
assert review.text is not None
|
|
assert len(review.text) > 0
|
|
assert review.text_normalized is not None
|
|
```
|
|
|
|
### Test 3: Stage 2 → Stage 3 Handoff
|
|
|
|
```python
|
|
def test_stage2_to_stage3():
|
|
"""Verify classified spans can be routed."""
|
|
|
|
# Get unrouted negative spans
|
|
spans = db.query("""
|
|
SELECT * FROM review_spans
|
|
WHERE is_active = TRUE
|
|
AND valence IN ('V-', 'V±')
|
|
AND NOT EXISTS (SELECT 1 FROM issue_spans WHERE span_id = review_spans.span_id)
|
|
LIMIT 10
|
|
""")
|
|
|
|
# Build Stage 3 input
|
|
stage3_input = Stage3Input(
|
|
spans=[SpanToRoute.from_db(s) for s in spans]
|
|
)
|
|
|
|
# Validate
|
|
for span in stage3_input.spans:
|
|
assert span.span_id is not None
|
|
assert span.business_id is not None
|
|
assert span.urt_primary is not None
|
|
assert span.valence in ['V-', 'V±']
|
|
```
|
|
|
|
### Test 4: End-to-End Pipeline
|
|
|
|
```python
|
|
def test_e2e_pipeline():
|
|
"""Full pipeline integration test."""
|
|
|
|
# Stage 0: Scrape
|
|
job_id = submit_scrape_job(TEST_URL)
|
|
wait_for_job(job_id)
|
|
stage0_result = validate_stage0_output(get_job(job_id))
|
|
assert stage0_result.passed
|
|
|
|
# Stage 1: Normalize
|
|
stage1_output = run_stage1(job_id)
|
|
stage1_result = validate_stage1_output(stage1_output)
|
|
assert stage1_result.passed
|
|
|
|
# Stage 2: Classify
|
|
stage2_output = run_stage2(stage1_output.reviews_normalized)
|
|
stage2_result = validate_stage2_output(stage2_output, stage1_output)
|
|
assert stage2_result.passed
|
|
|
|
# Stage 3: Route
|
|
negative_spans = [s for r in stage2_output.reviews_classified
|
|
for s in r.spans if s.valence in ['V-', 'V±']]
|
|
stage3_output = run_stage3(negative_spans)
|
|
stage3_result = validate_stage3_output(stage3_output, db)
|
|
assert stage3_result.passed
|
|
|
|
# Stage 4: Aggregate
|
|
stage4_output = run_stage4(TEST_BUSINESS_ID, today(), ['day'])
|
|
stage4_result = validate_stage4_output(stage4_output)
|
|
assert stage4_result.passed
|
|
|
|
# Final assertion: Facts exist
|
|
facts = db.query("""
|
|
SELECT COUNT(*) FROM fact_timeseries
|
|
WHERE business_id = %s AND period_date = %s
|
|
""", [TEST_BUSINESS_ID, today()])
|
|
assert facts['count'] > 0
|
|
```
|
|
|
|
---
|
|
|
|
## Test Fixtures
|
|
|
|
### Fixture 1: Sample Raw Review (Stage 0 Output)
|
|
|
|
```json
|
|
{
|
|
"job_id": "test-job-001",
|
|
"status": "completed",
|
|
"business_id": "acme-corp",
|
|
"place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
|
|
"business_info": {
|
|
"name": "Acme Restaurant",
|
|
"address": "123 Main St, Anytown, USA",
|
|
"category": "Restaurant",
|
|
"total_reviews": 1247,
|
|
"average_rating": 4.2
|
|
},
|
|
"reviews": [
|
|
{
|
|
"review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
|
|
"author_name": "John Smith",
|
|
"author_id": "103456789012345678901",
|
|
"rating": 2,
|
|
"text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.",
|
|
"review_time": "2026-01-20T14:30:00Z",
|
|
"response_text": null,
|
|
"photos": [],
|
|
"raw_payload": {}
|
|
}
|
|
],
|
|
"scrape_time_ms": 12500,
|
|
"reviews_scraped": 1,
|
|
"scraper_version": "v1.0.0"
|
|
}
|
|
```
|
|
|
|
### Fixture 2: Normalized Review (Stage 1 Output)
|
|
|
|
```json
|
|
{
|
|
"source": "google",
|
|
"review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
|
|
"review_version": 1,
|
|
"business_id": "acme-corp",
|
|
"place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
|
|
"text": "The food was great but the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers. The server Mike was rude and dismissive when we complained. However, the steak was cooked perfectly and the dessert was amazing.",
|
|
"text_normalized": "the food was great but the wait was absolutely terrible we waited 45 minutes just to be seated and another 30 minutes for our appetizers the server mike was rude and dismissive when we complained however the steak was cooked perfectly and the dessert was amazing",
|
|
"text_language": "en",
|
|
"text_length": 267,
|
|
"word_count": 52,
|
|
"rating": 2,
|
|
"review_time": "2026-01-20T14:30:00Z",
|
|
"author_name": "John Smith",
|
|
"content_hash": "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd",
|
|
"raw_id": 12345
|
|
}
|
|
```
|
|
|
|
### Fixture 3: Classified Review with Spans (Stage 2 Output)
|
|
|
|
```json
|
|
{
|
|
"source": "google",
|
|
"review_id": "ChdDSUhNMG9nS0VJQ0FnSURBdWJQX3h3RRAB",
|
|
"review_version": 1,
|
|
"urt_primary": "J1.01",
|
|
"urt_secondary": ["P1.02"],
|
|
"valence": "V±",
|
|
"intensity": "I3",
|
|
"comparative": "CR-N",
|
|
"staff_mentions": ["Mike"],
|
|
"quotes": {
|
|
"J1.01": "waited 45 minutes just to be seated",
|
|
"P1.02": "rude and dismissive"
|
|
},
|
|
"trust_score": 0.85,
|
|
"embedding": [0.123, -0.456, 0.789, "... 384 values ..."],
|
|
"spans": [
|
|
{
|
|
"span_id": "SPN-a1b2c3d4e5f67890",
|
|
"span_index": 0,
|
|
"span_text": "The food was great",
|
|
"span_start": 0,
|
|
"span_end": 18,
|
|
"profile": "standard",
|
|
"urt_primary": "O1.01",
|
|
"urt_secondary": [],
|
|
"valence": "V+",
|
|
"intensity": "I2",
|
|
"comparative": "CR-N",
|
|
"specificity": "S1",
|
|
"actionability": "A1",
|
|
"temporal": "TC",
|
|
"evidence": "ES",
|
|
"confidence": "high",
|
|
"usn": "URT:S:O1.01:+2:11TC.ES.N",
|
|
"is_primary": false
|
|
},
|
|
{
|
|
"span_id": "SPN-b2c3d4e5f6789012",
|
|
"span_index": 1,
|
|
"span_text": "the wait was absolutely terrible. We waited 45 minutes just to be seated, and another 30 minutes for our appetizers",
|
|
"span_start": 23,
|
|
"span_end": 138,
|
|
"profile": "standard",
|
|
"urt_primary": "J1.01",
|
|
"urt_secondary": [],
|
|
"valence": "V-",
|
|
"intensity": "I3",
|
|
"comparative": "CR-N",
|
|
"specificity": "S3",
|
|
"actionability": "A2",
|
|
"temporal": "TC",
|
|
"evidence": "EC",
|
|
"confidence": "high",
|
|
"usn": "URT:S:J1.01:-3:32TC.EC.N",
|
|
"is_primary": true
|
|
},
|
|
{
|
|
"span_id": "SPN-c3d4e5f678901234",
|
|
"span_index": 2,
|
|
"span_text": "The server Mike was rude and dismissive when we complained",
|
|
"span_start": 140,
|
|
"span_end": 198,
|
|
"profile": "standard",
|
|
"urt_primary": "P1.02",
|
|
"urt_secondary": [],
|
|
"valence": "V-",
|
|
"intensity": "I2",
|
|
"comparative": "CR-N",
|
|
"specificity": "S2",
|
|
"actionability": "A2",
|
|
"temporal": "TC",
|
|
"evidence": "ES",
|
|
"entity": "Mike",
|
|
"entity_type": "staff",
|
|
"entity_normalized": "mike",
|
|
"confidence": "high",
|
|
"usn": "URT:S:P1.02:-2:22TC.ES.N",
|
|
"is_primary": false
|
|
},
|
|
{
|
|
"span_id": "SPN-d4e5f67890123456",
|
|
"span_index": 3,
|
|
"span_text": "the steak was cooked perfectly and the dessert was amazing",
|
|
"span_start": 209,
|
|
"span_end": 267,
|
|
"profile": "standard",
|
|
"urt_primary": "O1.01",
|
|
"urt_secondary": [],
|
|
"valence": "V+",
|
|
"intensity": "I2",
|
|
"comparative": "CR-N",
|
|
"specificity": "S2",
|
|
"actionability": "A1",
|
|
"temporal": "TC",
|
|
"evidence": "ES",
|
|
"confidence": "high",
|
|
"usn": "URT:S:O1.01:+2:21TC.ES.N",
|
|
"is_primary": false
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
### Fixture 4: Routed Spans (Stage 3 Output)
|
|
|
|
```json
|
|
{
|
|
"routed_spans": [
|
|
{
|
|
"span_id": "SPN-b2c3d4e5f6789012",
|
|
"issue_id": "ISS-7a8b9c0d1e2f3a4b",
|
|
"routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|J1.01|",
|
|
"is_new_issue": true
|
|
},
|
|
{
|
|
"span_id": "SPN-c3d4e5f678901234",
|
|
"issue_id": "ISS-2b3c4d5e6f7a8b9c",
|
|
"routing_key": "acme-corp|ChIJN1t_tDeuEmsRUsoyG83frY4|P1.02|mike",
|
|
"is_new_issue": true
|
|
}
|
|
],
|
|
"issues_created": ["ISS-7a8b9c0d1e2f3a4b", "ISS-2b3c4d5e6f7a8b9c"],
|
|
"issues_updated": [],
|
|
"stats": {
|
|
"spans_processed": 4,
|
|
"spans_routed": 2,
|
|
"spans_skipped": 2,
|
|
"issues_created": 2,
|
|
"issues_updated": 0
|
|
}
|
|
}
|
|
```
|
|
|
|
### Fixture 5: Aggregated Facts (Stage 4 Output)
|
|
|
|
```json
|
|
{
|
|
"facts_written": [
|
|
{
|
|
"business_id": "acme-corp",
|
|
"place_id": "ChIJN1t_tDeuEmsRUsoyG83frY4",
|
|
"period_date": "2026-01-20",
|
|
"bucket_type": "day",
|
|
"subject_type": "urt_code",
|
|
"subject_id": "J1.01",
|
|
"taxonomy_version": "v5.1",
|
|
"review_count": 1,
|
|
"span_count": 1,
|
|
"negative_count": 1,
|
|
"positive_count": 0,
|
|
"neutral_count": 0,
|
|
"mixed_count": 0,
|
|
"strength_score": 4.0,
|
|
"negative_strength": 4.0,
|
|
"positive_strength": 0.0,
|
|
"avg_rating": 2.0,
|
|
"i1_count": 0,
|
|
"i2_count": 0,
|
|
"i3_count": 1,
|
|
"cr_better": 0,
|
|
"cr_worse": 0,
|
|
"cr_same": 0,
|
|
"trust_weighted_strength": 3.4,
|
|
"trust_weighted_negative": 3.4
|
|
}
|
|
],
|
|
"stats": {
|
|
"business_id": "acme-corp",
|
|
"date": "2026-01-20",
|
|
"locations_processed": 1,
|
|
"codes_aggregated": 3,
|
|
"facts_upserted": 5
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Validation Summary Table
|
|
|
|
| Stage | Input From | Output To | Validation Rules | Test Coverage |
|
|
|-------|------------|-----------|------------------|---------------|
|
|
| 0 | Google Maps | jobs.reviews_data | V0.1-V0.5 | Unit + E2E |
|
|
| 1 | Stage 0 | reviews_raw, reviews_enriched | V1.1-V1.6 | Unit + Integration |
|
|
| 2 | Stage 1 | reviews_enriched, review_spans | V2.1-V2.12 | Unit + Integration |
|
|
| 3 | Stage 2 | issues, issue_spans, issue_events | V3.1-V3.5 | Unit + Integration |
|
|
| 4 | All stages | fact_timeseries | V4.1-V4.7 | Unit + E2E |
|
|
|
|
---
|
|
|
|
## Document Control
|
|
|
|
| Field | Value |
|
|
|-------|-------|
|
|
| **Document** | ReviewIQ Pipeline Contracts v1.0 |
|
|
| **Status** | Implementation Blueprint |
|
|
| **Date** | 2026-01-24 |
|
|
| **Purpose** | Enable parallel development with stage validation |
|
|
|
|
---
|
|
|
|
*End of Pipeline Contracts Document*
|