Files
whyrating-engine-legacy/.artifacts/ReviewIQ-Architecture-v3.md
Alejandro Gutiérrez 544e028c3f Phase 0: Project restructure to ReviewIQ platform architecture
New structure:
- scrapers/google_reviews/v1_0_0.py (was modules/scraper_clean.py)
- scrapers/base.py (BaseScraper interface)
- scrapers/registry.py (ScraperRegistry for version routing)
- core/database.py, models.py, config.py, enums.py
- utils/logger.py, crash_analyzer.py, health_checks.py, helpers.py, date_converter.py
- workers/chrome_pool.py
- services/webhook_service.py
- api/ routes structure (empty, ready for Phase 2)
- tests/ structure mirroring source

All imports updated in:
- api_server_production.py (7 import paths updated)
- utils/health_checks.py (scraper import path)

Legacy modules moved to modules/_legacy/:
- data_storage.py, image_handler.py, s3_handler.py (unused)

Syntax verified, frontend build passing.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 15:22:08 +00:00

53 KiB
Raw Permalink Blame History

ReviewIQ: Review Intelligence Pipeline

Version: 3.0 Status: Architecture Specification Date: 2026-01-24


Executive Summary

ReviewIQ transforms customer reviews into actionable business intelligence through a three-stage pipeline:

  1. Ingest — LLM-powered URT classification with semantic embeddings
  2. Analyze — Issue lifecycle management with sub-pattern discovery
  3. Report — Statistically rigorous insights with trend detection

Design Principles:

  • Accuracy over heuristics: LLM classification at ingest (~$0.0002/review)
  • Taxonomy as structure: URT provides stable, interpretable categories
  • Local ML for depth: Sub-clustering reveals actionable patterns within categories
  • Feedback loop: CR (Comparative Reference) signals verify resolution effectiveness

Part 1: System Architecture

┌─────────────────────────────────────────────────────────────────────────────────┐
│                              REVIEWIQ PIPELINE                                   │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌─────────────┐     ┌──────────────────────────────────────────────────────┐  │
│  │             │     │                    INGEST LAYER                       │  │
│  │   Reviews   │────▶│  ┌─────────┐    ┌─────────┐    ┌─────────────────┐   │  │
│  │   (Input)   │     │  │ Embed   │    │   LLM   │    │     Store       │   │  │
│  │             │     │  │ Review  │───▶│Classify │───▶│  (PostgreSQL)   │   │  │
│  └─────────────┘     │  └─────────┘    └─────────┘    └─────────────────┘   │  │
│                      │      │                              │                 │  │
│                      │      │  ~$0.00                      │  ~$0.0002      │  │
│                      │      │  (local)                     │  per review    │  │
│                      └──────┼──────────────────────────────┼─────────────────┘  │
│                             │                              │                    │
│                             ▼                              ▼                    │
│  ┌──────────────────────────────────────────────────────────────────────────┐  │
│  │                         ISSUE AGGREGATION                                 │  │
│  │                                                                          │  │
│  │   V- classified reviews  ───▶  Match or Create Issue  ───▶  Track State  │  │
│  │                                                                          │  │
│  │   Rules: Same URT code + entity + location + time window = same issue    │  │
│  │   States: DETECTED → ACKNOWLEDGED → IN_PROGRESS → RESOLVED → VERIFIED    │  │
│  │                                                                          │  │
│  └──────────────────────────────────────────────────────────────────────────┘  │
│                                                                                 │
│                             │                              │                    │
│                             ▼                              ▼                    │
│  ┌──────────────────────────────────────────────────────────────────────────┐  │
│  │                        REPORT GENERATION                                  │  │
│  │                                                                          │  │
│  │  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐   ┌──────────┐  │  │
│  │  │  Aggregate   │   │ Sub-Cluster  │   │    Trend     │   │   LLM    │  │  │
│  │  │  by URT Code │──▶│ Within Codes │──▶│   Analysis   │──▶│ Narrate  │  │  │
│  │  │    (SQL)     │   │  (HDBSCAN)   │   │  (CR + Rate) │   │          │  │  │
│  │  └──────────────┘   └──────────────┘   └──────────────┘   └──────────┘  │  │
│  │       $0.00              $0.00              $0.00            ~$0.15      │  │
│  │                                                                          │  │
│  └──────────────────────────────────────────────────────────────────────────┘  │
│                                                                                 │
│                                      │                                          │
│                                      ▼                                          │
│  ┌──────────────────────────────────────────────────────────────────────────┐  │
│  │                            OUTPUT                                         │  │
│  │                                                                          │  │
│  │   • Executive Summary with statistically defensible claims               │  │
│  │   • Issues ranked by priority with sub-pattern breakdown                 │  │
│  │   • Strengths with trend signals                                         │  │
│  │   • Staff performance insights                                           │  │
│  │   • Actionable recommendations                                           │  │
│  │                                                                          │  │
│  └──────────────────────────────────────────────────────────────────────────┘  │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

Part 2: Ingest Layer

2.1 Design Philosophy

The review is the atomic unit. We do not split reviews into fragments — this preserves context and enables accurate classification. A single review may contain multiple topics; URT's multi-coding (primary + up to 2 secondary codes) handles this naturally.

2.2 Dual Processing

Each review undergoes two parallel operations:

async def ingest_review(review: dict) -> dict:
    """Ingest a single review: embed + classify."""

    text = review['text'].strip()

    # Parallel execution
    embedding_task = asyncio.create_task(embed_review(text))
    classification_task = asyncio.create_task(classify_review_llm(text))

    embedding = await embedding_task
    classification = await classification_task

    return {
        'review_id': review['review_id'],
        'business_id': review['business_id'],
        'text': text,
        'embedding': embedding,
        'date': review['date'],
        'rating': review.get('rating'),
        **classification,  # URT codes, valence, intensity, etc.
    }

2.3 Embedding

Local multilingual embeddings for semantic capabilities:

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('intfloat/multilingual-e5-small')

def embed_review(text: str) -> np.ndarray:
    """Generate normalized embedding for semantic search and clustering."""

    # e5 models perform better with instruction prefix
    embedding = model.encode(
        f"passage: {text}",
        normalize_embeddings=True
    )
    return embedding  # 384 dimensions

