Files
2026-02-02 18:19:00 +00:00

410 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Backfill review_facts_v1 from public.jobs.reviews_data.
Parses relative timestamps ("17 hours ago", "2 weeks ago") into absolute
timestamps anchored to job.created_at.
Usage:
python backfill_review_facts.py
python backfill_review_facts.py --dry-run
python backfill_review_facts.py --job-id <uuid>
"""
import argparse
import asyncio
import json
import os
import re
from datetime import datetime, timedelta, timezone
from typing import Any
import asyncpg
# Database URL
DB_URL = os.environ.get(
"DATABASE_URL",
"postgresql://scraper:scraper123@localhost:5437/scraper"
)
# =============================================================================
# RELATIVE TIMESTAMP PARSER
# =============================================================================
# Regex patterns for relative timestamps
RELATIVE_PATTERNS = [
# "17 hours ago", "2 weeks ago", "a month ago"
(r"(?:edited\s+)?(\d+|a|an)\s+(second|minute|hour|day|week|month|year)s?\s+ago", "standard"),
# "just now"
(r"just\s+now", "just_now"),
# "yesterday"
(r"yesterday", "yesterday"),
# "today"
(r"today", "today"),
]
# Time unit multipliers (in seconds)
TIME_UNITS = {
"second": 1,
"minute": 60,
"hour": 3600,
"day": 86400,
"week": 604800,
"month": 2592000, # 30 days
"year": 31536000, # 365 days
}
def parse_relative_timestamp(raw: str, reference_time: datetime) -> datetime | None:
"""
Parse a relative timestamp string into an absolute datetime.
Args:
raw: Relative timestamp like "17 hours ago", "Edited 2 weeks ago"
reference_time: The reference point (usually job.created_at)
Returns:
Absolute datetime or None if parsing failed
"""
if not raw:
return None
text = raw.lower().strip()
# Handle "just now"
if "just now" in text:
return reference_time
# Handle "yesterday"
if text == "yesterday":
return reference_time - timedelta(days=1)
# Handle "today"
if text == "today":
return reference_time
# Handle standard relative format
# Remove "edited " prefix if present
text = re.sub(r"^edited\s+", "", text)
# Match "N unit(s) ago"
match = re.match(r"(\d+|a|an)\s+(second|minute|hour|day|week|month|year)s?\s+ago", text)
if match:
quantity_str = match.group(1)
unit = match.group(2)
# Convert "a"/"an" to 1
if quantity_str in ("a", "an"):
quantity = 1
else:
quantity = int(quantity_str)
seconds = quantity * TIME_UNITS.get(unit, 0)
return reference_time - timedelta(seconds=seconds)
# Unknown format
return None
def parse_relative_timestamp_safe(raw: str, reference_time: datetime) -> tuple[datetime | None, bool]:
"""
Safe wrapper that returns (parsed_time, success).
"""
try:
result = parse_relative_timestamp(raw, reference_time)
return result, result is not None
except Exception:
return None, False
# =============================================================================
# BACKFILL LOGIC
# =============================================================================
async def get_jobs_with_reviews(pool: asyncpg.Pool, job_id: str | None = None) -> list[dict]:
"""Get all jobs with reviews_data."""
if job_id:
query = """
SELECT job_id, created_at, reviews_data,
COALESCE(metadata->>'business_name', url) as business_id
FROM public.jobs
WHERE job_id = $1
AND reviews_data IS NOT NULL
AND jsonb_typeof(reviews_data) = 'array'
"""
rows = await pool.fetch(query, job_id)
else:
query = """
SELECT job_id, created_at, reviews_data,
COALESCE(metadata->>'business_name', url) as business_id
FROM public.jobs
WHERE reviews_data IS NOT NULL
AND jsonb_typeof(reviews_data) = 'array'
ORDER BY created_at DESC
"""
rows = await pool.fetch(query)
return [dict(r) for r in rows]
async def get_run_id_for_job(pool: asyncpg.Pool, job_id: str) -> str | None:
"""Get the run_id associated with a job from detected_spans_v2."""
row = await pool.fetchrow("""
SELECT DISTINCT run_id FROM pipeline.detected_spans_v2
WHERE job_id = $1 AND run_id IS NOT NULL
LIMIT 1
""", job_id)
return str(row["run_id"]) if row and row["run_id"] else None
async def get_language_for_review(pool: asyncpg.Pool, review_id: str) -> str | None:
"""Get detected language for a review from spans."""
row = await pool.fetchrow("""
SELECT language FROM pipeline.detected_spans_v2
WHERE review_id = $1 AND language IS NOT NULL
LIMIT 1
""", review_id)
return row["language"] if row else None
async def upsert_review_facts(
pool: asyncpg.Pool,
facts: list[dict],
dry_run: bool = False,
) -> tuple[int, int]:
"""
Upsert review facts into the database.
Returns:
(inserted_count, updated_count)
"""
if dry_run or not facts:
return 0, 0
# Use executemany with ON CONFLICT
query = """
INSERT INTO pipeline.review_facts_v1
(review_id, business_id, job_id, run_id, rating, review_time_utc, raw_timestamp, author, language)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (review_id) DO UPDATE SET
business_id = EXCLUDED.business_id,
job_id = EXCLUDED.job_id,
run_id = COALESCE(EXCLUDED.run_id, pipeline.review_facts_v1.run_id),
rating = EXCLUDED.rating,
review_time_utc = EXCLUDED.review_time_utc,
raw_timestamp = EXCLUDED.raw_timestamp,
author = EXCLUDED.author,
language = COALESCE(EXCLUDED.language, pipeline.review_facts_v1.language)
"""
# Prepare records
records = [
(
f["review_id"],
f["business_id"],
f["job_id"],
f.get("run_id"),
f.get("rating"),
f.get("review_time_utc"),
f.get("raw_timestamp"),
f.get("author"),
f.get("language"),
)
for f in facts
]
await pool.executemany(query, records)
return len(records), 0
async def backfill_job(
pool: asyncpg.Pool,
job: dict,
dry_run: bool = False,
verbose: bool = False,
) -> dict[str, Any]:
"""
Backfill review facts for a single job.
Returns:
Stats dict with counts and errors
"""
job_id = job["job_id"]
job_created = job["created_at"]
business_id = job["business_id"]
reviews_data = job["reviews_data"]
# asyncpg may return JSONB as string
if isinstance(reviews_data, str):
reviews_data = json.loads(reviews_data)
# Make job_created timezone-aware if it isn't
if job_created.tzinfo is None:
job_created = job_created.replace(tzinfo=timezone.utc)
# Get run_id for this job
run_id = await get_run_id_for_job(pool, str(job_id))
stats = {
"job_id": str(job_id),
"total_reviews": 0,
"parsed_ok": 0,
"parsed_failed": 0,
"inserted": 0,
"sample_failures": [],
}
facts = []
for review in reviews_data:
stats["total_reviews"] += 1
# Handle both dict and JSON string
if isinstance(review, str):
try:
review = json.loads(review)
except json.JSONDecodeError:
continue
review_id = review.get("review_id")
if not review_id:
continue
raw_timestamp = review.get("timestamp", "")
review_time, success = parse_relative_timestamp_safe(raw_timestamp, job_created)
if success:
stats["parsed_ok"] += 1
else:
stats["parsed_failed"] += 1
if len(stats["sample_failures"]) < 5:
stats["sample_failures"].append(raw_timestamp)
# Get language from spans if available
language = await get_language_for_review(pool, review_id) if not dry_run else None
facts.append({
"review_id": review_id,
"business_id": business_id,
"job_id": job_id,
"run_id": run_id,
"rating": review.get("rating"),
"review_time_utc": review_time,
"raw_timestamp": raw_timestamp,
"author": review.get("author"),
"language": language,
})
# Upsert
inserted, _ = await upsert_review_facts(pool, facts, dry_run=dry_run)
stats["inserted"] = inserted
if verbose:
print(f" Job {job_id}: {stats['total_reviews']} reviews, "
f"{stats['parsed_ok']} parsed OK, {stats['parsed_failed']} failed")
if stats["sample_failures"]:
print(f" Sample failures: {stats['sample_failures'][:3]}")
return stats
async def backfill_all(
pool: asyncpg.Pool,
job_id: str | None = None,
dry_run: bool = False,
verbose: bool = False,
) -> dict[str, Any]:
"""
Backfill review facts for all jobs (or a specific job).
Returns:
Aggregate stats
"""
jobs = await get_jobs_with_reviews(pool, job_id)
print(f"\n{'[DRY RUN] ' if dry_run else ''}Backfilling review_facts_v1 from {len(jobs)} jobs...")
aggregate = {
"jobs_processed": 0,
"total_reviews": 0,
"parsed_ok": 0,
"parsed_failed": 0,
"inserted": 0,
"unique_failure_patterns": set(),
}
for i, job in enumerate(jobs, 1):
if verbose:
print(f"\n[{i}/{len(jobs)}] Processing job {job['job_id']}...")
stats = await backfill_job(pool, job, dry_run=dry_run, verbose=verbose)
aggregate["jobs_processed"] += 1
aggregate["total_reviews"] += stats["total_reviews"]
aggregate["parsed_ok"] += stats["parsed_ok"]
aggregate["parsed_failed"] += stats["parsed_failed"]
aggregate["inserted"] += stats["inserted"]
aggregate["unique_failure_patterns"].update(stats["sample_failures"])
# Convert set to list for JSON serialization
aggregate["unique_failure_patterns"] = list(aggregate["unique_failure_patterns"])[:20]
return aggregate
# =============================================================================
# CLI
# =============================================================================
async def main_async(args):
"""Main async entry point."""
pool = await asyncpg.create_pool(DB_URL)
try:
stats = await backfill_all(
pool,
job_id=args.job_id,
dry_run=args.dry_run,
verbose=args.verbose,
)
print("\n" + "=" * 60)
print("BACKFILL COMPLETE")
print("=" * 60)
print(f"Jobs processed: {stats['jobs_processed']}")
print(f"Total reviews: {stats['total_reviews']}")
print(f"Timestamps parsed: {stats['parsed_ok']} ({stats['parsed_ok']/max(stats['total_reviews'],1)*100:.1f}%)")
print(f"Timestamps failed: {stats['parsed_failed']} ({stats['parsed_failed']/max(stats['total_reviews'],1)*100:.1f}%)")
if not args.dry_run:
print(f"Records upserted: {stats['inserted']}")
if stats["unique_failure_patterns"]:
print(f"\nUnparsed timestamp patterns ({len(stats['unique_failure_patterns'])}):")
for p in stats["unique_failure_patterns"][:10]:
print(f" - \"{p}\"")
# Calculate coverage
coverage = stats['parsed_ok'] / max(stats['total_reviews'], 1) * 100
if coverage < 90:
print(f"\n⚠️ WARNING: Timestamp coverage is {coverage:.1f}% (target: >90%)")
else:
print(f"\n✅ Timestamp coverage: {coverage:.1f}%")
finally:
await pool.close()
def main():
parser = argparse.ArgumentParser(description="Backfill review_facts_v1")
parser.add_argument("--job-id", help="Process a specific job only")
parser.add_argument("--dry-run", action="store_true", help="Don't write to database")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()