#!/usr/bin/env python3 """ End-to-end test for the ReviewIQ Pipeline with actual database. Usage: python tools/test_pipeline_e2e.py --database-url $DATABASE_URL This test: 1. Connects to the database 2. Inserts sample review data into pipeline.reviews_raw 3. Runs Stage 1 normalization 4. Runs Stage 2 classification (mocked LLM) 5. Runs Stage 3 routing 6. Runs Stage 4 aggregation 7. Verifies data in all tables """ import asyncio import os import sys import argparse import json import logging from datetime import datetime, timezone from uuid import uuid4 # Add project root and package to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "packages/reviewiq-pipeline/src" )) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # Sample test data - matches scraper output format def generate_sample_reviews(): """Generate sample reviews with unique IDs.""" return [ { "review_id": f"test-review-{uuid4().hex[:8]}", "text": "The food was absolutely terrible! We waited 45 minutes for cold pasta. Mike the waiter was rude and dismissive. Never coming back.", "rating": 1, "author_name": "John Doe", "author_id": "author-001", "review_time": datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).isoformat(), "time": int(datetime(2026, 1, 15, 14, 30, tzinfo=timezone.utc).timestamp()), "relative_time": "1 week ago", }, { "review_id": f"test-review-{uuid4().hex[:8]}", "text": "Great experience! The steak was cooked to perfection and Sarah provided excellent service. A bit pricey but worth it.", "rating": 5, "author_name": "Jane Smith", "author_id": "author-002", "review_time": datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).isoformat(), "time": int(datetime(2026, 1, 18, 19, 0, tzinfo=timezone.utc).timestamp()), "relative_time": "4 days ago", }, { "review_id": f"test-review-{uuid4().hex[:8]}", "text": "Average food, nothing special. The ambiance was nice though. Wait time was reasonable.", "rating": 3, "author_name": "Bob Wilson", "author_id": "author-003", "review_time": datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).isoformat(), "time": int(datetime(2026, 1, 20, 12, 0, tzinfo=timezone.utc).timestamp()), "relative_time": "2 days ago", }, ] async def run_e2e_test(database_url: str): """Run the full E2E pipeline test.""" import asyncpg # Generate unique test identifiers test_id = uuid4().hex[:8] business_id = f"test-business-{test_id}" place_id = f"test-place-{test_id}" job_id = uuid4() logger.info(f"Starting E2E test with business_id={business_id}") # Generate sample reviews sample_reviews = generate_sample_reviews() # Connect to database pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5) try: # ========== STEP 1: Insert raw reviews ========== logger.info("Step 1: Inserting raw reviews into pipeline.reviews_raw...") async with pool.acquire() as conn: for review in sample_reviews: review_time = datetime.fromtimestamp(review["time"], tz=timezone.utc) await conn.execute(""" INSERT INTO pipeline.reviews_raw ( job_id, source, review_id, place_id, raw_payload, review_text, rating, review_time, reviewer_name, reviewer_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) """, job_id, "google", review["review_id"], place_id, json.dumps(review), review["text"], review["rating"], review_time, review["author_name"], review.get("author_id"), ) raw_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id, ) logger.info(f" Inserted {raw_count} raw reviews") # ========== STEP 2: Run Stage 1 - Normalization ========== logger.info("Step 2: Running Stage 1 normalization...") from reviewiq_pipeline.config import Config from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer config = Config(database_url=database_url) normalizer = Stage1Normalizer(config) normalized_reviews = normalizer.normalize_batch( sample_reviews, business_id, place_id, ) logger.info(f" Normalized {len(normalized_reviews)} reviews") # Insert into reviews_enriched async with pool.acquire() as conn: for norm in normalized_reviews: # Get raw_id raw_id = await conn.fetchval( "SELECT id FROM pipeline.reviews_raw WHERE review_id = $1", norm["review_id"], ) review_time = datetime.fromisoformat(norm["review_time"].replace("Z", "+00:00")) await conn.execute(""" INSERT INTO pipeline.reviews_enriched ( source, review_id, review_version, is_latest, raw_id, business_id, place_id, text, text_normalized, rating, review_time, language, taxonomy_version ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (source, review_id, review_version) DO UPDATE SET is_latest = EXCLUDED.is_latest """, norm["source"], norm["review_id"], norm["review_version"], True, raw_id, business_id, place_id, norm["text"], norm["text_normalized"], norm["rating"], review_time, norm["text_language"], "v5.1", ) enriched_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id, ) logger.info(f" Inserted {enriched_count} enriched reviews") # ========== STEP 3: Run Stage 2 - Classification (mocked) ========== logger.info("Step 3: Running Stage 2 classification (mocked LLM)...") # Use mocked classification results mock_classifications = [ { "review_id": normalized_reviews[0]["review_id"], "urt_primary": "O1.01", # Core product issue "urt_secondary": ["J1.01", "A1.01"], # Wait time, friendliness "valence": "V-", "intensity": "I3", "comparative": "CR-N", "trust_score": 0.85, "spans": [ { "span_index": 0, "span_text": "food was absolutely terrible", "span_start": 4, "span_end": 32, "urt_primary": "O1.01", "valence": "V-", "intensity": "I3", "is_primary": True, }, { "span_index": 1, "span_text": "waited 45 minutes", "span_start": 38, "span_end": 55, "urt_primary": "J1.01", "valence": "V-", "intensity": "I2", "is_primary": False, }, ], }, { "review_id": normalized_reviews[1]["review_id"], "urt_primary": "O1.01", "urt_secondary": ["A2.01"], "valence": "V+", "intensity": "I3", "comparative": "CR-N", "trust_score": 0.90, "spans": [ { "span_index": 0, "span_text": "steak was cooked to perfection", "span_start": 22, "span_end": 52, "urt_primary": "O1.01", "valence": "V+", "intensity": "I3", "is_primary": True, }, ], }, { "review_id": normalized_reviews[2]["review_id"], "urt_primary": "O1.01", "urt_secondary": [], "valence": "V0", "intensity": "I1", "comparative": "CR-N", "trust_score": 0.70, "spans": [ { "span_index": 0, "span_text": "Average food, nothing special", "span_start": 0, "span_end": 29, "urt_primary": "O1.01", "valence": "V0", "intensity": "I1", "is_primary": True, }, ], }, ] # Update enriched reviews with classification and insert spans batch_id = f"batch-{test_id}" async with pool.acquire() as conn: for cls in mock_classifications: # Update classification in reviews_enriched await conn.execute(""" UPDATE pipeline.reviews_enriched SET urt_primary = $1, urt_secondary = $2, valence = $3, intensity = $4, comparative = $5, trust_score = $6, classification_model = $7, processed_at = NOW() WHERE review_id = $8 AND business_id = $9 """, cls["urt_primary"], cls["urt_secondary"], cls["valence"], cls["intensity"], cls["comparative"], cls["trust_score"], "mocked-gpt-4o-mini", cls["review_id"], business_id, ) # Get review info for spans review_row = await conn.fetchrow( "SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1", cls["review_id"], ) # Insert spans for span in cls["spans"]: span_id = f"SPN-{uuid4().hex[:12]}" usn = f"google:{cls['review_id']}:1:{span['span_index']}" await conn.execute(""" INSERT INTO pipeline.review_spans ( span_id, business_id, place_id, source, review_id, review_version, span_index, span_text, span_start, span_end, profile, urt_primary, valence, intensity, comparative, is_primary, is_active, review_time, confidence, usn, taxonomy_version, model_version, ingest_batch_id ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23 ) """, span_id, business_id, place_id, "google", cls["review_id"], 1, span["span_index"], span["span_text"], span["span_start"], span["span_end"], "standard", span["urt_primary"], span["valence"], span["intensity"], "CR-N", span["is_primary"], True, review_row["review_time"], "high", usn, "v5.1", "mocked-gpt-4o-mini", batch_id, ) span_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id, ) logger.info(f" Inserted {span_count} spans") # ========== STEP 4: Run Stage 3 - Routing ========== logger.info("Step 4: Running Stage 3 routing...") from reviewiq_pipeline.stages.stage3_route import Stage3Router router = Stage3Router(config) async with pool.acquire() as conn: # Get negative/mixed spans to route spans_to_route = await conn.fetch(""" SELECT span_id, business_id, place_id, urt_primary, valence, intensity, entity_normalized, review_time, confidence, source, review_id, review_version FROM pipeline.review_spans WHERE business_id = $1 AND valence IN ('V-', 'V±') """, business_id) routed_count = 0 for span_row in spans_to_route: span_data = { "span_id": span_row["span_id"], "business_id": span_row["business_id"], "place_id": span_row["place_id"], "urt_primary": span_row["urt_primary"], "valence": span_row["valence"], "intensity": span_row["intensity"], "entity_normalized": span_row["entity_normalized"], "review_time": span_row["review_time"].isoformat(), "confidence": span_row["confidence"], "trust_score": 0.85, } routed = router.route_span_sync(span_data) # Check if issue exists existing = await conn.fetchval( "SELECT 1 FROM pipeline.issues WHERE issue_id = $1", routed["issue_id"], ) if not existing: # Create issue - extract domain from urt_primary (e.g., "O1.01" -> "O") domain = span_row["urt_primary"][0] # Compute priority score based on intensity intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0} priority_score = intensity_scores.get(span_row["intensity"], 1.0) await conn.execute(""" INSERT INTO pipeline.issues ( issue_id, business_id, place_id, primary_subcode, domain, state, priority_score, span_count, max_intensity, taxonomy_version ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) """, routed["issue_id"], business_id, place_id, span_row["urt_primary"], domain, "open", priority_score, 1, span_row["intensity"], "v5.1", ) else: # Update span count await conn.execute(""" UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW() WHERE issue_id = $1 """, routed["issue_id"]) # Link span to issue await conn.execute(""" INSERT INTO pipeline.issue_spans ( issue_id, span_id, source, review_id, review_version, is_primary_match, intensity, review_time ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (span_id) DO NOTHING """, routed["issue_id"], span_row["span_id"], span_row["source"], span_row["review_id"], span_row["review_version"], True, span_row["intensity"], span_row["review_time"], ) # Log event await conn.execute(""" INSERT INTO pipeline.issue_events (issue_id, event_type, span_id) VALUES ($1, $2, $3) """, routed["issue_id"], "span_linked", span_row["span_id"]) routed_count += 1 issue_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id, ) logger.info(f" Routed {routed_count} spans to {issue_count} issues") # ========== STEP 5: Run Stage 4 - Aggregation ========== logger.info("Step 5: Running Stage 4 aggregation...") from collections import defaultdict async with pool.acquire() as conn: # Get span data for aggregation span_data = await conn.fetch(""" SELECT rs.business_id, rs.place_id, DATE(rs.review_time) as review_date, rs.urt_primary, rs.valence, rs.intensity, rs.comparative, re.trust_score, re.rating FROM pipeline.review_spans rs JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE rs.business_id = $1 AND rs.is_active = TRUE """, business_id) # Group by date and URT code daily_data = defaultdict(lambda: defaultdict(list)) for row in span_data: daily_data[row["review_date"]][row["urt_primary"]].append(dict(row)) facts_inserted = 0 for period_date, urt_groups in daily_data.items(): for urt_code, code_spans in urt_groups.items(): # Compute simple metrics negative_count = sum(1 for s in code_spans if s["valence"] == "V-") positive_count = sum(1 for s in code_spans if s["valence"] == "V+") neutral_count = sum(1 for s in code_spans if s["valence"] == "V0") mixed_count = sum(1 for s in code_spans if s["valence"] == "V±") i1_count = sum(1 for s in code_spans if s["intensity"] == "I1") i2_count = sum(1 for s in code_spans if s["intensity"] == "I2") i3_count = sum(1 for s in code_spans if s["intensity"] == "I3") # Simple strength score strength_score = (positive_count - negative_count) / max(len(code_spans), 1) negative_strength = negative_count * 1.0 positive_strength = positive_count * 1.0 await conn.execute(""" INSERT INTO pipeline.fact_timeseries ( business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version, review_count, span_count, negative_count, positive_count, neutral_count, mixed_count, strength_score, negative_strength, positive_strength, i1_count, i2_count, i3_count, cr_better, cr_worse, cr_same, trust_weighted_strength, trust_weighted_negative ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24 ) ON CONFLICT (business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version) DO UPDATE SET span_count = EXCLUDED.span_count, computed_at = NOW() """, business_id, place_id, period_date, "day", "urt_code", urt_code, "v5.1", len(code_spans), # review_count approximation len(code_spans), negative_count, positive_count, neutral_count, mixed_count, strength_score, negative_strength, positive_strength, i1_count, i2_count, i3_count, 0, # cr_better 0, # cr_worse 0, # cr_same strength_score, # trust_weighted_strength negative_strength, # trust_weighted_negative ) facts_inserted += 1 logger.info(f" Inserted {facts_inserted} fact records") # ========== STEP 6: Verify Results ========== logger.info("Step 6: Verifying results...") async with pool.acquire() as conn: results = {} # Count records in each table results["reviews_raw"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id ) results["reviews_enriched"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id ) results["review_spans"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id ) results["issues"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id ) results["issue_spans"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id ) results["issue_events"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id ) results["fact_timeseries"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id ) logger.info("") logger.info("=" * 50) logger.info("E2E TEST RESULTS") logger.info("=" * 50) logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}") logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}") logger.info(f" pipeline.review_spans: {results['review_spans']}") logger.info(f" pipeline.issues: {results['issues']}") logger.info(f" pipeline.issue_spans: {results['issue_spans']}") logger.info(f" pipeline.issue_events: {results['issue_events']}") logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}") logger.info("=" * 50) # Validate counts assert results["reviews_raw"] == 3, f"Expected 3 raw reviews, got {results['reviews_raw']}" assert results["reviews_enriched"] == 3, f"Expected 3 enriched reviews, got {results['reviews_enriched']}" assert results["review_spans"] >= 3, f"Expected at least 3 spans, got {results['review_spans']}" assert results["issues"] >= 1, f"Expected at least 1 issue, got {results['issues']}" assert results["fact_timeseries"] >= 1, f"Expected at least 1 fact, got {results['fact_timeseries']}" logger.info("") logger.info("ALL ASSERTIONS PASSED!") logger.info("") # Show sample data logger.info("Sample issue:") issue = await conn.fetchrow( "SELECT issue_id, primary_subcode, state, span_count, max_intensity FROM pipeline.issues WHERE business_id = $1 LIMIT 1", business_id ) if issue: logger.info(f" ID: {issue['issue_id']}") logger.info(f" Code: {issue['primary_subcode']}") logger.info(f" State: {issue['state']}") logger.info(f" Spans: {issue['span_count']}") logger.info(f" Max Intensity: {issue['max_intensity']}") # ========== CLEANUP ========== logger.info("") logger.info("Cleaning up test data...") async with pool.acquire() as conn: # Delete in reverse dependency order await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id) logger.info("Test data cleaned up successfully") logger.info("") logger.info("E2E TEST COMPLETED SUCCESSFULLY!") finally: await pool.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run E2E pipeline test") parser.add_argument( "--database-url", default=os.environ.get("DATABASE_URL"), help="PostgreSQL connection string (default: $DATABASE_URL)", ) args = parser.parse_args() if not args.database_url: print("Error: --database-url required or set DATABASE_URL environment variable", file=sys.stderr) sys.exit(1) asyncio.run(run_e2e_test(args.database_url))