test: Add E2E pipeline test with real database

Tests the full pipeline flow:
- Stage 1: Insert raw reviews, normalize text
- Stage 2: Mock LLM classification, insert spans
- Stage 3: Route negative spans to issues
- Stage 4: Aggregate facts by URT code and date

Validates all pipeline.* tables are populated correctly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 18:28:53 +00:00
parent 03ed7029e2
commit 3e57c887e9

649
tools/test_pipeline_e2e.py Normal file
View File

@@ -0,0 +1,649 @@
#!/usr/bin/env python3
"""
End-to-end test for the ReviewIQ Pipeline with actual database.
Usage:
python tools/test_pipeline_e2e.py --database-url $DATABASE_URL
This test:
1. Connects to the database
2. Inserts sample review data into pipeline.reviews_raw
3. Runs Stage 1 normalization
4. Runs Stage 2 classification (mocked LLM)
5. Runs Stage 3 routing
6. Runs Stage 4 aggregation
7. Verifies data in all tables
"""
import asyncio
import os
import sys
import argparse
import json
import logging
from datetime import datetime, timezone
from uuid import uuid4
# Add project root and package to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"packages/reviewiq-pipeline/src"
))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Sample test data - matches scraper output format
def generate_sample_reviews():
"""Generate sample reviews with unique IDs."""
return [
{
"review_id": f"test-review-{uuid4().hex[:8]}",
"text": "The food was absolutely terrible! We waited 45 minutes for cold pasta. Mike the waiter was rude and dismissive. Never coming back.",
"rating": 1,
"author_name": "John Doe",
"author_id": "author-001",
"review_time": datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).isoformat(),
"time": int(datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).timestamp()),
"relative_time": "1 week ago",
},
{
"review_id": f"test-review-{uuid4().hex[:8]}",
"text": "Great experience! The steak was cooked to perfection and Sarah provided excellent service. A bit pricey but worth it.",
"rating": 5,
"author_name": "Jane Smith",
"author_id": "author-002",
"review_time": datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).isoformat(),
"time": int(datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).timestamp()),
"relative_time": "4 days ago",
},
{
"review_id": f"test-review-{uuid4().hex[:8]}",
"text": "Average food, nothing special. The ambiance was nice though. Wait time was reasonable.",
"rating": 3,
"author_name": "Bob Wilson",
"author_id": "author-003",
"review_time": datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).isoformat(),
"time": int(datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).timestamp()),
"relative_time": "2 days ago",
},
]
async def run_e2e_test(database_url: str):
"""Run the full E2E pipeline test."""
import asyncpg
# Generate unique test identifiers
test_id = uuid4().hex[:8]
business_id = f"test-business-{test_id}"
place_id = f"test-place-{test_id}"
job_id = uuid4()
logger.info(f"Starting E2E test with business_id={business_id}")
# Generate sample reviews
sample_reviews = generate_sample_reviews()
# Connect to database
pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5)
try:
# ========== STEP 1: Insert raw reviews ==========
logger.info("Step 1: Inserting raw reviews into pipeline.reviews_raw...")
async with pool.acquire() as conn:
for review in sample_reviews:
review_time = datetime.fromtimestamp(review["time"], tz=timezone.utc)
await conn.execute("""
INSERT INTO pipeline.reviews_raw (
job_id, source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
job_id,
"google",
review["review_id"],
place_id,
json.dumps(review),
review["text"],
review["rating"],
review_time,
review["author_name"],
review.get("author_id"),
)
raw_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1",
place_id,
)
logger.info(f" Inserted {raw_count} raw reviews")
# ========== STEP 2: Run Stage 1 - Normalization ==========
logger.info("Step 2: Running Stage 1 normalization...")
from reviewiq_pipeline.config import Config
from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer
config = Config(database_url=database_url)
normalizer = Stage1Normalizer(config)
normalized_reviews = normalizer.normalize_batch(
sample_reviews,
business_id,
place_id,
)
logger.info(f" Normalized {len(normalized_reviews)} reviews")
# Insert into reviews_enriched
async with pool.acquire() as conn:
for norm in normalized_reviews:
# Get raw_id
raw_id = await conn.fetchval(
"SELECT id FROM pipeline.reviews_raw WHERE review_id = $1",
norm["review_id"],
)
review_time = datetime.fromisoformat(norm["review_time"].replace("Z", "+00:00"))
await conn.execute("""
INSERT INTO pipeline.reviews_enriched (
source, review_id, review_version, is_latest, raw_id,
business_id, place_id, text, text_normalized, rating, review_time,
language, taxonomy_version
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (source, review_id, review_version) DO UPDATE SET
is_latest = EXCLUDED.is_latest
""",
norm["source"],
norm["review_id"],
norm["review_version"],
True,
raw_id,
business_id,
place_id,
norm["text"],
norm["text_normalized"],
norm["rating"],
review_time,
norm["text_language"],
"v5.1",
)
enriched_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1",
business_id,
)
logger.info(f" Inserted {enriched_count} enriched reviews")
# ========== STEP 3: Run Stage 2 - Classification (mocked) ==========
logger.info("Step 3: Running Stage 2 classification (mocked LLM)...")
# Use mocked classification results
mock_classifications = [
{
"review_id": normalized_reviews[0]["review_id"],
"urt_primary": "O1.01", # Core product issue
"urt_secondary": ["J1.01", "A1.01"], # Wait time, friendliness
"valence": "V-",
"intensity": "I3",
"comparative": "CR-N",
"trust_score": 0.85,
"spans": [
{
"span_index": 0,
"span_text": "food was absolutely terrible",
"span_start": 4,
"span_end": 32,
"urt_primary": "O1.01",
"valence": "V-",
"intensity": "I3",
"is_primary": True,
},
{
"span_index": 1,
"span_text": "waited 45 minutes",
"span_start": 38,
"span_end": 55,
"urt_primary": "J1.01",
"valence": "V-",
"intensity": "I2",
"is_primary": False,
},
],
},
{
"review_id": normalized_reviews[1]["review_id"],
"urt_primary": "O1.01",
"urt_secondary": ["A2.01"],
"valence": "V+",
"intensity": "I3",
"comparative": "CR-N",
"trust_score": 0.90,
"spans": [
{
"span_index": 0,
"span_text": "steak was cooked to perfection",
"span_start": 22,
"span_end": 52,
"urt_primary": "O1.01",
"valence": "V+",
"intensity": "I3",
"is_primary": True,
},
],
},
{
"review_id": normalized_reviews[2]["review_id"],
"urt_primary": "O1.01",
"urt_secondary": [],
"valence": "V0",
"intensity": "I1",
"comparative": "CR-N",
"trust_score": 0.70,
"spans": [
{
"span_index": 0,
"span_text": "Average food, nothing special",
"span_start": 0,
"span_end": 29,
"urt_primary": "O1.01",
"valence": "V0",
"intensity": "I1",
"is_primary": True,
},
],
},
]
# Update enriched reviews with classification and insert spans
batch_id = f"batch-{test_id}"
async with pool.acquire() as conn:
for cls in mock_classifications:
# Update classification in reviews_enriched
await conn.execute("""
UPDATE pipeline.reviews_enriched SET
urt_primary = $1,
urt_secondary = $2,
valence = $3,
intensity = $4,
comparative = $5,
trust_score = $6,
classification_model = $7,
processed_at = NOW()
WHERE review_id = $8 AND business_id = $9
""",
cls["urt_primary"],
cls["urt_secondary"],
cls["valence"],
cls["intensity"],
cls["comparative"],
cls["trust_score"],
"mocked-gpt-4o-mini",
cls["review_id"],
business_id,
)
# Get review info for spans
review_row = await conn.fetchrow(
"SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1",
cls["review_id"],
)
# Insert spans
for span in cls["spans"]:
span_id = f"SPN-{uuid4().hex[:12]}"
usn = f"google:{cls['review_id']}:1:{span['span_index']}"
await conn.execute("""
INSERT INTO pipeline.review_spans (
span_id, business_id, place_id, source, review_id, review_version,
span_index, span_text, span_start, span_end,
profile, urt_primary, valence, intensity, comparative,
is_primary, is_active, review_time,
confidence, usn, taxonomy_version, model_version, ingest_batch_id
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23
)
""",
span_id,
business_id,
place_id,
"google",
cls["review_id"],
1,
span["span_index"],
span["span_text"],
span["span_start"],
span["span_end"],
"standard",
span["urt_primary"],
span["valence"],
span["intensity"],
"CR-N",
span["is_primary"],
True,
review_row["review_time"],
"high",
usn,
"v5.1",
"mocked-gpt-4o-mini",
batch_id,
)
span_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1",
business_id,
)
logger.info(f" Inserted {span_count} spans")
# ========== STEP 4: Run Stage 3 - Routing ==========
logger.info("Step 4: Running Stage 3 routing...")
from reviewiq_pipeline.stages.stage3_route import Stage3Router
router = Stage3Router(config)
async with pool.acquire() as conn:
# Get negative/mixed spans to route
spans_to_route = await conn.fetch("""
SELECT span_id, business_id, place_id, urt_primary, valence, intensity,
entity_normalized, review_time, confidence, source, review_id, review_version
FROM pipeline.review_spans
WHERE business_id = $1 AND valence IN ('V-', '')
""", business_id)
routed_count = 0
for span_row in spans_to_route:
span_data = {
"span_id": span_row["span_id"],
"business_id": span_row["business_id"],
"place_id": span_row["place_id"],
"urt_primary": span_row["urt_primary"],
"valence": span_row["valence"],
"intensity": span_row["intensity"],
"entity_normalized": span_row["entity_normalized"],
"review_time": span_row["review_time"].isoformat(),
"confidence": span_row["confidence"],
"trust_score": 0.85,
}
routed = router.route_span_sync(span_data)
# Check if issue exists
existing = await conn.fetchval(
"SELECT 1 FROM pipeline.issues WHERE issue_id = $1",
routed["issue_id"],
)
if not existing:
# Create issue - extract domain from urt_primary (e.g., "O1.01" -> "O")
domain = span_row["urt_primary"][0]
# Compute priority score based on intensity
intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0}
priority_score = intensity_scores.get(span_row["intensity"], 1.0)
await conn.execute("""
INSERT INTO pipeline.issues (
issue_id, business_id, place_id, primary_subcode, domain,
state, priority_score, span_count, max_intensity, taxonomy_version
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
routed["issue_id"],
business_id,
place_id,
span_row["urt_primary"],
domain,
"open",
priority_score,
1,
span_row["intensity"],
"v5.1",
)
else:
# Update span count
await conn.execute("""
UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW()
WHERE issue_id = $1
""", routed["issue_id"])
# Link span to issue
await conn.execute("""
INSERT INTO pipeline.issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (span_id) DO NOTHING
""",
routed["issue_id"],
span_row["span_id"],
span_row["source"],
span_row["review_id"],
span_row["review_version"],
True,
span_row["intensity"],
span_row["review_time"],
)
# Log event
await conn.execute("""
INSERT INTO pipeline.issue_events (issue_id, event_type, span_id)
VALUES ($1, $2, $3)
""", routed["issue_id"], "span_linked", span_row["span_id"])
routed_count += 1
issue_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1",
business_id,
)
logger.info(f" Routed {routed_count} spans to {issue_count} issues")
# ========== STEP 5: Run Stage 4 - Aggregation ==========
logger.info("Step 5: Running Stage 4 aggregation...")
from collections import defaultdict
async with pool.acquire() as conn:
# Get span data for aggregation
span_data = await conn.fetch("""
SELECT
rs.business_id,
rs.place_id,
DATE(rs.review_time) as review_date,
rs.urt_primary,
rs.valence,
rs.intensity,
rs.comparative,
re.trust_score,
re.rating
FROM pipeline.review_spans rs
JOIN pipeline.reviews_enriched re ON (
re.source = rs.source
AND re.review_id = rs.review_id
AND re.review_version = rs.review_version
)
WHERE rs.business_id = $1 AND rs.is_active = TRUE
""", business_id)
# Group by date and URT code
daily_data = defaultdict(lambda: defaultdict(list))
for row in span_data:
daily_data[row["review_date"]][row["urt_primary"]].append(dict(row))
facts_inserted = 0
for period_date, urt_groups in daily_data.items():
for urt_code, code_spans in urt_groups.items():
# Compute simple metrics
negative_count = sum(1 for s in code_spans if s["valence"] == "V-")
positive_count = sum(1 for s in code_spans if s["valence"] == "V+")
neutral_count = sum(1 for s in code_spans if s["valence"] == "V0")
mixed_count = sum(1 for s in code_spans if s["valence"] == "")
i1_count = sum(1 for s in code_spans if s["intensity"] == "I1")
i2_count = sum(1 for s in code_spans if s["intensity"] == "I2")
i3_count = sum(1 for s in code_spans if s["intensity"] == "I3")
# Simple strength score
strength_score = (positive_count - negative_count) / max(len(code_spans), 1)
negative_strength = negative_count * 1.0
positive_strength = positive_count * 1.0
await conn.execute("""
INSERT INTO pipeline.fact_timeseries (
business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version,
review_count, span_count,
negative_count, positive_count, neutral_count, mixed_count,
strength_score, negative_strength, positive_strength,
i1_count, i2_count, i3_count,
cr_better, cr_worse, cr_same,
trust_weighted_strength, trust_weighted_negative
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24
)
ON CONFLICT (business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version)
DO UPDATE SET
span_count = EXCLUDED.span_count,
computed_at = NOW()
""",
business_id,
place_id,
period_date,
"day",
"urt_code",
urt_code,
"v5.1",
len(code_spans), # review_count approximation
len(code_spans),
negative_count,
positive_count,
neutral_count,
mixed_count,
strength_score,
negative_strength,
positive_strength,
i1_count,
i2_count,
i3_count,
0, # cr_better
0, # cr_worse
0, # cr_same
strength_score, # trust_weighted_strength
negative_strength, # trust_weighted_negative
)
facts_inserted += 1
logger.info(f" Inserted {facts_inserted} fact records")
# ========== STEP 6: Verify Results ==========
logger.info("Step 6: Verifying results...")
async with pool.acquire() as conn:
results = {}
# Count records in each table
results["reviews_raw"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id
)
results["reviews_enriched"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id
)
results["review_spans"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id
)
results["issues"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id
)
results["issue_spans"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id
)
results["issue_events"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id
)
results["fact_timeseries"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id
)
logger.info("")
logger.info("=" * 50)
logger.info("E2E TEST RESULTS")
logger.info("=" * 50)
logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}")
logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}")
logger.info(f" pipeline.review_spans: {results['review_spans']}")
logger.info(f" pipeline.issues: {results['issues']}")
logger.info(f" pipeline.issue_spans: {results['issue_spans']}")
logger.info(f" pipeline.issue_events: {results['issue_events']}")
logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}")
logger.info("=" * 50)
# Validate counts
assert results["reviews_raw"] == 3, f"Expected 3 raw reviews, got {results['reviews_raw']}"
assert results["reviews_enriched"] == 3, f"Expected 3 enriched reviews, got {results['reviews_enriched']}"
assert results["review_spans"] >= 3, f"Expected at least 3 spans, got {results['review_spans']}"
assert results["issues"] >= 1, f"Expected at least 1 issue, got {results['issues']}"
assert results["fact_timeseries"] >= 1, f"Expected at least 1 fact, got {results['fact_timeseries']}"
logger.info("")
logger.info("ALL ASSERTIONS PASSED!")
logger.info("")
# Show sample data
logger.info("Sample issue:")
issue = await conn.fetchrow(
"SELECT issue_id, primary_subcode, state, span_count, max_intensity FROM pipeline.issues WHERE business_id = $1 LIMIT 1",
business_id
)
if issue:
logger.info(f" ID: {issue['issue_id']}")
logger.info(f" Code: {issue['primary_subcode']}")
logger.info(f" State: {issue['state']}")
logger.info(f" Spans: {issue['span_count']}")
logger.info(f" Max Intensity: {issue['max_intensity']}")
# ========== CLEANUP ==========
logger.info("")
logger.info("Cleaning up test data...")
async with pool.acquire() as conn:
# Delete in reverse dependency order
await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id)
logger.info("Test data cleaned up successfully")
logger.info("")
logger.info("E2E TEST COMPLETED SUCCESSFULLY!")
finally:
await pool.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run E2E pipeline test")
parser.add_argument(
"--database-url",
default=os.environ.get("DATABASE_URL"),
help="PostgreSQL connection string (default: $DATABASE_URL)",
)
args = parser.parse_args()
if not args.database_url:
print("Error: --database-url required or set DATABASE_URL environment variable", file=sys.stderr)
sys.exit(1)
asyncio.run(run_e2e_test(args.database_url))