feat(api+broker+web): write-time mention fan-out via notification table
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Phase 1 of v0.3.0 — replaces the regex-on-decoded-ciphertext scan
in /v1/notifications and the dashboard MentionsSection with reads
from a new mesh.notification table populated at write time.

Schema 0025: mesh.notification (id, mesh_id, topic_id, message_id,
recipient_member_id, sender_member_id, kind, created_at, read_at)
with a unique (message_id, recipient) so a re-fanned message yields
one row per recipient. Backfills existing v0.2.0 messages by
regex-matching the (still-base64-plaintext) bodies — guarded with
a base64 + length check so binary ciphertext doesn't crash the
migration.

Writers (POST /v1/messages + broker appendTopicMessage) now
extract @-mentions from either an explicit `mentions: string[]`
on the request OR a regex over the base64 plaintext (transitional
fallback). Targets are intersected with the mesh roster + capped
at 32 per message. Web chat panel sends the explicit array now so
it keeps working after phase 2 lands.

Readers switch to JOIN-on-notification:
  /v1/notifications      — table-backed, supports ?unread=1
  POST /v1/notifications/read  — new, mark by ids or all-up-to
  MentionsSection (RSC) — same JOIN, returns readAt for each row

GET /v1/notifications also gains a read_at field per row so a
future bell UI can show unread vs read.

Once per-topic encryption (phase 2) lands, the regex fallback
becomes a no-op for v2 messages — clients MUST send `mentions`,
which they already do.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 20:23:50 +01:00
parent 81f8066f99
commit 1a238d4178
6 changed files with 474 additions and 48 deletions

View File

