From 1a238d41789ea150afe4ea08e4397030a96b0fc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 2 May 2026 20:23:50 +0100 Subject: [PATCH] feat(api+broker+web): write-time mention fan-out via notification table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of v0.3.0 — replaces the regex-on-decoded-ciphertext scan in /v1/notifications and the dashboard MentionsSection with reads from a new mesh.notification table populated at write time. Schema 0025: mesh.notification (id, mesh_id, topic_id, message_id, recipient_member_id, sender_member_id, kind, created_at, read_at) with a unique (message_id, recipient) so a re-fanned message yields one row per recipient. Backfills existing v0.2.0 messages by regex-matching the (still-base64-plaintext) bodies — guarded with a base64 + length check so binary ciphertext doesn't crash the migration. Writers (POST /v1/messages + broker appendTopicMessage) now extract @-mentions from either an explicit `mentions: string[]` on the request OR a regex over the base64 plaintext (transitional fallback). Targets are intersected with the mesh roster + capped at 32 per message. Web chat panel sends the explicit array now so it keeps working after phase 2 lands. Readers switch to JOIN-on-notification: /v1/notifications — table-backed, supports ?unread=1 POST /v1/notifications/read — new, mark by ids or all-up-to MentionsSection (RSC) — same JOIN, returns readAt for each row GET /v1/notifications also gains a read_at field per row so a future bell UI can show unread vs read. Once per-topic encryption (phase 2) lands, the regex fallback becomes a no-op for v2 messages — clients MUST send `mentions`, which they already do. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/broker/src/broker.ts | 94 ++++++++ .../app/[locale]/dashboard/(user)/page.tsx | 42 ++-- .../web/src/modules/mesh/topic-chat-panel.tsx | 23 +- packages/api/src/modules/mesh/v1-router.ts | 200 +++++++++++++++--- packages/db/migrations/0025_notifications.sql | 80 +++++++ packages/db/src/schema/mesh.ts | 83 ++++++++ 6 files changed, 474 insertions(+), 48 deletions(-) create mode 100644 packages/db/migrations/0025_notifications.sql diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 13a8809..c35797c 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -38,7 +38,9 @@ import { meshFileKey, meshContext, meshMember as memberTable, + meshMember, meshMemory, + meshNotification, meshState, meshService, meshSkill, @@ -694,6 +696,13 @@ export async function appendTopicMessage(args: { senderSessionPubkey?: string; nonce: string; ciphertext: string; + /** + * Optional client-extracted mention list (lowercased display names + * without the leading @). Required once per-topic encryption lands — + * the server can't read v0.3.0 ciphertext. Falls back to a regex on + * the v0.2.0 base64 plaintext when omitted. + */ + mentions?: string[]; }): Promise { const [row] = await db .insert(meshTopicMessage) @@ -706,9 +715,94 @@ export async function appendTopicMessage(args: { }) .returning({ id: meshTopicMessage.id }); if (!row) throw new Error("failed to append topic message"); + + void fanOutMentions({ + messageId: row.id, + topicId: args.topicId, + senderMemberId: args.senderMemberId, + ciphertext: args.ciphertext, + explicit: args.mentions, + }).catch(() => { + // Notifications are advisory; don't fail the message write. + }); + return row.id; } +/** + * Extract `@` tokens from a base64-of-UTF8 plaintext body. + * Capped at 16 tokens. Returns lowercased names without the @ prefix. + */ +function extractMentionTokens(b64: string): string[] { + let text: string; + try { + text = Buffer.from(b64, "base64").toString("utf-8"); + } catch { + return []; + } + const found = new Set(); + const re = /(^|[^A-Za-z0-9_-])@([A-Za-z0-9_-]{1,64})(?=$|[^A-Za-z0-9_-])/g; + let m: RegExpExecArray | null; + while ((m = re.exec(text)) !== null) { + found.add(m[2]!.toLowerCase()); + if (found.size >= 16) break; + } + return [...found]; +} + +async function fanOutMentions(args: { + messageId: string; + topicId: string; + senderMemberId: string; + ciphertext: string; + explicit?: string[]; +}): Promise { + let tokens = args.explicit?.map((s) => s.toLowerCase().replace(/^@/, "")); + if (!tokens || tokens.length === 0) { + tokens = extractMentionTokens(args.ciphertext); + } + if (tokens.length === 0) return; + + const [topic] = await db + .select({ meshId: meshTopic.meshId }) + .from(meshTopic) + .where(eq(meshTopic.id, args.topicId)); + if (!topic) return; + + const recipients = await db + .select({ + id: meshMember.id, + displayName: meshMember.displayName, + }) + .from(meshMember) + .where( + and(eq(meshMember.meshId, topic.meshId), isNull(meshMember.revokedAt)), + ); + const tokenSet = new Set(tokens); + const targets = recipients + .filter( + (r) => + tokenSet.has(r.displayName.toLowerCase()) && + r.id !== args.senderMemberId, + ) + .slice(0, 32); + if (targets.length === 0) return; + + await db + .insert(meshNotification) + .values( + targets.map((t) => ({ + meshId: topic.meshId, + topicId: args.topicId, + messageId: args.messageId, + recipientMemberId: t.id, + senderMemberId: args.senderMemberId, + kind: "mention", + })), + ) + .onConflictDoNothing(); +} + /** * Fetch topic history for a member. Pagination via `before` cursor (id of * an earlier message); pass null for the latest page. diff --git a/apps/web/src/app/[locale]/dashboard/(user)/page.tsx b/apps/web/src/app/[locale]/dashboard/(user)/page.tsx index bc5755d..8cd028b 100644 --- a/apps/web/src/app/[locale]/dashboard/(user)/page.tsx +++ b/apps/web/src/app/[locale]/dashboard/(user)/page.tsx @@ -9,11 +9,12 @@ import { db } from "@turbostarter/db/server"; import { mesh, meshMember, + meshNotification, meshTopic, meshTopicMember, meshTopicMessage, } from "@turbostarter/db/schema/mesh"; -import { and, count, desc, eq, gt, inArray, isNull, or, sql } from "drizzle-orm"; +import { aliasedTable, and, count, desc, eq, gt, inArray, isNull, or, sql } from "drizzle-orm"; import { appConfig } from "~/config/app"; import { pathsConfig } from "~/config/paths"; @@ -127,45 +128,42 @@ export default async function UniversePage() { })); // Recent @-mentions of the viewer across every mesh they belong to. - // Build a (memberId, regex) pair per mesh and OR them together so we - // catch users with different display names in different meshes. The - // ciphertext is base64 plaintext in v0.2.0; per-topic encryption in - // v0.3.0 will move this scan to a notification table populated at - // write time. 7-day window keeps the query bounded. + // Reads from mesh.notification, populated at write time by the + // POST /v1/messages handler + broker topic-send. Survives v0.3.0 + // per-topic encryption (the previous regex-on-decoded-ciphertext + // approach would not). 7-day window keeps the result bounded. const mentionWindow = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); - const mentionConditions = myMembers.map((m) => { - const escaped = m.displayName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); - const pattern = `(^|\\s|[^A-Za-z0-9_-])@${escaped}($|[^A-Za-z0-9_-])`; - return and( - eq(meshTopic.meshId, m.meshId), - sql`${meshTopicMessage.senderMemberId} <> ${m.id}`, - sql`convert_from(decode(${meshTopicMessage.ciphertext}, 'base64'), 'UTF8') ~* ${pattern}`, - ); - }); - const mentionRows = mentionConditions.length + const senderMember = aliasedTable(meshMember, "sender_member"); + const mentionRows = myMemberIds.length ? await db .select({ id: meshTopicMessage.id, + notificationId: meshNotification.id, topicId: meshTopicMessage.topicId, topicName: meshTopic.name, meshId: meshTopic.meshId, meshName: mesh.name, - senderName: meshMember.displayName, + senderName: senderMember.displayName, ciphertext: meshTopicMessage.ciphertext, + readAt: meshNotification.readAt, createdAt: meshTopicMessage.createdAt, }) - .from(meshTopicMessage) - .innerJoin(meshTopic, eq(meshTopic.id, meshTopicMessage.topicId)) + .from(meshNotification) + .innerJoin( + meshTopicMessage, + eq(meshTopicMessage.id, meshNotification.messageId), + ) + .innerJoin(meshTopic, eq(meshTopic.id, meshNotification.topicId)) .innerJoin(mesh, eq(mesh.id, meshTopic.meshId)) .innerJoin( - meshMember, - eq(meshMember.id, meshTopicMessage.senderMemberId), + senderMember, + eq(senderMember.id, meshNotification.senderMemberId), ) .where( and( + inArray(meshNotification.recipientMemberId, myMemberIds), isNull(meshTopic.archivedAt), gt(meshTopicMessage.createdAt, mentionWindow), - or(...mentionConditions), ), ) .orderBy(desc(meshTopicMessage.createdAt)) diff --git a/apps/web/src/modules/mesh/topic-chat-panel.tsx b/apps/web/src/modules/mesh/topic-chat-panel.tsx index 9a86b82..ba9b3b6 100644 --- a/apps/web/src/modules/mesh/topic-chat-panel.tsx +++ b/apps/web/src/modules/mesh/topic-chat-panel.tsx @@ -414,6 +414,21 @@ export function TopicChatPanel({ [draft, mentionState], ); + // Extract @-mention tokens from the draft body so the server can + // populate mesh.notification rows without having to read the + // ciphertext (forward-compat with v0.3.0 per-topic encryption). + // Capped at 16 to bound notification fan-out. + const extractMentions = (text: string): string[] => { + const found = new Set(); + const re = /(^|[^A-Za-z0-9_-])@([A-Za-z0-9_-]{1,64})(?=$|[^A-Za-z0-9_-])/g; + let m: RegExpExecArray | null; + while ((m = re.exec(text)) !== null) { + found.add(m[2]!.toLowerCase()); + if (found.size >= 16) break; + } + return [...found]; + }; + const send = async () => { const text = draft.trim(); if (!text) return; @@ -421,10 +436,16 @@ export function TopicChatPanel({ setError(null); try { const { ciphertext, nonce } = encodeOutgoing(text); + const mentions = extractMentions(text); const res = await fetch("/api/v1/messages", { method: "POST", headers, - body: JSON.stringify({ topic: topicName, ciphertext, nonce }), + body: JSON.stringify({ + topic: topicName, + ciphertext, + nonce, + ...(mentions.length > 0 ? { mentions } : {}), + }), }); if (!res.ok) { const body = await res.text().catch(() => ""); diff --git a/packages/api/src/modules/mesh/v1-router.ts b/packages/api/src/modules/mesh/v1-router.ts index acc8fb9..12f3b12 100644 --- a/packages/api/src/modules/mesh/v1-router.ts +++ b/packages/api/src/modules/mesh/v1-router.ts @@ -31,6 +31,7 @@ import { mesh, meshApiKey, meshMember, + meshNotification, meshTopic, meshTopicMember, meshTopicMessage, @@ -56,8 +57,43 @@ const sendMessageSchema = z.object({ /** base64 nonce. */ nonce: z.string().min(1), priority: z.enum(["now", "next", "low"]).optional().default("next"), + /** + * Optional list of `@` mentions extracted client-side + * from the plaintext. Capped at 16 to bound notification fan-out + * (anti-spam). Server intersects with the mesh roster — anything + * that doesn't resolve to a member is silently dropped. + * + * Falls back to a server-side regex on the base64 plaintext when + * absent (v0.2.0 messages still ship plaintext). After per-topic + * encryption lands the regex path stops working and the client + * MUST send this array. + */ + mentions: z.array(z.string().min(1).max(64)).max(16).optional(), }); +/** + * Extract `@` mentions from base64-encoded plaintext. Returns + * the lowercased display names found in the body, deduped and capped + * at 16. Used as the legacy fallback when the client doesn't send a + * `mentions` array on POST /messages. + */ +function extractMentionsFromBase64(b64: string): string[] { + let text: string; + try { + text = Buffer.from(b64, "base64").toString("utf-8"); + } catch { + return []; + } + const found = new Set(); + const re = /(^|[^A-Za-z0-9_-])@([A-Za-z0-9_-]{1,64})(?=$|[^A-Za-z0-9_-])/g; + let m: RegExpExecArray | null; + while ((m = re.exec(text)) !== null) { + found.add(m[2]!.toLowerCase()); + if (found.size >= 16) break; + } + return [...found]; +} + const historyQuerySchema = z.object({ limit: z.coerce.number().int().min(1).max(200).optional().default(50), before: z.string().optional(), @@ -108,13 +144,19 @@ export const v1Router = new Hono() .limit(1); if (!ownerMember) return c.json({ error: "no_mesh_member" }, 500); + // Sender attribution: prefer the apikey's issuing member (so the + // dashboard chat user shows up correctly in /v1/peers and as the + // notification sender). Fall back to the oldest mesh member for + // legacy keys with no issuer. + const senderMemberId = key.issuedByMemberId ?? ownerMember.id; + // Persist to history (topic_message) + ephemeral queue (message_queue). // Broker's drain loop picks up the queue entry and pushes to live peers. const [historyRow] = await db .insert(meshTopicMessage) .values({ topicId: topic.id, - senderMemberId: ownerMember.id, + senderMemberId, nonce: body.nonce, ciphertext: body.ciphertext, }) @@ -124,7 +166,7 @@ export const v1Router = new Hono() .insert(messageQueue) .values({ meshId: key.meshId, - senderMemberId: ownerMember.id, + senderMemberId, targetSpec: "#" + topic.id, priority: body.priority, nonce: body.nonce, @@ -132,11 +174,56 @@ export const v1Router = new Hono() }) .returning({ id: messageQueue.id }); + // Mention fan-out → notification rows. Client-extracted mentions + // win when present (post-encryption clients MUST extract and send); + // otherwise we regex the base64 plaintext as a transitional fallback. + let mentionTokens = body.mentions?.map((s) => s.toLowerCase().replace(/^@/, "")); + if (!mentionTokens || mentionTokens.length === 0) { + mentionTokens = extractMentionsFromBase64(body.ciphertext); + } + let notifications = 0; + if (historyRow && mentionTokens.length > 0) { + const recipients = await db + .select({ + id: meshMember.id, + displayName: meshMember.displayName, + }) + .from(meshMember) + .where( + and(eq(meshMember.meshId, key.meshId), isNull(meshMember.revokedAt)), + ); + const lowerTokens = new Set(mentionTokens); + const targets = recipients + .filter( + (r) => + lowerTokens.has(r.displayName.toLowerCase()) && + r.id !== senderMemberId, + ) + .slice(0, 32); // hard cap on per-message fan-out + if (targets.length > 0) { + await db + .insert(meshNotification) + .values( + targets.map((t) => ({ + meshId: key.meshId, + topicId: topic.id, + messageId: historyRow.id, + recipientMemberId: t.id, + senderMemberId, + kind: "mention", + })), + ) + .onConflictDoNothing(); + notifications = targets.length; + } + } + return c.json({ messageId: queueRow?.id ?? null, historyId: historyRow?.id ?? null, topic: body.topic, topicId: topic.id, + notifications, }); }) @@ -508,13 +595,14 @@ export const v1Router = new Hono() }); }) - // GET /v1/notifications — recent @-mentions of the viewer across all - // topics in the key's mesh. v0.2.0 plaintext-base64 ciphertext lets - // us regex match server-side; in v0.3.0 (per-topic encryption) this - // moves to a notification table populated at write time. + // GET /v1/notifications — recent @-mentions of the viewer across + // all topics in the key's mesh. Reads from mesh.notification, which + // is populated at write time by POST /v1/messages and the broker's + // topic-send handler. Survives the v0.3.0 per-topic encryption cut + // (the regex-on-decoded-ciphertext approach won't). // - // Query: ?since= to incrementally fetch only newer mentions - // (e.g. for a polling notification bell). Default: last 24h. + // Query: ?since= for incremental fetch (polling bells), and + // ?unread=1 to filter to read_at IS NULL only. .get("/notifications", async (c) => { const key = c.var.apiKey; requireCapability(key, "read"); @@ -535,50 +623,54 @@ export const v1Router = new Hono() if (Number.isNaN(since.getTime())) { return c.json({ error: "invalid_since" }, 400); } + const unreadOnly = c.req.query("unread") === "1"; - // Postgres regex with case-insensitive match + word boundary on - // both sides. Decode the base64 ciphertext (plaintext envelope in - // v0.2.0) so we're matching readable text, not the base64 alphabet. - const escaped = me.displayName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); - const pattern = `(^|\\s|[^A-Za-z0-9_-])@${escaped}($|[^A-Za-z0-9_-])`; + const conditions = [ + eq(meshNotification.recipientMemberId, key.issuedByMemberId), + eq(meshNotification.meshId, key.meshId), + gt(meshNotification.createdAt, since), + ]; + if (unreadOnly) conditions.push(isNull(meshNotification.readAt)); const rows = await db .select({ id: meshTopicMessage.id, + notificationId: meshNotification.id, topicId: meshTopicMessage.topicId, topicName: meshTopic.name, senderMemberId: meshTopicMessage.senderMemberId, senderName: meshMember.displayName, senderPubkey: meshMember.peerPubkey, ciphertext: meshTopicMessage.ciphertext, + kind: meshNotification.kind, + readAt: meshNotification.readAt, createdAt: meshTopicMessage.createdAt, }) - .from(meshTopicMessage) - .innerJoin(meshTopic, eq(meshTopic.id, meshTopicMessage.topicId)) + .from(meshNotification) + .innerJoin( + meshTopicMessage, + eq(meshTopicMessage.id, meshNotification.messageId), + ) + .innerJoin(meshTopic, eq(meshTopic.id, meshNotification.topicId)) .innerJoin( meshMember, - eq(meshMember.id, meshTopicMessage.senderMemberId), - ) - .where( - and( - eq(meshTopic.meshId, key.meshId), - isNull(meshTopic.archivedAt), - gt(meshTopicMessage.createdAt, since), - sql`${meshTopicMessage.senderMemberId} <> ${key.issuedByMemberId}`, - sql`convert_from(decode(${meshTopicMessage.ciphertext}, 'base64'), 'UTF8') ~* ${pattern}`, - ), + eq(meshMember.id, meshNotification.senderMemberId), ) + .where(and(...conditions)) .orderBy(desc(meshTopicMessage.createdAt)) .limit(50); return c.json({ notifications: rows.map((r) => ({ id: r.id, + notificationId: r.notificationId, topicId: r.topicId, topicName: r.topicName, senderName: r.senderName, senderPubkey: r.senderPubkey, ciphertext: r.ciphertext, + kind: r.kind, + readAt: r.readAt?.toISOString() ?? null, createdAt: r.createdAt.toISOString(), })), since: since.toISOString(), @@ -586,6 +678,64 @@ export const v1Router = new Hono() }); }) + // POST /v1/notifications/read — mark notifications read. Body shape: + // { ids: string[] } — mark these notification ids + // { all: true, before?: ISO } — mark every unread for this + // member up to `before` (or now) + // Idempotent. Always 200, even if 0 rows updated. + .post( + "/notifications/read", + validate( + "json", + z.union([ + z.object({ ids: z.array(z.string().min(1)).min(1).max(200) }), + z.object({ all: z.literal(true), before: z.string().optional() }), + ]), + ), + async (c) => { + const key = c.var.apiKey; + requireCapability(key, "read"); + if (!key.issuedByMemberId) { + return c.json({ error: "api_key_has_no_issuer" }, 400); + } + + const body = c.req.valid("json"); + const now = new Date(); + + if ("ids" in body) { + await db + .update(meshNotification) + .set({ readAt: now }) + .where( + and( + eq(meshNotification.recipientMemberId, key.issuedByMemberId), + eq(meshNotification.meshId, key.meshId), + isNull(meshNotification.readAt), + sql`${meshNotification.id} = ANY(${body.ids})`, + ), + ); + return c.json({ marked: body.ids.length, readAt: now.toISOString() }); + } + + const beforeAt = body.before ? new Date(body.before) : now; + if (Number.isNaN(beforeAt.getTime())) { + return c.json({ error: "invalid_before" }, 400); + } + await db + .update(meshNotification) + .set({ readAt: now }) + .where( + and( + eq(meshNotification.recipientMemberId, key.issuedByMemberId), + eq(meshNotification.meshId, key.meshId), + isNull(meshNotification.readAt), + sql`${meshNotification.createdAt} <= ${beforeAt}`, + ), + ); + return c.json({ marked: "all", before: beforeAt.toISOString() }); + }, + ) + // GET /v1/peers — connected peers in the key's mesh // // Sources, deduped by memberId: diff --git a/packages/db/migrations/0025_notifications.sql b/packages/db/migrations/0025_notifications.sql new file mode 100644 index 0000000..6d10c21 --- /dev/null +++ b/packages/db/migrations/0025_notifications.sql @@ -0,0 +1,80 @@ +-- Notifications — write-time mention fan-out (v0.3.0 phase 1). +-- +-- Replaces the regex-on-decoded-ciphertext scan in /v1/notifications and +-- the dashboard MentionsSection. Lets us drop the +-- `convert_from(decode(ciphertext, 'base64'), 'UTF8') ~* @name` query that +-- breaks the moment ciphertext stops being base64-of-UTF8 (i.e. the +-- moment per-topic encryption lands in v0.3.0 phase 2). +-- +-- One row per (recipient_member, topic_message). Idempotent ON CONFLICT +-- on the unique pair; if the broker re-fans a message after a crash the +-- recipient sees one notification, not two. +-- +-- Server-side mention extraction happens in POST /v1/messages and the +-- broker's WS message handler. Both extract @-tokens from the body +-- BEFORE encryption (the only point at which the server can read it), +-- match against the topic's member roster, and insert a row per match. + +CREATE TABLE IF NOT EXISTS "mesh"."notification" ( + "id" text PRIMARY KEY NOT NULL, + "mesh_id" text NOT NULL REFERENCES "mesh"."mesh"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "topic_id" text NOT NULL REFERENCES "mesh"."topic"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "message_id" text NOT NULL REFERENCES "mesh"."topic_message"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "recipient_member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "sender_member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "kind" text NOT NULL DEFAULT 'mention', + "created_at" timestamp DEFAULT now() NOT NULL, + "read_at" timestamp +); + +CREATE UNIQUE INDEX IF NOT EXISTS "notification_unique" + ON "mesh"."notification" ("message_id", "recipient_member_id"); + +CREATE INDEX IF NOT EXISTS "notification_by_recipient_unread" + ON "mesh"."notification" ("recipient_member_id", "created_at" DESC) + WHERE "read_at" IS NULL; + +CREATE INDEX IF NOT EXISTS "notification_by_recipient" + ON "mesh"."notification" ("recipient_member_id", "created_at" DESC); + +CREATE INDEX IF NOT EXISTS "notification_by_mesh" + ON "mesh"."notification" ("mesh_id", "created_at" DESC); + +-- Backfill existing v0.2.0 messages so the new table has history. Safe +-- to run multiple times (ON CONFLICT DO NOTHING). The regex matches the +-- same shape as the in-app autocomplete + render: @-prefixed token with +-- a non-word boundary on both sides (or string edges). +-- +-- We skip messages that fail to decode — defensive against any non-base64 +-- ciphertext that may have slipped in via future writers. +INSERT INTO "mesh"."notification" + ("id", "mesh_id", "topic_id", "message_id", "recipient_member_id", + "sender_member_id", "kind", "created_at") +SELECT + replace(gen_random_uuid()::text, '-', ''), + t."mesh_id", + m."topic_id", + m."id", + recipient."id", + m."sender_member_id", + 'mention', + m."created_at" +FROM "mesh"."topic_message" m +INNER JOIN "mesh"."topic" t ON t."id" = m."topic_id" +INNER JOIN "mesh"."member" recipient + ON recipient."mesh_id" = t."mesh_id" + AND recipient."revoked_at" IS NULL + AND recipient."id" <> m."sender_member_id" +WHERE + -- Only scan messages that look like base64-of-UTF8. Defensive guard + -- against a future writer storing binary ciphertext — convert_from + -- would otherwise raise and abort the whole migration. + m."ciphertext" ~ '^[A-Za-z0-9+/=]+$' + AND length(m."ciphertext") > 0 + AND length(m."ciphertext") % 4 = 0 + AND convert_from(decode(m."ciphertext", 'base64'), 'UTF8') ~* ( + '(^|\s|[^A-Za-z0-9_-])@' + || regexp_replace(recipient."display_name", '([.*+?^${}()|\[\]\\])', '\\\1', 'g') + || '($|[^A-Za-z0-9_-])' + ) +ON CONFLICT ("message_id", "recipient_member_id") DO NOTHING; diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index abfcd1f..85f90e0 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -1484,6 +1484,89 @@ export const insertMeshTopicMessageSchema = export type SelectMeshTopicMessage = typeof meshTopicMessage.$inferSelect; export type InsertMeshTopicMessage = typeof meshTopicMessage.$inferInsert; +/** + * Per-recipient notifications. v0.3.0 phase 1: server-side mention + * extraction at write time replaces the regex-on-decoded-ciphertext + * scan in /v1/notifications. Fanned out at POST /v1/messages and the + * broker's WS topic_send handler — one row per (recipient, message). + * + * `kind` is open-ended ("mention" today; future kinds: "reply", + * "task_assigned", etc.) so we can extend without a migration. + */ +export const meshNotification = meshSchema.table( + "notification", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + topicId: text() + .references(() => meshTopic.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + messageId: text() + .references(() => meshTopicMessage.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + recipientMemberId: text() + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + senderMemberId: text() + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + kind: text().notNull().default("mention"), + createdAt: timestamp().defaultNow().notNull(), + readAt: timestamp(), + }, + (t) => [ + uniqueIndex("notification_unique").on(t.messageId, t.recipientMemberId), + index("notification_by_recipient").on(t.recipientMemberId, t.createdAt), + index("notification_by_mesh").on(t.meshId, t.createdAt), + ], +); + +export const meshNotificationRelations = relations( + meshNotification, + ({ one }) => ({ + topic: one(meshTopic, { + fields: [meshNotification.topicId], + references: [meshTopic.id], + }), + message: one(meshTopicMessage, { + fields: [meshNotification.messageId], + references: [meshTopicMessage.id], + }), + recipient: one(meshMember, { + fields: [meshNotification.recipientMemberId], + references: [meshMember.id], + }), + sender: one(meshMember, { + fields: [meshNotification.senderMemberId], + references: [meshMember.id], + }), + }), +); + +export const selectMeshNotificationSchema = + createSelectSchema(meshNotification); +export const insertMeshNotificationSchema = + createInsertSchema(meshNotification); +export type SelectMeshNotification = typeof meshNotification.$inferSelect; +export type InsertMeshNotification = typeof meshNotification.$inferInsert; + /* ──────────────────────────────────────────────────────────────────────── * API keys (v0.2.0) — REST + external WS auth. *