diff --git a/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx index dba2f9e..762069f 100644 --- a/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx +++ b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx @@ -4,10 +4,14 @@ import { notFound } from "next/navigation"; import { getMyMeshResponseSchema } from "@turbostarter/api/schema"; import { handle } from "@turbostarter/api/utils"; import { db } from "@turbostarter/db/server"; -import { meshTopic } from "@turbostarter/db/schema/mesh"; +import { + meshTopic, + meshTopicMember, + meshTopicMessage, +} from "@turbostarter/db/schema/mesh"; import { Badge } from "@turbostarter/ui-web/badge"; import { buttonVariants } from "@turbostarter/ui-web/button"; -import { and, asc, eq, isNull } from "drizzle-orm"; +import { and, asc, count, eq, isNull, ne, or, sql } from "drizzle-orm"; import { pathsConfig } from "~/config/paths"; import { api } from "~/lib/api/server"; @@ -52,6 +56,43 @@ export default async function MeshPage({ .where(and(eq(meshTopic.meshId, id), isNull(meshTopic.archivedAt))) .orderBy(asc(meshTopic.name)); + // Unread counts per topic for the viewing member. Skips messages the + // viewer themselves sent (no point flagging "you" as unread). Topics + // the viewer hasn't joined fall through to last_read_at = epoch via + // the LEFT JOIN, which counts every message — that's the correct + // first-visit behaviour for owner-created topics they haven't opened. + const myMemberId = members.find((m) => m.isMe)?.id; + const unreadByTopic = new Map(); + if (myMemberId && topics.length > 0) { + const counts = await db + .select({ + topicId: meshTopicMessage.topicId, + unread: count(meshTopicMessage.id), + }) + .from(meshTopicMessage) + .leftJoin( + meshTopicMember, + and( + eq(meshTopicMember.topicId, meshTopicMessage.topicId), + eq(meshTopicMember.memberId, myMemberId), + ), + ) + .where( + and( + sql`${meshTopicMessage.topicId} = ANY(${topics.map((t) => t.id)})`, + ne(meshTopicMessage.senderMemberId, myMemberId), + or( + isNull(meshTopicMember.lastReadAt), + sql`${meshTopicMessage.createdAt} > ${meshTopicMember.lastReadAt}`, + ), + ), + ) + .groupBy(meshTopicMessage.topicId); + for (const row of counts) { + unreadByTopic.set(row.topicId, Number(row.unread)); + } + } + return ( <> @@ -166,7 +207,9 @@ export default async function MeshPage({ ) : (
- {topics.map((t) => ( + {topics.map((t) => { + const unread = unreadByTopic.get(t.id) ?? 0; + return ( {t.visibility} + {unread > 0 ? ( + + {unread > 99 ? "99+" : unread} + + ) : null}
{t.description ? ( @@ -195,7 +247,8 @@ export default async function MeshPage({
- ))} + ); + })} )} diff --git a/apps/web/src/modules/mesh/topic-chat-panel.tsx b/apps/web/src/modules/mesh/topic-chat-panel.tsx index 530ce0e..52e66a0 100644 --- a/apps/web/src/modules/mesh/topic-chat-panel.tsx +++ b/apps/web/src/modules/mesh/topic-chat-panel.tsx @@ -131,6 +131,7 @@ export function TopicChatPanel({ const [lastEventAt, setLastEventAt] = useState(null); const scrollRef = useRef(null); const seenIdsRef = useRef>(new Set()); + const lastMarkReadAtRef = useRef(0); const headers = useMemo( () => ({ @@ -140,6 +141,22 @@ export function TopicChatPanel({ [apiKeySecret], ); + // Mark the topic read up to now, but at most once per 5 seconds — + // we'd otherwise hit /read on every inbound SSE message which is + // wasteful (the wall-clock watermark advances either way). + const markRead = useCallback(async () => { + if (Date.now() - lastMarkReadAtRef.current < 5000) return; + lastMarkReadAtRef.current = Date.now(); + try { + await fetch(`/api/v1/topics/${encodeURIComponent(topicName)}/read`, { + method: "PATCH", + headers, + }); + } catch { + // Soft-fail — unread counts are advisory. + } + }, [headers, topicName]); + // One-shot history backfill on mount; the SSE stream is forward-only, // so any messages older than connect-time come from this fetch. const loadHistory = useCallback(async () => { @@ -164,7 +181,8 @@ export function TopicChatPanel({ useEffect(() => { void loadHistory(); - }, [loadHistory]); + void markRead(); + }, [loadHistory, markRead]); // SSE subscription with auto-reconnect. AbortController unwinds the // stream when the component unmounts or the topic/key changes. @@ -212,6 +230,7 @@ export function TopicChatPanel({ if (seenIdsRef.current.has(m.id)) continue; seenIdsRef.current.add(m.id); setMessages((cur) => [...cur, m]); + void markRead(); } catch { // Drop malformed events silently — heartbeat-as-message // happens once per misconfigured proxy. @@ -236,7 +255,7 @@ export function TopicChatPanel({ setStreamState("stopped"); ctl.abort(); }; - }, [apiKeySecret, topicName]); + }, [apiKeySecret, topicName, markRead]); useEffect(() => { scrollRef.current?.scrollTo({ diff --git a/packages/api/src/modules/mesh/api-key-auth.ts b/packages/api/src/modules/mesh/api-key-auth.ts index 44b19c2..e2e2531 100644 --- a/packages/api/src/modules/mesh/api-key-auth.ts +++ b/packages/api/src/modules/mesh/api-key-auth.ts @@ -22,6 +22,13 @@ export type ApiKeyCapability = "send" | "read" | "state_write" | "admin"; export interface AuthedApiKey { id: string; meshId: string; + /** + * The mesh member that minted this key. Dashboard keys carry the + * owner's member id; CLI-minted keys carry the issuing peer. Endpoints + * that attribute a side-effect to a member (e.g. PATCH /read, + * presence ping) read this field instead of the api key id. + */ + issuedByMemberId: string | null; capabilities: ApiKeyCapability[]; topicScopes: string[] | null; } @@ -37,6 +44,7 @@ async function verifyBearer(secret: string): Promise { secretHash: meshApiKey.secretHash, capabilities: meshApiKey.capabilities, topicScopes: meshApiKey.topicScopes, + issuedByMemberId: meshApiKey.issuedByMemberId, revokedAt: meshApiKey.revokedAt, expiresAt: meshApiKey.expiresAt, }) @@ -58,6 +66,7 @@ async function verifyBearer(secret: string): Promise { return { id: c.id, meshId: c.meshId, + issuedByMemberId: c.issuedByMemberId ?? null, capabilities: (c.capabilities ?? []) as ApiKeyCapability[], topicScopes: c.topicScopes ?? null, }; diff --git a/packages/api/src/modules/mesh/v1-router.ts b/packages/api/src/modules/mesh/v1-router.ts index d0de74e..e89db41 100644 --- a/packages/api/src/modules/mesh/v1-router.ts +++ b/packages/api/src/modules/mesh/v1-router.ts @@ -36,7 +36,7 @@ import { messageQueue, presence, } from "@turbostarter/db/schema/mesh"; -import { and, asc, desc, eq, gt, isNull, lt } from "drizzle-orm"; +import { and, asc, count, desc, eq, gt, isNull, lt, sql } from "drizzle-orm"; import { validate } from "../../middleware"; import { @@ -140,6 +140,11 @@ export const v1Router = new Hono() }) // GET /v1/topics — list topics in the key's mesh + // Includes per-topic unread counts when the key has an issuing member + // (i.e. dashboard keys; CLI-minted keys also carry it). Counts are + // computed against topic_member.last_read_at; if no membership row + // exists for this member, the topic counts as 0 unread (member is + // not subscribed — surfacing the topic without nagging them). .get("/topics", async (c) => { const key = c.var.apiKey; requireCapability(key, "read"); @@ -157,6 +162,38 @@ export const v1Router = new Hono() const filtered = key.topicScopes ? rows.filter((r) => key.topicScopes!.includes(r.name)) : rows; + + // Build an unread-count map keyed by topic id. Only meaningful when + // we know whose last_read_at to compare against. + const unreadByTopic = new Map(); + if (key.issuedByMemberId && filtered.length > 0) { + const topicIds = filtered.map((t) => t.id); + const counts = await db + .select({ + topicId: meshTopicMessage.topicId, + unread: count(meshTopicMessage.id), + }) + .from(meshTopicMessage) + .leftJoin( + meshTopicMember, + and( + eq(meshTopicMember.topicId, meshTopicMessage.topicId), + eq(meshTopicMember.memberId, key.issuedByMemberId), + ), + ) + .where( + and( + sql`${meshTopicMessage.topicId} = ANY(${topicIds})`, + sql`${meshTopicMessage.createdAt} > COALESCE(${meshTopicMember.lastReadAt}, '1970-01-01'::timestamp)`, + sql`${meshTopicMessage.senderMemberId} <> ${key.issuedByMemberId}`, + ), + ) + .groupBy(meshTopicMessage.topicId); + for (const row of counts) { + unreadByTopic.set(row.topicId, Number(row.unread)); + } + } + return c.json({ topics: filtered.map((t) => ({ id: t.id, @@ -164,10 +201,60 @@ export const v1Router = new Hono() description: t.description, visibility: t.visibility, createdAt: t.createdAt.toISOString(), + unread: unreadByTopic.get(t.id) ?? 0, })), }); }) + // PATCH /v1/topics/:name/read — mark a topic read up to now for the + // member that issued this api key. Upserts topic_member if no row + // exists yet (e.g. dashboard owner who joined the mesh before #general + // existed and hadn't been auto-subscribed). No-op if the api key has + // no issuing member (legacy keys without issuedByMemberId). + .patch("/topics/:name/read", async (c) => { + const key = c.var.apiKey; + requireCapability(key, "read"); + const name = c.req.param("name"); + requireTopicScope(key, name); + + if (!key.issuedByMemberId) { + return c.json({ error: "api_key_has_no_issuer" }, 400); + } + + const [topic] = await db + .select({ id: meshTopic.id }) + .from(meshTopic) + .where( + and( + eq(meshTopic.meshId, key.meshId), + eq(meshTopic.name, name), + isNull(meshTopic.archivedAt), + ), + ); + if (!topic) { + return c.json({ error: "topic_not_found", topic: name }, 404); + } + + const now = new Date(); + await db + .insert(meshTopicMember) + .values({ + topicId: topic.id, + memberId: key.issuedByMemberId, + lastReadAt: now, + }) + .onConflictDoUpdate({ + target: [meshTopicMember.topicId, meshTopicMember.memberId], + set: { lastReadAt: now }, + }); + + return c.json({ + topic: name, + topicId: topic.id, + readAt: now.toISOString(), + }); + }) + // GET /v1/topics/:name/messages?limit=50&before= .get( "/topics/:name/messages",