@@ -31,6 +31,7 @@ import {
mesh,
meshApiKey,
meshMember,
meshNotification,
meshTopic,
meshTopicMember,
meshTopicMessage,
@@ -56,8 +57,43 @@ const sendMessageSchema = z.object({
/** base64 nonce. */
nonce: z.string().min(1),
priority: z.enum(["now", "next", "low"]).optional().default("next"),
/**
* Optional list of `@<displayName>` mentions extracted client-side
* from the plaintext. Capped at 16 to bound notification fan-out
* (anti-spam). Server intersects with the mesh roster — anything
* that doesn't resolve to a member is silently dropped.
*
* Falls back to a server-side regex on the base64 plaintext when
* absent (v0.2.0 messages still ship plaintext). After per-topic
* encryption lands the regex path stops working and the client
* MUST send this array.
*/
mentions: z.array(z.string().min(1).max(64)).max(16).optional(),
});
/**
* Extract `@<token>` mentions from base64-encoded plaintext. Returns
* the lowercased display names found in the body, deduped and capped
* at 16. Used as the legacy fallback when the client doesn't send a
* `mentions` array on POST /messages.
*/
function extractMentionsFromBase64(b64: string): string[] {
let text: string;
try {
text = Buffer.from(b64, "base64").toString("utf-8");
} catch {
return [];
}
const found = new Set<string>();
const re = /(^|[^A-Za-z0-9_-])@([A-Za-z0-9_-]{1,64})(?=$|[^A-Za-z0-9_-])/g;
let m: RegExpExecArray | null;
while ((m = re.exec(text)) !== null) {
found.add(m[2]!.toLowerCase());
if (found.size >= 16) break;
}
return [...found];
}
const historyQuerySchema = z.object({
limit: z.coerce.number().int().min(1).max(200).optional().default(50),
before: z.string().optional(),
@@ -108,13 +144,19 @@ export const v1Router = new Hono<Env>()
.limit(1);
if (!ownerMember) return c.json({ error: "no_mesh_member" }, 500);
// Sender attribution: prefer the apikey's issuing member (so the
// dashboard chat user shows up correctly in /v1/peers and as the
// notification sender). Fall back to the oldest mesh member for
// legacy keys with no issuer.
const senderMemberId = key.issuedByMemberId ?? ownerMember.id;
// Persist to history (topic_message) + ephemeral queue (message_queue).
// Broker's drain loop picks up the queue entry and pushes to live peers.
const [historyRow] = await db
.insert(meshTopicMessage)
.values({
topicId: topic.id,
senderMemberId: ownerMember.id,
senderMemberId,
nonce: body.nonce,
ciphertext: body.ciphertext,
})
@@ -124,7 +166,7 @@ export const v1Router = new Hono<Env>()
.insert(messageQueue)
.values({
meshId: key.meshId,
senderMemberId: ownerMember.id,
senderMemberId,
targetSpec: "#" + topic.id,
priority: body.priority,
nonce: body.nonce,
@@ -132,11 +174,56 @@ export const v1Router = new Hono<Env>()
})
.returning({ id: messageQueue.id });
// Mention fan-out → notification rows. Client-extracted mentions
// win when present (post-encryption clients MUST extract and send);
// otherwise we regex the base64 plaintext as a transitional fallback.
let mentionTokens = body.mentions?.map((s) => s.toLowerCase().replace(/^@/, ""));
if (!mentionTokens || mentionTokens.length === 0) {
mentionTokens = extractMentionsFromBase64(body.ciphertext);
}
let notifications = 0;
if (historyRow && mentionTokens.length > 0) {
const recipients = await db
.select({
id: meshMember.id,
displayName: meshMember.displayName,
})
.from(meshMember)
.where(
and(eq(meshMember.meshId, key.meshId), isNull(meshMember.revokedAt)),
);
const lowerTokens = new Set(mentionTokens);
const targets = recipients
.filter(
(r) =>
lowerTokens.has(r.displayName.toLowerCase()) &&
r.id !== senderMemberId,
)
.slice(0, 32); // hard cap on per-message fan-out
if (targets.length > 0) {
await db
.insert(meshNotification)
.values(
targets.map((t) => ({
meshId: key.meshId,
topicId: topic.id,
messageId: historyRow.id,
recipientMemberId: t.id,
senderMemberId,
kind: "mention",
})),
)
.onConflictDoNothing();
notifications = targets.length;
}
}
return c.json({
messageId: queueRow?.id ?? null,
historyId: historyRow?.id ?? null,
topic: body.topic,
topicId: topic.id,
notifications,
});
})
@@ -508,13 +595,14 @@ export const v1Router = new Hono<Env>()
});
})
// GET /v1/notifications — recent @-mentions of the viewer across all
// topics in the key's mesh. v0.2.0 plaintext-base64 ciphertext lets
// us regex match server-side; in v0.3.0 (per-topic encryption) this
// moves to a notification table populated at write time.
// GET /v1/notifications — recent @-mentions of the viewer across
// all topics in the key's mesh. Reads from mesh.notification, which
// is populated at write time by POST /v1/messages and the broker's
// topic-send handler. Survives the v0.3.0 per-topic encryption cut
// (the regex-on-decoded-ciphertext approach won't).
//
// Query: ?since=<ISO> to incrementally fetch only newer mentions
// (e.g. for a polling notification bell). Default: last 24h.
// Query: ?since=<ISO> for incremental fetch (polling bells), and
// ?unread=1 to filter to read_at IS NULL only.
.get("/notifications", async (c) => {
const key = c.var.apiKey;
requireCapability(key, "read");
@@ -535,50 +623,54 @@ export const v1Router = new Hono<Env>()
if (Number.isNaN(since.getTime())) {
return c.json({ error: "invalid_since" }, 400);
}
const unreadOnly = c.req.query("unread") === "1";
// Postgres regex with case-insensitive match + word boundary on
// both sides. Decode the base64 ciphertext (plaintext envelope in
// v0.2.0) so we're matching readable text, not the base64 alphabet.
const escaped = me.displayName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const pattern = `(^|\\s|[^A-Za-z0-9_-])@${escaped}($|[^A-Za-z0-9_-])`;
const conditions = [
eq(meshNotification.recipientMemberId, key.issuedByMemberId),
eq(meshNotification.meshId, key.meshId),
gt(meshNotification.createdAt, since),
];
if (unreadOnly) conditions.push(isNull(meshNotification.readAt));
const rows = await db
.select({
id: meshTopicMessage.id,
notificationId: meshNotification.id,
topicId: meshTopicMessage.topicId,
topicName: meshTopic.name,
senderMemberId: meshTopicMessage.senderMemberId,
senderName: meshMember.displayName,
senderPubkey: meshMember.peerPubkey,
ciphertext: meshTopicMessage.ciphertext,
kind: meshNotification.kind,
readAt: meshNotification.readAt,
createdAt: meshTopicMessage.createdAt,
})
.from(meshTopicMessage)
.innerJoin(meshTopic, eq(meshTopic.id, meshTopicMessage.topicId))
.from(meshNotification)
.innerJoin(
meshTopicMessage,
eq(meshTopicMessage.id, meshNotification.messageId),
)
.innerJoin(meshTopic, eq(meshTopic.id, meshNotification.topicId))
.innerJoin(
meshMember,
eq(meshMember.id, meshTopicMessage.senderMemberId),
)
.where(
and(
eq(meshTopic.meshId, key.meshId),
isNull(meshTopic.archivedAt),
gt(meshTopicMessage.createdAt, since),
sql`${meshTopicMessage.senderMemberId} <> ${key.issuedByMemberId}`,
sql`convert_from(decode(${meshTopicMessage.ciphertext}, 'base64'), 'UTF8') ~* ${pattern}`,
),
eq(meshMember.id, meshNotification.senderMemberId),
)
.where(and(...conditions))
.orderBy(desc(meshTopicMessage.createdAt))
.limit(50);
return c.json({
notifications: rows.map((r) => ({
id: r.id,
notificationId: r.notificationId,
topicId: r.topicId,
topicName: r.topicName,
senderName: r.senderName,
senderPubkey: r.senderPubkey,
ciphertext: r.ciphertext,
kind: r.kind,
readAt: r.readAt?.toISOString() ?? null,
createdAt: r.createdAt.toISOString(),
})),
since: since.toISOString(),
@@ -586,6 +678,64 @@ export const v1Router = new Hono<Env>()
});
})
// POST /v1/notifications/read — mark notifications read. Body shape:
// { ids: string[] } — mark these notification ids
// { all: true, before?: ISO } — mark every unread for this
// member up to `before` (or now)
// Idempotent. Always 200, even if 0 rows updated.
.post(
"/notifications/read",
validate(
"json",
z.union([
z.object({ ids: z.array(z.string().min(1)).min(1).max(200) }),
z.object({ all: z.literal(true), before: z.string().optional() }),
]),
),
async (c) => {
const key = c.var.apiKey;
requireCapability(key, "read");
if (!key.issuedByMemberId) {
return c.json({ error: "api_key_has_no_issuer" }, 400);
}
const body = c.req.valid("json");
const now = new Date();
if ("ids" in body) {
await db
.update(meshNotification)
.set({ readAt: now })
.where(
and(
eq(meshNotification.recipientMemberId, key.issuedByMemberId),
eq(meshNotification.meshId, key.meshId),
isNull(meshNotification.readAt),
sql`${meshNotification.id} = ANY(${body.ids})`,
),
);
return c.json({ marked: body.ids.length, readAt: now.toISOString() });
}
const beforeAt = body.before ? new Date(body.before) : now;
if (Number.isNaN(beforeAt.getTime())) {
return c.json({ error: "invalid_before" }, 400);
}
await db
.update(meshNotification)
.set({ readAt: now })
.where(
and(
eq(meshNotification.recipientMemberId, key.issuedByMemberId),
eq(meshNotification.meshId, key.meshId),
isNull(meshNotification.readAt),
sql`${meshNotification.createdAt} <= ${beforeAt}`,
),
);
return c.json({ marked: "all", before: beforeAt.toISOString() });
},
)
// GET /v1/peers — connected peers in the key's mesh
//
// Sources, deduped by memberId:

