From e2d7f6f1186dbdf46c0071f5c06897ce90cba6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 24 Jan 2026 18:35:09 +0000 Subject: [PATCH] feat: Add ScraperV1Adapter and real data pipeline test - Add ScraperV1Adapter to transform scraped reviews into pipeline format - Handles relative timestamps (centerDate) - Generates deterministic IDs for DOM-sourced reviews - Filters out empty (rating-only) reviews - Add sample barbershop reviews (79 reviews, 46 with text) - Real data from Las Palmas barbershop - Multi-language: Spanish, English, German, Norwegian, Italian - Add test_pipeline_real_data.py for E2E testing with real data - Uses mock classifier based on keywords and rating - Full pipeline flow: raw -> enriched -> spans -> issues -> facts Test results with real data: - 46 reviews processed - 6 languages detected (es: 35, en: 7, de: 1, no: 1, it: 1, ca: 1) - 3 issues identified from negative reviews - 29 fact records aggregated across date range 2017-2025 Co-Authored-By: Claude Opus 4.5 --- data/samples/barbershop_reviews.json | 861 ++++++++++++++++++ .../reviewiq_pipeline/adapters/__init__.py | 5 + .../reviewiq_pipeline/adapters/scraper_v1.py | 242 +++++ tools/test_pipeline_real_data.py | 625 +++++++++++++ 4 files changed, 1733 insertions(+) create mode 100644 data/samples/barbershop_reviews.json create mode 100644 packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/__init__.py create mode 100644 packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/scraper_v1.py create mode 100644 tools/test_pipeline_real_data.py diff --git a/data/samples/barbershop_reviews.json b/data/samples/barbershop_reviews.json new file mode 100644 index 0000000..ad16611 --- /dev/null +++ b/data/samples/barbershop_reviews.json @@ -0,0 +1,861 @@ +[ + { + "text": "", + "author": "Andy Cabrera Ramírez", + "rating": 5, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT2toMWNrZDFZbGcxZVZKNGVtMWFaR2t0TTIxVE5FRRAB", + "timestamp": "a month ago", + "minDate": "2025-11-26T18:31:09.843Z", + "maxDate": "2025-12-25T18:31:09.843Z", + "centerDate": "2025-12-11T06:31:09.843Z" + }, + { + "text": "El mejor servicio, el mejor trato y profesionalidad.", + "author": "Patrick Scherschinski Luca de Tena", + "rating": 5, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT2pCRU5GcGFZVXN6Y0VWWlozUTJRMnN5V0MxZlIyYxAB", + "timestamp": "2 months ago", + "minDate": "2025-10-27T18:31:09.843Z", + "maxDate": "2025-11-25T18:31:09.843Z", + "centerDate": "2025-11-11T06:31:09.843Z" + }, + { + "text": "De profesionales nada.a mi me dejaron fatal.no es solo pedir disculpas,por lo menos me hubiesen ofrecido arreglarme el desastre que me hicieron.no vuelvo mas a esta peluqueria.", + "author": "Francisco javier Rodriguez limones", + "rating": 1, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT25WMVVHRjVhVWQ1Um04M1VHWklYMjU1Ym5wMFpGRRAB", + "timestamp": "2 months ago", + "minDate": "2025-10-27T18:31:09.843Z", + "maxDate": "2025-11-25T18:31:09.843Z", + "centerDate": "2025-11-11T06:31:09.843Z" + }, + { + "text": "La primera vez que boy a esta peluqueria porque me la recomendaron y me dejaron fatal.", + "author": "ECEM HOGAR", + "rating": 1, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT25OTVFpMWxMV3BYYVVWZmVIWjBNbFZhWkdwaFJtYxAB", + "timestamp": "Edited 2 months ago", + "minDate": "2025-10-27T18:31:09.843Z", + "maxDate": "2025-11-25T18:31:09.843Z", + "centerDate": "2025-11-11T06:31:09.843Z" + }, + { + "text": "", + "author": "Adrián Santana García", + "rating": 5, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT21GR2NIUjFRbUk0WWs1Nk1Yb3lXWGRpYkVSVGNrRRAB", + "timestamp": "3 months ago", + "minDate": "2025-09-27T18:31:09.843Z", + "maxDate": "2025-10-26T18:31:09.843Z", + "centerDate": "2025-10-12T06:31:09.843Z" + }, + { + "text": "Rafa es mi nuevo referente en el sector. Atención profesional y dedicada por alguien con muchísima experiencia en este campo. Cortes modernos o clásicos, barba perfecta. 10/10", + "author": "Alex Q.", + "rating": 5, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT214TGMwRnhlR3BMZWxGUFJqTldkbEY2V0ZocmJGRRAB", + "timestamp": "3 months ago", + "minDate": "2025-09-27T18:31:09.843Z", + "maxDate": "2025-10-26T18:31:09.843Z", + "centerDate": "2025-10-12T06:31:09.843Z" + }, + { + "text": "Super mala la atención! No tienen buenos modales y mal educados en su totalidad jamas volveria! Quieres pasarla mal este es el sitio indicado", + "author": "SoyWilliams", + "rating": 1, + "source": "api", + "review_id": "Ci9DQUlRQUNvZENodHljRjlvT2xOTGVEaEZjV2R4Y2toMGF5MVVXUzFSYkdRMk1XYxAB", + "timestamp": "5 months ago", + "minDate": "2025-07-29T18:31:09.843Z", + "maxDate": "2025-08-27T18:31:09.843Z", + "centerDate": "2025-08-13T06:31:09.843Z" + }, + { + "text": "Se trata de una peluquería excepcional; profesionalidad y buen hacer en cada corte de pelo, no importa si te gusta más clásico o más moderno, siempre se da buen resultado.", + "author": "Quique Con Q", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VMTE1oZG51MktiX3dnRRAB", + "timestamp": "7 months ago", + "minDate": "2025-05-30T18:31:09.843Z", + "maxDate": "2025-06-28T18:31:09.843Z", + "centerDate": "2025-06-14T06:31:09.843Z" + }, + { + "text": "Una Peluquería Profesional. Los Hermanos Fleitas Nunca Fallan.\nSiempre Obran El Milagro.\nBuenas Tertulias De Lo Divino y De Lo Humano.", + "author": "JOSE MANUEL MORENO CASTAÑO", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURKb05ITWtnRRAB", + "timestamp": "Edited 8 months ago", + "minDate": "2025-04-30T18:31:09.843Z", + "maxDate": "2025-05-29T18:31:09.843Z", + "centerDate": "2025-05-15T06:31:09.843Z" + }, + { + "text": "Increíble trato, excelente servicio y muy rapido", + "author": "Jaime Jesús García Mendoza", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnTURJOUpyY3J3RRAB", + "timestamp": "9 months ago", + "minDate": "2025-03-31T18:31:09.843Z", + "maxDate": "2025-04-29T18:31:09.843Z", + "centerDate": "2025-04-15T06:31:09.843Z" + }, + { + "text": "Fui por las reseñas de otros usuarios y estoy completamente de acuerdo, muy buen hacer… corte de pelo a tijera, nada de máquinas y rapados, he quedado muy satisfecho con mi corte de pelo, por supuesto volveré y lo recomiendo a todos ☺️", + "author": "Jose Gerardo", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNyM29MSU9nEAE", + "timestamp": "Edited 10 months ago", + "minDate": "2025-03-01T18:31:09.843Z", + "maxDate": "2025-03-30T18:31:09.843Z", + "centerDate": "2025-03-16T06:31:09.843Z" + }, + { + "text": "Super Friseur, schönes Ambiente. Ich habe mir Haare und Bart schneiden lassen – beides TOP. Kann ich zu 100% Empfehlen! Ich komme definitiv wieder.", + "author": "B F", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnTUNRdDd5QVBnEAE", + "timestamp": "10 months ago", + "minDate": "2025-03-01T18:31:09.843Z", + "maxDate": "2025-03-30T18:31:09.843Z", + "centerDate": "2025-03-16T06:31:09.843Z" + }, + { + "text": "", + "author": "cristopher munizaga", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnTURBbkphVjdBRRAB", + "timestamp": "11 months ago", + "minDate": "2025-01-30T18:31:09.843Z", + "maxDate": "2025-02-28T18:31:09.843Z", + "centerDate": "2025-02-14T06:31:09.843Z" + }, + { + "text": "Profesjonell, bra pris, hyggelig! Anbefales på det sterkeste ✂️", + "author": "Stian Øvstebø", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURfbm8yLVR3EAE", + "timestamp": "a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "El trato de Rafael es inmejorable siempre con una sonrisa y buena predisposición. Muy buen barbero, siempre dispuesto a recomendarte o mejorar la idea que tengas en mente. He ido a muchas barberías pero como esta ninguna. Gracias Rafael", + "author": "David Rodríguez", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURleG9ENnJRRRAB", + "timestamp": "Edited a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "", + "author": "Derians Jose Diaz", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNyb2FhcUxnEAE", + "timestamp": "a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "Profesionales de trato muy amable y cercano.\nSin duda alguna la mejor barbería de la zona.", + "author": "Daniel PM", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUR4MU0zd3h3RRAB", + "timestamp": "Edited a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "El mejor sitio de Gran Canaria para pelarse sin duda, grandes profesionales.", + "author": "Juan Mora", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUN6akxieFFBEAE", + "timestamp": "a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "professionale! precisione! brava! molto consigliato!\n\nprofessional! accurate! well done! highly recommended!\n\n¡profesional! ¡preciso! ¡bien hecho! ¡muy recomendable!", + "author": "Alessandro “L” Sandri", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURONy1QeEJBEAE", + "timestamp": "a year ago", + "minDate": "2024-01-26T18:31:09.843Z", + "maxDate": "2025-01-24T18:31:09.843Z", + "centerDate": "2024-07-26T18:31:09.843Z" + }, + { + "text": "", + "author": "Adrian Nuez Sosa", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUMxeHRQRHZ3RRAB", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Ancor Santana Martín", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Excellent service from a professional who loves his job.", + "author": "Pepe", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Spectacular, art is what it has", + "author": "Cristobal Ceballos vega", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Professional, fast. Good price! Recommended.", + "author": "Flavio", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "I came here after getting a haircut elsewhere that I wasn't happy with. They did an amazing job fixing it and refused to take my money when I tried to pay. Complete class act, would highly recommend and will be back when I stay in Las Palmas again", + "author": "Ian Hannon", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Monica Campos Guerra", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Alfredo Rodriguez", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Jesus Tanausu Gonzalez Gonzalez", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "I will definitely be back again. Raul is amazing.", + "author": "Paul Bechtold", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Pablo Yanez Ortega", + "rating": 5, + "source": "dom", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Muy buena peluquería, trato de 10 te hacen sentir como en casa desde que entras por la puerta", + "author": "Alejandro Tavio", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUR4enJPbjR3RRAB", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Judit Nolasco", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR4bHB1S0ZnEAE", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Manuel Moranchel", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR4LU1hZEl3EAE", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Mejor barber en Las Palmas. No veo la hora de volver de Croacia para cortarme el pelo.\nRecomiendo👍", + "author": "Dražen Zavoreo", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR4Z05YMUVREAE", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Rui Rego Soares", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUN4azllVHJnRRAB", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Marc Sherwood", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUN4eU16b0p3EAE", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Malísimo", + "author": "Iker Gomez", + "rating": 1, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNSeXVHeFhBEAE", + "timestamp": "2 years ago", + "minDate": "2023-01-26T18:31:09.843Z", + "maxDate": "2024-01-25T18:31:09.843Z", + "centerDate": "2023-07-27T18:31:09.843Z" + }, + { + "text": "Peluquería de 10: me corté el pelo con Raúl y tuve todo el rato la sensación de estar con un gran profesional: minucioso, ocupado en conseguir lo que le pedí y un trato excelente. Y el precio desde luego es bajo por tan buen servicio. TOP", + "author": "Salvador Bosch", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNCb2VXaXRnRRAB", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Buen servicio", + "author": "Noelia Lopez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNCZ2NDWmJREAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "La primera vez que vengo, soy de Barcelona. Me han tratado muy bien, el corte muy fino muy bien 👌🏽 ambiente muy amistoso y agradable, precio recomiendo!", + "author": "Bohdan Savchenko", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUQtcWRlbnV3RRAB", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "ANGEL RODRIGUEZ", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUMtXzlIakpBEAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "La verdad es que estoy muy contento con el equipo de hermanos de esta peluquería. Están cualificados, responden cualquier duda y te tratan de forma muy profesional. Además no te intentan vender nada, simplemente te recomiendan productos para la mejora de tu salud capilar. Volveré encantado.", + "author": "Adrián Rodríguez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUMtNDU2UWVBEAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "La mejor barbería de la zona de guanarteme", + "author": "C Cdrs", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUMteVotMmpRRRAB", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Marco Santana Alonso", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNldktXVUlBEAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Profesional por Excelencia !!!\ny ciertamente gran conversador ,recomendable !!", + "author": "JOSÈ RODRIGUEZ", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR1bm96QmVREAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Fui de casualidad, por que no resido en las Palmas, y fue un acierto.\nMuy buen trato, y muy profesionales.\nSi viviera aquí, sin duda ya tendría peluquería donde ir.", + "author": "Daniel P. G.", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUR1eEtTSDR3RRAB", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Increíbles, atencion, trabajo perfecto, unos crack, Rafitaaaaa Un artista con esas manos te deja perfecto", + "author": "Eduardo Rodriguez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUN1X1B2MFpBEAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Sara Elizondo Santana", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUQyaE9fSkJnEAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Idafen Santana Pérez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURXMWZQeGNREAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Desde la primera vez que vine a R. Feitas peluquería no me he arrepentido de estar ausente ahí cuando quiero tener un corte preciso, bonito y elegante, de la mano de Rafa. Me ha entregado un servicio y atención excepcional, digno de volver. Desde que entras hasta que sales puedes hablar y disfrutar de él humor, carisma o sentido del peluquero que te esté tratando. He aprendido mucho sentado en una silla junto a Rafa y he disfrutado de muchos momentos alegres. Y le sumas el resultado del arte que tiene Rafa ñara dejarte estilisticamente el pelo, como dije antes, \"preciso y bonito\". Recordarles. ⭐️⭐️⭐️⭐️⭐️", + "author": "Eli Lopez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNXOHNTTFp3EAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Leonardo Romeo", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURtM3NqSGNREAE", + "timestamp": "3 years ago", + "minDate": "2022-01-26T18:31:09.843Z", + "maxDate": "2023-01-25T18:31:09.843Z", + "centerDate": "2022-07-27T18:31:09.843Z" + }, + { + "text": "Good masters working there! David is simple a rockstar 🙌", + "author": "Ievgen Chernenko", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNtelp6NUFREAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "Great service", + "author": "Markus Kramer", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNtaExqRld3EAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Rubén Cabrera", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUQ2N19ha1JREAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Jesus Rb", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNhNW82ejRRRRAB", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "jose manuel caamaño pais", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNxaTV1RzlnRRAB", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "Buen trato", + "author": "Jordi Antonell Bladé", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNLemZET2xBRRAB", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "Te hacen sentir muy cómodo y buenos profesionales", + "author": "Ignacio Santana Rodríguez", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURTdXRyNTZnRRAB", + "timestamp": "Edited 4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Ayoze Lopez ROMERO", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNLMElfbTBnRRAB", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Abel Redondo Santana", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR5aHUzQ1VnEAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Jonay Fuentes", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR5dXRpYkxnEAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "BLAS MANUEL GARCIA PEREZ", + "rating": 4, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUR5X01YTGRREAE", + "timestamp": "4 years ago", + "minDate": "2021-01-26T18:31:09.843Z", + "maxDate": "2022-01-25T18:31:09.843Z", + "centerDate": "2021-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Santiago Ruiz", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNTa05hTUZREAE", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "Rafael. Muy buen barbero. Profesional y buen talante.", + "author": "The Garden Lanzarote. Profesionales de jardinería.", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURpM09UVWhBRRAB", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "La mejor peluquería de la ciudad 👌🏽", + "author": "Adrián Pérez Roger", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNpbXZlbl9RRRAB", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "No te puedes morir sin ir antes 💈✨", + "author": "Camilo", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURDdktLalVnEAE", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "Pequeño ,pero acojedor, buen profecional y muy cercano al cliente , Rafa eres un encanto como profecional y como persona, gracias nos volveremos ha ver,", + "author": "Paco Hornero", + "rating": 4, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUM4OHFTQ05nEAE", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "Muy amables, profesional, cliente para toda la vida.", + "author": "Itaca", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURjbF9pWnJnRRAB", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "Esta peluquería es la mejor en la que he estado,simplemente tienen un servicio impecable,sus trabajadores tienen una profesionalidad del 100 %, magnífico.", + "author": "Guillermo Cedrés", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURzcnBmZTlRRRAB", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "", + "author": "Gabriel R. Castillo", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUNzcS1iazhRRRAB", + "timestamp": "5 years ago", + "minDate": "2020-01-27T18:31:09.843Z", + "maxDate": "2021-01-25T18:31:09.843Z", + "centerDate": "2020-07-27T18:31:09.843Z" + }, + { + "text": "Poco que decir de este gran nuevo negocio llevado por una gente excepcional. Un trato muy a la altura de los gustos más exigentes. Y una calidad de trabajo que en pocos lugares de Las Palmas se podrá encontrar", + "author": "Nestor Lobato", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSUQwOU9HQzJnRRAB", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "", + "author": "alberto hernandez rodriguez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUMwaVBYWENREAE", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "", + "author": "Airam Sánchez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURVcTVhSUpBEAE", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "Sin duda la mejor barbería de Las Palmas. Llevo arreglandome la barba unos 3 años y Rafael tiene unas manos de oro. Muy profesional y un trato exquisito. Comentar que usa unos productos de primera categoría. Un acierto haber ido hace tiempo a probar y quedarme como cliente para siempre. Un saludo Rafa!!!", + "author": "Ero Martinez Garcia", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURVbnJTazd3RRAB", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "", + "author": "Alfonzo Arteaga", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNZb3R6aFZnEAE", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "Peluqueros muy trabajadore", + "author": "FernanGamer HDModzz", + "rating": 5, + "source": "api", + "review_id": "ChdDSUhNMG9nS0VJQ0FnSURvbnEtTnF3RRAB", + "timestamp": "6 years ago", + "minDate": "2019-01-27T18:31:09.843Z", + "maxDate": "2020-01-26T18:31:09.843Z", + "centerDate": "2019-07-28T18:31:09.843Z" + }, + { + "text": "", + "author": "Goretti Cabrera Suárez", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUNJcklYV0p3EAE", + "timestamp": "7 years ago", + "minDate": "2018-01-27T18:31:09.843Z", + "maxDate": "2019-01-26T18:31:09.843Z", + "centerDate": "2018-07-28T18:31:09.843Z" + }, + { + "text": "", + "author": "Jose Ignacio Zaballos", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSURBeXR2b0tnEAE", + "timestamp": "7 years ago", + "minDate": "2018-01-27T18:31:09.843Z", + "maxDate": "2019-01-26T18:31:09.843Z", + "centerDate": "2018-07-28T18:31:09.843Z" + }, + { + "text": "Probablemente el mejor barbero / peluquero de la ciudad. Exquisito en el trato con los clientes, minucioso y perfeccionista en sus trabajos. Recomendable 100% pedir cita previa dada la demanda.", + "author": "Daniel Medina Claesson", + "rating": 5, + "source": "api", + "review_id": "ChZDSUhNMG9nS0VJQ0FnSUN3MXE2dVFBEAE", + "timestamp": "8 years ago", + "minDate": "2017-01-27T18:31:09.843Z", + "maxDate": "2018-01-26T18:31:09.843Z", + "centerDate": "2017-07-28T18:31:09.843Z" + } +] \ No newline at end of file diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/__init__.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/__init__.py new file mode 100644 index 0000000..eea571f --- /dev/null +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/__init__.py @@ -0,0 +1,5 @@ +"""Ingestion adapters for various review data formats.""" + +from reviewiq_pipeline.adapters.scraper_v1 import ScraperV1Adapter + +__all__ = ["ScraperV1Adapter"] diff --git a/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/scraper_v1.py b/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/scraper_v1.py new file mode 100644 index 0000000..5bcd66d --- /dev/null +++ b/packages/reviewiq-pipeline/src/reviewiq_pipeline/adapters/scraper_v1.py @@ -0,0 +1,242 @@ +""" +Adapter for Scraper V1 output format. + +This adapter transforms the raw scraped review format into the pipeline's +expected RawReview format for Stage 1 processing. + +Input format (from scraper): +{ + "text": "Review text...", + "author": "Author Name", + "rating": 5, + "source": "api" | "dom", + "review_id": "ABC123...", # Optional for DOM-sourced + "timestamp": "2 months ago", + "minDate": "2025-10-27T18:31:09.843Z", + "maxDate": "2025-11-25T18:31:09.843Z", + "centerDate": "2025-11-11T06:31:09.843Z" +} + +Output format (for pipeline): +{ + "review_id": "ABC123...", + "text": "Review text...", + "rating": 5, + "author_name": "Author Name", + "author_id": None, + "review_time": "2025-11-11T06:31:09.843Z", + "relative_time": "2 months ago", + "raw_payload": {...} # Original data +} +""" + +from __future__ import annotations + +import hashlib +import logging +from datetime import datetime, timezone +from typing import Any +from uuid import uuid4 + +logger = logging.getLogger(__name__) + + +class ScraperV1Adapter: + """ + Adapter to transform Scraper V1 output into pipeline-compatible format. + + The scraper produces reviews with relative timestamps ("2 months ago") and + estimated date ranges. This adapter normalizes them into absolute timestamps + using the centerDate field. + """ + + def __init__( + self, + business_id: str, + place_id: str, + source: str = "google", + ): + """ + Initialize the adapter. + + Args: + business_id: Business identifier for the reviews + place_id: Google Maps place ID + source: Review source platform (default: "google") + """ + self.business_id = business_id + self.place_id = place_id + self.source = source + self._seen_ids: set[str] = set() + + def transform(self, scraped_reviews: list[dict[str, Any]]) -> list[dict[str, Any]]: + """ + Transform a list of scraped reviews into pipeline format. + + Args: + scraped_reviews: List of reviews from scraper output + + Returns: + List of reviews in pipeline RawReview format + """ + transformed = [] + skipped_empty = 0 + skipped_duplicate = 0 + + for review in scraped_reviews: + result = self.transform_single(review) + + if result is None: + skipped_empty += 1 + continue + + if result["review_id"] in self._seen_ids: + skipped_duplicate += 1 + continue + + self._seen_ids.add(result["review_id"]) + transformed.append(result) + + logger.info( + f"Transformed {len(transformed)} reviews " + f"(skipped {skipped_empty} empty, {skipped_duplicate} duplicates)" + ) + return transformed + + def transform_single(self, review: dict[str, Any]) -> dict[str, Any] | None: + """ + Transform a single scraped review into pipeline format. + + Args: + review: Single review from scraper output + + Returns: + Review in pipeline RawReview format, or None if should be skipped + """ + text = (review.get("text") or "").strip() + + # Skip empty reviews (rating-only) + if not text: + return None + + # Generate review_id if missing (DOM-sourced reviews) + review_id = review.get("review_id") + if not review_id: + # Generate deterministic ID from content + content_hash = hashlib.sha256( + f"{review.get('author', '')}:{text}:{review.get('rating', 0)}".encode() + ).hexdigest()[:16] + review_id = f"DOM-{content_hash}" + + # Parse review time from centerDate (best estimate) + review_time = self._parse_review_time(review) + + return { + "review_id": review_id, + "text": text, + "rating": review.get("rating", 5), + "author_name": review.get("author", "Anonymous"), + "author_id": None, # Not available in scraper output + "review_time": review_time, + "relative_time": review.get("timestamp", ""), + "raw_payload": review, # Preserve original data + } + + def _parse_review_time(self, review: dict[str, Any]) -> str: + """ + Parse the review time from available date fields. + + Priority: + 1. centerDate (best estimate from relative time) + 2. minDate (earliest possible date) + 3. Current time (fallback) + + Args: + review: Review data with date fields + + Returns: + ISO 8601 timestamp string + """ + # Try centerDate first (most accurate estimate) + center_date = review.get("centerDate") + if center_date: + try: + # Parse and re-format to ensure consistency + dt = datetime.fromisoformat(center_date.replace("Z", "+00:00")) + return dt.isoformat() + except (ValueError, TypeError): + pass + + # Try minDate as fallback + min_date = review.get("minDate") + if min_date: + try: + dt = datetime.fromisoformat(min_date.replace("Z", "+00:00")) + return dt.isoformat() + except (ValueError, TypeError): + pass + + # Final fallback: current time + return datetime.now(timezone.utc).isoformat() + + def to_scraper_output( + self, + scraped_reviews: list[dict[str, Any]], + job_id: str | None = None, + ) -> dict[str, Any]: + """ + Create a full scraper output envelope for the pipeline. + + This creates the complete structure expected by Pipeline.process(). + + Args: + scraped_reviews: List of reviews from scraper + job_id: Optional job ID (generates UUID if not provided) + + Returns: + Complete scraper output dict for pipeline ingestion + """ + if job_id is None: + job_id = str(uuid4()) + + transformed = self.transform(scraped_reviews) + + return { + "job_id": job_id, + "status": "completed", + "business_id": self.business_id, + "place_id": self.place_id, + "business_info": { + "name": self.business_id, + "place_id": self.place_id, + }, + "reviews": transformed, + "scrape_time_ms": 0, + "reviews_scraped": len(transformed), + "scraper_version": "v1.0.0", + } + + +def load_and_transform( + file_path: str, + business_id: str, + place_id: str, +) -> dict[str, Any]: + """ + Convenience function to load a JSON file and transform it. + + Args: + file_path: Path to JSON file with scraped reviews + business_id: Business identifier + place_id: Google Maps place ID + + Returns: + Complete scraper output dict for pipeline ingestion + """ + import json + from pathlib import Path + + data = json.loads(Path(file_path).read_text()) + + adapter = ScraperV1Adapter(business_id, place_id) + return adapter.to_scraper_output(data) diff --git a/tools/test_pipeline_real_data.py b/tools/test_pipeline_real_data.py new file mode 100644 index 0000000..53bc323 --- /dev/null +++ b/tools/test_pipeline_real_data.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 +""" +Test the pipeline with real scraped review data. + +Usage: + python tools/test_pipeline_real_data.py --database-url $DATABASE_URL + +This test: +1. Loads sample barbershop reviews from data/samples/ +2. Uses ScraperV1Adapter to transform the data +3. Runs the full pipeline (Stage 1-4) with mocked LLM +4. Verifies data in all tables +5. Shows summary statistics +""" + +import asyncio +import os +import sys +import argparse +import json +import logging +from datetime import datetime, timezone +from uuid import uuid4 +from pathlib import Path + +# Add project root and package to path +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) +sys.path.insert(0, str(PROJECT_ROOT / "packages/reviewiq-pipeline/src")) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +async def run_pipeline_test(database_url: str, sample_file: str, cleanup: bool = True): + """Run the pipeline test with real data.""" + import asyncpg + from reviewiq_pipeline.adapters.scraper_v1 import ScraperV1Adapter, load_and_transform + from reviewiq_pipeline.config import Config + from reviewiq_pipeline.stages.stage1_normalize import Stage1Normalizer + from reviewiq_pipeline.stages.stage3_route import Stage3Router + from collections import defaultdict + + # Generate unique test identifiers + test_id = uuid4().hex[:8] + business_id = f"barbershop-fleitas-{test_id}" + place_id = f"ChIJxxxxxx-{test_id}" + job_id = uuid4() + + logger.info("=" * 60) + logger.info("PIPELINE TEST WITH REAL DATA") + logger.info("=" * 60) + logger.info(f"Sample file: {sample_file}") + logger.info(f"Business ID: {business_id}") + logger.info(f"Job ID: {job_id}") + logger.info("") + + # Load and transform sample data + logger.info("Step 1: Loading and transforming sample data...") + sample_path = PROJECT_ROOT / sample_file + + if not sample_path.exists(): + logger.error(f"Sample file not found: {sample_path}") + sys.exit(1) + + raw_data = json.loads(sample_path.read_text()) + logger.info(f" Loaded {len(raw_data)} raw reviews from file") + + adapter = ScraperV1Adapter(business_id, place_id) + scraper_output = adapter.to_scraper_output(raw_data, str(job_id)) + reviews = scraper_output["reviews"] + + logger.info(f" Transformed {len(reviews)} reviews (skipped empty text)") + + # Show sample review + if reviews: + sample = reviews[0] + logger.info(f" Sample review:") + logger.info(f" Author: {sample['author_name']}") + logger.info(f" Rating: {sample['rating']}") + logger.info(f" Text: {sample['text'][:80]}...") + + # Connect to database + pool = await asyncpg.create_pool(database_url, min_size=2, max_size=5) + config = Config(database_url=database_url) + + try: + # ========== STAGE 1: Insert raw reviews ========== + logger.info("") + logger.info("Step 2: Stage 1 - Inserting raw reviews...") + + async with pool.acquire() as conn: + for review in reviews: + review_time = datetime.fromisoformat( + review["review_time"].replace("Z", "+00:00") + ) + await conn.execute(""" + INSERT INTO pipeline.reviews_raw ( + job_id, source, review_id, place_id, raw_payload, + review_text, rating, review_time, reviewer_name, reviewer_id + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (source, review_id, review_version) DO NOTHING + """, + job_id, + "google", + review["review_id"], + place_id, + json.dumps(review.get("raw_payload", {})), + review["text"], + review["rating"], + review_time, + review["author_name"], + review.get("author_id"), + ) + + raw_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", + place_id, + ) + logger.info(f" Inserted {raw_count} raw reviews") + + # ========== STAGE 1: Normalize ========== + logger.info("") + logger.info("Step 3: Stage 1 - Normalizing reviews...") + + normalizer = Stage1Normalizer(config) + normalized_reviews = normalizer.normalize_batch(reviews, business_id, place_id) + logger.info(f" Normalized {len(normalized_reviews)} reviews") + + # Language distribution + lang_dist = defaultdict(int) + for r in normalized_reviews: + lang_dist[r["text_language"]] += 1 + logger.info(f" Languages: {dict(lang_dist)}") + + # Insert into reviews_enriched + async with pool.acquire() as conn: + for norm in normalized_reviews: + raw_id = await conn.fetchval( + "SELECT id FROM pipeline.reviews_raw WHERE review_id = $1", + norm["review_id"], + ) + + review_time = datetime.fromisoformat( + norm["review_time"].replace("Z", "+00:00") + ) + + await conn.execute(""" + INSERT INTO pipeline.reviews_enriched ( + source, review_id, review_version, is_latest, raw_id, + business_id, place_id, text, text_normalized, rating, review_time, + language, taxonomy_version + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (source, review_id, review_version) DO UPDATE SET + is_latest = EXCLUDED.is_latest + """, + norm["source"], + norm["review_id"], + norm["review_version"], + True, + raw_id, + business_id, + place_id, + norm["text"], + norm["text_normalized"], + norm["rating"], + review_time, + norm["text_language"], + "v5.1", + ) + + enriched_count = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", + business_id, + ) + logger.info(f" Inserted {enriched_count} enriched reviews") + + # ========== STAGE 2: Mock Classification ========== + logger.info("") + logger.info("Step 4: Stage 2 - Classifying reviews (mocked LLM)...") + + # Simple heuristic classification based on rating and keywords + def mock_classify(review: dict) -> dict: + text = review["text_normalized"].lower() + rating = review["rating"] + + # Determine valence from rating + if rating >= 4: + valence = "V+" + elif rating <= 2: + valence = "V-" + else: + valence = "V0" + + # Determine intensity from text length and rating extremity + if rating in (1, 5) and len(text) > 100: + intensity = "I3" + elif rating in (1, 2, 4, 5): + intensity = "I2" + else: + intensity = "I1" + + # Simple URT classification based on keywords + if any(w in text for w in ["servicio", "service", "trato", "atención", "attention"]): + urt_primary = "A2.01" # Helpfulness + elif any(w in text for w in ["profesional", "professional", "expert"]): + urt_primary = "A4.01" # Knowledge/Expertise + elif any(w in text for w in ["precio", "price", "caro", "barato", "expensive", "cheap"]): + urt_primary = "P1.01" # Value Perception + elif any(w in text for w in ["espera", "wait", "tiempo", "time", "rápido", "fast"]): + urt_primary = "J1.01" # Wait Times + elif any(w in text for w in ["ambiente", "ambiance", "local", "lugar", "place"]): + urt_primary = "E2.01" # Ambiance + else: + urt_primary = "O1.01" # Core Product/Service + + return { + "urt_primary": urt_primary, + "valence": valence, + "intensity": intensity, + "trust_score": 0.7 + (len(text) / 1000), # Longer = more trustworthy + } + + batch_id = f"batch-{test_id}" + spans_created = 0 + + async with pool.acquire() as conn: + for norm in normalized_reviews: + cls = mock_classify(norm) + + # Update classification in reviews_enriched + await conn.execute(""" + UPDATE pipeline.reviews_enriched SET + urt_primary = $1, + valence = $2, + intensity = $3, + trust_score = $4, + classification_model = $5, + processed_at = NOW() + WHERE review_id = $6 AND business_id = $7 + """, + cls["urt_primary"], + cls["valence"], + cls["intensity"], + cls["trust_score"], + "mock-classifier-v1", + norm["review_id"], + business_id, + ) + + # Get review time for span + review_row = await conn.fetchrow( + "SELECT review_time FROM pipeline.reviews_enriched WHERE review_id = $1", + norm["review_id"], + ) + + # Create a single span for the whole review + span_id = f"SPN-{uuid4().hex[:12]}" + usn = f"google:{norm['review_id']}:1:0" + + # Truncate span text to first 200 chars + span_text = norm["text"][:200] + + await conn.execute(""" + INSERT INTO pipeline.review_spans ( + span_id, business_id, place_id, source, review_id, review_version, + span_index, span_text, span_start, span_end, + profile, urt_primary, valence, intensity, comparative, + is_primary, is_active, review_time, + confidence, usn, taxonomy_version, model_version, ingest_batch_id + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, + $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23 + ) + ON CONFLICT (span_id) DO NOTHING + """, + span_id, + business_id, + place_id, + "google", + norm["review_id"], + 1, + 0, # span_index + span_text, + 0, # span_start + len(span_text), # span_end + "standard", + cls["urt_primary"], + cls["valence"], + cls["intensity"], + "CR-N", + True, # is_primary + True, # is_active + review_row["review_time"], + "high", + usn, + "v5.1", + "mock-classifier-v1", + batch_id, + ) + spans_created += 1 + + logger.info(f" Created {spans_created} spans") + + # Show classification distribution + cls_dist = await conn.fetch(""" + SELECT urt_primary, valence, COUNT(*) as cnt + FROM pipeline.reviews_enriched + WHERE business_id = $1 + GROUP BY urt_primary, valence + ORDER BY cnt DESC + """, business_id) + + logger.info(" Classification distribution:") + for row in cls_dist[:5]: + logger.info(f" {row['urt_primary']} ({row['valence']}): {row['cnt']}") + + # ========== STAGE 3: Route Issues ========== + logger.info("") + logger.info("Step 5: Stage 3 - Routing negative spans to issues...") + + router = Stage3Router(config) + routed_count = 0 + issues_created = 0 + + async with pool.acquire() as conn: + # Get negative/mixed spans + spans_to_route = await conn.fetch(""" + SELECT span_id, business_id, place_id, urt_primary, valence, intensity, + entity_normalized, review_time, confidence, source, review_id, review_version + FROM pipeline.review_spans + WHERE business_id = $1 AND valence IN ('V-', 'V±') + """, business_id) + + for span_row in spans_to_route: + span_data = { + "span_id": span_row["span_id"], + "business_id": span_row["business_id"], + "place_id": span_row["place_id"], + "urt_primary": span_row["urt_primary"], + "valence": span_row["valence"], + "intensity": span_row["intensity"], + "entity_normalized": span_row["entity_normalized"], + "review_time": span_row["review_time"].isoformat(), + "confidence": span_row["confidence"], + "trust_score": 0.85, + } + + routed = router.route_span_sync(span_data) + + # Check if issue exists + existing = await conn.fetchval( + "SELECT 1 FROM pipeline.issues WHERE issue_id = $1", + routed["issue_id"], + ) + + if not existing: + domain = span_row["urt_primary"][0] + intensity_scores = {"I1": 1.0, "I2": 2.0, "I3": 3.0} + priority_score = intensity_scores.get(span_row["intensity"], 1.0) + + await conn.execute(""" + INSERT INTO pipeline.issues ( + issue_id, business_id, place_id, primary_subcode, domain, + state, priority_score, span_count, max_intensity, taxonomy_version + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + """, + routed["issue_id"], + business_id, + place_id, + span_row["urt_primary"], + domain, + "open", + priority_score, + 1, + span_row["intensity"], + "v5.1", + ) + issues_created += 1 + else: + await conn.execute(""" + UPDATE pipeline.issues SET span_count = span_count + 1, updated_at = NOW() + WHERE issue_id = $1 + """, routed["issue_id"]) + + # Link span to issue + await conn.execute(""" + INSERT INTO pipeline.issue_spans ( + issue_id, span_id, source, review_id, review_version, + is_primary_match, intensity, review_time + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (span_id) DO NOTHING + """, + routed["issue_id"], + span_row["span_id"], + span_row["source"], + span_row["review_id"], + span_row["review_version"], + True, + span_row["intensity"], + span_row["review_time"], + ) + + # Log event + await conn.execute(""" + INSERT INTO pipeline.issue_events (issue_id, event_type, span_id) + VALUES ($1, $2, $3) + """, routed["issue_id"], "span_linked", span_row["span_id"]) + + routed_count += 1 + + logger.info(f" Routed {routed_count} spans to {issues_created} issues") + + # ========== STAGE 4: Aggregation ========== + logger.info("") + logger.info("Step 6: Stage 4 - Aggregating facts...") + + async with pool.acquire() as conn: + # Get span data for aggregation + span_data = await conn.fetch(""" + SELECT + rs.business_id, + rs.place_id, + DATE(rs.review_time) as review_date, + rs.urt_primary, + rs.valence, + rs.intensity, + rs.comparative, + re.trust_score, + re.rating + FROM pipeline.review_spans rs + JOIN pipeline.reviews_enriched re ON ( + re.source = rs.source + AND re.review_id = rs.review_id + AND re.review_version = rs.review_version + ) + WHERE rs.business_id = $1 AND rs.is_active = TRUE + """, business_id) + + # Group by date and URT code + daily_data = defaultdict(lambda: defaultdict(list)) + for row in span_data: + daily_data[row["review_date"]][row["urt_primary"]].append(dict(row)) + + facts_inserted = 0 + for period_date, urt_groups in daily_data.items(): + for urt_code, code_spans in urt_groups.items(): + negative_count = sum(1 for s in code_spans if s["valence"] == "V-") + positive_count = sum(1 for s in code_spans if s["valence"] == "V+") + neutral_count = sum(1 for s in code_spans if s["valence"] == "V0") + mixed_count = sum(1 for s in code_spans if s["valence"] == "V±") + + i1_count = sum(1 for s in code_spans if s["intensity"] == "I1") + i2_count = sum(1 for s in code_spans if s["intensity"] == "I2") + i3_count = sum(1 for s in code_spans if s["intensity"] == "I3") + + strength_score = (positive_count - negative_count) / max(len(code_spans), 1) + + await conn.execute(""" + INSERT INTO pipeline.fact_timeseries ( + business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version, + review_count, span_count, + negative_count, positive_count, neutral_count, mixed_count, + strength_score, negative_strength, positive_strength, + i1_count, i2_count, i3_count, + cr_better, cr_worse, cr_same, + trust_weighted_strength, trust_weighted_negative + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, + $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24 + ) + ON CONFLICT (business_id, place_id, period_date, bucket_type, + subject_type, subject_id, taxonomy_version) + DO UPDATE SET + span_count = EXCLUDED.span_count, + computed_at = NOW() + """, + business_id, + place_id, + period_date, + "day", + "urt_code", + urt_code, + "v5.1", + len(code_spans), + len(code_spans), + negative_count, + positive_count, + neutral_count, + mixed_count, + strength_score, + float(negative_count), + float(positive_count), + i1_count, + i2_count, + i3_count, + 0, 0, 0, + strength_score, + float(negative_count), + ) + facts_inserted += 1 + + logger.info(f" Inserted {facts_inserted} fact records") + + # ========== RESULTS ========== + logger.info("") + logger.info("=" * 60) + logger.info("PIPELINE TEST RESULTS") + logger.info("=" * 60) + + async with pool.acquire() as conn: + results = {} + results["reviews_raw"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_raw WHERE place_id = $1", place_id + ) + results["reviews_enriched"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.reviews_enriched WHERE business_id = $1", business_id + ) + results["review_spans"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.review_spans WHERE business_id = $1", business_id + ) + results["issues"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issues WHERE business_id = $1", business_id + ) + results["issue_spans"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id + ) + results["fact_timeseries"] = await conn.fetchval( + "SELECT COUNT(*) FROM pipeline.fact_timeseries WHERE business_id = $1", business_id + ) + + logger.info(f" pipeline.reviews_raw: {results['reviews_raw']}") + logger.info(f" pipeline.reviews_enriched: {results['reviews_enriched']}") + logger.info(f" pipeline.review_spans: {results['review_spans']}") + logger.info(f" pipeline.issues: {results['issues']}") + logger.info(f" pipeline.issue_spans: {results['issue_spans']}") + logger.info(f" pipeline.fact_timeseries: {results['fact_timeseries']}") + + # Show issues + logger.info("") + logger.info("Issues identified:") + issues = await conn.fetch(""" + SELECT issue_id, primary_subcode, span_count, max_intensity + FROM pipeline.issues + WHERE business_id = $1 + ORDER BY span_count DESC + LIMIT 10 + """, business_id) + + for issue in issues: + logger.info( + f" {issue['issue_id'][:20]}... " + f"Code={issue['primary_subcode']} " + f"Spans={issue['span_count']} " + f"MaxInt={issue['max_intensity']}" + ) + + # Show date range + date_range = await conn.fetchrow(""" + SELECT MIN(review_time)::date as min_date, MAX(review_time)::date as max_date + FROM pipeline.reviews_enriched + WHERE business_id = $1 + """, business_id) + + logger.info("") + logger.info(f"Date range: {date_range['min_date']} to {date_range['max_date']}") + + # ========== CLEANUP ========== + if cleanup: + logger.info("") + logger.info("Cleaning up test data...") + + async with pool.acquire() as conn: + await conn.execute("DELETE FROM pipeline.fact_timeseries WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.issue_events WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) + await conn.execute("DELETE FROM pipeline.issue_spans WHERE issue_id IN (SELECT issue_id FROM pipeline.issues WHERE business_id = $1)", business_id) + await conn.execute("DELETE FROM pipeline.issues WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.review_spans WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.reviews_enriched WHERE business_id = $1", business_id) + await conn.execute("DELETE FROM pipeline.reviews_raw WHERE place_id = $1", place_id) + + logger.info("Test data cleaned up") + + logger.info("") + logger.info("=" * 60) + logger.info("PIPELINE TEST COMPLETED SUCCESSFULLY!") + logger.info("=" * 60) + + finally: + await pool.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Test pipeline with real data") + parser.add_argument( + "--database-url", + default=os.environ.get("DATABASE_URL"), + help="PostgreSQL connection string (default: $DATABASE_URL)", + ) + parser.add_argument( + "--sample-file", + default="data/samples/barbershop_reviews.json", + help="Path to sample data file (relative to project root)", + ) + parser.add_argument( + "--no-cleanup", + action="store_true", + help="Don't clean up test data after running", + ) + + args = parser.parse_args() + + if not args.database_url: + print("Error: --database-url required or set DATABASE_URL", file=sys.stderr) + sys.exit(1) + + asyncio.run(run_pipeline_test( + args.database_url, + args.sample_file, + cleanup=not args.no_cleanup, + ))