From 3e57c887e97895b31ca8ccfb796a1ad62b4381ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 18:28:53 +0000 Subject: [PATCH] test: Add E2E pipeline test with real database Tests the full pipeline flow: - Stage 1: Insert raw reviews, normalize text - Stage 2: Mock LLM classification, insert spans - Stage 3: Route negative spans to issues - Stage 4: Aggregate facts by URT code and date Validates all pipeline.* tables are populated correctly. Co-Authored-By: Claude Opus 4.5 --- tools/test_pipeline_e2e.py | 649 +++++++++++++++++++++++++++++++++++++ 1 file changed, 649 insertions(+) create mode 100644 tools/test_pipeline_e2e.py diff --git a/tools/test_pipeline_e2e.py b/tools/test_pipeline_e2e.py new file mode 100644 index 0000000..e65d60a --- /dev/null +++ b/tools/test_pipeline_e2e.py @@ -0,0 +1,649 @@ +#!/usr/bin/env python3 +""" +End-to-end test for the ReviewIQ Pipeline with actual database. + +Usage: + python tools/test_pipeline_e2e.py --database-url $DATABASE_URL + +This test: +1. Connects to the database +2. Inserts sample review data into pipeline.reviews_raw +3. Runs Stage 1 normalization +4. Runs Stage 2 classification (mocked LLM) +5. Runs Stage 3 routing +6. Runs Stage 4 aggregation +7. Verifies data in all tables +""" + +import asyncio +import os +import sys +import argparse +import json +import logging +from datetime import datetime, timezone +from uuid import uuid4 + +# Add project root and package to path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "packages/reviewiq-pipeline/src" +)) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +# Sample test data - matches scraper output format +def generate_sample_reviews(): + """Generate sample reviews with unique IDs.""" + return [ + { + "review_id": f"test-review-{uuid4().hex[:8]}", + "text": "The food was absolutely terrible! We waited 45 minutes for cold pasta. Mike the waiter was rude and dismissive. Never coming back.", + "rating": 1, + "author_name": "John Doe", + "author_id": "author-001", + "review_time": datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).isoformat(), + "time": int(datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).timestamp()), + "relative_time": "1 week ago", + }, + { + "review_id": f"test-review-{uuid4().hex[:8]}", + "text": "Great experience! The steak was cooked to perfection and Sarah provided excellent service. A bit pricey but worth it.", + "rating": 5, + "author_name": "Jane Smith", + "author_id": "author-002", + "review_time": datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).isoformat(), + "time": int(datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).timestamp()), + "relative_time": "4 days ago", + }, + { + "review_id": f"test-review-{uuid4().hex[:8]}", + "text": "Average food, nothing special. The ambiance was nice though. Wait time was reasonable.", + "rating": 3, + "author_name": "Bob Wilson", + "author_id": "author-003", + "review_time": datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).isoformat(), + "time": int(datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).timestamp()), + "relative_time": "2 days ago", + }, + ] + + +async def run_e2e_test(database_url: str): + """Run the full E2E pipeline test.""" + import asyncpg + + # Generate unique test identifiers + test_id = uuid4().hex[:8] + business_id = f"test-business-{test_id}" + place_id = f"test-place-{test_id}" + job_id = uuid4() + + logger.info(f"Starting E2E test with business_id={business_id}") + + # Generate sample reviews + sample_reviews = generate_sample_reviews() + + # Connect to database + pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5) + + try: + # ========== STEP 1: Insert raw reviews ========== + logger.info("Step 1: Inserting raw reviews into pipeline.reviews_raw...") + + async with pool.acquire() as conn: + for review in sample_reviews: + review_time = datetime.fromtimestamp(review["time"], tz=timezone.utc) + await conn.execute(""" + INSERT INTO pipeline.reviews_raw ( + job_id, source, review_id, place_id, raw_payload, + review_text, rating, review_time, reviewer_name, reviewer_id + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + """, + job_id, + "google", + review["review_id"], + place_id, + json.dumps(review), + review["text"], + review["rating"], + review_time, + review["author_name"], + review.get("author_id"), + ) + + raw_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", + place_id, + ) + logger.info(f" Inserted {raw_count} raw reviews") + + # ========== STEP 2: Run Stage 1 - Normalization ========== + logger.info("Step 2: Running Stage 1 normalization...") + + from reviewiq_pipeline.config import Config + from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer + + config = Config(database_url=database_url) + normalizer = Stage1Normalizer(config) + + normalized_reviews = normalizer.normalize_batch( + sample_reviews, + business_id, + place_id, + ) + logger.info(f" Normalized {len(normalized_reviews)} reviews") + + # Insert into reviews_enriched + async with pool.acquire() as conn: + for norm in normalized_reviews: + # Get raw_id + raw_id = await conn.fetchval( + "SELECT id FROM pipeline.reviews_raw WHERE review_id = $1", + norm["review_id"], + ) + + review_time = datetime.fromisoformat(norm["review_time"].replace("Z", "+00:00")) + + await conn.execute(""" + INSERT INTO pipeline.reviews_enriched ( + source, review_id, review_version, is_latest, raw_id, + business_id, place_id, text, text_normalized, rating, review_time, + language, taxonomy_version + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (source, review_id, review_version) DO UPDATE SET + is_latest = EXCLUDED.is_latest + """, + norm["source"], + norm["review_id"], + norm["review_version"], + True, + raw_id, + business_id, + place_id, + norm["text"], + norm["text_normalized"], + norm["rating"], + review_time, + norm["text_language"], + "v5.1", + ) + + enriched_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", + business_id, + ) + logger.info(f" Inserted {enriched_count} enriched reviews") + + # ========== STEP 3: Run Stage 2 - Classification (mocked) ========== + logger.info("Step 3: Running Stage 2 classification (mocked LLM)...") + + # Use mocked classification results + mock_classifications = [ + { + "review_id": normalized_reviews[0]["review_id"], + "urt_primary": "O1.01", # Core product issue + "urt_secondary": ["J1.01", "A1.01"], # Wait time, friendliness + "valence": "V-", + "intensity": "I3", + "comparative": "CR-N", + "trust_score": 0.85, + "spans": [ + { + "span_index": 0, + "span_text": "food was absolutely terrible", + "span_start": 4, + "span_end": 32, + "urt_primary": "O1.01", + "valence": "V-", + "intensity": "I3", + "is_primary": True, + }, + { + "span_index": 1, + "span_text": "waited 45 minutes", + "span_start": 38, + "span_end": 55, + "urt_primary": "J1.01", + "valence": "V-", + "intensity": "I2", + "is_primary": False, + }, + ], + }, + { + "review_id": normalized_reviews[1]["review_id"], + "urt_primary": "O1.01", + "urt_secondary": ["A2.01"], + "valence": "V+", + "intensity": "I3", + "comparative": "CR-N", + "trust_score": 0.90, + "spans": [ + { + "span_index": 0, + "span_text": "steak was cooked to perfection", + "span_start": 22, + "span_end": 52, + "urt_primary": "O1.01", + "valence": "V+", + "intensity": "I3", + "is_primary": True, + }, + ], + }, + { + "review_id": normalized_reviews[2]["review_id"], + "urt_primary": "O1.01", + "urt_secondary": [], + "valence": "V0", + "intensity": "I1", + "comparative": "CR-N", + "trust_score": 0.70, + "spans": [ + { + "span_index": 0, + "span_text": "Average food, nothing special", + "span_start": 0, + "span_end": 29, + "urt_primary": "O1.01", + "valence": "V0", + "intensity": "I1", + "is_primary": True, + }, + ], + }, + ] + + # Update enriched reviews with classification and insert spans + batch_id = f"batch-{test_id}" + async with pool.acquire() as conn: + for cls in mock_classifications: + # Update classification in reviews_enriched + await conn.execute(""" + UPDATE pipeline.reviews_enriched SET + urt_primary = $1, + urt_secondary = $2, + valence = $3, + intensity = $4, + comparative = $5, + trust_score = $6, + classification_model = $7, + processed_at = NOW() + WHERE review_id = $8 AND business_id = $9 + """, + cls["urt_primary"], + cls["urt_secondary"], + cls["valence"], + cls["intensity"], + cls["comparative"], + cls["trust_score"], + "mocked-gpt-4o-mini", + cls["review_id"], + business_id, + ) + + # Get review info for spans + review_row = await conn.fetchrow( + "SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1", + cls["review_id"], + ) + + # Insert spans + for span in cls["spans"]: + span_id = f"SPN-{uuid4().hex[:12]}" + usn = f"google:{cls['review_id']}:1:{span['span_index']}" + + await conn.execute(""" + INSERT INTO pipeline.review_spans ( + span_id, business_id, place_id, source, review_id, review_version, + span_index, span_text, span_start, span_end, + profile, urt_primary, valence, intensity, comparative, + is_primary, is_active, review_time, + confidence, usn, taxonomy_version, model_version, ingest_batch_id + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, + $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23 + ) + """, + span_id, + business_id, + place_id, + "google", + cls["review_id"], + 1, + span["span_index"], + span["span_text"], + span["span_start"], + span["span_end"], + "standard", + span["urt_primary"], + span["valence"], + span["intensity"], + "CR-N", + span["is_primary"], + True, + review_row["review_time"], + "high", + usn, + "v5.1", + "mocked-gpt-4o-mini", + batch_id, + ) + + span_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", + business_id, + ) + logger.info(f" Inserted {span_count} spans") + + # ========== STEP 4: Run Stage 3 - Routing ========== + logger.info("Step 4: Running Stage 3 routing...") + + from reviewiq_pipeline.stages.stage3_route import Stage3Router + + router = Stage3Router(config) + + async with pool.acquire() as conn: + # Get negative/mixed spans to route + spans_to_route = await conn.fetch(""" + SELECT span_id, business_id, place_id, urt_primary, valence, intensity, + entity_normalized, review_time, confidence, source, review_id, review_version + FROM pipeline.review_spans + WHERE business_id = $1 AND valence IN ('V-', 'V±') + """, business_id) + + routed_count = 0 + for span_row in spans_to_route: + span_data = { + "span_id": span_row["span_id"], + "business_id": span_row["business_id"], + "place_id": span_row["place_id"], + "urt_primary": span_row["urt_primary"], + "valence": span_row["valence"], + "intensity": span_row["intensity"], + "entity_normalized": span_row["entity_normalized"], + "review_time": span_row["review_time"].isoformat(), + "confidence": span_row["confidence"], + "trust_score": 0.85, + } + + routed = router.route_span_sync(span_data) + + # Check if issue exists + existing = await conn.fetchval( + "SELECT 1 FROM pipeline.issues WHERE issue_id = $1", + routed["issue_id"], + ) + + if not existing: + # Create issue - extract domain from urt_primary (e.g., "O1.01" -> "O") + domain = span_row["urt_primary"][0] + # Compute priority score based on intensity + intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0} + priority_score = intensity_scores.get(span_row["intensity"], 1.0) + + await conn.execute(""" + INSERT INTO pipeline.issues ( + issue_id, business_id, place_id, primary_subcode, domain, + state, priority_score, span_count, max_intensity, taxonomy_version + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + """, + routed["issue_id"], + business_id, + place_id, + span_row["urt_primary"], + domain, + "open", + priority_score, + 1, + span_row["intensity"], + "v5.1", + ) + else: + # Update span count + await conn.execute(""" + UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW() + WHERE issue_id = $1 + """, routed["issue_id"]) + + # Link span to issue + await conn.execute(""" + INSERT INTO pipeline.issue_spans ( + issue_id, span_id, source, review_id, review_version, + is_primary_match, intensity, review_time + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (span_id) DO NOTHING + """, + routed["issue_id"], + span_row["span_id"], + span_row["source"], + span_row["review_id"], + span_row["review_version"], + True, + span_row["intensity"], + span_row["review_time"], + ) + + # Log event + await conn.execute(""" + INSERT INTO pipeline.issue_events (issue_id, event_type, span_id) + VALUES ($1, $2, $3) + """, routed["issue_id"], "span_linked", span_row["span_id"]) + + routed_count += 1 + + issue_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", + business_id, + ) + logger.info(f" Routed {routed_count} spans to {issue_count} issues") + + # ========== STEP 5: Run Stage 4 - Aggregation ========== + logger.info("Step 5: Running Stage 4 aggregation...") + + from collections import defaultdict + + async with pool.acquire() as conn: + # Get span data for aggregation + span_data = await conn.fetch(""" + SELECT + rs.business_id, + rs.place_id, + DATE(rs.review_time) as review_date, + rs.urt_primary, + rs.valence, + rs.intensity, + rs.comparative, + re.trust_score, + re.rating + FROM pipeline.review_spans rs + JOIN pipeline.reviews_enriched re ON ( + re.source = rs.source + AND re.review_id = rs.review_id + AND re.review_version = rs.review_version + ) + WHERE rs.business_id = $1 AND rs.is_active = TRUE + """, business_id) + + # Group by date and URT code + daily_data = defaultdict(lambda: defaultdict(list)) + for row in span_data: + daily_data[row["review_date"]][row["urt_primary"]].append(dict(row)) + + facts_inserted = 0 + for period_date, urt_groups in daily_data.items(): + for urt_code, code_spans in urt_groups.items(): + # Compute simple metrics + negative_count = sum(1 for s in code_spans if s["valence"] == "V-") + positive_count = sum(1 for s in code_spans if s["valence"] == "V+") + neutral_count = sum(1 for s in code_spans if s["valence"] == "V0") + mixed_count = sum(1 for s in code_spans if s["valence"] == "V±") + + i1_count = sum(1 for s in code_spans if s["intensity"] == "I1") + i2_count = sum(1 for s in code_spans if s["intensity"] == "I2") + i3_count = sum(1 for s in code_spans if s["intensity"] == "I3") + + # Simple strength score + strength_score = (positive_count - negative_count) / max(len(code_spans), 1) + negative_strength = negative_count * 1.0 + positive_strength = positive_count * 1.0 + + await conn.execute(""" + INSERT INTO pipeline.fact_timeseries ( + business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version, + review_count, span_count, + negative_count, positive_count, neutral_count, mixed_count, + strength_score, negative_strength, positive_strength, + i1_count, i2_count, i3_count, + cr_better, cr_worse, cr_same, + trust_weighted_strength, trust_weighted_negative + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, + $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24 + ) + ON CONFLICT (business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version) + DO UPDATE SET + span_count = EXCLUDED.span_count, + computed_at = NOW() + """, + business_id, + place_id, + period_date, + "day", + "urt_code", + urt_code, + "v5.1", + len(code_spans), # review_count approximation + len(code_spans), + negative_count, + positive_count, + neutral_count, + mixed_count, + strength_score, + negative_strength, + positive_strength, + i1_count, + i2_count, + i3_count, + 0, # cr_better + 0, # cr_worse + 0, # cr_same + strength_score, # trust_weighted_strength + negative_strength, # trust_weighted_negative + ) + facts_inserted += 1 + + logger.info(f" Inserted {facts_inserted} fact records") + + # ========== STEP 6: Verify Results ========== + logger.info("Step 6: Verifying results...") + + async with pool.acquire() as conn: + results = {} + + # Count records in each table + results["reviews_raw"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id + ) + results["reviews_enriched"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id + ) + results["review_spans"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id + ) + results["issues"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id + ) + results["issue_spans"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id + ) + results["issue_events"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id + ) + results["fact_timeseries"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id + ) + + logger.info("") + logger.info("=" * 50) + logger.info("E2E TEST RESULTS") + logger.info("=" * 50) + logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}") + logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}") + logger.info(f" pipeline.review_spans: {results['review_spans']}") + logger.info(f" pipeline.issues: {results['issues']}") + logger.info(f" pipeline.issue_spans: {results['issue_spans']}") + logger.info(f" pipeline.issue_events: {results['issue_events']}") + logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}") + logger.info("=" * 50) + + # Validate counts + assert results["reviews_raw"] == 3, f"Expected 3 raw reviews, got {results['reviews_raw']}" + assert results["reviews_enriched"] == 3, f"Expected 3 enriched reviews, got {results['reviews_enriched']}" + assert results["review_spans"] >= 3, f"Expected at least 3 spans, got {results['review_spans']}" + assert results["issues"] >= 1, f"Expected at least 1 issue, got {results['issues']}" + assert results["fact_timeseries"] >= 1, f"Expected at least 1 fact, got {results['fact_timeseries']}" + + logger.info("") + logger.info("ALL ASSERTIONS PASSED!") + logger.info("") + + # Show sample data + logger.info("Sample issue:") + issue = await conn.fetchrow( + "SELECT issue_id, primary_subcode, state, span_count, max_intensity FROM pipeline.issues WHERE business_id = $1 LIMIT 1", + business_id + ) + if issue: + logger.info(f" ID: {issue['issue_id']}") + logger.info(f" Code: {issue['primary_subcode']}") + logger.info(f" State: {issue['state']}") + logger.info(f" Spans: {issue['span_count']}") + logger.info(f" Max Intensity: {issue['max_intensity']}") + + # ========== CLEANUP ========== + logger.info("") + logger.info("Cleaning up test data...") + + async with pool.acquire() as conn: + # Delete in reverse dependency order + await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) + await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) + await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id) + + logger.info("Test data cleaned up successfully") + logger.info("") + logger.info("E2E TEST COMPLETED SUCCESSFULLY!") + + finally: + await pool.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run E2E pipeline test") + parser.add_argument( + "--database-url", + default=os.environ.get("DATABASE_URL"), + help="PostgreSQL connection string (default: $DATABASE_URL)", + ) + + args = parser.parse_args() + + if not args.database_url: + print("Error: --database-url required or set DATABASE_URL environment variable", file=sys.stderr) + sys.exit(1) + + asyncio.run(run_e2e_test(args.database_url))