Files
whyrating-engine-legacy/.artifacts/ReviewIQ-Codebase-Overview.md
Alejandro Gutiérrez acd3b22e88 docs: Add pipeline development artifacts for parallel implementation
New artifacts:
- ReviewIQ-Pipeline-DevGuide.md: Entry point for pipeline work
- ReviewIQ-Pipeline-Contracts-v1.md: Stage I/O specs, validation rules, test fixtures
- ReviewIQ-Pipeline-Checklist.md: Per-stage implementation checklists
- ReviewIQ-Codebase-Overview.md: File structure, integration points
- ReviewIQ-v3.2.1-Taxonomy-Versioning.md: Taxonomy versioning addendum

Updated:
- ReviewIQ-v32-Decisions.md: Added B2 audit findings, taxonomy versioning decisions, pipeline status

These artifacts enable parallel development of pipeline stages 1-4 with:
- Independent validation (35 rules across stages)
- Clear input/output contracts
- Test fixtures for each stage
- Definition of done criteria

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 17:08:40 +00:00

13 KiB

ReviewIQ Codebase Overview

Purpose: Map existing code for agents starting fresh Last Updated: 2026-01-24 Status: ~55% implemented (scraping done, enrichment pipeline missing)


Quick Start for New Agents

  1. Read first: ReviewIQ-v32-Decisions.md (context recovery)
  2. For implementation: ReviewIQ-Pipeline-Contracts-v1.md + ReviewIQ-Pipeline-Checklist.md
  3. For schema: ReviewIQ-Architecture-v3.2.md § Part 2
  4. For LLM prompts: LLM-Classification-Contract-v1.md

Implementation Status Summary

PIPELINE COMPLETION: ~55%

✅ COMPLETE (Working in Production)
├── Google Maps scraping (v1.0.0)
├── Job orchestration & queuing
├── Chrome worker pool
├── Webhook delivery
├── SSE real-time streaming
├── Frontend job management
└── Basic analytics dashboard