View File

@@ -0,0 +1,80 @@
-- Notifications — write-time mention fan-out (v0.3.0 phase 1).
--
-- Replaces the regex-on-decoded-ciphertext scan in /v1/notifications and
-- the dashboard MentionsSection. Lets us drop the
-- `convert_from(decode(ciphertext, 'base64'), 'UTF8') ~* @name` query that
-- breaks the moment ciphertext stops being base64-of-UTF8 (i.e. the
-- moment per-topic encryption lands in v0.3.0 phase 2).
--
-- One row per (recipient_member, topic_message). Idempotent ON CONFLICT
-- on the unique pair; if the broker re-fans a message after a crash the
-- recipient sees one notification, not two.
--
-- Server-side mention extraction happens in POST /v1/messages and the
-- broker's WS message handler. Both extract @-tokens from the body
-- BEFORE encryption (the only point at which the server can read it),
-- match against the topic's member roster, and insert a row per match.
CREATE TABLE IF NOT EXISTS "mesh"."notification" (
"id" text PRIMARY KEY NOT NULL,
"mesh_id" text NOT NULL REFERENCES "mesh"."mesh"("id") ON DELETE CASCADE ON UPDATE CASCADE,
"topic_id" text NOT NULL REFERENCES "mesh"."topic"("id") ON DELETE CASCADE ON UPDATE CASCADE,
"message_id" text NOT NULL REFERENCES "mesh"."topic_message"("id") ON DELETE CASCADE ON UPDATE CASCADE,
"recipient_member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE,
"sender_member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE,
"kind" text NOT NULL DEFAULT 'mention',
"created_at" timestamp DEFAULT now() NOT NULL,
"read_at" timestamp
);
CREATE UNIQUE INDEX IF NOT EXISTS "notification_unique"
ON "mesh"."notification" ("message_id", "recipient_member_id");
CREATE INDEX IF NOT EXISTS "notification_by_recipient_unread"
ON "mesh"."notification" ("recipient_member_id", "created_at" DESC)
WHERE "read_at" IS NULL;
CREATE INDEX IF NOT EXISTS "notification_by_recipient"
ON "mesh"."notification" ("recipient_member_id", "created_at" DESC);
CREATE INDEX IF NOT EXISTS "notification_by_mesh"
ON "mesh"."notification" ("mesh_id", "created_at" DESC);
-- Backfill existing v0.2.0 messages so the new table has history. Safe
-- to run multiple times (ON CONFLICT DO NOTHING). The regex matches the
-- same shape as the in-app autocomplete + render: @-prefixed token with
-- a non-word boundary on both sides (or string edges).
--
-- We skip messages that fail to decode — defensive against any non-base64
-- ciphertext that may have slipped in via future writers.
INSERT INTO "mesh"."notification"
("id", "mesh_id", "topic_id", "message_id", "recipient_member_id",
"sender_member_id", "kind", "created_at")
SELECT
replace(gen_random_uuid()::text, '-', ''),
t."mesh_id",
m."topic_id",
m."id",
recipient."id",
m."sender_member_id",
'mention',
m."created_at"
FROM "mesh"."topic_message" m
INNER JOIN "mesh"."topic" t ON t."id" = m."topic_id"
INNER JOIN "mesh"."member" recipient
ON recipient."mesh_id" = t."mesh_id"
AND recipient."revoked_at" IS NULL
AND recipient."id" <> m."sender_member_id"
WHERE
-- Only scan messages that look like base64-of-UTF8. Defensive guard
-- against a future writer storing binary ciphertext — convert_from
-- would otherwise raise and abort the whole migration.
m."ciphertext" ~ '^[A-Za-z0-9+/=]+$'
AND length(m."ciphertext") > 0
AND length(m."ciphertext") % 4 = 0
AND convert_from(decode(m."ciphertext", 'base64'), 'UTF8') ~* (
'(^|\s|[^A-Za-z0-9_-])@'
|| regexp_replace(recipient."display_name", '([.*+?^${}()|\[\]\\])', '\\\1', 'g')
|| '($|[^A-Za-z0-9_-])'
)
ON CONFLICT ("message_id", "recipient_member_id") DO NOTHING;