Why embed if we have URT codes?

  • Sub-clustering within URT codes (pattern discovery)
  • Semantic quote selection (centroid-closest)
  • Similarity search for emerging patterns
  • Backup for low-confidence classifications

2.4 LLM Classification

Single LLM call extracts complete URT classification:

CLASSIFICATION_PROMPT = """You are a customer feedback classifier using the Universal Review Taxonomy (URT).

Analyze the review and return JSON with:

{
  "urt_primary": "X1.23",           // Main URT subcode
  "urt_secondary": ["Y2.34"],       // 0-2 additional codes (different domains only)
  "valence": "V-",                  // V+, V-, V0, V±
  "intensity": "I2",                // I1 (mild), I2 (moderate), I3 (strong)
  "comparative": "CR-N",            // CR-N (none), CR-B (better), CR-W (worse), CR-S (same)
  "staff_mentions": ["Mike"],       // Employee names mentioned
  "quotes": {                       // Key phrase for each code
    "X1.23": "exact phrase from review",
    "Y2.34": "another phrase"
  }
}

URT DOMAINS (choose primary from most impactful):
- O (Offering): Product/service quality, function, completeness, fit
- P (People): Staff attitude, competence, responsiveness, communication
- J (Journey): Timing, ease, reliability, resolution
- E (Environment): Physical space, digital interface, ambiance, safety
- A (Access): Availability, accessibility, inclusivity, convenience
- V (Value): Price, transparency, effort, worth
- R (Relationship): Trust, dependability, recovery, loyalty

RULES:
1. Primary = what customer is MOST affected by
2. Secondary must be DIFFERENT domains (P1.02 + P3.01 is invalid)
3. V± only when genuinely mixed (positive AND negative on different topics)
4. CR-B/W/S only for explicit self-comparison ("better than last time", "still broken")
5. quotes must be EXACT phrases from the review text

Return valid JSON only."""


async def classify_review_llm(text: str) -> dict:
    """Complete URT classification via LLM."""

    response = await llm.chat(
        model="gpt-4o-mini",  # ~$0.0002 per review
        messages=[
            {"role": "system", "content": CLASSIFICATION_PROMPT},
            {"role": "user", "content": text}
        ],
        response_format={"type": "json_object"},
        temperature=0.1  # Low temperature for consistency
    )

    return json.loads(response.content)

2.5 Batch Processing for Efficiency

For bulk ingestion, batch multiple reviews per LLM call:

async def classify_batch(reviews: list[dict], batch_size: int = 10) -> list[dict]:
    """Process reviews in batches for ~40% cost reduction."""

    results = []
    for i in range(0, len(reviews), batch_size):
        batch = reviews[i:i+batch_size]

        prompt = BATCH_CLASSIFICATION_PROMPT + "\n\nREVIEWS:\n"
        for j, review in enumerate(batch):
            prompt += f"\n[{j}] {review['text']}\n---\n"

        response = await llm.chat(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": prompt}],
            response_format={"type": "json_object"}
        )

        batch_results = json.loads(response.content)["classifications"]
        results.extend(batch_results)

    return results

2.6 Data Model

-- Core review storage with URT classification
CREATE TABLE reviews (
    review_id       TEXT PRIMARY KEY,
    business_id     TEXT NOT NULL,
    text            TEXT NOT NULL,
    embedding       VECTOR(384),
    date            TIMESTAMP NOT NULL,
    rating          SMALLINT,

    -- URT Classification (from LLM)
    urt_primary     TEXT NOT NULL,              -- 'J1.01', 'P1.02', etc.
    urt_secondary   TEXT[] DEFAULT '{}',        -- Max 2
    valence         TEXT NOT NULL,              -- 'V+', 'V-', 'V0', 'V±'
    intensity       TEXT NOT NULL,              -- 'I1', 'I2', 'I3'
    comparative     TEXT DEFAULT 'CR-N',        -- 'CR-N', 'CR-B', 'CR-W', 'CR-S'

    -- Extracted entities
    staff_mentions  TEXT[] DEFAULT '{}',
    quotes          JSONB,                      -- {"code": "phrase", ...}

    -- Metadata
    created_at      TIMESTAMP DEFAULT NOW(),
    classification_model TEXT DEFAULT 'gpt-4o-mini'
);

-- Indexes for query patterns
CREATE INDEX idx_reviews_business_date ON reviews(business_id, date DESC);
CREATE INDEX idx_reviews_urt_primary ON reviews(business_id, urt_primary);
CREATE INDEX idx_reviews_valence ON reviews(business_id, valence, date);
CREATE INDEX idx_reviews_comparative ON reviews(comparative) WHERE comparative != 'CR-N';
CREATE INDEX idx_reviews_embedding ON reviews USING hnsw (embedding vector_cosine_ops);

Part 3: Issue Lifecycle Management

Following the URT Issue Lifecycle Framework (C1), negative feedback (V-) generates trackable issues.

3.1 Issue Aggregation

Multiple reviews about the same problem aggregate into a single issue:

def aggregate_to_issue(review: dict) -> str:
    """Match review to existing issue or create new one."""

    if review['valence'] not in ('V-', 'V±'):
        return None  # Only negative feedback creates issues

    # Find matching open issues
    matching = db.query("""
        SELECT issue_id, primary_subcode, entity, location
        FROM issues
        WHERE business_id = %s
          AND primary_subcode = %s
          AND state NOT IN ('VERIFIED', 'DECLINED')
          AND created_at > NOW() - INTERVAL '30 days'
    """, [review['business_id'], review['urt_primary']])

    for issue in matching:
        if is_same_issue(review, issue):
            # Aggregate to existing issue
            add_span_to_issue(issue['issue_id'], review)
            recalculate_priority(issue['issue_id'])
            return issue['issue_id']

    # Check intensity threshold for new issue creation
    if should_create_issue(review):
        return create_issue(review)

    return None  # Stored in buffer for future aggregation


