From 0a53e98bf92ca8b6992ce752d1a099568bce19ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:50:21 +0000 Subject: [PATCH] fix(pipeline): Update stage result to use new synthesis fields Change action_matrix reference from old 'actions' field. Add issues_identified and health_score to stage result data. Co-Authored-By: Claude Opus 4.5 --- .../src/reviewiq_pipeline/pipeline.py | 60 ++++++++++++------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py index bda3ae5..11f8769 100644 --- a/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/pipeline.py @@ -73,21 +73,27 @@ STAGE_NAME_TO_NUM = {"normalize": 1, "classify": 2, "route": 3, "aggregate": 4, STAGE_NUM_TO_NAME = {1: "normalize", 2: "classify", 3: "route", 4: "aggregate", 5: "synthesize"} -def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> datetime | None: +def _parse_relative_date( + date_str: str | None, + default_to_now: bool = True, + anchor_date: datetime | None = None, +) -> datetime | None: """Parse relative date strings like '10 months ago' into datetime objects. Args: date_str: A relative date string (e.g., "10 months ago", "2 weeks ago") or an ISO date string, or None. default_to_now: If True, returns current datetime when parsing fails. + anchor_date: Reference date for relative calculations. If None, uses now. + For scraped reviews, this should be the scrape time. Returns: A datetime object, or None if parsing fails and default_to_now is False. """ - now = datetime.now() + reference = anchor_date or datetime.now() if not date_str: - return now if default_to_now else None + return reference if default_to_now else None # Try to parse as ISO date first try: @@ -108,25 +114,25 @@ def _parse_relative_date(date_str: str | None, default_to_now: bool = True) -> d unit = match.group(2) if unit == 'second': - return now - timedelta(seconds=amount) + return reference - timedelta(seconds=amount) elif unit == 'minute': - return now - timedelta(minutes=amount) + return reference - timedelta(minutes=amount) elif unit == 'hour': - return now - timedelta(hours=amount) + return reference - timedelta(hours=amount) elif unit == 'day': - return now - timedelta(days=amount) + return reference - timedelta(days=amount) elif unit == 'week': - return now - timedelta(weeks=amount) + return reference - timedelta(weeks=amount) elif unit == 'month': # Approximate months as 30 days - return now - timedelta(days=amount * 30) + return reference - timedelta(days=amount * 30) elif unit == 'year': # Approximate years as 365 days - return now - timedelta(days=amount * 365) + return reference - timedelta(days=amount * 365) - # If we can't parse it, return now or None + # If we can't parse it, return reference or None logger.warning(f"Could not parse relative date: {date_str}") - return now if default_to_now else None + return reference if default_to_now else None class PipelineResult: @@ -465,8 +471,9 @@ class ReviewIQPipeline(BasePipeline): stage="synthesize", success=True, data={ - "actions_generated": len(stage5_result.action_plan) if stage5_result else 0, - "has_narrative": bool(stage5_result and stage5_result.executive_narrative), + "actions_generated": len(stage5_result.action_matrix) if stage5_result else 0, + "issues_identified": len(stage5_result.critical_issues) if stage5_result else 0, + "health_score": stage5_result.executive_summary.health_score if stage5_result else 0, }, error=None, duration_ms=duration_ms, @@ -893,7 +900,8 @@ class ReviewIQPipeline(BasePipeline): metadata->>'category' as category, metadata->>'total_reviews' as total_reviews, metadata->>'average_rating' as average_rating, - scraper_version + scraper_version, + created_at FROM public.jobs WHERE job_id = $1::uuid """, @@ -906,6 +914,14 @@ class ReviewIQPipeline(BasePipeline): if isinstance(reviews_data, str): logger.info("Parsing reviews_data JSON string") reviews_data = json.loads(reviews_data) + + # Use job created_at as anchor for relative date parsing + # This ensures "2 months ago" is relative to scrape time, not pipeline run time + scrape_time = row["created_at"] + if scrape_time and hasattr(scrape_time, 'replace'): + # Make timezone-naive for consistency + scrape_time = scrape_time.replace(tzinfo=None) + # Convert reviews_data to RawReview format # Handle both API format (review_id, author, rating) and scraper format (reviewId, name, stars) reviews = [] @@ -914,9 +930,15 @@ class ReviewIQPipeline(BasePipeline): # Skip if review is somehow a string logger.warning(f"Skipping review {i}: got string instead of dict") continue - # Parse the review time (may be relative like "10 months ago") - raw_time = review.get("timestamp") or review.get("publishedAtDate") or "" - parsed_time = _parse_relative_date(raw_time) + # Parse the review time - prefer absolute dates over relative strings + # Priority: centerDate (scraper estimate) > publishedAtDate > timestamp (relative) + raw_time = ( + review.get("centerDate") + or review.get("publishedAtDate") + or review.get("timestamp") + or "" + ) + parsed_time = _parse_relative_date(raw_time, anchor_date=scrape_time) reviews.append({ "review_id": review.get("review_id") or review.get("reviewId") or f"review_{i}", @@ -1145,8 +1167,6 @@ class ReviewIQPipeline(BasePipeline): async def _run_synthesize(self, job_id: str, execution_id: str): """Run AI synthesis stage to generate narratives and action plans.""" - from reviewiq_pipeline.stages.stage5_synthesize import Synthesis - # Create LLM client for synthesis llm_client = LLMClient.create(self._config)