View File

@@ -1484,6 +1484,89 @@ export const insertMeshTopicMessageSchema =
export type SelectMeshTopicMessage = typeof meshTopicMessage.$inferSelect;
export type InsertMeshTopicMessage = typeof meshTopicMessage.$inferInsert;
/**
* Per-recipient notifications. v0.3.0 phase 1: server-side mention
* extraction at write time replaces the regex-on-decoded-ciphertext
* scan in /v1/notifications. Fanned out at POST /v1/messages and the
* broker's WS topic_send handler — one row per (recipient, message).
*
* `kind` is open-ended ("mention" today; future kinds: "reply",
* "task_assigned", etc.) so we can extend without a migration.
*/
export const meshNotification = meshSchema.table(
"notification",
{
id: text().primaryKey().notNull().$defaultFn(generateId),
meshId: text()
.references(() => mesh.id, {
onDelete: "cascade",
onUpdate: "cascade",
})
.notNull(),
topicId: text()
.references(() => meshTopic.id, {
onDelete: "cascade",
onUpdate: "cascade",
})
.notNull(),
messageId: text()
.references(() => meshTopicMessage.id, {
onDelete: "cascade",
onUpdate: "cascade",
})
.notNull(),
recipientMemberId: text()
.references(() => meshMember.id, {
onDelete: "cascade",
onUpdate: "cascade",
})
.notNull(),
senderMemberId: text()
.references(() => meshMember.id, {
onDelete: "cascade",
onUpdate: "cascade",
})
.notNull(),
kind: text().notNull().default("mention"),
createdAt: timestamp().defaultNow().notNull(),
readAt: timestamp(),
},
(t) => [
uniqueIndex("notification_unique").on(t.messageId, t.recipientMemberId),
index("notification_by_recipient").on(t.recipientMemberId, t.createdAt),
index("notification_by_mesh").on(t.meshId, t.createdAt),
],
);
export const meshNotificationRelations = relations(
meshNotification,
({ one }) => ({
topic: one(meshTopic, {
fields: [meshNotification.topicId],
references: [meshTopic.id],
}),
message: one(meshTopicMessage, {
fields: [meshNotification.messageId],
references: [meshTopicMessage.id],
}),
recipient: one(meshMember, {
fields: [meshNotification.recipientMemberId],
references: [meshMember.id],
}),
sender: one(meshMember, {
fields: [meshNotification.senderMemberId],
references: [meshMember.id],
}),
}),
);
export const selectMeshNotificationSchema =
createSelectSchema(meshNotification);
export const insertMeshNotificationSchema =
createInsertSchema(meshNotification);
export type SelectMeshNotification = typeof meshNotification.$inferSelect;
export type InsertMeshNotification = typeof meshNotification.$inferInsert;
/* ────────────────────────────────────────────────────────────────────────
* API keys (v0.2.0) — REST + external WS auth.
*