def should_create_issue(review: dict) -> bool:
    """Intensity-based issue creation thresholds."""

    if review['intensity'] == 'I3':
        return True  # Critical = immediate issue

    # Check aggregation buffer for patterns
    similar_count = count_similar_in_buffer(review, window_days=30)

    if review['intensity'] == 'I2' and similar_count >= 2:
        return True  # Moderate + 2 others = issue

    if review['intensity'] == 'I1' and similar_count >= 4:
        return True  # Mild + 4 others = issue

    return False

3.2 Issue State Machine

                     DETECTED
                         │
            ┌────────────┼────────────┐
            ▼            ▼            ▼
       ACKNOWLEDGED   DECLINED    (escalate)
            │                         │
            ▼                         │
       IN_PROGRESS                    │
            │                         │
            ▼                         │
        RESOLVED ◀────────────────────┘
            │
      ┌─────┼─────┐
      ▼           ▼
   VERIFIED    REOPENED
                  │
                  └──▶ (back to IN_PROGRESS)

3.3 Priority Scoring

def calculate_priority(issue: dict) -> float:
    """
    Priority combines intensity, volume, recency, and recurrence.

    P = I_weight × (1 + log(span_count)) × decay(days) × recurrence_boost × trend_modifier
    """

    INTENSITY_WEIGHTS = {'I1': 1.0, 'I2': 2.0, 'I3': 4.0}

    i_weight = INTENSITY_WEIGHTS[issue['max_intensity']]
    volume_factor = 1 + math.log(issue['span_count'])

    days_old = (datetime.now() - issue['created_at']).days
    decay = math.exp(-0.023 * days_old)  # Half-life ~30 days

    recurrence_boost = 1.0 + 0.5 * math.log2(issue['reopen_count'] + 1)

    # Trend modifier from CR signals
    if issue['recent_cr_w_count'] >= 2:
        trend_modifier = 1.3  # Worsening
    elif issue['recent_cr_b_count'] >= 2:
        trend_modifier = 0.7  # Improving
    else:
        trend_modifier = 1.0  # Stable

    return i_weight * volume_factor * decay * recurrence_boost * trend_modifier

3.4 Resolution Verification via CR Signals

The Comparative Reference (CR) dimension enables automatic verification:

def process_cr_signal(review: dict):
    """Handle CR-B/W/S signals for issue lifecycle."""

    if review['comparative'] == 'CR-N':
        return

    # Find resolved issues with matching code
    resolved_issues = db.query("""
        SELECT issue_id, state, resolved_at
        FROM issues
        WHERE business_id = %s
          AND primary_subcode = %s
          AND state IN ('RESOLVED', 'VERIFIED')
          AND resolved_at > NOW() - INTERVAL '60 days'
    """, [review['business_id'], review['urt_primary']])

    for issue in resolved_issues:
        if review['comparative'] == 'CR-B':
            # Improvement signal → verify resolution
            if issue['state'] == 'RESOLVED':
                verify_issue(issue['issue_id'], review['review_id'])

        elif review['comparative'] in ('CR-S', 'CR-W'):
            # Unchanged or worsening → reopen
            reopen_issue(issue['issue_id'], review['review_id'])

            if review['comparative'] == 'CR-W':
                escalate_issue(issue['issue_id'], reason='REGRESSION')

3.5 Issue Data Model

CREATE TABLE issues (
    issue_id            TEXT PRIMARY KEY,
    business_id         TEXT NOT NULL,
    primary_subcode     TEXT NOT NULL,
    domain              TEXT NOT NULL,

    -- State
    state               TEXT NOT NULL DEFAULT 'DETECTED',
    priority_score      FLOAT NOT NULL,
    confidence_score    FLOAT NOT NULL,

    -- Aggregation
    review_ids          TEXT[] NOT NULL,
    span_count          INT NOT NULL DEFAULT 1,
    max_intensity       TEXT NOT NULL,

    -- Ownership
    owner_team          TEXT,
    owner_individual    TEXT,

    -- Timestamps
    created_at          TIMESTAMP DEFAULT NOW(),
    acknowledged_at     TIMESTAMP,
    resolved_at         TIMESTAMP,
    verified_at         TIMESTAMP,

    -- Resolution
    reopen_count        INT DEFAULT 0,
    resolution_code     TEXT,
    resolution_notes    TEXT,
    decline_reason      TEXT,

    -- Context
    entity              TEXT,  -- Product, staff member, feature
    location            TEXT,  -- Physical or logical location
    causal_codes        TEXT[],  -- CD-O, MG-T, etc.

    -- Verification
    verification_window_days INT DEFAULT 60
);

CREATE TABLE issue_events (
    event_id        SERIAL PRIMARY KEY,
    issue_id        TEXT REFERENCES issues(issue_id),
    event_type      TEXT NOT NULL,  -- 'state_change', 'span_added', 'priority_update'
    from_state      TEXT,
    to_state        TEXT,
    actor           TEXT,
    notes           TEXT,
    review_id       TEXT,  -- Triggering review if applicable
    created_at      TIMESTAMP DEFAULT NOW()
);

-- Time-series aggregation for impact charts
CREATE TABLE issue_timeseries (
    id              SERIAL PRIMARY KEY,
    business_id     TEXT NOT NULL,
    code            TEXT NOT NULL,          -- URT code (e.g., 'J1.01')
    period          DATE NOT NULL,          -- Bucket date (day/week/month)
    bucket_type     TEXT NOT NULL,          -- 'day', 'week', 'month'

    -- Counts
    review_count    INT NOT NULL DEFAULT 0,
    negative_count  INT NOT NULL DEFAULT 0,
    positive_count  INT NOT NULL DEFAULT 0,

    -- Strength metrics
    strength_score  FLOAT NOT NULL DEFAULT 0,  -- Weighted by intensity
    avg_intensity   FLOAT,
    max_intensity   TEXT,

    -- CR signals in period
    cr_better       INT DEFAULT 0,
    cr_worse        INT DEFAULT 0,
    cr_same         INT DEFAULT 0,

    UNIQUE(business_id, code, period, bucket_type)
);

CREATE INDEX idx_timeseries_lookup ON issue_timeseries(business_id, code, period);

3.6 Issue Review Drill-Down

Retrieve all reviews belonging to a specific issue for detailed inspection:

