#!/usr/bin/env python3 """ Resolve GBP taxonomy categories for all jobs. Uses exact match, LLM match, or hierarchical classification. Usage: source .env && python scripts/resolve_job_categories.py """ import asyncio import os import sys from dataclasses import dataclass from typing import Optional import asyncpg from openai import OpenAI DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://scraper:scraper123@localhost:5437/scraper') @dataclass class ResolvedCategory: """Result of category resolution.""" category_id: int path: str name: str level: int method: str # 'exact', 'llm', 'hierarchical' confidence: float class SimpleLLM: """Simple OpenAI wrapper for category resolution.""" def __init__(self): self.client = OpenAI() async def complete(self, prompt: str, max_tokens: int = 50, temperature: float = 0) -> str: """Get completion from OpenAI.""" response = await asyncio.to_thread( self.client.chat.completions.create, model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature ) return response.choices[0].message.content.strip() class CategoryResolver: """Resolves business categories to GBP taxonomy nodes.""" def __init__(self, pool: asyncpg.Pool, llm: SimpleLLM): self.pool = pool self.llm = llm self._level1_cache: list[dict] = [] self._level2_cache: dict[str, list[dict]] = {} self._level3_cache: dict[str, list[dict]] = {} async def resolve( self, google_category: Optional[str] = None, business_name: Optional[str] = None, business_address: Optional[str] = None ) -> Optional[ResolvedCategory]: """Resolve to the deepest taxonomy node.""" # Phase 1: Exact match if google_category: result = await self._exact_match(google_category) if result: return result # Phase 2: LLM match result = await self._llm_match(google_category) if result: return result # Phase 3: Hierarchical classification if business_name: result = await self._hierarchical_classify( business_name=business_name, business_address=business_address, google_category=google_category ) if result: return result return None async def _exact_match(self, google_category: str) -> Optional[ResolvedCategory]: """Try exact match against taxonomy.""" async with self.pool.acquire() as conn: # Exact match (case-insensitive) row = await conn.fetchrow(""" SELECT id, name, path::text as path, level FROM gbp_categories WHERE LOWER(name) = LOWER($1) AND level = 3 """, google_category) if row: return ResolvedCategory( category_id=row['id'], path=row['path'], name=row['name'], level=row['level'], method='exact', confidence=1.0 ) # Trigram similarity match (handles typos, slight variations) # Threshold 0.7 = high confidence only, else fall through to LLM row = await conn.fetchrow(""" SELECT id, name, path::text as path, level, similarity(LOWER(name), LOWER($1)) as sim FROM gbp_categories WHERE level = 3 AND similarity(LOWER(name), LOWER($1)) > 0.7 ORDER BY sim DESC LIMIT 1 """, google_category) if row: return ResolvedCategory( category_id=row['id'], path=row['path'], name=row['name'], level=row['level'], method='fuzzy', confidence=float(row['sim']) ) return None async def _llm_match(self, google_category: str) -> Optional[ResolvedCategory]: """Use LLM to match Google category to taxonomy.""" # Synonym expansion for common variations SYNONYMS = { 'shop': ['store', 'shop', 'outlet'], 'store': ['store', 'shop', 'outlet'], 'house': ['house', 'home'], 'home': ['house', 'home'], 'office': ['office', 'clinic', 'center'], 'clinic': ['clinic', 'office', 'center'], 'center': ['center', 'centre'], 'centre': ['center', 'centre'], 'repair': ['repair', 'service', 'maintenance'], } async with self.pool.acquire() as conn: # Get candidates using multiple strategies: # 1. Word matches with synonym expansion # 2. Trigram similarity words = google_category.lower().split() expanded_words = set() for w in words: if len(w) > 2: expanded_words.add(w) if w in SYNONYMS: expanded_words.update(SYNONYMS[w]) word_conditions = " OR ".join([f"LOWER(name) LIKE '%{w}%'" for w in expanded_words]) primary_word = google_category.lower().split()[0] # First word is usually most important # Order by: starts with primary word, then by similarity candidates = await conn.fetch(f""" SELECT DISTINCT id, name, path::text as path, level, CASE WHEN LOWER(name) LIKE $2 THEN 1 ELSE 0 END as starts_with, similarity(LOWER(name), LOWER($1)) as sim FROM gbp_categories WHERE level = 3 AND ( ({word_conditions if word_conditions else 'FALSE'}) OR similarity(LOWER(name), LOWER($1)) > 0.3 ) ORDER BY starts_with DESC, sim DESC LIMIT 20 """, google_category, f"{primary_word}%") if not candidates: return None candidate_list = "\n".join([f"- {c['name']}" for c in candidates]) prompt = f"""Match business category "{google_category}" to the closest option. Synonyms: shop=store, house=cafe/home, office=clinic/center Options: {candidate_list} Reply with ONLY the exact category name from the list.""" response = await self.llm.complete(prompt, max_tokens=30) selected = response.strip().strip('"').strip("'") if selected.upper() == "NONE": return None for c in candidates: if c['name'].lower() == selected.lower(): return ResolvedCategory( category_id=c['id'], path=c['path'], name=c['name'], level=c['level'], method='llm', confidence=0.85 ) # Fuzzy match selected name to candidates for c in candidates: if selected.lower() in c['name'].lower() or c['name'].lower() in selected.lower(): return ResolvedCategory( category_id=c['id'], path=c['path'], name=c['name'], level=c['level'], method='llm', confidence=0.75 ) return None async def _hierarchical_classify( self, business_name: str, business_address: Optional[str] = None, google_category: Optional[str] = None ) -> Optional[ResolvedCategory]: """Walk down taxonomy tree using LLM.""" context = f"Business: {business_name}" if business_address: context += f"\nAddress: {business_address}" if google_category: context += f"\nHint: {google_category}" # Level 1 level1 = await self._get_categories(1) sector = await self._llm_select(context, level1, "sector") if not sector: return None # Level 2 level2 = await self._get_categories(2, sector['path']) biz_type = await self._llm_select(context, level2, "business type", sector['name']) if not biz_type: return None # Level 3 level3 = await self._get_categories(3, biz_type['path']) specific = await self._llm_select(context, level3, "specific category", biz_type['name']) if not specific: return None return ResolvedCategory( category_id=specific['id'], path=specific['path'], name=specific['name'], level=specific['level'], method='hierarchical', confidence=0.7 ) async def _get_categories(self, level: int, parent_path: str = None) -> list[dict]: """Get categories at level, optionally under parent.""" async with self.pool.acquire() as conn: if parent_path: rows = await conn.fetch(""" SELECT id, name, path::text as path, level FROM gbp_categories WHERE level = $1 AND path <@ $2::ltree ORDER BY name """, level, parent_path) else: rows = await conn.fetch(""" SELECT id, name, path::text as path, level FROM gbp_categories WHERE level = $1 ORDER BY name """, level) return [dict(r) for r in rows] async def _llm_select( self, context: str, categories: list[dict], level_name: str, parent: str = None ) -> Optional[dict]: """Ask LLM to select best category.""" if not categories: return None if len(categories) == 1: return categories[0] cat_list = "\n".join([f"- {c['name']}" for c in categories]) parent_ctx = f" within {parent}" if parent else "" prompt = f"""{context} Select the most appropriate {level_name}{parent_ctx}. Options: {cat_list} Respond with ONLY the exact name from the list.""" response = await self.llm.complete(prompt) selected = response.strip().strip('"').strip("'") for c in categories: if c['name'].lower() == selected.lower(): return c # Fuzzy fallback for c in categories: if selected.lower() in c['name'].lower(): return c return categories[0] if categories else None async def main(): # Connect to database pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=5) # Initialize LLM client llm = SimpleLLM() try: # Get jobs needing category resolution async with pool.acquire() as conn: jobs = await conn.fetch(""" SELECT job_id, business_name, business_category, business_address FROM jobs WHERE status = 'completed' AND gbp_category_path IS NULL ORDER BY created_at DESC """) print(f"Found {len(jobs)} jobs needing category resolution\n") resolver = CategoryResolver(pool, llm) resolved = 0 failed = 0 for job in jobs: job_id = str(job['job_id']) name = job['business_name'] or 'Unknown' google_cat = job['business_category'] address = job['business_address'] print(f"Processing: {name[:50]}...") if google_cat: print(f" Google category: {google_cat}") try: result = await resolver.resolve( google_category=google_cat, business_name=name, business_address=address ) if result: # Determine source: google if they had a category, inferred if we used business name category_source = 'google' if google_cat else 'inferred' # Save to database async with pool.acquire() as conn: await conn.execute(""" UPDATE jobs SET gbp_category_id = $2, gbp_category_path = $3::ltree, category_resolution_method = $4, business_category_source = $5, updated_at = NOW() WHERE job_id = $1::uuid """, job_id, result.category_id, result.path, result.method, category_source) print(f" ✓ Resolved: {result.path} ({result.method}, source={category_source})") resolved += 1 else: print(f" ✗ Could not resolve") failed += 1 except Exception as e: print(f" ✗ Error: {e}") failed += 1 print(f"\n{'='*50}") print(f"Done! Resolved: {resolved}, Failed: {failed}") # Show results async with pool.acquire() as conn: results = await conn.fetch(""" SELECT business_name, business_category, gbp_category_path::text as resolved_path, category_resolution_method, business_category_source FROM jobs WHERE status = 'completed' AND gbp_category_path IS NOT NULL ORDER BY created_at DESC LIMIT 10 """) print(f"\n{'='*50}") print("Recent resolved categories:") for r in results: source = r['business_category_source'] or '-' print(f" {r['business_name'][:30]:30} | {r['business_category'] or '-':20} | {source:8} -> {r['resolved_path']} ({r['category_resolution_method']})") finally: await pool.close() if __name__ == '__main__': asyncio.run(main())