feat: Add ScraperV1Adapter and real data pipeline test

- Add ScraperV1Adapter to transform scraped reviews into pipeline format
  - Handles relative timestamps (centerDate)
  - Generates deterministic IDs for DOM-sourced reviews
  - Filters out empty (rating-only) reviews

- Add sample barbershop reviews (79 reviews, 46 with text)
  - Real data from Las Palmas barbershop
  - Multi-language: Spanish, English, German, Norwegian, Italian

- Add test_pipeline_real_data.py for E2E testing with real data
  - Uses mock classifier based on keywords and rating
  - Full pipeline flow: raw -> enriched -> spans -> issues -> facts

Test results with real data:
- 46 reviews processed
- 6 languages detected (es: 35, en: 7, de: 1, no: 1, it: 1, ca: 1)
- 3 issues identified from negative reviews
- 29 fact records aggregated across date range 2017-2025

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 18:35:09 +00:00
parent 3e57c887e9
commit e2d7f6f118
4 changed files with 1733 additions and 0 deletions

View File

@@ -0,0 +1,625 @@
#!/usr/bin/env python3
"""
Test the pipeline with real scraped review data.
Usage:
python tools/test_pipeline_real_data.py --database-url $DATABASE_URL
This test:
1. Loads sample barbershop reviews from data/samples/
2. Uses ScraperV1Adapter to transform the data
3. Runs the full pipeline (Stage 1-4) with mocked LLM
4. Verifies data in all tables
5. Shows summary statistics
"""
import asyncio
import os
import sys
import argparse
import json
import logging
from datetime import datetime, timezone
from uuid import uuid4
from pathlib import Path
# Add project root and package to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT / "packages/reviewiq-pipeline/src"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
async def run_pipeline_test(database_url: str, sample_file: str, cleanup: bool = True):
"""Run the pipeline test with real data."""
import asyncpg
from reviewiq_pipeline.adapters.scraper_v1 import ScraperV1Adapter, load_and_transform
from reviewiq_pipeline.config import Config
from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer
from reviewiq_pipeline.stages.stage3_route import Stage3Router
from collections import defaultdict
# Generate unique test identifiers
test_id = uuid4().hex[:8]
business_id = f"barbershop-fleitas-{test_id}"
place_id = f"ChIJxxxxxx-{test_id}"
job_id = uuid4()
logger.info("=" * 60)
logger.info("PIPELINE TEST WITH REAL DATA")
logger.info("=" * 60)
logger.info(f"Sample file: {sample_file}")
logger.info(f"Business ID: {business_id}")
logger.info(f"Job ID: {job_id}")
logger.info("")
# Load and transform sample data
logger.info("Step 1: Loading and transforming sample data...")
sample_path = PROJECT_ROOT / sample_file
if not sample_path.exists():
logger.error(f"Sample file not found: {sample_path}")
sys.exit(1)
raw_data = json.loads(sample_path.read_text())
logger.info(f" Loaded {len(raw_data)} raw reviews from file")
adapter = ScraperV1Adapter(business_id, place_id)
scraper_output = adapter.to_scraper_output(raw_data, str(job_id))
reviews = scraper_output["reviews"]
logger.info(f" Transformed {len(reviews)} reviews (skipped empty text)")
# Show sample review
if reviews:
sample = reviews[0]
logger.info(f" Sample review:")
logger.info(f" Author: {sample['author_name']}")
logger.info(f" Rating: {sample['rating']}")
logger.info(f" Text: {sample['text'][:80]}...")
# Connect to database
pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5)
config = Config(database_url=database_url)
try:
# ========== STAGE 1: Insert raw reviews ==========
logger.info("")
logger.info("Step 2: Stage 1 - Inserting raw reviews...")
async with pool.acquire() as conn:
for review in reviews:
review_time = datetime.fromisoformat(
review["review_time"].replace("Z", "+00:00")
)
await conn.execute("""
INSERT INTO pipeline.reviews_raw (
job_id, source, review_id, place_id, raw_payload,
review_text, rating, review_time, reviewer_name, reviewer_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (source, review_id, review_version) DO NOTHING
""",
job_id,
"google",
review["review_id"],
place_id,
json.dumps(review.get("raw_payload", {})),
review["text"],
review["rating"],
review_time,
review["author_name"],
review.get("author_id"),
)
raw_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1",
place_id,
)
logger.info(f" Inserted {raw_count} raw reviews")
# ========== STAGE 1: Normalize ==========
logger.info("")
logger.info("Step 3: Stage 1 - Normalizing reviews...")
normalizer = Stage1Normalizer(config)
normalized_reviews = normalizer.normalize_batch(reviews, business_id, place_id)
logger.info(f" Normalized {len(normalized_reviews)} reviews")
# Language distribution
lang_dist = defaultdict(int)
for r in normalized_reviews:
lang_dist[r["text_language"]] += 1
logger.info(f" Languages: {dict(lang_dist)}")
# Insert into reviews_enriched
async with pool.acquire() as conn:
for norm in normalized_reviews:
raw_id = await conn.fetchval(
"SELECT id FROM pipeline.reviews_raw WHERE review_id = $1",
norm["review_id"],
)
review_time = datetime.fromisoformat(
norm["review_time"].replace("Z", "+00:00")
)
await conn.execute("""
INSERT INTO pipeline.reviews_enriched (
source, review_id, review_version, is_latest, raw_id,
business_id, place_id, text, text_normalized, rating, review_time,
language, taxonomy_version
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (source, review_id, review_version) DO UPDATE SET
is_latest = EXCLUDED.is_latest
""",
norm["source"],
norm["review_id"],
norm["review_version"],
True,
raw_id,
business_id,
place_id,
norm["text"],
norm["text_normalized"],
norm["rating"],
review_time,
norm["text_language"],
"v5.1",
)
enriched_count = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1",
business_id,
)
logger.info(f" Inserted {enriched_count} enriched reviews")
# ========== STAGE 2: Mock Classification ==========
logger.info("")
logger.info("Step 4: Stage 2 - Classifying reviews (mocked LLM)...")
# Simple heuristic classification based on rating and keywords
def mock_classify(review: dict) -> dict:
text = review["text_normalized"].lower()
rating = review["rating"]
# Determine valence from rating
if rating >= 4:
valence = "V+"
elif rating <= 2:
valence = "V-"
else:
valence = "V0"
# Determine intensity from text length and rating extremity
if rating in (1, 5) and len(text) > 100:
intensity = "I3"
elif rating in (1, 2, 4, 5):
intensity = "I2"
else:
intensity = "I1"
# Simple URT classification based on keywords
if any(w in text for w in ["servicio", "service", "trato", "atención", "attention"]):
urt_primary = "A2.01" # Helpfulness
elif any(w in text for w in ["profesional", "professional", "expert"]):
urt_primary = "A4.01" # Knowledge/Expertise
elif any(w in text for w in ["precio", "price", "caro", "barato", "expensive", "cheap"]):
urt_primary = "P1.01" # Value Perception
elif any(w in text for w in ["espera", "wait", "tiempo", "time", "rápido", "fast"]):
urt_primary = "J1.01" # Wait Times
elif any(w in text for w in ["ambiente", "ambiance", "local", "lugar", "place"]):
urt_primary = "E2.01" # Ambiance
else:
urt_primary = "O1.01" # Core Product/Service
return {
"urt_primary": urt_primary,
"valence": valence,
"intensity": intensity,
"trust_score": 0.7 + (len(text) / 1000), # Longer = more trustworthy
}
batch_id = f"batch-{test_id}"
spans_created = 0
async with pool.acquire() as conn:
for norm in normalized_reviews:
cls = mock_classify(norm)
# Update classification in reviews_enriched
await conn.execute("""
UPDATE pipeline.reviews_enriched SET
urt_primary = $1,
valence = $2,
intensity = $3,
trust_score = $4,
classification_model = $5,
processed_at = NOW()
WHERE review_id = $6 AND business_id = $7
""",
cls["urt_primary"],
cls["valence"],
cls["intensity"],
cls["trust_score"],
"mock-classifier-v1",
norm["review_id"],
business_id,
)
# Get review time for span
review_row = await conn.fetchrow(
"SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1",
norm["review_id"],
)
# Create a single span for the whole review
span_id = f"SPN-{uuid4().hex[:12]}"
usn = f"google:{norm['review_id']}:1:0"
# Truncate span text to first 200 chars
span_text = norm["text"][:200]
await conn.execute("""
INSERT INTO pipeline.review_spans (
span_id, business_id, place_id, source, review_id, review_version,
span_index, span_text, span_start, span_end,
profile, urt_primary, valence, intensity, comparative,
is_primary, is_active, review_time,
confidence, usn, taxonomy_version, model_version, ingest_batch_id
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23
)
ON CONFLICT (span_id) DO NOTHING
""",
span_id,
business_id,
place_id,
"google",
norm["review_id"],
1,
0, # span_index
span_text,
0, # span_start
len(span_text), # span_end
"standard",
cls["urt_primary"],
cls["valence"],
cls["intensity"],
"CR-N",
True, # is_primary
True, # is_active
review_row["review_time"],
"high",
usn,
"v5.1",
"mock-classifier-v1",
batch_id,
)
spans_created += 1
logger.info(f" Created {spans_created} spans")
# Show classification distribution
cls_dist = await conn.fetch("""
SELECT urt_primary, valence, COUNT(*) as cnt
FROM pipeline.reviews_enriched
WHERE business_id = $1
GROUP BY urt_primary, valence
ORDER BY cnt DESC
""", business_id)
logger.info(" Classification distribution:")
for row in cls_dist[:5]:
logger.info(f" {row['urt_primary']} ({row['valence']}): {row['cnt']}")
# ========== STAGE 3: Route Issues ==========
logger.info("")
logger.info("Step 5: Stage 3 - Routing negative spans to issues...")
router = Stage3Router(config)
routed_count = 0
issues_created = 0
async with pool.acquire() as conn:
# Get negative/mixed spans
spans_to_route = await conn.fetch("""
SELECT span_id, business_id, place_id, urt_primary, valence, intensity,
entity_normalized, review_time, confidence, source, review_id, review_version
FROM pipeline.review_spans
WHERE business_id = $1 AND valence IN ('V-', '')
""", business_id)
for span_row in spans_to_route:
span_data = {
"span_id": span_row["span_id"],
"business_id": span_row["business_id"],
"place_id": span_row["place_id"],
"urt_primary": span_row["urt_primary"],
"valence": span_row["valence"],
"intensity": span_row["intensity"],
"entity_normalized": span_row["entity_normalized"],
"review_time": span_row["review_time"].isoformat(),
"confidence": span_row["confidence"],
"trust_score": 0.85,
}
routed = router.route_span_sync(span_data)
# Check if issue exists
existing = await conn.fetchval(
"SELECT 1 FROM pipeline.issues WHERE issue_id = $1",
routed["issue_id"],
)
if not existing:
domain = span_row["urt_primary"][0]
intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0}
priority_score = intensity_scores.get(span_row["intensity"], 1.0)
await conn.execute("""
INSERT INTO pipeline.issues (
issue_id, business_id, place_id, primary_subcode, domain,
state, priority_score, span_count, max_intensity, taxonomy_version
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
routed["issue_id"],
business_id,
place_id,
span_row["urt_primary"],
domain,
"open",
priority_score,
1,
span_row["intensity"],
"v5.1",
)
issues_created += 1
else:
await conn.execute("""
UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW()
WHERE issue_id = $1
""", routed["issue_id"])
# Link span to issue
await conn.execute("""
INSERT INTO pipeline.issue_spans (
issue_id, span_id, source, review_id, review_version,
is_primary_match, intensity, review_time
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (span_id) DO NOTHING
""",
routed["issue_id"],
span_row["span_id"],
span_row["source"],
span_row["review_id"],
span_row["review_version"],
True,
span_row["intensity"],
span_row["review_time"],
)
# Log event
await conn.execute("""
INSERT INTO pipeline.issue_events (issue_id, event_type, span_id)
VALUES ($1, $2, $3)
""", routed["issue_id"], "span_linked", span_row["span_id"])
routed_count += 1
logger.info(f" Routed {routed_count} spans to {issues_created} issues")
# ========== STAGE 4: Aggregation ==========
logger.info("")
logger.info("Step 6: Stage 4 - Aggregating facts...")
async with pool.acquire() as conn:
# Get span data for aggregation
span_data = await conn.fetch("""
SELECT
rs.business_id,
rs.place_id,
DATE(rs.review_time) as review_date,
rs.urt_primary,
rs.valence,
rs.intensity,
rs.comparative,
re.trust_score,
re.rating
FROM pipeline.review_spans rs
JOIN pipeline.reviews_enriched re ON (
re.source = rs.source
AND re.review_id = rs.review_id
AND re.review_version = rs.review_version
)
WHERE rs.business_id = $1 AND rs.is_active = TRUE
""", business_id)
# Group by date and URT code
daily_data = defaultdict(lambda: defaultdict(list))
for row in span_data:
daily_data[row["review_date"]][row["urt_primary"]].append(dict(row))
facts_inserted = 0
for period_date, urt_groups in daily_data.items():
for urt_code, code_spans in urt_groups.items():
negative_count = sum(1 for s in code_spans if s["valence"] == "V-")
positive_count = sum(1 for s in code_spans if s["valence"] == "V+")
neutral_count = sum(1 for s in code_spans if s["valence"] == "V0")
mixed_count = sum(1 for s in code_spans if s["valence"] == "")
i1_count = sum(1 for s in code_spans if s["intensity"] == "I1")
i2_count = sum(1 for s in code_spans if s["intensity"] == "I2")
i3_count = sum(1 for s in code_spans if s["intensity"] == "I3")
strength_score = (positive_count - negative_count) / max(len(code_spans), 1)
await conn.execute("""
INSERT INTO pipeline.fact_timeseries (
business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version,
review_count, span_count,
negative_count, positive_count, neutral_count, mixed_count,
strength_score, negative_strength, positive_strength,
i1_count, i2_count, i3_count,
cr_better, cr_worse, cr_same,
trust_weighted_strength, trust_weighted_negative
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
$14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24
)
ON CONFLICT (business_id, place_id, period_date, bucket_type,
subject_type, subject_id, taxonomy_version)
DO UPDATE SET
span_count = EXCLUDED.span_count,
computed_at = NOW()
""",
business_id,
place_id,
period_date,
"day",
"urt_code",
urt_code,
"v5.1",
len(code_spans),
len(code_spans),
negative_count,
positive_count,
neutral_count,
mixed_count,
strength_score,
float(negative_count),
float(positive_count),
i1_count,
i2_count,
i3_count,
0, 0, 0,
strength_score,
float(negative_count),
)
facts_inserted += 1
logger.info(f" Inserted {facts_inserted} fact records")
# ========== RESULTS ==========
logger.info("")
logger.info("=" * 60)
logger.info("PIPELINE TEST RESULTS")
logger.info("=" * 60)
async with pool.acquire() as conn:
results = {}
results["reviews_raw"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id
)
results["reviews_enriched"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id
)
results["review_spans"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id
)
results["issues"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id
)
results["issue_spans"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id
)
results["fact_timeseries"] = await conn.fetchval(
"SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id
)
logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}")
logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}")
logger.info(f" pipeline.review_spans: {results['review_spans']}")
logger.info(f" pipeline.issues: {results['issues']}")
logger.info(f" pipeline.issue_spans: {results['issue_spans']}")
logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}")
# Show issues
logger.info("")
logger.info("Issues identified:")
issues = await conn.fetch("""
SELECT issue_id, primary_subcode, span_count, max_intensity
FROM pipeline.issues
WHERE business_id = $1
ORDER BY span_count DESC
LIMIT 10
""", business_id)
for issue in issues:
logger.info(
f" {issue['issue_id'][:20]}... "
f"Code={issue['primary_subcode']} "
f"Spans={issue['span_count']} "
f"MaxInt={issue['max_intensity']}"
)
# Show date range
date_range = await conn.fetchrow("""
SELECT MIN(review_time)::date as min_date, MAX(review_time)::date as max_date
FROM pipeline.reviews_enriched
WHERE business_id = $1
""", business_id)
logger.info("")
logger.info(f"Date range: {date_range['min_date']} to {date_range['max_date']}")
# ========== CLEANUP ==========
if cleanup:
logger.info("")
logger.info("Cleaning up test data...")
async with pool.acquire() as conn:
await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id)
await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id)
logger.info("Test data cleaned up")
logger.info("")
logger.info("=" * 60)
logger.info("PIPELINE TEST COMPLETED SUCCESSFULLY!")
logger.info("=" * 60)
finally:
await pool.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test pipeline with real data")
parser.add_argument(
"--database-url",
default=os.environ.get("DATABASE_URL"),
help="PostgreSQL connection string (default: $DATABASE_URL)",
)
parser.add_argument(
"--sample-file",
default="data/samples/barbershop_reviews.json",
help="Path to sample data file (relative to project root)",
)
parser.add_argument(
"--no-cleanup",
action="store_true",
help="Don't clean up test data after running",
)
args = parser.parse_args()
if not args.database_url:
print("Error: --database-url required or set DATABASE_URL", file=sys.stderr)
sys.exit(1)
asyncio.run(run_pipeline_test(
args.database_url,
args.sample_file,
cleanup=not args.no_cleanup,
))