def get_issue_reviews(issue_id: str,
                      sort_by: str = 'date',
                      limit: int = 50) -> list[dict]:
    """Fetch all reviews aggregated into an issue."""

    issue = db.query_one("""
        SELECT issue_id, review_ids, primary_subcode, business_id
        FROM issues WHERE issue_id = %s
    """, [issue_id])

    if not issue:
        return []

    order_clause = {
        'date': 'date DESC',
        'intensity': "CASE intensity WHEN 'I3' THEN 1 WHEN 'I2' THEN 2 ELSE 3 END",
        'relevance': 'date DESC'  # Could enhance with embedding similarity
    }.get(sort_by, 'date DESC')

    reviews = db.query(f"""
        SELECT
            review_id,
            text,
            date,
            rating,
            valence,
            intensity,
            comparative,
            quotes->>%s as quote,
            staff_mentions
        FROM reviews
        WHERE review_id = ANY(%s)
        ORDER BY {order_clause}
        LIMIT %s
    """, [issue['primary_subcode'], issue['review_ids'], limit])

    return [{
        **r,
        'intensity_weight': {'I1': 1, 'I2': 2, 'I3': 4}[r['intensity']]
    } for r in reviews]

3.7 Strength Score Aggregation

A unified metric combining volume and intensity for impact measurement:

Strength Score = Σ (intensity_weight × review_count)

Where:
  I1 (mild)     → weight = 1
  I2 (moderate) → weight = 2
  I3 (strong)   → weight = 4
INTENSITY_WEIGHTS = {'I1': 1, 'I2': 2, 'I3': 4}

def compute_strength_score(reviews: list[dict]) -> float:
    """
    Aggregate strength from multiple reviews.

    A single I3 review (weight=4) has same impact as:
    - 4 I1 reviews, or
    - 2 I2 reviews

    This captures that one "furious" customer signals more
    than four "mildly annoyed" customers.
    """
    return sum(INTENSITY_WEIGHTS.get(r['intensity'], 1) for r in reviews)


def compute_strength_by_code(business_id: str, code: str,
                             start: date, end: date) -> dict:
    """Compute strength metrics for a URT code."""

    reviews = db.query("""
        SELECT intensity, valence, date
        FROM reviews
        WHERE business_id = %s
          AND (urt_primary = %s OR %s = ANY(urt_secondary))
          AND date BETWEEN %s AND %s
    """, [business_id, code, code, start, end])

    neg_reviews = [r for r in reviews if r['valence'] in ('V-', 'V±')]
    pos_reviews = [r for r in reviews if r['valence'] == 'V+']

    return {
        'code': code,
        'total_count': len(reviews),
        'negative_count': len(neg_reviews),
        'positive_count': len(pos_reviews),
        'negative_strength': compute_strength_score(neg_reviews),
        'positive_strength': compute_strength_score(pos_reviews),
        'avg_intensity': np.mean([
            INTENSITY_WEIGHTS[r['intensity']] for r in reviews
        ]) if reviews else 0,
        'max_intensity': max(
            (r['intensity'] for r in reviews),
            key=lambda i: INTENSITY_WEIGHTS.get(i, 0),
            default='I1'
        )
    }

3.8 Impact Timeline (Time-Series Aggregation)

Generate data for line charts showing issue/strength evolution over time:

def build_impact_timeline(business_id: str, code: str,
                          start: date, end: date,
                          bucket: str = 'week') -> list[dict]:
    """
    Time-series strength aggregation for charts.

    Returns data points for plotting:
    - X-axis: time periods
    - Y-axis: strength score (or count, rate)
    """

    timeline = db.query("""
        SELECT
            date_trunc(%s, date)::date as period,
            COUNT(*) as review_count,
            COUNT(*) FILTER (WHERE valence IN ('V-', 'V±')) as negative_count,
            COUNT(*) FILTER (WHERE valence = 'V+') as positive_count,
            SUM(CASE intensity
                WHEN 'I3' THEN 4
                WHEN 'I2' THEN 2
                ELSE 1
            END) as strength_score,
            SUM(CASE intensity
                WHEN 'I3' THEN 4
                WHEN 'I2' THEN 2
                ELSE 1
            END) FILTER (WHERE valence IN ('V-', 'V±')) as negative_strength,
            AVG(CASE intensity
                WHEN 'I3' THEN 3
                WHEN 'I2' THEN 2
                ELSE 1
            END) as avg_intensity,
            MAX(CASE intensity
                WHEN 'I3' THEN 3
                WHEN 'I2' THEN 2
                ELSE 1
            END) as max_intensity_num,
            COUNT(*) FILTER (WHERE comparative = 'CR-B') as cr_better,
            COUNT(*) FILTER (WHERE comparative = 'CR-W') as cr_worse,
            COUNT(*) FILTER (WHERE comparative = 'CR-S') as cr_same
        FROM reviews
        WHERE business_id = %s
          AND (urt_primary = %s OR %s = ANY(urt_secondary))
          AND date BETWEEN %s AND %s
        GROUP BY 1
        ORDER BY 1
    """, [bucket, business_id, code, code, start, end])

    # Fill gaps for continuous chart
    return fill_timeline_gaps(timeline, start, end, bucket)


def fill_timeline_gaps(data: list[dict], start: date, end: date,
                       bucket: str) -> list[dict]:
    """Ensure continuous timeline with zero-fill for missing periods."""

    from pandas import date_range

    freq = {'day': 'D', 'week': 'W-MON', 'month': 'MS'}[bucket]
    all_periods = date_range(start, end, freq=freq)

    data_map = {row['period']: row for row in data}

    result = []
    for period in all_periods:
        period_date = period.date()
        if period_date in data_map:
            result.append(data_map[period_date])
        else:
            result.append({
                'period': period_date,
                'review_count': 0,
                'negative_count': 0,
                'positive_count': 0,
                'strength_score': 0,
                'negative_strength': 0,
                'avg_intensity': None,
                'max_intensity_num': None,
                'cr_better': 0,
                'cr_worse': 0,
                'cr_same': 0
            })

    return result


