feat: Add reviewiq-pipeline package for LLM-powered review classification

Implement a standalone Python package for processing customer reviews through
a 4-stage pipeline using URT (Universal Review Taxonomy) v5.1:

- Stage 1: Normalization (text cleaning, language detection, deduplication)
- Stage 2: LLM Classification (OpenAI/Anthropic span extraction with URT codes)
- Stage 3: Issue Routing (deterministic issue ID generation, span linking)
- Stage 4: Fact Aggregation (time series metrics for dashboards)

Package includes:
- TypedDict contracts matching Pipeline-Contracts-v1.md
- Async database layer with asyncpg and 5 SQL migrations
- LLM client abstraction supporting both OpenAI and Anthropic
- Sentence-transformers integration for embeddings
- Validation rules V1.x through V4.x
- CLI commands: migrate, run, validate, check
- 55 unit and integration tests (all passing)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-24 18:07:11 +00:00
parent b780a23b66
commit 7d720f5378
34 changed files with 7222 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
"""Services for pipeline operations."""
from reviewiq_pipeline.services.embeddings import EmbeddingService
from reviewiq_pipeline.services.llm_client import LLMClient
from reviewiq_pipeline.services.text_processor import TextProcessor
__all__ = [
"LLMClient",
"EmbeddingService",
"TextProcessor",
]

View File

@@ -0,0 +1,225 @@
"""
Embedding service for generating text embeddings.
Uses sentence-transformers with the all-MiniLM-L6-v2 model (384 dimensions).
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from reviewiq_pipeline.config import Config
logger = logging.getLogger(__name__)
class EmbeddingService:
"""
Service for generating text embeddings using sentence-transformers.
Uses the all-MiniLM-L6-v2 model by default, which produces 384-dimensional
embeddings suitable for semantic similarity and clustering.
"""
def __init__(self, config: Config):
"""
Initialize the embedding service.
Args:
config: Pipeline configuration with embedding model settings
"""
self.config = config
self.model_name = config.embedding_model
self.dimension = config.embedding_dimension
self._model = None
self._initialized = False
def _ensure_initialized(self) -> None:
"""Lazy initialization of the sentence-transformers model."""
if self._initialized:
return
try:
from sentence_transformers import SentenceTransformer
logger.info(f"Loading embedding model: {self.model_name}")
self._model = SentenceTransformer(self.model_name)
self._initialized = True
logger.info(f"Embedding model loaded. Dimension: {self._model.get_sentence_embedding_dimension()}")
except ImportError:
raise ImportError(
"sentence-transformers is required for embeddings. "
"Install with: pip install sentence-transformers"
)
def embed(self, text: str) -> list[float]:
"""
Generate embedding for a single text.
Args:
text: Text to embed
Returns:
List of floats representing the embedding vector
"""
self._ensure_initialized()
if not text or not text.strip():
# Return zero vector for empty text
return [0.0] * self.dimension
embedding = self._model.encode(text, convert_to_numpy=True)
return embedding.tolist()
def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""
Generate embeddings for multiple texts.
More efficient than calling embed() repeatedly.
Args:
texts: List of texts to embed
Returns:
List of embedding vectors
"""
self._ensure_initialized()
if not texts:
return []
# Handle empty strings
non_empty_indices = [i for i, t in enumerate(texts) if t and t.strip()]
non_empty_texts = [texts[i] for i in non_empty_indices]
if not non_empty_texts:
return [[0.0] * self.dimension for _ in texts]
# Batch encode
embeddings = self._model.encode(non_empty_texts, convert_to_numpy=True)
# Build result with zero vectors for empty strings
result = [[0.0] * self.dimension for _ in texts]
for idx, emb in zip(non_empty_indices, embeddings):
result[idx] = emb.tolist()
return result
def similarity(self, embedding1: list[float], embedding2: list[float]) -> float:
"""
Calculate cosine similarity between two embeddings.
Args:
embedding1: First embedding vector
embedding2: Second embedding vector
Returns:
Cosine similarity score between -1 and 1
"""
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
# Handle zero vectors
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(vec1, vec2) / (norm1 * norm2))
def find_similar(
self,
query_embedding: list[float],
candidate_embeddings: list[list[float]],
top_k: int = 5,
threshold: float = 0.0,
) -> list[tuple[int, float]]:
"""
Find most similar embeddings to a query.
Args:
query_embedding: Query embedding vector
candidate_embeddings: List of candidate embeddings
top_k: Number of top results to return
threshold: Minimum similarity threshold
Returns:
List of (index, similarity) tuples, sorted by similarity descending
"""
if not candidate_embeddings:
return []
query = np.array(query_embedding)
candidates = np.array(candidate_embeddings)
# Compute all similarities at once
query_norm = np.linalg.norm(query)
if query_norm == 0:
return []
candidate_norms = np.linalg.norm(candidates, axis=1)
# Avoid division by zero
valid_mask = candidate_norms > 0
similarities = np.zeros(len(candidates))
similarities[valid_mask] = (
np.dot(candidates[valid_mask], query)
/ (candidate_norms[valid_mask] * query_norm)
)
# Filter by threshold and get top k
results = [
(i, float(sim))
for i, sim in enumerate(similarities)
if sim >= threshold
]
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
@property
def model(self):
"""Get the underlying sentence-transformers model."""
self._ensure_initialized()
return self._model
def normalize_embedding(embedding: list[float]) -> list[float]:
"""
Normalize an embedding to unit length.
Args:
embedding: Embedding vector
Returns:
Unit-normalized embedding
"""
vec = np.array(embedding)
norm = np.linalg.norm(vec)
if norm == 0:
return embedding
return (vec / norm).tolist()
def average_embeddings(embeddings: list[list[float]]) -> list[float]:
"""
Compute the average of multiple embeddings.
Useful for creating centroid vectors for clustering.
Args:
embeddings: List of embedding vectors
Returns:
Averaged embedding vector
"""
if not embeddings:
raise ValueError("Cannot average empty embedding list")
arr = np.array(embeddings)
return arr.mean(axis=0).tolist()