❌ NOT IMPLEMENTED (Spec'd Only)
├── Stage 1: Normalization
├── Stage 2: LLM Classification
├── Stage 3: Issue Routing
├── Stage 4: Fact Aggregation
├── Enrichment database schema
└── Advanced analytics UI

Directory Structure

google-reviews-scraper-pro/
│
├── .artifacts/                    # Design documents (YOU ARE HERE)
│   ├── ReviewIQ-v32-Decisions.md         # START HERE - context recovery
│   ├── ReviewIQ-Architecture-v3.2.md     # Full v3.2 spec
│   ├── ReviewIQ-Pipeline-Contracts-v1.md # Stage I/O contracts
│   ├── ReviewIQ-Pipeline-Checklist.md    # Implementation checklist
│   ├── LLM-Classification-Contract-v1.md # LLM prompt spec
│   ├── URT-v5.1-Reference.md             # URT dimension codes
│   └── ReviewIQ-Codebase-Overview.md     # THIS FILE
│
├── api/                           # ✅ API routes (FastAPI)
│   ├── routes/
│   │   ├── admin.py              # Scraper management endpoints
│   │   ├── dashboard.py          # Analytics endpoints
│   │   └── batches.py            # Batch job endpoints
│   └── __init__.py
│
├── core/                          # ✅ Core services
│   ├── database.py               # AsyncPG database layer (~1200 lines)
│   ├── config.py                 # Configuration management
│   └── logging.py                # Structured logging
│
├── services/                      # ✅ Background services
│   ├── webhook_service.py        # Async webhook delivery
│   └── job_callback_service.py   # Callback handling
│
├── workers/                       # ✅ Worker pool
│   └── chrome_pool.py            # Chrome instance pooling
│
├── scrapers/                      # ✅ Scraper implementations
│   └── google_reviews/
│       └── v1_0_0.py             # Main scraper (~2000 lines)
│
├── pipeline/                      # ❌ TO BE CREATED
│   ├── stage1_normalize.py       # TODO
│   ├── stage2_classify.py        # TODO
│   ├── stage3_route.py           # TODO
│   ├── stage4_aggregate.py       # TODO
│   └── tests/                    # TODO
│
├── migrations/                    # Database migrations
│   ├── 001_add_job_platform_fields.sql    # ✅ Deployed
│   ├── 002_create_batches_table.sql       # ✅ Deployed
│   ├── 003_create_scraper_registry.sql    # ✅ Deployed
│   ├── 004_create_api_keys.sql            # ✅ Deployed
│   ├── 005_create_reviews_tables.sql      # ❌ TODO
│   ├── 006_create_spans_table.sql         # ❌ TODO
│   ├── 007_create_urt_enums.sql           # ❌ TODO
│   ├── 008_create_issues_tables.sql       # ❌ TODO
│   └── 009_create_facts_table.sql         # ❌ TODO
│
├── frontend/                      # ✅ Next.js frontend
│   ├── app/
│   │   ├── dashboard/            # System overview
│   │   ├── jobs/                 # Job list & detail
│   │   ├── analytics/            # Basic charts
│   │   └── new/                  # Job submission forms
│   └── components/
│
├── api_server_production.py      # ✅ Main API server (~1920 lines)
├── Dockerfile                    # ✅ Production container
├── docker-compose.production.yml # ✅ Docker orchestration
├── requirements-production.txt   # ✅ Python dependencies
└── package.json                  # ✅ Node.js dependencies

Key Files to Understand

1. API Server Entry Point

File: api_server_production.py Lines: ~1920 What it does:

  • FastAPI application setup
  • All endpoint definitions
  • Job submission and management
  • SSE streaming
  • Health checks

Key endpoints:

POST /api/scrape/google-reviews  # Submit scrape job
GET  /jobs/{job_id}              # Get job status
GET  /jobs/{job_id}/reviews      # Get scraped reviews
GET  /jobs/{job_id}/stream       # SSE real-time updates

2. Database Layer

File: core/database.py Lines: ~1200 What it does:

  • AsyncPG connection pooling
  • Job CRUD operations
  • Review storage (currently JSONB blob)
  • Webhook tracking

Key functions:

async def create_job(job_data: dict) -> str
async def update_job_status(job_id: str, status: str, ...)
async def get_job(job_id: str) -> dict
async def store_reviews(job_id: str, reviews: list) -> int

Note: Currently stores reviews as JSONB in jobs.reviews_data. The enrichment pipeline will need to:

  1. Read from jobs.reviews_data
  2. Write to reviews_raw and reviews_enriched tables

3. Google Scraper

File: scrapers/google_reviews/v1_0_0.py Lines: ~2000 What it does:

  • SeleniumBase Chrome automation
  • DOM scraping + API interception
  • Review extraction (text, rating, author, date)
  • Business metadata extraction
  • Pagination handling

Output format (stored in jobs.reviews_data):

{
  "business_info": {...},
  "reviews": [
    {
      "review_id": "...",
      "author_name": "...",
      "rating": 4,
      "text": "...",
      "review_time": "2026-01-20T14:30:00Z"
    }
  ]
}

4. Chrome Worker Pool

File: workers/chrome_pool.py What it does:

  • Pre-warms Chrome instances
  • Manages concurrent scraping jobs
  • Handles resource cleanup

Database Schema (Current State)

Deployed Tables

-- Core job tracking
CREATE TABLE jobs (
    job_id UUID PRIMARY KEY,
    status VARCHAR(50),
    url TEXT,
    reviews_data JSONB,          -- Raw scraped reviews live here
    reviews_count INTEGER,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    -- ... 20+ more columns
);

-- Batch processing
CREATE TABLE batches (...);

-- Webhook tracking
CREATE TABLE webhook_attempts (...);

-- Scraper versioning
CREATE TABLE scraper_registry (...);

-- API authentication (not enforced)
CREATE TABLE api_keys (...);

NOT Deployed (Defined in v3.2 Spec)

-- These tables need to be created via migrations 005-009

CREATE TABLE locations (...);           -- Multi-tenant locations
CREATE TABLE reviews_raw (...);         -- Immutable raw storage
CREATE TABLE reviews_enriched (...);    -- Classified reviews
CREATE TABLE review_spans (...);        -- Span-level classification
CREATE TABLE urt_codes (...);           -- URT reference data
CREATE TABLE issues (...);              -- Aggregated issues
CREATE TABLE issue_spans (...);         -- Issue-span links
CREATE TABLE issue_events (...);        -- Audit log
CREATE TABLE fact_timeseries (...);     -- Pre-aggregated analytics

Integration Points

Where New Pipeline Code Connects

                    EXISTING CODE                    NEW CODE
                    ════════════                    ════════

api_server_production.py
        │
        ▼
    jobs table
    (reviews_data JSONB) ──────────────────▶ pipeline/stage1_normalize.py
                                                    │
                                                    ▼
                                            reviews_raw table
                                            reviews_enriched table
                                                    │
                                                    ▼
                                            pipeline/stage2_classify.py
                                                    │
                                                    ▼
                                            review_spans table
                                                    │
                                                    ▼
                                            pipeline/stage3_route.py
                                                    │
                                                    ▼
                                            issues table
                                            issue_spans table
                                                    │
                                                    ▼
                                            pipeline/stage4_aggregate.py
                                                    │
                                                    ▼
                                            fact_timeseries table

How to Trigger Pipeline

Option A: Post-scrape hook (recommended)

# In api_server_production.py, after job completes:
async def on_job_complete(job_id: str):
    # Existing: send webhook
    await webhook_service.dispatch(job_id)

    # NEW: trigger enrichment pipeline
    await pipeline.stage1.process_job(job_id)

Option B: Background worker

# New file: workers/enrichment_worker.py
async def enrichment_loop():
    while True:
        jobs = await db.query("""
            SELECT job_id FROM jobs
            WHERE status = 'completed'
              AND enrichment_status IS NULL
            LIMIT 10
        """)
        for job in jobs:
            await pipeline.process(job['job_id'])
        await asyncio.sleep(60)

Option C: Manual trigger via API

# New endpoint in api_server_production.py
@app.post("/api/jobs/{job_id}/enrich")
async def trigger_enrichment(job_id: str):
    await pipeline.process(job_id)
    return {"status": "processing"}

Environment Setup

Required Environment Variables

# Database (required)
DATABASE_URL=postgresql://user:pass@localhost:5432/reviewiq

# LLM Provider (for Stage 2)
OPENAI_API_KEY=sk-...
# OR
ANTHROPIC_API_KEY=sk-ant-...

# Embedding model (for Stage 2)
EMBEDDING_MODEL=all-MiniLM-L6-v2

# Taxonomy version
DEFAULT_TAXONOMY_VERSION=v5.1

Local Development

# Start database
docker-compose -f docker-compose.production.yml up -d postgres

# Run migrations
psql $DATABASE_URL -f migrations/001_add_job_platform_fields.sql
# ... repeat for all migrations

# Start API server
python api_server_production.py

# Start frontend (separate terminal)
cd frontend && npm run dev

Running Tests

# Unit tests
pytest pipeline/tests/ -v

# Integration tests
pytest pipeline/tests/integration/ -v

# Full E2E validation
python -m pipeline.validate --job-id <JOB_ID>

Tech Stack

Component Technology Version
API FastAPI 0.100+
Database PostgreSQL 15+
DB Driver asyncpg 0.28+
Scraping SeleniumBase 4.20+
Browser Chrome (headless) 120+
Frontend Next.js 16.1.3
UI React 19.2.3
Charts Recharts 2.x
LLM OpenAI / Anthropic Latest
Embeddings sentence-transformers 2.x
Vectors pgvector 0.5+

Common Patterns

Database Queries (asyncpg)

# In core/database.py pattern:
async def get_job(job_id: str) -> dict:
    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT * FROM jobs WHERE job_id = $1",
            job_id
        )
        return dict(row) if row else None

