- Add ScraperV1Adapter to transform scraped reviews into pipeline format - Handles relative timestamps (centerDate) - Generates deterministic IDs for DOM-sourced reviews - Filters out empty (rating-only) reviews - Add sample barbershop reviews (79 reviews, 46 with text) - Real data from Las Palmas barbershop - Multi-language: Spanish, English, German, Norwegian, Italian - Add test_pipeline_real_data.py for E2E testing with real data - Uses mock classifier based on keywords and rating - Full pipeline flow: raw -> enriched -> spans -> issues -> facts Test results with real data: - 46 reviews processed - 6 languages detected (es: 35, en: 7, de: 1, no: 1, it: 1, ca: 1) - 3 issues identified from negative reviews - 29 fact records aggregated across date range 2017-2025 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
626 lines
25 KiB
Python
626 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test the pipeline with real scraped review data.
|
|
|
|
Usage:
|
|
python tools/test_pipeline_real_data.py --database-url $DATABASE_URL
|
|
|
|
This test:
|
|
1. Loads sample barbershop reviews from data/samples/
|
|
2. Uses ScraperV1Adapter to transform the data
|
|
3. Runs the full pipeline (Stage 1-4) with mocked LLM
|
|
4. Verifies data in all tables
|
|
5. Shows summary statistics
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from uuid import uuid4
|
|
from pathlib import Path
|
|
|
|
# Add project root and package to path
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
sys.path.insert(0, str(PROJECT_ROOT / "packages/reviewiq-pipeline/src"))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def run_pipeline_test(database_url: str, sample_file: str, cleanup: bool = True):
|
|
"""Run the pipeline test with real data."""
|
|
import asyncpg
|
|
from reviewiq_pipeline.adapters.scraper_v1 import ScraperV1Adapter, load_and_transform
|
|
from reviewiq_pipeline.config import Config
|
|
from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer
|
|
from reviewiq_pipeline.stages.stage3_route import Stage3Router
|
|
from collections import defaultdict
|
|
|
|
# Generate unique test identifiers
|
|
test_id = uuid4().hex[:8]
|
|
business_id = f"barbershop-fleitas-{test_id}"
|
|
place_id = f"ChIJxxxxxx-{test_id}"
|
|
job_id = uuid4()
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("PIPELINE TEST WITH REAL DATA")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Sample file: {sample_file}")
|
|
logger.info(f"Business ID: {business_id}")
|
|
logger.info(f"Job ID: {job_id}")
|
|
logger.info("")
|
|
|
|
# Load and transform sample data
|
|
logger.info("Step 1: Loading and transforming sample data...")
|
|
sample_path = PROJECT_ROOT / sample_file
|
|
|
|
if not sample_path.exists():
|
|
logger.error(f"Sample file not found: {sample_path}")
|
|
sys.exit(1)
|
|
|
|
raw_data = json.loads(sample_path.read_text())
|
|
logger.info(f" Loaded {len(raw_data)} raw reviews from file")
|
|
|
|
adapter = ScraperV1Adapter(business_id, place_id)
|
|
scraper_output = adapter.to_scraper_output(raw_data, str(job_id))
|
|
reviews = scraper_output["reviews"]
|
|
|
|
logger.info(f" Transformed {len(reviews)} reviews (skipped empty text)")
|
|
|
|
# Show sample review
|
|
if reviews:
|
|
sample = reviews[0]
|
|
logger.info(f" Sample review:")
|
|
logger.info(f" Author: {sample['author_name']}")
|
|
logger.info(f" Rating: {sample['rating']}")
|
|
logger.info(f" Text: {sample['text'][:80]}...")
|
|
|
|
# Connect to database
|
|
pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5)
|
|
config = Config(database_url=database_url)
|
|
|
|
try:
|
|
# ========== STAGE 1: Insert raw reviews ==========
|
|
logger.info("")
|
|
logger.info("Step 2: Stage 1 - Inserting raw reviews...")
|
|
|
|
async with pool.acquire() as conn:
|
|
for review in reviews:
|
|
review_time = datetime.fromisoformat(
|
|
review["review_time"].replace("Z", "+00:00")
|
|
)
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.reviews_raw (
|
|
job_id, source, review_id, place_id, raw_payload,
|
|
review_text, rating, review_time, reviewer_name, reviewer_id
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
ON CONFLICT (source, review_id, review_version) DO NOTHING
|
|
""",
|
|
job_id,
|
|
"google",
|
|
review["review_id"],
|
|
place_id,
|
|
json.dumps(review.get("raw_payload", {})),
|
|
review["text"],
|
|
review["rating"],
|
|
review_time,
|
|
review["author_name"],
|
|
review.get("author_id"),
|
|
)
|
|
|
|
raw_count = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1",
|
|
place_id,
|
|
)
|
|
logger.info(f" Inserted {raw_count} raw reviews")
|
|
|
|
# ========== STAGE 1: Normalize ==========
|
|
logger.info("")
|
|
logger.info("Step 3: Stage 1 - Normalizing reviews...")
|
|
|
|
normalizer = Stage1Normalizer(config)
|
|
normalized_reviews = normalizer.normalize_batch(reviews, business_id, place_id)
|
|
logger.info(f" Normalized {len(normalized_reviews)} reviews")
|
|
|
|
# Language distribution
|
|
lang_dist = defaultdict(int)
|
|
for r in normalized_reviews:
|
|
lang_dist[r["text_language"]] += 1
|
|
logger.info(f" Languages: {dict(lang_dist)}")
|
|
|
|
# Insert into reviews_enriched
|
|
async with pool.acquire() as conn:
|
|
for norm in normalized_reviews:
|
|
raw_id = await conn.fetchval(
|
|
"SELECT id FROM pipeline.reviews_raw WHERE review_id = $1",
|
|
norm["review_id"],
|
|
)
|
|
|
|
review_time = datetime.fromisoformat(
|
|
norm["review_time"].replace("Z", "+00:00")
|
|
)
|
|
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.reviews_enriched (
|
|
source, review_id, review_version, is_latest, raw_id,
|
|
business_id, place_id, text, text_normalized, rating, review_time,
|
|
language, taxonomy_version
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
|
ON CONFLICT (source, review_id, review_version) DO UPDATE SET
|
|
is_latest = EXCLUDED.is_latest
|
|
""",
|
|
norm["source"],
|
|
norm["review_id"],
|
|
norm["review_version"],
|
|
True,
|
|
raw_id,
|
|
business_id,
|
|
place_id,
|
|
norm["text"],
|
|
norm["text_normalized"],
|
|
norm["rating"],
|
|
review_time,
|
|
norm["text_language"],
|
|
"v5.1",
|
|
)
|
|
|
|
enriched_count = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1",
|
|
business_id,
|
|
)
|
|
logger.info(f" Inserted {enriched_count} enriched reviews")
|
|
|
|
# ========== STAGE 2: Mock Classification ==========
|
|
logger.info("")
|
|
logger.info("Step 4: Stage 2 - Classifying reviews (mocked LLM)...")
|
|
|
|
# Simple heuristic classification based on rating and keywords
|
|
def mock_classify(review: dict) -> dict:
|
|
text = review["text_normalized"].lower()
|
|
rating = review["rating"]
|
|
|
|
# Determine valence from rating
|
|
if rating >= 4:
|
|
valence = "V+"
|
|
elif rating <= 2:
|
|
valence = "V-"
|
|
else:
|
|
valence = "V0"
|
|
|
|
# Determine intensity from text length and rating extremity
|
|
if rating in (1, 5) and len(text) > 100:
|
|
intensity = "I3"
|
|
elif rating in (1, 2, 4, 5):
|
|
intensity = "I2"
|
|
else:
|
|
intensity = "I1"
|
|
|
|
# Simple URT classification based on keywords
|
|
if any(w in text for w in ["servicio", "service", "trato", "atención", "attention"]):
|
|
urt_primary = "A2.01" # Helpfulness
|
|
elif any(w in text for w in ["profesional", "professional", "expert"]):
|
|
urt_primary = "A4.01" # Knowledge/Expertise
|
|
elif any(w in text for w in ["precio", "price", "caro", "barato", "expensive", "cheap"]):
|
|
urt_primary = "P1.01" # Value Perception
|
|
elif any(w in text for w in ["espera", "wait", "tiempo", "time", "rápido", "fast"]):
|
|
urt_primary = "J1.01" # Wait Times
|
|
elif any(w in text for w in ["ambiente", "ambiance", "local", "lugar", "place"]):
|
|
urt_primary = "E2.01" # Ambiance
|
|
else:
|
|
urt_primary = "O1.01" # Core Product/Service
|
|
|
|
return {
|
|
"urt_primary": urt_primary,
|
|
"valence": valence,
|
|
"intensity": intensity,
|
|
"trust_score": 0.7 + (len(text) / 1000), # Longer = more trustworthy
|
|
}
|
|
|
|
batch_id = f"batch-{test_id}"
|
|
spans_created = 0
|
|
|
|
async with pool.acquire() as conn:
|
|
for norm in normalized_reviews:
|
|
cls = mock_classify(norm)
|
|
|
|
# Update classification in reviews_enriched
|
|
await conn.execute("""
|
|
UPDATE pipeline.reviews_enriched SET
|
|
urt_primary = $1,
|
|
valence = $2,
|
|
intensity = $3,
|
|
trust_score = $4,
|
|
classification_model = $5,
|
|
processed_at = NOW()
|
|
WHERE review_id = $6 AND business_id = $7
|
|
""",
|
|
cls["urt_primary"],
|
|
cls["valence"],
|
|
cls["intensity"],
|
|
cls["trust_score"],
|
|
"mock-classifier-v1",
|
|
norm["review_id"],
|
|
business_id,
|
|
)
|
|
|
|
# Get review time for span
|
|
review_row = await conn.fetchrow(
|
|
"SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1",
|
|
norm["review_id"],
|
|
)
|
|
|
|
# Create a single span for the whole review
|
|
span_id = f"SPN-{uuid4().hex[:12]}"
|
|
usn = f"google:{norm['review_id']}:1:0"
|
|
|
|
# Truncate span text to first 200 chars
|
|
span_text = norm["text"][:200]
|
|
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.review_spans (
|
|
span_id, business_id, place_id, source, review_id, review_version,
|
|
span_index, span_text, span_start, span_end,
|
|
profile, urt_primary, valence, intensity, comparative,
|
|
is_primary, is_active, review_time,
|
|
confidence, usn, taxonomy_version, model_version, ingest_batch_id
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
|
|
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23
|
|
)
|
|
ON CONFLICT (span_id) DO NOTHING
|
|
""",
|
|
span_id,
|
|
business_id,
|
|
place_id,
|
|
"google",
|
|
norm["review_id"],
|
|
1,
|
|
0, # span_index
|
|
span_text,
|
|
0, # span_start
|
|
len(span_text), # span_end
|
|
"standard",
|
|
cls["urt_primary"],
|
|
cls["valence"],
|
|
cls["intensity"],
|
|
"CR-N",
|
|
True, # is_primary
|
|
True, # is_active
|
|
review_row["review_time"],
|
|
"high",
|
|
usn,
|
|
"v5.1",
|
|
"mock-classifier-v1",
|
|
batch_id,
|
|
)
|
|
spans_created += 1
|
|
|
|
logger.info(f" Created {spans_created} spans")
|
|
|
|
# Show classification distribution
|
|
cls_dist = await conn.fetch("""
|
|
SELECT urt_primary, valence, COUNT(*) as cnt
|
|
FROM pipeline.reviews_enriched
|
|
WHERE business_id = $1
|
|
GROUP BY urt_primary, valence
|
|
ORDER BY cnt DESC
|
|
""", business_id)
|
|
|
|
logger.info(" Classification distribution:")
|
|
for row in cls_dist[:5]:
|
|
logger.info(f" {row['urt_primary']} ({row['valence']}): {row['cnt']}")
|
|
|
|
# ========== STAGE 3: Route Issues ==========
|
|
logger.info("")
|
|
logger.info("Step 5: Stage 3 - Routing negative spans to issues...")
|
|
|
|
router = Stage3Router(config)
|
|
routed_count = 0
|
|
issues_created = 0
|
|
|
|
async with pool.acquire() as conn:
|
|
# Get negative/mixed spans
|
|
spans_to_route = await conn.fetch("""
|
|
SELECT span_id, business_id, place_id, urt_primary, valence, intensity,
|
|
entity_normalized, review_time, confidence, source, review_id, review_version
|
|
FROM pipeline.review_spans
|
|
WHERE business_id = $1 AND valence IN ('V-', 'V±')
|
|
""", business_id)
|
|
|
|
for span_row in spans_to_route:
|
|
span_data = {
|
|
"span_id": span_row["span_id"],
|
|
"business_id": span_row["business_id"],
|
|
"place_id": span_row["place_id"],
|
|
"urt_primary": span_row["urt_primary"],
|
|
"valence": span_row["valence"],
|
|
"intensity": span_row["intensity"],
|
|
"entity_normalized": span_row["entity_normalized"],
|
|
"review_time": span_row["review_time"].isoformat(),
|
|
"confidence": span_row["confidence"],
|
|
"trust_score": 0.85,
|
|
}
|
|
|
|
routed = router.route_span_sync(span_data)
|
|
|
|
# Check if issue exists
|
|
existing = await conn.fetchval(
|
|
"SELECT 1 FROM pipeline.issues WHERE issue_id = $1",
|
|
routed["issue_id"],
|
|
)
|
|
|
|
if not existing:
|
|
domain = span_row["urt_primary"][0]
|
|
intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0}
|
|
priority_score = intensity_scores.get(span_row["intensity"], 1.0)
|
|
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.issues (
|
|
issue_id, business_id, place_id, primary_subcode, domain,
|
|
state, priority_score, span_count, max_intensity, taxonomy_version
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
""",
|
|
routed["issue_id"],
|
|
business_id,
|
|
place_id,
|
|
span_row["urt_primary"],
|
|
domain,
|
|
"open",
|
|
priority_score,
|
|
1,
|
|
span_row["intensity"],
|
|
"v5.1",
|
|
)
|
|
issues_created += 1
|
|
else:
|
|
await conn.execute("""
|
|
UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW()
|
|
WHERE issue_id = $1
|
|
""", routed["issue_id"])
|
|
|
|
# Link span to issue
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.issue_spans (
|
|
issue_id, span_id, source, review_id, review_version,
|
|
is_primary_match, intensity, review_time
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (span_id) DO NOTHING
|
|
""",
|
|
routed["issue_id"],
|
|
span_row["span_id"],
|
|
span_row["source"],
|
|
span_row["review_id"],
|
|
span_row["review_version"],
|
|
True,
|
|
span_row["intensity"],
|
|
span_row["review_time"],
|
|
)
|
|
|
|
# Log event
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.issue_events (issue_id, event_type, span_id)
|
|
VALUES ($1, $2, $3)
|
|
""", routed["issue_id"], "span_linked", span_row["span_id"])
|
|
|
|
routed_count += 1
|
|
|
|
logger.info(f" Routed {routed_count} spans to {issues_created} issues")
|
|
|
|
# ========== STAGE 4: Aggregation ==========
|
|
logger.info("")
|
|
logger.info("Step 6: Stage 4 - Aggregating facts...")
|
|
|
|
async with pool.acquire() as conn:
|
|
# Get span data for aggregation
|
|
span_data = await conn.fetch("""
|
|
SELECT
|
|
rs.business_id,
|
|
rs.place_id,
|
|
DATE(rs.review_time) as review_date,
|
|
rs.urt_primary,
|
|
rs.valence,
|
|
rs.intensity,
|
|
rs.comparative,
|
|
re.trust_score,
|
|
re.rating
|
|
FROM pipeline.review_spans rs
|
|
JOIN pipeline.reviews_enriched re ON (
|
|
re.source = rs.source
|
|
AND re.review_id = rs.review_id
|
|
AND re.review_version = rs.review_version
|
|
)
|
|
WHERE rs.business_id = $1 AND rs.is_active = TRUE
|
|
""", business_id)
|
|
|
|
# Group by date and URT code
|
|
daily_data = defaultdict(lambda: defaultdict(list))
|
|
for row in span_data:
|
|
daily_data[row["review_date"]][row["urt_primary"]].append(dict(row))
|
|
|
|
facts_inserted = 0
|
|
for period_date, urt_groups in daily_data.items():
|
|
for urt_code, code_spans in urt_groups.items():
|
|
negative_count = sum(1 for s in code_spans if s["valence"] == "V-")
|
|
positive_count = sum(1 for s in code_spans if s["valence"] == "V+")
|
|
neutral_count = sum(1 for s in code_spans if s["valence"] == "V0")
|
|
mixed_count = sum(1 for s in code_spans if s["valence"] == "V±")
|
|
|
|
i1_count = sum(1 for s in code_spans if s["intensity"] == "I1")
|
|
i2_count = sum(1 for s in code_spans if s["intensity"] == "I2")
|
|
i3_count = sum(1 for s in code_spans if s["intensity"] == "I3")
|
|
|
|
strength_score = (positive_count - negative_count) / max(len(code_spans), 1)
|
|
|
|
await conn.execute("""
|
|
INSERT INTO pipeline.fact_timeseries (
|
|
business_id, place_id, period_date, bucket_type,
|
|
subject_type, subject_id, taxonomy_version,
|
|
review_count, span_count,
|
|
negative_count, positive_count, neutral_count, mixed_count,
|
|
strength_score, negative_strength, positive_strength,
|
|
i1_count, i2_count, i3_count,
|
|
cr_better, cr_worse, cr_same,
|
|
trust_weighted_strength, trust_weighted_negative
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13,
|
|
$14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24
|
|
)
|
|
ON CONFLICT (business_id, place_id, period_date, bucket_type,
|
|
subject_type, subject_id, taxonomy_version)
|
|
DO UPDATE SET
|
|
span_count = EXCLUDED.span_count,
|
|
computed_at = NOW()
|
|
""",
|
|
business_id,
|
|
place_id,
|
|
period_date,
|
|
"day",
|
|
"urt_code",
|
|
urt_code,
|
|
"v5.1",
|
|
len(code_spans),
|
|
len(code_spans),
|
|
negative_count,
|
|
positive_count,
|
|
neutral_count,
|
|
mixed_count,
|
|
strength_score,
|
|
float(negative_count),
|
|
float(positive_count),
|
|
i1_count,
|
|
i2_count,
|
|
i3_count,
|
|
0, 0, 0,
|
|
strength_score,
|
|
float(negative_count),
|
|
)
|
|
facts_inserted += 1
|
|
|
|
logger.info(f" Inserted {facts_inserted} fact records")
|
|
|
|
# ========== RESULTS ==========
|
|
logger.info("")
|
|
logger.info("=" * 60)
|
|
logger.info("PIPELINE TEST RESULTS")
|
|
logger.info("=" * 60)
|
|
|
|
async with pool.acquire() as conn:
|
|
results = {}
|
|
results["reviews_raw"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id
|
|
)
|
|
results["reviews_enriched"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id
|
|
)
|
|
results["review_spans"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id
|
|
)
|
|
results["issues"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id
|
|
)
|
|
results["issue_spans"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id
|
|
)
|
|
results["fact_timeseries"] = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id
|
|
)
|
|
|
|
logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}")
|
|
logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}")
|
|
logger.info(f" pipeline.review_spans: {results['review_spans']}")
|
|
logger.info(f" pipeline.issues: {results['issues']}")
|
|
logger.info(f" pipeline.issue_spans: {results['issue_spans']}")
|
|
logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}")
|
|
|
|
# Show issues
|
|
logger.info("")
|
|
logger.info("Issues identified:")
|
|
issues = await conn.fetch("""
|
|
SELECT issue_id, primary_subcode, span_count, max_intensity
|
|
FROM pipeline.issues
|
|
WHERE business_id = $1
|
|
ORDER BY span_count DESC
|
|
LIMIT 10
|
|
""", business_id)
|
|
|
|
for issue in issues:
|
|
logger.info(
|
|
f" {issue['issue_id'][:20]}... "
|
|
f"Code={issue['primary_subcode']} "
|
|
f"Spans={issue['span_count']} "
|
|
f"MaxInt={issue['max_intensity']}"
|
|
)
|
|
|
|
# Show date range
|
|
date_range = await conn.fetchrow("""
|
|
SELECT MIN(review_time)::date as min_date, MAX(review_time)::date as max_date
|
|
FROM pipeline.reviews_enriched
|
|
WHERE business_id = $1
|
|
""", business_id)
|
|
|
|
logger.info("")
|
|
logger.info(f"Date range: {date_range['min_date']} to {date_range['max_date']}")
|
|
|
|
# ========== CLEANUP ==========
|
|
if cleanup:
|
|
logger.info("")
|
|
logger.info("Cleaning up test data...")
|
|
|
|
async with pool.acquire() as conn:
|
|
await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id)
|
|
await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
|
|
await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id)
|
|
await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id)
|
|
await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id)
|
|
await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id)
|
|
await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id)
|
|
|
|
logger.info("Test data cleaned up")
|
|
|
|
logger.info("")
|
|
logger.info("=" * 60)
|
|
logger.info("PIPELINE TEST COMPLETED SUCCESSFULLY!")
|
|
logger.info("=" * 60)
|
|
|
|
finally:
|
|
await pool.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Test pipeline with real data")
|
|
parser.add_argument(
|
|
"--database-url",
|
|
default=os.environ.get("DATABASE_URL"),
|
|
help="PostgreSQL connection string (default: $DATABASE_URL)",
|
|
)
|
|
parser.add_argument(
|
|
"--sample-file",
|
|
default="data/samples/barbershop_reviews.json",
|
|
help="Path to sample data file (relative to project root)",
|
|
)
|
|
parser.add_argument(
|
|
"--no-cleanup",
|
|
action="store_true",
|
|
help="Don't clean up test data after running",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.database_url:
|
|
print("Error: --database-url required or set DATABASE_URL", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
asyncio.run(run_pipeline_test(
|
|
args.database_url,
|
|
args.sample_file,
|
|
cleanup=not args.no_cleanup,
|
|
))
|