View File

@@ -0,0 +1,432 @@
"""
LLM client abstraction supporting OpenAI and Anthropic.
Provides a unified interface for classification requests with:
- Provider abstraction (OpenAI/Anthropic)
- Structured output (JSON mode)
- Retry handling
- Cost tracking
"""
from __future__ import annotations
import json
import logging
import time
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from reviewiq_pipeline.config import Config
from reviewiq_pipeline.contracts import LLMClassificationResponse
logger = logging.getLogger(__name__)
# System prompt for URT classification
SYSTEM_PROMPT = """You are a review classification system using URT (Universal Review Taxonomy) v5.1.
Your task is to extract semantic spans from customer reviews and classify each span independently.
## SPAN EXTRACTION RULES
1. **Split on contrasting conjunctions**: but, however, although, despite, yet, though
2. **Split on topic/target change**: food → service → bathroom = 3 spans
3. **Split on valence change**: positive → negative = split
4. **Split on domain change**: O (Offering) → J (Journey) → E (Environment) = split
5. **Keep together**: cause→effect within same feedback unit ("X because Y" = 1 span)
**Guardrails**:
- Max 3 spans per sentence (if 4+, re-check for over-splitting)
- Min 1 span per review (even single-word reviews)
- Spans must be non-overlapping and cover meaningful content
## URT DOMAINS (Tier-3 codes: X#.##)
| Domain | Code | Description |
|--------|------|-------------|
| Offering | O1-O4 | Product/service quality, features, variety |
| Price | P1-P4 | Value, pricing, promotions, payment |
| Journey | J1-J4 | Timing, process, convenience, accessibility |
| Environment | E1-E4 | Physical space, ambiance, cleanliness, digital UX |
| Attitude | A1-A4 | Staff behavior, helpfulness, professionalism |
| Voice | V1-V4 | Brand, communication, marketing, transparency |
| Relationship | R1-R4 | Loyalty, trust, consistency, personalization |
## DIMENSION CODES
### Valence
- V+ : Positive sentiment
- V- : Negative sentiment
- V0 : Neutral/factual
- V± : Mixed within the span
### Intensity
- I1 : Low ("okay", "fine", "decent")
- I2 : Moderate ("good", "bad", "slow")
- I3 : High ("amazing", "terrible", "unacceptable")
### Specificity
- S1 : Vague ("it was bad")
- S2 : Some detail ("the food was cold")
- S3 : Precise ("waited 45 minutes for appetizers")
### Actionability
- A1 : No clear action possible
- A2 : Possible actions, unclear which
- A3 : Clear, specific action ("train staff on X", "fix Y")
### Temporal
- TC : Current visit (default when no markers)
- TR : Recent pattern ("lately", "recently", "again")
- TH : Historical ("for years", "always", "used to")
- TF : Future ("won't return", "next time", "I expect")
### Evidence
- ES : Stated explicitly in text (default)
- EI : Inferred logically (not stated, but entailed)
- EC : Contextual (depends on surrounding text)
### Comparative
- CR-N : No comparison (default)
- CR-B : Better than alternatives
- CR-W : Worse than alternatives
- CR-S : Same as alternatives
## PRIMARY SPAN SELECTION
Mark exactly ONE span as is_primary=true using this order:
1. Highest intensity (I3 > I2 > I1)
2. Tie-break: negative over positive (V- > V± > V0 > V+)
3. Tie-break: earliest span_index
## USN (URT String Notation)
Generate a USN string for each span:
```
URT:S:{primary}[+{sec1}][+{sec2}]:{valence_sign}{intensity_num}:{S#}{A#}{temporal}.{evidence}.{CR_suffix}
```
Examples:
- `URT:S:J1.03:-2:22TC.ES.N` (J1.03, V-, I2, S2, A2, TC, ES, CR-N)
- `URT:S:P1.01+O2.03:+3:33TR.ES.B` (P1.01 primary, O2.03 secondary, V+, I3, S3, A3, TR, ES, CR-B)
Valence encoding: + for V+, - for V-, 0 for V0, ± for V±
CR suffix: N=CR-N, B=CR-B, W=CR-W, S=CR-S
## OUTPUT FORMAT
Return valid JSON matching this schema. No markdown, no explanations.
{
"spans": [
{
"span_index": 0,
"span_text": "exact text from review",
"span_start": 0,
"span_end": 25,
"urt_primary": "O1.01",
"urt_secondary": [],
"valence": "V+",
"intensity": "I2",
"specificity": "S2",
"actionability": "A1",
"temporal": "TC",
"evidence": "ES",
"comparative": "CR-N",
"is_primary": true,
"confidence": "high",
"entity": null,
"entity_type": null,
"relation_type": null,
"related_span_index": null,
"usn": "URT:S:O1.01:+2:21TC.ES.N"
}
],
"review_summary": {
"dominant_valence": "V+",
"dominant_domain": "O",
"span_count": 1,
"has_comparative": false,
"has_entity": false
}
}"""
class LLMClientBase(ABC):
"""Abstract base class for LLM clients."""
def __init__(self, config: Config):
self.config = config
self.total_tokens_used = 0
self.total_cost_usd = 0.0
@abstractmethod
async def classify(
self,
review_text: str,
profile: str = "standard",
) -> tuple[LLMClassificationResponse, dict[str, Any]]:
"""
Classify a review and extract spans.
Args:
review_text: The review text to classify
profile: Classification profile (lite/core/standard/full)
Returns:
Tuple of (classification response, metadata dict with tokens/cost)
"""
pass
@abstractmethod
async def close(self) -> None:
"""Close the client and cleanup resources."""
pass
class OpenAIClient(LLMClientBase):
"""OpenAI LLM client implementation."""
# Pricing per 1M tokens (as of 2024)
PRICING = {
"gpt-4o": {"input": 5.0, "output": 15.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.0, "output": 30.0},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}
def __init__(self, config: Config):
super().__init__(config)
from openai import AsyncOpenAI
self.client = AsyncOpenAI(api_key=config.get_llm_api_key())
self.model = config.llm_model
async def classify(
self,
review_text: str,
profile: str = "standard",
) -> tuple[LLMClassificationResponse, dict[str, Any]]:
"""Classify using OpenAI."""
start_time = time.time()
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{
"role": "user",
"content": f'Classify this review:\n\n"{review_text}"',
},
]
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.config.llm_temperature,
response_format={"type": "json_object"},
timeout=self.config.llm_timeout_seconds,
)
# Parse response
content = response.choices[0].message.content
if not content:
raise ValueError("Empty response from OpenAI")
result = json.loads(content)
# Calculate costs
input_tokens = response.usage.prompt_tokens if response.usage else 0
output_tokens = response.usage.completion_tokens if response.usage else 0
total_tokens = input_tokens + output_tokens
pricing = self.PRICING.get(self.model, {"input": 0.15, "output": 0.60})
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
self.total_tokens_used += total_tokens
self.total_cost_usd += cost
metadata = {
"model": self.model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"cost_usd": cost,
"latency_ms": int((time.time() - start_time) * 1000),
}
return result, metadata
async def close(self) -> None:
"""Close the OpenAI client."""
await self.client.close()
class AnthropicClient(LLMClientBase):
"""Anthropic LLM client implementation."""
# Pricing per 1M tokens (as of 2024)
PRICING = {
"claude-3-opus-20240229": {"input": 15.0, "output": 75.0},
"claude-3-sonnet-20240229": {"input": 3.0, "output": 15.0},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
"claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0},
}
def __init__(self, config: Config):
super().__init__(config)
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic(api_key=config.get_llm_api_key())
self.model = config.llm_model
async def classify(
self,
review_text: str,
profile: str = "standard",
) -> tuple[LLMClassificationResponse, dict[str, Any]]:
"""Classify using Anthropic."""
start_time = time.time()
response = await self.client.messages.create(
model=self.model,
max_tokens=4096,
system=SYSTEM_PROMPT,
messages=[
{
"role": "user",
"content": f'Classify this review and return JSON only:\n\n"{review_text}"',
},
],
temperature=self.config.llm_temperature,
)
# Parse response
content = response.content[0].text if response.content else ""
if not content:
raise ValueError("Empty response from Anthropic")
# Try to extract JSON from response
result = self._extract_json(content)
# Calculate costs
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
total_tokens = input_tokens + output_tokens
pricing = self.PRICING.get(self.model, {"input": 3.0, "output": 15.0})
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
self.total_tokens_used += total_tokens
self.total_cost_usd += cost
metadata = {
"model": self.model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"cost_usd": cost,
"latency_ms": int((time.time() - start_time) * 1000),
}
return result, metadata
def _extract_json(self, content: str) -> dict[str, Any]:
"""Extract JSON from response, handling markdown code blocks."""
content = content.strip()
# Try direct parse first
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# Try to find JSON in code blocks
import re
json_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content)
if json_match:
return json.loads(json_match.group(1))
# Try to find JSON object
json_match = re.search(r"\{[\s\S]*\}", content)
if json_match:
return json.loads(json_match.group(0))
raise ValueError(f"Could not extract JSON from response: {content[:200]}")
async def close(self) -> None:
"""Close the Anthropic client."""
await self.client.close()
class LLMClient:
"""
Factory class for LLM clients.
Usage:
client = LLMClient.create(config)
result, metadata = await client.classify(review_text)
await client.close()
"""
@staticmethod
def create(config: Config) -> LLMClientBase:
"""
Create an LLM client based on configuration.
Args:
config: Pipeline configuration
Returns:
LLM client instance (OpenAI or Anthropic)
"""
if config.llm_provider == "openai":
return OpenAIClient(config)
elif config.llm_provider == "anthropic":
return AnthropicClient(config)
else:
raise ValueError(f"Unsupported LLM provider: {config.llm_provider}")
def create_fallback_response(review_text: str) -> LLMClassificationResponse:
"""
Create a fallback classification response when LLM fails.
Args:
review_text: Original review text
Returns:
Minimal valid classification response
"""
return {
"spans": [
{
"span_index": 0,
"span_text": review_text,
"span_start": 0,
"span_end": len(review_text),
"urt_primary": "O1.01",
"urt_secondary": [],
"valence": "V0",
"intensity": "I1",
"specificity": "S1",
"actionability": "A1",
"temporal": "TC",
"evidence": "ES",
"comparative": "CR-N",
"is_primary": True,
"confidence": "low",
"entity": None,
"entity_type": None,
"relation_type": None,
"related_span_index": None,
"usn": "URT:S:O1.01:01:11TC.ES.N",
}
],
"review_summary": {
"dominant_valence": "V0",
"dominant_domain": "O",
"span_count": 1,
"has_comparative": False,
"has_entity": False,
},
}