API Endpoints (FastAPI)

# In api_server_production.py pattern:
@app.get("/jobs/{job_id}")
async def get_job(job_id: str):
    job = await db.get_job(job_id)
    if not job:
        raise HTTPException(404, "Job not found")
    return job

Background Tasks

# Pattern for async processing:
async def process_in_background(job_id: str):
    asyncio.create_task(do_heavy_work(job_id))
    return {"status": "processing"}

Gotchas & Notes

  1. Reviews are JSONB blobs - Currently in jobs.reviews_data, not normalized tables
  2. No auth enforcement - api_keys table exists but not used
  3. CORS is wide open - Set to * in production (fix before launch)
  4. Scraper is single-threaded per job - Chrome pool handles concurrency
  5. Webhooks have retry logic - 3 attempts with exponential backoff
  6. SSE streaming works - Real-time job updates via /jobs/{job_id}/stream

Next Steps for Implementation

  1. Create migrations 005-009 - Deploy enrichment schema
  2. Create pipeline/ directory - New code goes here
  3. Implement Stage 1 - Read from jobs.reviews_data, write to reviews_raw/enriched
  4. Implement Stage 2 - LLM classification with span extraction
  5. Implement Stage 3 - Issue routing
  6. Implement Stage 4 - Fact aggregation
  7. Add pipeline trigger - Hook into job completion or create worker
  8. Update frontend - Add enrichment views

This document should be updated when significant code changes occur.