def get_issue_impact_chart_data(issue_id: str,
                                 months_back: int = 6) -> dict:
    """
    Generate chart-ready data for a specific issue.

    Returns structure suitable for Recharts/Chart.js:
    {
        "issue": {...},
        "timeline": [
            {"period": "2026-01-06", "strength": 12, "count": 5, ...},
            {"period": "2026-01-13", "strength": 8, "count": 3, ...},
            ...
        ],
        "summary": {
            "total_strength": 156,
            "peak_period": "2026-01-06",
            "trend": "improving"
        }
    }
    """

    issue = db.query_one("""
        SELECT issue_id, primary_subcode, business_id, created_at
        FROM issues WHERE issue_id = %s
    """, [issue_id])

    end = date.today()
    start = end - timedelta(days=months_back * 30)

    timeline = build_impact_timeline(
        issue['business_id'],
        issue['primary_subcode'],
        start, end,
        bucket='week'
    )

    # Compute summary stats
    total_strength = sum(t['negative_strength'] or 0 for t in timeline)
    peak = max(timeline, key=lambda t: t['negative_strength'] or 0)

    # Trend: compare last 4 weeks vs prior 4 weeks
    recent = timeline[-4:] if len(timeline) >= 4 else timeline
    prior = timeline[-8:-4] if len(timeline) >= 8 else []

    recent_avg = np.mean([t['negative_strength'] or 0 for t in recent])
    prior_avg = np.mean([t['negative_strength'] or 0 for t in prior]) if prior else recent_avg

    if recent_avg < prior_avg * 0.7:
        trend = 'improving'
    elif recent_avg > prior_avg * 1.3:
        trend = 'worsening'
    else:
        trend = 'stable'

    return {
        'issue': {
            'issue_id': issue['issue_id'],
            'code': issue['primary_subcode'],
            'name': URT_CODE_NAMES.get(issue['primary_subcode'], issue['primary_subcode'])
        },
        'timeline': [
            {
                'period': t['period'].isoformat(),
                'strength': t['negative_strength'] or 0,
                'count': t['negative_count'],
                'avg_intensity': round(t['avg_intensity'], 2) if t['avg_intensity'] else None,
                'cr_signals': {
                    'better': t['cr_better'],
                    'worse': t['cr_worse'],
                    'same': t['cr_same']
                }
            }
            for t in timeline
        ],
        'summary': {
            'total_strength': total_strength,
            'peak_period': peak['period'].isoformat(),
            'peak_strength': peak['negative_strength'] or 0,
            'trend': trend
        }
    }

3.9 Timeline Data Model (Chart-Ready)

// TypeScript interface for frontend consumption

interface IssueTimelinePoint {
  period: string;           // ISO date "2026-01-06"
  strength: number;         // Weighted strength score
  count: number;            // Raw review count
  avg_intensity: number | null;
  cr_signals: {
    better: number;
    worse: number;
    same: number;
  };
}

interface IssueImpactChart {
  issue: {
    issue_id: string;
    code: string;           // "J1.01"
    name: string;           // "Wait Time"
  };
  timeline: IssueTimelinePoint[];
  summary: {
    total_strength: number;
    peak_period: string;
    peak_strength: number;
    trend: 'improving' | 'worsening' | 'stable';
  };
}

// Example Recharts usage:
// <LineChart data={chartData.timeline}>
//   <Line dataKey="strength" stroke="#ef4444" name="Impact" />
//   <Line dataKey="count" stroke="#6b7280" name="Reviews" />
// </LineChart>

Part 4: Report Generation

4.1 Report Structure

def generate_report(business_id: str, start: date, end: date) -> dict:
    """Generate comprehensive business intelligence report."""

    # 1. Aggregate statistics by URT code
    code_stats = compute_code_statistics(business_id, start, end)

    # 2. Deep analysis of top issues (sub-clustering)
    top_issues = analyze_top_issues(business_id, code_stats, start, end)

    # 3. Strength analysis
    strengths = analyze_strengths(business_id, code_stats, start, end)

    # 4. Trend analysis (vs prior period)
    trends = compute_trends(business_id, start, end)

    # 5. Staff insights
    staff = analyze_staff_mentions(business_id, start, end)

    # 6. Open issues summary
    open_issues = get_open_issues(business_id)

    # 7. Build payload
    payload = build_report_payload(
        business_id, start, end,
        top_issues, strengths, trends, staff, open_issues
    )

    # 8. LLM narration
    narrative = await generate_narrative(payload)

    return {
        'payload': payload,
        'narrative': narrative,
        'generated_at': datetime.now().isoformat()
    }

4.2 Statistics Computation

Review-level presence with Wilson confidence intervals:

def compute_code_statistics(business_id: str, start: date, end: date) -> list[dict]:
    """Aggregate statistics by URT code with confidence intervals."""

    stats = db.query("""
        WITH review_codes AS (
            SELECT
                review_id,
                urt_primary as code,
                valence,
                intensity
            FROM reviews
            WHERE business_id = %s AND date BETWEEN %s AND %s

            UNION ALL

            SELECT
                review_id,
                unnest(urt_secondary) as code,
                valence,
                intensity
            FROM reviews
            WHERE business_id = %s AND date BETWEEN %s AND %s
              AND array_length(urt_secondary, 1) > 0
        ),
        code_stats AS (
            SELECT
                code,
                COUNT(DISTINCT review_id) as k,
                COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V-') as k_neg,
                COUNT(DISTINCT review_id) FILTER (WHERE valence = 'V+') as k_pos,
                MAX(CASE intensity
                    WHEN 'I3' THEN 3 WHEN 'I2' THEN 2 ELSE 1 END) as max_intensity
            FROM review_codes
            GROUP BY code
        )
        SELECT
            cs.*,
            (SELECT COUNT(DISTINCT review_id) FROM reviews
             WHERE business_id = %s AND date BETWEEN %s AND %s) as n
        FROM code_stats cs
        WHERE k >= 3
        ORDER BY k_neg DESC
    """, [business_id, start, end] * 4)

    results = []
    for row in stats:
        n = row['n']

        # Wilson confidence intervals
        ci_neg = wilson_ci(row['k_neg'], n) if row['k_neg'] > 0 else (0, 0)
        ci_pos = wilson_ci(row['k_pos'], n) if row['k_pos'] > 0 else (0, 0)

        results.append({
            'code': row['code'],
            'domain': row['code'][0],
            'name': URT_CODE_NAMES[row['code']],
            'k': row['k'],
            'k_neg': row['k_neg'],
            'k_pos': row['k_pos'],
            'n': n,
            'rate_neg': row['k_neg'] / n if n > 0 else 0,
            'rate_pos': row['k_pos'] / n if n > 0 else 0,
            'ci_neg': ci_neg,
            'ci_pos': ci_pos,
            'max_intensity': f"I{row['max_intensity']}",
        })

    return results