View File

@@ -0,0 +1,262 @@
"""Text processing utilities for normalization."""
from __future__ import annotations
import hashlib
import logging
import re
import unicodedata
from typing import NamedTuple
logger = logging.getLogger(__name__)
class NormalizationResult(NamedTuple):
"""Result of text normalization."""
normalized: str
language: str
word_count: int
char_count: int
class TextProcessor:
"""Service for text normalization and processing."""
# Common emoji ranges
EMOJI_PATTERN = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags
"\U00002702-\U000027B0" # dingbats
"\U000024C2-\U0001F251" # enclosed characters
"]+",
flags=re.UNICODE,
)
# Control characters (except newlines and tabs we want to normalize)
CONTROL_CHAR_PATTERN = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]")
# Multiple whitespace
MULTI_WHITESPACE_PATTERN = re.compile(r"\s+")
# URL pattern
URL_PATTERN = re.compile(
r"https?://[^\s<>\"{}|\\^`\[\]]+|www\.[^\s<>\"{}|\\^`\[\]]+"
)
def __init__(self):
self._langdetect_available = False
try:
from langdetect import detect, DetectorFactory
# Make detection deterministic
DetectorFactory.seed = 0
self._langdetect_available = True
except ImportError:
logger.warning("langdetect not available, defaulting to 'en' for all text")
def normalize(self, text: str) -> NormalizationResult:
"""
Normalize text for classification.
Steps:
1. Remove control characters
2. Normalize Unicode (NFC)
3. Lowercase
4. Normalize whitespace (collapse multiple spaces, trim)
5. Standardize emoji (keep but normalize)
6. Detect language
Args:
text: Original review text
Returns:
NormalizationResult with normalized text and metadata
"""
if not text:
return NormalizationResult(
normalized="",
language="en",
word_count=0,
char_count=0,
)
# Step 1: Remove control characters
normalized = self.CONTROL_CHAR_PATTERN.sub("", text)
# Step 2: Unicode normalization (NFC - composed form)
normalized = unicodedata.normalize("NFC", normalized)
# Step 3: Lowercase
normalized = normalized.lower()
# Step 4: Normalize whitespace
normalized = self.MULTI_WHITESPACE_PATTERN.sub(" ", normalized)
normalized = normalized.strip()
# Detect language on original text (before lowercasing can help)
language = self.detect_language(text)
# Calculate metrics
word_count = len(normalized.split()) if normalized else 0
char_count = len(normalized)
return NormalizationResult(
normalized=normalized,
language=language,
word_count=word_count,
char_count=char_count,
)
def detect_language(self, text: str) -> str:
"""
Detect the language of the text.
Args:
text: Text to analyze
Returns:
ISO 639-1 language code (e.g., 'en', 'es', 'fr')
"""
if not text or not self._langdetect_available:
return "en"
try:
from langdetect import detect
# Need reasonable length for detection
sample = text[:1000] if len(text) > 1000 else text
return detect(sample)
except Exception as e:
logger.debug(f"Language detection failed: {e}")
return "en"
def generate_content_hash(self, text_normalized: str) -> str:
"""
Generate a SHA256 hash of normalized text for deduplication.
Args:
text_normalized: Normalized text
Returns:
64-character hex string
"""
return hashlib.sha256(text_normalized.encode("utf-8")).hexdigest()
def has_control_characters(self, text: str) -> bool:
"""Check if text contains control characters."""
return bool(self.CONTROL_CHAR_PATTERN.search(text))
def extract_urls(self, text: str) -> list[str]:
"""Extract URLs from text."""
return self.URL_PATTERN.findall(text)
def count_emoji(self, text: str) -> int:
"""Count emoji in text."""
return len(self.EMOJI_PATTERN.findall(text))
def is_empty_or_trivial(self, text: str | None, min_chars: int = 3) -> bool:
"""
Check if text is empty or trivially short.
Args:
text: Text to check
min_chars: Minimum meaningful character count
Returns:
True if text should be skipped
"""
if not text:
return True
stripped = text.strip()
if not stripped:
return True
if len(stripped) < min_chars:
return True
return False
def clean_for_llm(self, text: str) -> str:
"""
Clean text for LLM input.
Similar to normalize but preserves case and some formatting
for better LLM understanding.
Args:
text: Original text
Returns:
Cleaned text suitable for LLM input
"""
if not text:
return ""
# Remove control characters
cleaned = self.CONTROL_CHAR_PATTERN.sub("", text)
# Unicode normalization
cleaned = unicodedata.normalize("NFC", cleaned)
# Normalize whitespace but preserve single newlines for paragraphs
cleaned = re.sub(r"[^\S\n]+", " ", cleaned) # Collapse horizontal space
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) # Max 2 consecutive newlines
cleaned = cleaned.strip()
return cleaned
def is_valid_iso639(code: str) -> bool:
"""
Check if a language code is a valid ISO 639-1 code.
Args:
code: Language code to validate
Returns:
True if valid ISO 639-1 code
"""
# Common ISO 639-1 codes (not exhaustive but covers most)
valid_codes = {
"aa", "ab", "ae", "af", "ak", "am", "an", "ar", "as", "av",
"ay", "az", "ba", "be", "bg", "bh", "bi", "bm", "bn", "bo",
"br", "bs", "ca", "ce", "ch", "co", "cr", "cs", "cu", "cv",
"cy", "da", "de", "dv", "dz", "ee", "el", "en", "eo", "es",
"et", "eu", "fa", "ff", "fi", "fj", "fo", "fr", "fy", "ga",
"gd", "gl", "gn", "gu", "gv", "ha", "he", "hi", "ho", "hr",
"ht", "hu", "hy", "hz", "ia", "id", "ie", "ig", "ii", "ik",
"io", "is", "it", "iu", "ja", "jv", "ka", "kg", "ki", "kj",
"kk", "kl", "km", "kn", "ko", "kr", "ks", "ku", "kv", "kw",
"ky", "la", "lb", "lg", "li", "ln", "lo", "lt", "lu", "lv",
"mg", "mh", "mi", "mk", "ml", "mn", "mr", "ms", "mt", "my",
"na", "nb", "nd", "ne", "ng", "nl", "nn", "no", "nr", "nv",
"ny", "oc", "oj", "om", "or", "os", "pa", "pi", "pl", "ps",
"pt", "qu", "rm", "rn", "ro", "ru", "rw", "sa", "sc", "sd",
"se", "sg", "si", "sk", "sl", "sm", "sn", "so", "sq", "sr",
"ss", "st", "su", "sv", "sw", "ta", "te", "tg", "th", "ti",
"tk", "tl", "tn", "to", "tr", "ts", "tt", "tw", "ty", "ug",
"uk", "ur", "uz", "ve", "vi", "vo", "wa", "wo", "xh", "yi",
"yo", "za", "zh", "zu",
}
return code.lower() in valid_codes
def is_valid_sha256(hash_str: str) -> bool:
"""
Check if a string is a valid SHA256 hex hash.
Args:
hash_str: Hash string to validate
Returns:
True if valid 64-character hex string
"""
if not hash_str or len(hash_str) != 64:
return False
try:
int(hash_str, 16)
return True
except ValueError:
return False