fix(pipeline): Update stage result to use new synthesis fields

Change action_matrix reference from old 'actions' field.
Add issues_identified and health_score to stage result data.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-01-30 14:50:21 +00:00
parent d5ef13b58e
commit 0a53e98bf9

View File

@@ -73,21 +73,27 @@ STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4,
STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate", 5: "synthesize"}
def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> datetime | None:
def _parse_relative_date(
date_str: str | None,
default_to_now: bool = True,
anchor_date: datetime | None = None,
) -> datetime | None:
"""Parse relative date strings like '10 months ago' into datetime objects.
Args:
date_str: A relative date string (e.g., "10 months ago", "2 weeks ago")
or an ISO date string, or None.
default_to_now: If True, returns current datetime when parsing fails.
anchor_date: Reference date for relative calculations. If None, uses now.
For scraped reviews, this should be the scrape time.
Returns:
A datetime object, or None if parsing fails and default_to_now is False.
"""
now = datetime.now()
reference = anchor_date or datetime.now()
if not date_str:
return now if default_to_now else None
return reference if default_to_now else None
# Try to parse as ISO date first
try:
@@ -108,25 +114,25 @@ def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> d
unit = match.group(2)
if unit == 'second':
return now - timedelta(seconds=amount)
return reference - timedelta(seconds=amount)
elif unit == 'minute':
return now - timedelta(minutes=amount)
return reference - timedelta(minutes=amount)
elif unit == 'hour':
return now - timedelta(hours=amount)
return reference - timedelta(hours=amount)
elif unit == 'day':
return now - timedelta(days=amount)
return reference - timedelta(days=amount)
elif unit == 'week':
return now - timedelta(weeks=amount)
return reference - timedelta(weeks=amount)
elif unit == 'month':
# Approximate months as 30 days
return now - timedelta(days=amount * 30)
return reference - timedelta(days=amount * 30)
elif unit == 'year':
# Approximate years as 365 days
return now - timedelta(days=amount * 365)
return reference - timedelta(days=amount * 365)
# If we can't parse it, return now or None
# If we can't parse it, return reference or None
logger.warning(f"Could not parse relative date: {date_str}")
return now if default_to_now else None
return reference if default_to_now else None
class PipelineResult:
@@ -465,8 +471,9 @@ class ReviewIQPipeline(BasePipeline):
stage="synthesize",
success=True,
data={
"actions_generated": len(stage5_result.action_plan) if stage5_result else 0,
"has_narrative": bool(stage5_result and stage5_result.executive_narrative),
"actions_generated": len(stage5_result.action_matrix) if stage5_result else 0,
"issues_identified": len(stage5_result.critical_issues) if stage5_result else 0,
"health_score": stage5_result.executive_summary.health_score if stage5_result else 0,
},
error=None,
duration_ms=duration_ms,
@@ -893,7 +900,8 @@ class ReviewIQPipeline(BasePipeline):
metadata->>'category' as category,
metadata->>'total_reviews' as total_reviews,
metadata->>'average_rating' as average_rating,
scraper_version
scraper_version,
created_at
FROM public.jobs
WHERE job_id = $1::uuid
""",
@@ -906,6 +914,14 @@ class ReviewIQPipeline(BasePipeline):
if isinstance(reviews_data, str):
logger.info("Parsing reviews_data JSON string")
reviews_data = json.loads(reviews_data)
# Use job created_at as anchor for relative date parsing
# This ensures "2 months ago" is relative to scrape time, not pipeline run time
scrape_time = row["created_at"]
if scrape_time and hasattr(scrape_time, 'replace'):
# Make timezone-naive for consistency
scrape_time = scrape_time.replace(tzinfo=None)
# Convert reviews_data to RawReview format
# Handle both API format (review_id, author, rating) and scraper format (reviewId, name, stars)
reviews = []
@@ -914,9 +930,15 @@ class ReviewIQPipeline(BasePipeline):
# Skip if review is somehow a string
logger.warning(f"Skipping review {i}: got string instead of dict")
continue
# Parse the review time (may be relative like "10 months ago")
raw_time = review.get("timestamp") or review.get("publishedAtDate") or ""
parsed_time = _parse_relative_date(raw_time)
# Parse the review time - prefer absolute dates over relative strings
# Priority: centerDate (scraper estimate) > publishedAtDate > timestamp (relative)
raw_time = (
review.get("centerDate")
or review.get("publishedAtDate")
or review.get("timestamp")
or ""
)
parsed_time = _parse_relative_date(raw_time, anchor_date=scrape_time)
reviews.append({
"review_id": review.get("review_id") or review.get("reviewId") or f"review_{i}",
@@ -1145,8 +1167,6 @@ class ReviewIQPipeline(BasePipeline):
async def _run_synthesize(self, job_id: str, execution_id: str):
"""Run AI synthesis stage to generate narratives and action plans."""
from reviewiq_pipeline.stages.stage5_synthesize import Synthesis
# Create LLM client for synthesis
llm_client = LLMClient.create(self._config)