def wilson_ci(k: int, n: int, z: float = 1.96) -> tuple[float, float]:
    """Wilson score interval for binomial proportion."""
    if n == 0:
        return (0.0, 0.0)

    p = k / n
    denom = 1 + z**2 / n
    center = (p + z**2 / (2*n)) / denom
    margin = z * math.sqrt((p*(1-p) + z**2/(4*n)) / n) / denom

    return (max(0, center - margin), min(1, center + margin))

4.3 Sub-Pattern Discovery (Local ML)

The key insight: LLM gives categories, local ML reveals patterns within categories.

def analyze_top_issues(business_id: str, code_stats: list,
                       start: date, end: date, top_k: int = 5) -> list[dict]:
    """Deep analysis of top negative codes with sub-clustering."""

    # Filter to significant negative codes
    issues_to_analyze = [
        cs for cs in code_stats
        if cs['k_neg'] >= 8 and cs['ci_neg'][1] - cs['ci_neg'][0] <= 0.30
    ][:top_k]

    results = []
    for code_stat in issues_to_analyze:
        code = code_stat['code']

        # Fetch all negative reviews for this code
        reviews = db.query("""
            SELECT review_id, text, embedding, intensity, quotes, date
            FROM reviews
            WHERE business_id = %s
              AND (urt_primary = %s OR %s = ANY(urt_secondary))
              AND valence IN ('V-', 'V±')
              AND date BETWEEN %s AND %s
        """, [business_id, code, code, start, end])

        # Sub-cluster to find patterns
        sub_patterns = discover_sub_patterns(reviews, code)

        results.append({
            'code': code,
            'name': code_stat['name'],
            'total_reviews': code_stat['k_neg'],
            'rate': code_stat['rate_neg'],
            'ci': code_stat['ci_neg'],
            'max_intensity': code_stat['max_intensity'],
            'sub_patterns': sub_patterns,
        })

    return results


def discover_sub_patterns(reviews: list[dict], code: str,
                          min_cluster_size: int = 3) -> list[dict]:
    """Cluster reviews within a URT code to find actionable sub-patterns."""

    if len(reviews) < min_cluster_size * 2:
        # Too few for meaningful clustering
        return [{
            'label': 'General',
            'count': len(reviews),
            'percentage': 1.0,
            'representative_quote': select_representative(reviews, code),
            'sharpest_quote': select_sharpest(reviews, code),
        }]

    embeddings = np.array([r['embedding'] for r in reviews])

    # HDBSCAN for small datasets, KMeans for larger
    if len(reviews) < 500:
        clusterer = hdbscan.HDBSCAN(
            min_cluster_size=min_cluster_size,
            min_samples=2,
            metric='euclidean'
        )
        labels = clusterer.fit_predict(embeddings)
    else:
        k = min(8, max(3, int(np.sqrt(len(reviews) / 5))))
        kmeans = KMeans(n_clusters=k, n_init=3)
        labels = kmeans.fit_predict(embeddings)

    # Group by cluster
    clusters = {}
    for review, label in zip(reviews, labels):
        if label == -1:  # Noise
            continue
        if label not in clusters:
            clusters[label] = []
        clusters[label].append(review)

    # Build sub-pattern descriptions
    patterns = []
    for label, cluster_reviews in clusters.items():
        if len(cluster_reviews) < min_cluster_size:
            continue

        cluster_embeddings = np.array([r['embedding'] for r in cluster_reviews])
        centroid = cluster_embeddings.mean(axis=0)
        centroid /= np.linalg.norm(centroid)

        patterns.append({
            'label': extract_cluster_label(cluster_reviews, code),
            'count': len(cluster_reviews),
            'percentage': len(cluster_reviews) / len(reviews),
            'representative_quote': select_representative(cluster_reviews, code, centroid),
            'sharpest_quote': select_sharpest(cluster_reviews, code),
            'avg_intensity': np.mean([
                {'I1': 1, 'I2': 2, 'I3': 3}[r['intensity']]
                for r in cluster_reviews
            ]),
            'centroid': centroid,  # For trend matching
        })

    patterns.sort(key=lambda x: x['count'], reverse=True)
    return patterns[:4]  # Top 4 sub-patterns


def select_representative(reviews: list, code: str,
                          centroid: np.ndarray = None) -> str:
    """Select quote closest to centroid (most representative)."""

    if centroid is None:
        embeddings = np.array([r['embedding'] for r in reviews])
        centroid = embeddings.mean(axis=0)
        centroid /= np.linalg.norm(centroid)

    best_review = max(reviews, key=lambda r: r['embedding'] @ centroid)

    # Return the extracted quote for this code, or truncated text
    if best_review.get('quotes') and code in best_review['quotes']:
        return best_review['quotes'][code]
    return best_review['text'][:150]


def select_sharpest(reviews: list, code: str) -> str:
    """Select highest intensity quote (sharpest criticism)."""

    intensity_order = {'I3': 3, 'I2': 2, 'I1': 1}
    best_review = max(reviews, key=lambda r: intensity_order.get(r['intensity'], 0))

    if best_review.get('quotes') and code in best_review['quotes']:
        return best_review['quotes'][code]
    return best_review['text'][:150]


