#!/usr/bin/env python3 """ Test the pipeline with real scraped review data. Usage: python tools/test_pipeline_real_data.py --database-url $DATABASE_URL This test: 1. Loads sample barbershop reviews from data/samples/ 2. Uses ScraperV1Adapter to transform the data 3. Runs the full pipeline (Stage 1-4) with mocked LLM 4. Verifies data in all tables 5. Shows summary statistics """ import asyncio import os import sys import argparse import json import logging from datetime import datetime, timezone from uuid import uuid4 from pathlib import Path # Add project root and package to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) sys.path.insert(0, str(PROJECT_ROOT / "packages/reviewiq-pipeline/src")) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) async def run_pipeline_test(database_url: str, sample_file: str, cleanup: bool = True): """Run the pipeline test with real data.""" import asyncpg from reviewiq_pipeline.adapters.scraper_v1 import ScraperV1Adapter, load_and_transform from reviewiq_pipeline.config import Config from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer from reviewiq_pipeline.stages.stage3_route import Stage3Router from collections import defaultdict # Generate unique test identifiers test_id = uuid4().hex[:8] business_id = f"barbershop-fleitas-{test_id}" place_id = f"ChIJxxxxxx-{test_id}" job_id = uuid4() logger.info("=" * 60) logger.info("PIPELINE TEST WITH REAL DATA") logger.info("=" * 60) logger.info(f"Sample file: {sample_file}") logger.info(f"Business ID: {business_id}") logger.info(f"Job ID: {job_id}") logger.info("") # Load and transform sample data logger.info("Step 1: Loading and transforming sample data...") sample_path = PROJECT_ROOT / sample_file if not sample_path.exists(): logger.error(f"Sample file not found: {sample_path}") sys.exit(1) raw_data = json.loads(sample_path.read_text()) logger.info(f" Loaded {len(raw_data)} raw reviews from file") adapter = ScraperV1Adapter(business_id, place_id) scraper_output = adapter.to_scraper_output(raw_data, str(job_id)) reviews = scraper_output["reviews"] logger.info(f" Transformed {len(reviews)} reviews (skipped empty text)") # Show sample review if reviews: sample = reviews[0] logger.info(f" Sample review:") logger.info(f" Author: {sample['author_name']}") logger.info(f" Rating: {sample['rating']}") logger.info(f" Text: {sample['text'][:80]}...") # Connect to database pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5) config = Config(database_url=database_url) try: # ========== STAGE 1: Insert raw reviews ========== logger.info("") logger.info("Step 2: Stage 1 - Inserting raw reviews...") async with pool.acquire() as conn: for review in reviews: review_time = datetime.fromisoformat( review["review_time"].replace("Z", "+00:00") ) await conn.execute(""" INSERT INTO pipeline.reviews_raw ( job_id, source, review_id, place_id, raw_payload, review_text, rating, review_time, reviewer_name, reviewer_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (source, review_id, review_version) DO NOTHING """, job_id, "google", review["review_id"], place_id, json.dumps(review.get("raw_payload", {})), review["text"], review["rating"], review_time, review["author_name"], review.get("author_id"), ) raw_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id, ) logger.info(f" Inserted {raw_count} raw reviews") # ========== STAGE 1: Normalize ========== logger.info("") logger.info("Step 3: Stage 1 - Normalizing reviews...") normalizer = Stage1Normalizer(config) normalized_reviews = normalizer.normalize_batch(reviews, business_id, place_id) logger.info(f" Normalized {len(normalized_reviews)} reviews") # Language distribution lang_dist = defaultdict(int) for r in normalized_reviews: lang_dist[r["text_language"]] += 1 logger.info(f" Languages: {dict(lang_dist)}") # Insert into reviews_enriched async with pool.acquire() as conn: for norm in normalized_reviews: raw_id = await conn.fetchval( "SELECT id FROM pipeline.reviews_raw WHERE review_id = $1", norm["review_id"], ) review_time = datetime.fromisoformat( norm["review_time"].replace("Z", "+00:00") ) await conn.execute(""" INSERT INTO pipeline.reviews_enriched ( source, review_id, review_version, is_latest, raw_id, business_id, place_id, text, text_normalized, rating, review_time, language, taxonomy_version ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (source, review_id, review_version) DO UPDATE SET is_latest = EXCLUDED.is_latest """, norm["source"], norm["review_id"], norm["review_version"], True, raw_id, business_id, place_id, norm["text"], norm["text_normalized"], norm["rating"], review_time, norm["text_language"], "v5.1", ) enriched_count = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id, ) logger.info(f" Inserted {enriched_count} enriched reviews") # ========== STAGE 2: Mock Classification ========== logger.info("") logger.info("Step 4: Stage 2 - Classifying reviews (mocked LLM)...") # Simple heuristic classification based on rating and keywords def mock_classify(review: dict) -> dict: text = review["text_normalized"].lower() rating = review["rating"] # Determine valence from rating if rating >= 4: valence = "V+" elif rating <= 2: valence = "V-" else: valence = "V0" # Determine intensity from text length and rating extremity if rating in (1, 5) and len(text) > 100: intensity = "I3" elif rating in (1, 2, 4, 5): intensity = "I2" else: intensity = "I1" # Simple URT classification based on keywords if any(w in text for w in ["servicio", "service", "trato", "atención", "attention"]): urt_primary = "A2.01" # Helpfulness elif any(w in text for w in ["profesional", "professional", "expert"]): urt_primary = "A4.01" # Knowledge/Expertise elif any(w in text for w in ["precio", "price", "caro", "barato", "expensive", "cheap"]): urt_primary = "P1.01" # Value Perception elif any(w in text for w in ["espera", "wait", "tiempo", "time", "rápido", "fast"]): urt_primary = "J1.01" # Wait Times elif any(w in text for w in ["ambiente", "ambiance", "local", "lugar", "place"]): urt_primary = "E2.01" # Ambiance else: urt_primary = "O1.01" # Core Product/Service return { "urt_primary": urt_primary, "valence": valence, "intensity": intensity, "trust_score": 0.7 + (len(text) / 1000), # Longer = more trustworthy } batch_id = f"batch-{test_id}" spans_created = 0 async with pool.acquire() as conn: for norm in normalized_reviews: cls = mock_classify(norm) # Update classification in reviews_enriched await conn.execute(""" UPDATE pipeline.reviews_enriched SET urt_primary = $1, valence = $2, intensity = $3, trust_score = $4, classification_model = $5, processed_at = NOW() WHERE review_id = $6 AND business_id = $7 """, cls["urt_primary"], cls["valence"], cls["intensity"], cls["trust_score"], "mock-classifier-v1", norm["review_id"], business_id, ) # Get review time for span review_row = await conn.fetchrow( "SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1", norm["review_id"], ) # Create a single span for the whole review span_id = f"SPN-{uuid4().hex[:12]}" usn = f"google:{norm['review_id']}:1:0" # Truncate span text to first 200 chars span_text = norm["text"][:200] await conn.execute(""" INSERT INTO pipeline.review_spans ( span_id, business_id, place_id, source, review_id, review_version, span_index, span_text, span_start, span_end, profile, urt_primary, valence, intensity, comparative, is_primary, is_active, review_time, confidence, usn, taxonomy_version, model_version, ingest_batch_id ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23 ) ON CONFLICT (span_id) DO NOTHING """, span_id, business_id, place_id, "google", norm["review_id"], 1, 0, # span_index span_text, 0, # span_start len(span_text), # span_end "standard", cls["urt_primary"], cls["valence"], cls["intensity"], "CR-N", True, # is_primary True, # is_active review_row["review_time"], "high", usn, "v5.1", "mock-classifier-v1", batch_id, ) spans_created += 1 logger.info(f" Created {spans_created} spans") # Show classification distribution cls_dist = await conn.fetch(""" SELECT urt_primary, valence, COUNT(*) as cnt FROM pipeline.reviews_enriched WHERE business_id = $1 GROUP BY urt_primary, valence ORDER BY cnt DESC """, business_id) logger.info(" Classification distribution:") for row in cls_dist[:5]: logger.info(f" {row['urt_primary']} ({row['valence']}): {row['cnt']}") # ========== STAGE 3: Route Issues ========== logger.info("") logger.info("Step 5: Stage 3 - Routing negative spans to issues...") router = Stage3Router(config) routed_count = 0 issues_created = 0 async with pool.acquire() as conn: # Get negative/mixed spans spans_to_route = await conn.fetch(""" SELECT span_id, business_id, place_id, urt_primary, valence, intensity, entity_normalized, review_time, confidence, source, review_id, review_version FROM pipeline.review_spans WHERE business_id = $1 AND valence IN ('V-', 'V±') """, business_id) for span_row in spans_to_route: span_data = { "span_id": span_row["span_id"], "business_id": span_row["business_id"], "place_id": span_row["place_id"], "urt_primary": span_row["urt_primary"], "valence": span_row["valence"], "intensity": span_row["intensity"], "entity_normalized": span_row["entity_normalized"], "review_time": span_row["review_time"].isoformat(), "confidence": span_row["confidence"], "trust_score": 0.85, } routed = router.route_span_sync(span_data) # Check if issue exists existing = await conn.fetchval( "SELECT 1 FROM pipeline.issues WHERE issue_id = $1", routed["issue_id"], ) if not existing: domain = span_row["urt_primary"][0] intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0} priority_score = intensity_scores.get(span_row["intensity"], 1.0) await conn.execute(""" INSERT INTO pipeline.issues ( issue_id, business_id, place_id, primary_subcode, domain, state, priority_score, span_count, max_intensity, taxonomy_version ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) """, routed["issue_id"], business_id, place_id, span_row["urt_primary"], domain, "open", priority_score, 1, span_row["intensity"], "v5.1", ) issues_created += 1 else: await conn.execute(""" UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW() WHERE issue_id = $1 """, routed["issue_id"]) # Link span to issue await conn.execute(""" INSERT INTO pipeline.issue_spans ( issue_id, span_id, source, review_id, review_version, is_primary_match, intensity, review_time ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (span_id) DO NOTHING """, routed["issue_id"], span_row["span_id"], span_row["source"], span_row["review_id"], span_row["review_version"], True, span_row["intensity"], span_row["review_time"], ) # Log event await conn.execute(""" INSERT INTO pipeline.issue_events (issue_id, event_type, span_id) VALUES ($1, $2, $3) """, routed["issue_id"], "span_linked", span_row["span_id"]) routed_count += 1 logger.info(f" Routed {routed_count} spans to {issues_created} issues") # ========== STAGE 4: Aggregation ========== logger.info("") logger.info("Step 6: Stage 4 - Aggregating facts...") async with pool.acquire() as conn: # Get span data for aggregation span_data = await conn.fetch(""" SELECT rs.business_id, rs.place_id, DATE(rs.review_time) as review_date, rs.urt_primary, rs.valence, rs.intensity, rs.comparative, re.trust_score, re.rating FROM pipeline.review_spans rs JOIN pipeline.reviews_enriched re ON ( re.source = rs.source AND re.review_id = rs.review_id AND re.review_version = rs.review_version ) WHERE rs.business_id = $1 AND rs.is_active = TRUE """, business_id) # Group by date and URT code daily_data = defaultdict(lambda: defaultdict(list)) for row in span_data: daily_data[row["review_date"]][row["urt_primary"]].append(dict(row)) facts_inserted = 0 for period_date, urt_groups in daily_data.items(): for urt_code, code_spans in urt_groups.items(): negative_count = sum(1 for s in code_spans if s["valence"] == "V-") positive_count = sum(1 for s in code_spans if s["valence"] == "V+") neutral_count = sum(1 for s in code_spans if s["valence"] == "V0") mixed_count = sum(1 for s in code_spans if s["valence"] == "V±") i1_count = sum(1 for s in code_spans if s["intensity"] == "I1") i2_count = sum(1 for s in code_spans if s["intensity"] == "I2") i3_count = sum(1 for s in code_spans if s["intensity"] == "I3") strength_score = (positive_count - negative_count) / max(len(code_spans), 1) await conn.execute(""" INSERT INTO pipeline.fact_timeseries ( business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version, review_count, span_count, negative_count, positive_count, neutral_count, mixed_count, strength_score, negative_strength, positive_strength, i1_count, i2_count, i3_count, cr_better, cr_worse, cr_same, trust_weighted_strength, trust_weighted_negative ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24 ) ON CONFLICT (business_id, place_id, period_date, bucket_type, subject_type, subject_id, taxonomy_version) DO UPDATE SET span_count = EXCLUDED.span_count, computed_at = NOW() """, business_id, place_id, period_date, "day", "urt_code", urt_code, "v5.1", len(code_spans), len(code_spans), negative_count, positive_count, neutral_count, mixed_count, strength_score, float(negative_count), float(positive_count), i1_count, i2_count, i3_count, 0, 0, 0, strength_score, float(negative_count), ) facts_inserted += 1 logger.info(f" Inserted {facts_inserted} fact records") # ========== RESULTS ========== logger.info("") logger.info("=" * 60) logger.info("PIPELINE TEST RESULTS") logger.info("=" * 60) async with pool.acquire() as conn: results = {} results["reviews_raw"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id ) results["reviews_enriched"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id ) results["review_spans"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id ) results["issues"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id ) results["issue_spans"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id ) results["fact_timeseries"] = await conn.fetchval( "SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id ) logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}") logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}") logger.info(f" pipeline.review_spans: {results['review_spans']}") logger.info(f" pipeline.issues: {results['issues']}") logger.info(f" pipeline.issue_spans: {results['issue_spans']}") logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}") # Show issues logger.info("") logger.info("Issues identified:") issues = await conn.fetch(""" SELECT issue_id, primary_subcode, span_count, max_intensity FROM pipeline.issues WHERE business_id = $1 ORDER BY span_count DESC LIMIT 10 """, business_id) for issue in issues: logger.info( f" {issue['issue_id'][:20]}... " f"Code={issue['primary_subcode']} " f"Spans={issue['span_count']} " f"MaxInt={issue['max_intensity']}" ) # Show date range date_range = await conn.fetchrow(""" SELECT MIN(review_time)::date as min_date, MAX(review_time)::date as max_date FROM pipeline.reviews_enriched WHERE business_id = $1 """, business_id) logger.info("") logger.info(f"Date range: {date_range['min_date']} to {date_range['max_date']}") # ========== CLEANUP ========== if cleanup: logger.info("") logger.info("Cleaning up test data...") async with pool.acquire() as conn: await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id) await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id) logger.info("Test data cleaned up") logger.info("") logger.info("=" * 60) logger.info("PIPELINE TEST COMPLETED SUCCESSFULLY!") logger.info("=" * 60) finally: await pool.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test pipeline with real data") parser.add_argument( "--database-url", default=os.environ.get("DATABASE_URL"), help="PostgreSQL connection string (default: $DATABASE_URL)", ) parser.add_argument( "--sample-file", default="data/samples/barbershop_reviews.json", help="Path to sample data file (relative to project root)", ) parser.add_argument( "--no-cleanup", action="store_true", help="Don't clean up test data after running", ) args = parser.parse_args() if not args.database_url: print("Error: --database-url required or set DATABASE_URL", file=sys.stderr) sys.exit(1) asyncio.run(run_pipeline_test( args.database_url, args.sample_file, cleanup=not args.no_cleanup, ))