#!/usr/bin/env python3 """ Backfill review_facts_v1 from public.jobs.reviews_data. Parses relative timestamps ("17 hours ago", "2 weeks ago") into absolute timestamps anchored to job.created_at. Usage: python backfill_review_facts.py python backfill_review_facts.py --dry-run python backfill_review_facts.py --job-id """ import argparse import asyncio import json import os import re from datetime import datetime, timedelta, timezone from typing import Any import asyncpg # Database URL DB_URL = os.environ.get( "DATABASE_URL", "postgresql://scraper:scraper123@localhost:5437/scraper" ) # ============================================================================= # RELATIVE TIMESTAMP PARSER # ============================================================================= # Regex patterns for relative timestamps RELATIVE_PATTERNS = [ # "17 hours ago", "2 weeks ago", "a month ago" (r"(?:edited\s+)?(\d+|a|an)\s+(second|minute|hour|day|week|month|year)s?\s+ago", "standard"), # "just now" (r"just\s+now", "just_now"), # "yesterday" (r"yesterday", "yesterday"), # "today" (r"today", "today"), ] # Time unit multipliers (in seconds) TIME_UNITS = { "second": 1, "minute": 60, "hour": 3600, "day": 86400, "week": 604800, "month": 2592000, # 30 days "year": 31536000, # 365 days } def parse_relative_timestamp(raw: str, reference_time: datetime) -> datetime | None: """ Parse a relative timestamp string into an absolute datetime. Args: raw: Relative timestamp like "17 hours ago", "Edited 2 weeks ago" reference_time: The reference point (usually job.created_at) Returns: Absolute datetime or None if parsing failed """ if not raw: return None text = raw.lower().strip() # Handle "just now" if "just now" in text: return reference_time # Handle "yesterday" if text == "yesterday": return reference_time - timedelta(days=1) # Handle "today" if text == "today": return reference_time # Handle standard relative format # Remove "edited " prefix if present text = re.sub(r"^edited\s+", "", text) # Match "N unit(s) ago" match = re.match(r"(\d+|a|an)\s+(second|minute|hour|day|week|month|year)s?\s+ago", text) if match: quantity_str = match.group(1) unit = match.group(2) # Convert "a"/"an" to 1 if quantity_str in ("a", "an"): quantity = 1 else: quantity = int(quantity_str) seconds = quantity * TIME_UNITS.get(unit, 0) return reference_time - timedelta(seconds=seconds) # Unknown format return None def parse_relative_timestamp_safe(raw: str, reference_time: datetime) -> tuple[datetime | None, bool]: """ Safe wrapper that returns (parsed_time, success). """ try: result = parse_relative_timestamp(raw, reference_time) return result, result is not None except Exception: return None, False # ============================================================================= # BACKFILL LOGIC # ============================================================================= async def get_jobs_with_reviews(pool: asyncpg.Pool, job_id: str | None = None) -> list[dict]: """Get all jobs with reviews_data.""" if job_id: query = """ SELECT job_id, created_at, reviews_data, COALESCE(metadata->>'business_name', url) as business_id FROM public.jobs WHERE job_id = $1 AND reviews_data IS NOT NULL AND jsonb_typeof(reviews_data) = 'array' """ rows = await pool.fetch(query, job_id) else: query = """ SELECT job_id, created_at, reviews_data, COALESCE(metadata->>'business_name', url) as business_id FROM public.jobs WHERE reviews_data IS NOT NULL AND jsonb_typeof(reviews_data) = 'array' ORDER BY created_at DESC """ rows = await pool.fetch(query) return [dict(r) for r in rows] async def get_run_id_for_job(pool: asyncpg.Pool, job_id: str) -> str | None: """Get the run_id associated with a job from detected_spans_v2.""" row = await pool.fetchrow(""" SELECT DISTINCT run_id FROM pipeline.detected_spans_v2 WHERE job_id = $1 AND run_id IS NOT NULL LIMIT 1 """, job_id) return str(row["run_id"]) if row and row["run_id"] else None async def get_language_for_review(pool: asyncpg.Pool, review_id: str) -> str | None: """Get detected language for a review from spans.""" row = await pool.fetchrow(""" SELECT language FROM pipeline.detected_spans_v2 WHERE review_id = $1 AND language IS NOT NULL LIMIT 1 """, review_id) return row["language"] if row else None async def upsert_review_facts( pool: asyncpg.Pool, facts: list[dict], dry_run: bool = False, ) -> tuple[int, int]: """ Upsert review facts into the database. Returns: (inserted_count, updated_count) """ if dry_run or not facts: return 0, 0 # Use executemany with ON CONFLICT query = """ INSERT INTO pipeline.review_facts_v1 (review_id, business_id, job_id, run_id, rating, review_time_utc, raw_timestamp, author, language) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (review_id) DO UPDATE SET business_id = EXCLUDED.business_id, job_id = EXCLUDED.job_id, run_id = COALESCE(EXCLUDED.run_id, pipeline.review_facts_v1.run_id), rating = EXCLUDED.rating, review_time_utc = EXCLUDED.review_time_utc, raw_timestamp = EXCLUDED.raw_timestamp, author = EXCLUDED.author, language = COALESCE(EXCLUDED.language, pipeline.review_facts_v1.language) """ # Prepare records records = [ ( f["review_id"], f["business_id"], f["job_id"], f.get("run_id"), f.get("rating"), f.get("review_time_utc"), f.get("raw_timestamp"), f.get("author"), f.get("language"), ) for f in facts ] await pool.executemany(query, records) return len(records), 0 async def backfill_job( pool: asyncpg.Pool, job: dict, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """ Backfill review facts for a single job. Returns: Stats dict with counts and errors """ job_id = job["job_id"] job_created = job["created_at"] business_id = job["business_id"] reviews_data = job["reviews_data"] # asyncpg may return JSONB as string if isinstance(reviews_data, str): reviews_data = json.loads(reviews_data) # Make job_created timezone-aware if it isn't if job_created.tzinfo is None: job_created = job_created.replace(tzinfo=timezone.utc) # Get run_id for this job run_id = await get_run_id_for_job(pool, str(job_id)) stats = { "job_id": str(job_id), "total_reviews": 0, "parsed_ok": 0, "parsed_failed": 0, "inserted": 0, "sample_failures": [], } facts = [] for review in reviews_data: stats["total_reviews"] += 1 # Handle both dict and JSON string if isinstance(review, str): try: review = json.loads(review) except json.JSONDecodeError: continue review_id = review.get("review_id") if not review_id: continue raw_timestamp = review.get("timestamp", "") review_time, success = parse_relative_timestamp_safe(raw_timestamp, job_created) if success: stats["parsed_ok"] += 1 else: stats["parsed_failed"] += 1 if len(stats["sample_failures"]) < 5: stats["sample_failures"].append(raw_timestamp) # Get language from spans if available language = await get_language_for_review(pool, review_id) if not dry_run else None facts.append({ "review_id": review_id, "business_id": business_id, "job_id": job_id, "run_id": run_id, "rating": review.get("rating"), "review_time_utc": review_time, "raw_timestamp": raw_timestamp, "author": review.get("author"), "language": language, }) # Upsert inserted, _ = await upsert_review_facts(pool, facts, dry_run=dry_run) stats["inserted"] = inserted if verbose: print(f" Job {job_id}: {stats['total_reviews']} reviews, " f"{stats['parsed_ok']} parsed OK, {stats['parsed_failed']} failed") if stats["sample_failures"]: print(f" Sample failures: {stats['sample_failures'][:3]}") return stats async def backfill_all( pool: asyncpg.Pool, job_id: str | None = None, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """ Backfill review facts for all jobs (or a specific job). Returns: Aggregate stats """ jobs = await get_jobs_with_reviews(pool, job_id) print(f"\n{'[DRY RUN] ' if dry_run else ''}Backfilling review_facts_v1 from {len(jobs)} jobs...") aggregate = { "jobs_processed": 0, "total_reviews": 0, "parsed_ok": 0, "parsed_failed": 0, "inserted": 0, "unique_failure_patterns": set(), } for i, job in enumerate(jobs, 1): if verbose: print(f"\n[{i}/{len(jobs)}] Processing job {job['job_id']}...") stats = await backfill_job(pool, job, dry_run=dry_run, verbose=verbose) aggregate["jobs_processed"] += 1 aggregate["total_reviews"] += stats["total_reviews"] aggregate["parsed_ok"] += stats["parsed_ok"] aggregate["parsed_failed"] += stats["parsed_failed"] aggregate["inserted"] += stats["inserted"] aggregate["unique_failure_patterns"].update(stats["sample_failures"]) # Convert set to list for JSON serialization aggregate["unique_failure_patterns"] = list(aggregate["unique_failure_patterns"])[:20] return aggregate # ============================================================================= # CLI # ============================================================================= async def main_async(args): """Main async entry point.""" pool = await asyncpg.create_pool(DB_URL) try: stats = await backfill_all( pool, job_id=args.job_id, dry_run=args.dry_run, verbose=args.verbose, ) print("\n" + "=" * 60) print("BACKFILL COMPLETE") print("=" * 60) print(f"Jobs processed: {stats['jobs_processed']}") print(f"Total reviews: {stats['total_reviews']}") print(f"Timestamps parsed: {stats['parsed_ok']} ({stats['parsed_ok']/max(stats['total_reviews'],1)*100:.1f}%)") print(f"Timestamps failed: {stats['parsed_failed']} ({stats['parsed_failed']/max(stats['total_reviews'],1)*100:.1f}%)") if not args.dry_run: print(f"Records upserted: {stats['inserted']}") if stats["unique_failure_patterns"]: print(f"\nUnparsed timestamp patterns ({len(stats['unique_failure_patterns'])}):") for p in stats["unique_failure_patterns"][:10]: print(f" - \"{p}\"") # Calculate coverage coverage = stats['parsed_ok'] / max(stats['total_reviews'], 1) * 100 if coverage < 90: print(f"\n⚠️ WARNING: Timestamp coverage is {coverage:.1f}% (target: >90%)") else: print(f"\n✅ Timestamp coverage: {coverage:.1f}%") finally: await pool.close() def main(): parser = argparse.ArgumentParser(description="Backfill review_facts_v1") parser.add_argument("--job-id", help="Process a specific job only") parser.add_argument("--dry-run", action="store_true", help="Don't write to database") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() asyncio.run(main_async(args)) if __name__ == "__main__": main()