def extract_cluster_label(reviews: list, code: str) -> str:
    """Generate a concise label for the cluster."""

    # Extract common phrases from quotes
    texts = []
    for r in reviews:
        if r.get('quotes') and code in r['quotes']:
            texts.append(r['quotes'][code].lower())
        else:
            texts.append(r['text'][:100].lower())

    # Find distinctive 2-3 word phrases
    from collections import Counter

    all_text = ' '.join(texts)
    words = re.findall(r'\b[a-z]{3,}\b', all_text)

    # Bigrams, skip stopwords
    stopwords = {'the', 'and', 'was', 'were', 'for', 'that', 'this', 'with', 'but', 'have', 'had'}
    bigrams = [
        f"{words[i]} {words[i+1]}"
        for i in range(len(words)-1)
        if words[i] not in stopwords and words[i+1] not in stopwords
    ]

    counts = Counter(bigrams)
    if counts:
        label = counts.most_common(1)[0][0]
        return label.title()

    return URT_CODE_NAMES.get(code, "General")

4.4 Trend Analysis

Combine rate comparison with CR signals:

def compute_trends(business_id: str, current_start: date,
                   current_end: date) -> dict:
    """Compute trends vs prior period using rates and CR signals."""

    period_length = (current_end - current_start).days
    prior_start = current_start - timedelta(days=period_length)
    prior_end = current_start

    # Current period stats
    current_stats = compute_code_statistics(business_id, current_start, current_end)
    current_map = {cs['code']: cs for cs in current_stats}

    # Prior period stats
    prior_stats = compute_code_statistics(business_id, prior_start, prior_end)
    prior_map = {cs['code']: cs for cs in prior_stats}

    # CR signals in current period
    cr_signals = db.query("""
        SELECT urt_primary as code, comparative, COUNT(*) as count
        FROM reviews
        WHERE business_id = %s
          AND date BETWEEN %s AND %s
          AND comparative != 'CR-N'
        GROUP BY urt_primary, comparative
    """, [business_id, current_start, current_end])

    cr_map = {}
    for row in cr_signals:
        if row['code'] not in cr_map:
            cr_map[row['code']] = {'CR-B': 0, 'CR-W': 0, 'CR-S': 0}
        cr_map[row['code']][row['comparative']] = row['count']

    # Compute trends
    trends = {}
    for code, current in current_map.items():
        prior = prior_map.get(code, {'rate_neg': 0, 'rate_pos': 0, 'k_neg': 0, 'k_pos': 0})
        cr = cr_map.get(code, {'CR-B': 0, 'CR-W': 0, 'CR-S': 0})

        # Rate-based trend (issues)
        rate_trend_neg = current['rate_neg'] - prior['rate_neg']

        # CR-enhanced trend signal
        if cr['CR-W'] >= 2:
            trend_signal = 'worsening'
        elif cr['CR-B'] >= 2:
            trend_signal = 'improving'
        elif cr['CR-S'] >= 2:
            trend_signal = 'persistent'
        elif rate_trend_neg > 0.05:
            trend_signal = 'worsening'
        elif rate_trend_neg < -0.05:
            trend_signal = 'improving'
        else:
            trend_signal = 'stable'

        trends[code] = {
            'rate_change_neg': rate_trend_neg,
            'rate_change_pos': current['rate_pos'] - prior['rate_pos'],
            'signal': trend_signal,
            'cr_better': cr['CR-B'],
            'cr_worse': cr['CR-W'],
            'cr_same': cr['CR-S'],
        }

    return trends

4.5 Staff Analysis

def analyze_staff_mentions(business_id: str, start: date, end: date) -> dict:
    """Aggregate staff performance from mentions."""

    staff_data = db.query("""
        SELECT
            unnest(staff_mentions) as staff_name,
            valence,
            intensity,
            urt_primary,
            quotes
        FROM reviews
        WHERE business_id = %s
          AND date BETWEEN %s AND %s
          AND array_length(staff_mentions, 1) > 0
    """, [business_id, start, end])

    staff_map = {}
    for row in staff_data:
        name = row['staff_name']
        if name not in staff_map:
            staff_map[name] = {
                'positive': [],
                'negative': [],
                'codes': Counter()
            }

        if row['valence'] in ('V+',):
            staff_map[name]['positive'].append(row)
        elif row['valence'] in ('V-',):
            staff_map[name]['negative'].append(row)

        staff_map[name]['codes'][row['urt_primary']] += 1

    # Build summary
    staff_summary = []
    for name, data in staff_map.items():
        total = len(data['positive']) + len(data['negative'])
        if total < 2:
            continue  # Need multiple mentions

        staff_summary.append({
            'name': name,
            'total_mentions': total,
            'positive': len(data['positive']),
            'negative': len(data['negative']),
            'sentiment_ratio': len(data['positive']) / total,
            'top_codes': data['codes'].most_common(3),
            'sample_praise': data['positive'][0]['quotes'] if data['positive'] else None,
            'sample_criticism': data['negative'][0]['quotes'] if data['negative'] else None,
        })

    staff_summary.sort(key=lambda x: x['total_mentions'], reverse=True)

    return {
        'staff': staff_summary,
        'top_performer': max(staff_summary, key=lambda x: x['sentiment_ratio']) if staff_summary else None,
        'needs_attention': [s for s in staff_summary if s['sentiment_ratio'] < 0.5],
    }

4.6 LLM Narrative Generation

NARRATIVE_PROMPT = """You are a business intelligence analyst writing an executive summary of customer feedback.

You MUST follow these rules:
1. Only state claims supported by the provided data
2. Include specific numbers (percentages, counts) for every claim
3. Do not invent or hallucinate any statistics
4. Be direct and actionable, not vague
5. Highlight the most impactful findings first

The report payload follows. Write a concise executive summary (~300 words) covering:
- Top issues with their sub-patterns and severity
- Notable strengths
- Trend signals (improving/worsening/persistent)
- Staff highlights
- 2-3 prioritized recommendations

REPORT DATA:
{payload}"""


async def generate_narrative(payload: dict) -> str:
    """Generate executive narrative from structured payload."""

    response = await llm.chat(
        model="gpt-4o",  # Use stronger model for narrative
        messages=[
            {"role": "system", "content": NARRATIVE_PROMPT.format(
                payload=json.dumps(payload, indent=2)
            )}
        ],
        temperature=0.3
    )

    return response.content

Part 5: Report Output Example

5.1 Structured Payload

{
  "business_id": "rest_12345",
  "period": "2026-01-01 to 2026-01-31",
  "total_reviews": 234,

  "issues": [
    {
      "code": "J1.01",
      "name": "Wait Time",
      "total_reviews": 47,
      "rate": 0.201,
      "ci": [0.153, 0.258],
      "max_intensity": "I3",
      "trend": {
        "signal": "worsening",
        "cr_worse": 3,
        "rate_change": 0.042
      },
      "sub_patterns": [
        {
          "label": "Table Seating",
          "count": 20,
          "percentage": 0.426,
          "representative_quote": "Waited 45 minutes for a table even with reservation",
          "sharpest_quote": "HOUR wait on a Tuesday. Unacceptable."
        },
        {
          "label": "Food After Ordering",
          "count": 15,
          "percentage": 0.319,
          "representative_quote": "Food took 40 minutes after we ordered",
          "sharpest_quote": "Over an hour for cold pasta"
        },
        {
          "label": "Check Payment",
          "count": 12,
          "percentage": 0.255,
          "representative_quote": "Had to flag someone down just to pay",
          "sharpest_quote": "20 minutes for the check, ridiculous"
        }
      ]
    }
  ],

  "strengths": [
    {
      "code": "O2.02",
      "name": "Craftsmanship",
      "total_reviews": 89,
      "rate": 0.380,
      "ci": [0.318, 0.446],
      "trend": {"signal": "stable"},
      "representative_quote": "The pasta is clearly made fresh, incredible quality"
    }
  ],

  "staff": {
    "top_performer": {
      "name": "Maria",
      "mentions": 12,
      "sentiment_ratio": 0.917
    },
    "needs_attention": [
      {
        "name": "Tom",
        "mentions": 8,
        "sentiment_ratio": 0.375,
        "top_issues": ["P1.02", "P3.01"]
      }
    ]
  },

  "open_issues": [
    {
      "issue_id": "ISSUE-2026-0142",
      "code": "J1.01",
      "state": "IN_PROGRESS",
      "priority": 7.45,
      "days_open": 12
    }
  ]
}

5.2 Generated Narrative

Executive Summary: January 2026

Analysis of 234 reviews reveals wait times as the critical issue, affecting 20.1% of customers (95% CI: 15.3%-25.8%) — a worsening trend with 3 explicit "worse than before" signals this month.

Wait Time Breakdown:

  • Seating delays (43%): Customers report 30-60 minute waits despite reservations. "Waited 45 minutes for a table even with reservation."
  • Kitchen delays (32%): Food taking 40+ minutes after ordering. "Over an hour for cold pasta."
  • Checkout friction (26%): Difficulty getting the check. "20 minutes for the check, ridiculous."

Strengths remain strong: Food craftsmanship praised in 38% of reviews, stable month-over-month. "The pasta is clearly made fresh, incredible quality."

Staff Notes: Maria received 12 mentions with 92% positive sentiment. Tom (8 mentions, 38% positive) shows patterns in P1.02 (Respect) and P3.01 (Attentiveness) — recommend coaching session.

Prioritized Recommendations:

  1. Immediate: Audit reservation system — seating bottleneck is primary wait issue
  2. This Week: Review kitchen workflow for food delivery timing
  3. This Month: Implement checkout process training (e.g., table check-in rotation)

One high-priority issue (ISSUE-2026-0142) is in progress with 12 days elapsed.


Part 6: Cost Model

Stage When Cost Notes
Embedding Per review ingested $0.00 Local model, ~50ms/review
LLM Classification Per review ingested ~$0.0002 GPT-4o-mini, batched
Issue Aggregation Per V- review $0.00 SQL queries
Sub-Clustering Per report $0.00 HDBSCAN/KMeans, <1s
Trend Analysis Per report $0.00 SQL + computation
LLM Narrative Per report ~$0.15 GPT-4o, single call

Total Costs:

Volume Monthly Ingest Reports (10/month) Total
1K reviews $0.20 $1.50 $1.70
10K reviews $2.00 $1.50 $3.50
100K reviews $20.00 $1.50 $21.50

Part 7: Implementation Checklist

Phase 1: Core Pipeline

  • Set up PostgreSQL with pgvector extension
  • Implement embedding generation (multilingual-e5-small)
  • Build LLM classification module with batching
  • Create review ingestion pipeline
  • Implement URT code reference data

Phase 2: Issue Lifecycle

  • Implement issue aggregation logic
  • Build state machine with transitions
  • Create priority scoring function
  • Add CR signal processing for verification
  • Set up issue event logging
  • Implement strength score aggregation
  • Build issue review drill-down query
  • Create impact timeline aggregation
  • Set up issue_timeseries table and population

Phase 3: Report Generation

  • Build statistics aggregation queries
  • Implement sub-pattern clustering
  • Add trend analysis with CR integration
  • Create staff analysis module
  • Build narrative generation prompt

Phase 4: Integration

  • API endpoints for ingestion
  • Report generation endpoint
  • Issue management endpoints
  • Issue timeline chart endpoint
  • Issue review table endpoint
  • Dashboard queries
  • Alert/notification hooks

Part 8: Key Innovations

Innovation Benefit
LLM at ingest, not report Accurate classification amortized across all reports
URT as structure Stable, interpretable categories; no clustering drift
Multi-coding Handle complex reviews without fragmentation
Sub-clustering within codes Actionable patterns beyond category level
CR for verification Automatic resolution validation from customer feedback
Review as unit Preserve context; avoid embedding quality loss
Issue lifecycle Operational tracking with statistical rigor
Strength score Unified impact metric: volume × intensity
Impact timeline Time-series visualization for trend analysis
Issue drill-down Full review table for any aggregated issue

Document Control

Field Value
Document ReviewIQ Architecture v3.0
Status Specification Complete
Date 2026-01-24
Dependencies URT Specification v5.1, Issue Lifecycle Framework C1
Cost Target <$25/month at 100K reviews
Accuracy Target >90% URT classification, >85% sub-pattern relevance

Changelog v3.0

Addition Description
3.6 Issue Review Drill-Down Query to fetch all reviews for a specific issue
3.7 Strength Score Aggregation Unified metric: count × intensity weight
3.8 Impact Timeline Time-series aggregation for line charts
3.9 Timeline Data Model TypeScript interface for frontend charts
issue_timeseries table Persistent time-bucketed aggregation

End of ReviewIQ Architecture v3.0