feat(api+web): unread counts per topic + PATCH /read mark-as-read
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

PATCH /v1/topics/:name/read upserts topic_member.last_read_at for the
api key's issuing member. The chat panel calls it on mount and on
every inbound SSE message (5s debounce so we don't hammer it).

GET /v1/topics now returns unread per topic — counts messages newer
than last_read_at and not authored by the viewer. Mesh detail page
shows a clay-rounded badge next to each topic name with the count
(99+ ceiling).

AuthedApiKey gains issuedByMemberId so endpoints can attribute
side-effects to the minting member. Required because external api
keys aren't tied to a specific peer member; only dashboard- and
CLI-minted keys carry one.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 19:06:01 +01:00
parent 7e71a61db4
commit a80eb6fcca
4 changed files with 175 additions and 7 deletions

View File

@@ -4,10 +4,14 @@ import { notFound } from "next/navigation";
import { getMyMeshResponseSchema } from "@turbostarter/api/schema"; import { getMyMeshResponseSchema } from "@turbostarter/api/schema";
import { handle } from "@turbostarter/api/utils"; import { handle } from "@turbostarter/api/utils";
import { db } from "@turbostarter/db/server"; import { db } from "@turbostarter/db/server";
import { meshTopic } from "@turbostarter/db/schema/mesh"; import {
meshTopic,
meshTopicMember,
meshTopicMessage,
} from "@turbostarter/db/schema/mesh";
import { Badge } from "@turbostarter/ui-web/badge"; import { Badge } from "@turbostarter/ui-web/badge";
import { buttonVariants } from "@turbostarter/ui-web/button"; import { buttonVariants } from "@turbostarter/ui-web/button";
import { and, asc, eq, isNull } from "drizzle-orm"; import { and, asc, count, eq, isNull, ne, or, sql } from "drizzle-orm";
import { pathsConfig } from "~/config/paths"; import { pathsConfig } from "~/config/paths";
import { api } from "~/lib/api/server"; import { api } from "~/lib/api/server";
@@ -52,6 +56,43 @@ export default async function MeshPage({
.where(and(eq(meshTopic.meshId, id), isNull(meshTopic.archivedAt))) .where(and(eq(meshTopic.meshId, id), isNull(meshTopic.archivedAt)))
.orderBy(asc(meshTopic.name)); .orderBy(asc(meshTopic.name));
// Unread counts per topic for the viewing member. Skips messages the
// viewer themselves sent (no point flagging "you" as unread). Topics
// the viewer hasn't joined fall through to last_read_at = epoch via
// the LEFT JOIN, which counts every message — that's the correct
// first-visit behaviour for owner-created topics they haven't opened.
const myMemberId = members.find((m) => m.isMe)?.id;
const unreadByTopic = new Map<string, number>();
if (myMemberId && topics.length > 0) {
const counts = await db
.select({
topicId: meshTopicMessage.topicId,
unread: count(meshTopicMessage.id),
})
.from(meshTopicMessage)
.leftJoin(
meshTopicMember,
and(
eq(meshTopicMember.topicId, meshTopicMessage.topicId),
eq(meshTopicMember.memberId, myMemberId),
),
)
.where(
and(
sql`${meshTopicMessage.topicId} = ANY(${topics.map((t) => t.id)})`,
ne(meshTopicMessage.senderMemberId, myMemberId),
or(
isNull(meshTopicMember.lastReadAt),
sql`${meshTopicMessage.createdAt} > ${meshTopicMember.lastReadAt}`,
),
),
)
.groupBy(meshTopicMessage.topicId);
for (const row of counts) {
unreadByTopic.set(row.topicId, Number(row.unread));
}
}
return ( return (
<> <>
<DashboardHeader> <DashboardHeader>
@@ -166,7 +207,9 @@ export default async function MeshPage({
</div> </div>
) : ( ) : (
<div className="divide-y"> <div className="divide-y">
{topics.map((t) => ( {topics.map((t) => {
const unread = unreadByTopic.get(t.id) ?? 0;
return (
<Link <Link
key={t.id} key={t.id}
href={pathsConfig.dashboard.user.meshes.topic(mesh.id, t.name)} href={pathsConfig.dashboard.user.meshes.topic(mesh.id, t.name)}
@@ -180,6 +223,15 @@ export default async function MeshPage({
<Badge variant="outline" className="text-xs"> <Badge variant="outline" className="text-xs">
{t.visibility} {t.visibility}
</Badge> </Badge>
{unread > 0 ? (
<span
className="inline-flex h-5 min-w-5 items-center justify-center rounded-full bg-[var(--cm-clay)] px-1.5 text-[10px] font-medium text-white"
style={{ fontFamily: "var(--cm-font-mono)" }}
aria-label={`${unread} unread`}
>
{unread > 99 ? "99+" : unread}
</span>
) : null}
</div> </div>
<div className="flex items-center gap-3"> <div className="flex items-center gap-3">
{t.description ? ( {t.description ? (
@@ -195,7 +247,8 @@ export default async function MeshPage({
</span> </span>
</div> </div>
</Link> </Link>
))} );
})}
</div> </div>
)} )}
</section> </section>

View File

@@ -131,6 +131,7 @@ export function TopicChatPanel({
const [lastEventAt, setLastEventAt] = useState<number | null>(null); const [lastEventAt, setLastEventAt] = useState<number | null>(null);
const scrollRef = useRef<HTMLDivElement>(null); const scrollRef = useRef<HTMLDivElement>(null);
const seenIdsRef = useRef<Set<string>>(new Set()); const seenIdsRef = useRef<Set<string>>(new Set());
const lastMarkReadAtRef = useRef<number>(0);
const headers = useMemo( const headers = useMemo(
() => ({ () => ({
@@ -140,6 +141,22 @@ export function TopicChatPanel({
[apiKeySecret], [apiKeySecret],
); );
// Mark the topic read up to now, but at most once per 5 seconds —
// we'd otherwise hit /read on every inbound SSE message which is
// wasteful (the wall-clock watermark advances either way).
const markRead = useCallback(async () => {
if (Date.now() - lastMarkReadAtRef.current < 5000) return;
lastMarkReadAtRef.current = Date.now();
try {
await fetch(`/api/v1/topics/${encodeURIComponent(topicName)}/read`, {
method: "PATCH",
headers,
});
} catch {
// Soft-fail — unread counts are advisory.
}
}, [headers, topicName]);
// One-shot history backfill on mount; the SSE stream is forward-only, // One-shot history backfill on mount; the SSE stream is forward-only,
// so any messages older than connect-time come from this fetch. // so any messages older than connect-time come from this fetch.
const loadHistory = useCallback(async () => { const loadHistory = useCallback(async () => {
@@ -164,7 +181,8 @@ export function TopicChatPanel({
useEffect(() => { useEffect(() => {
void loadHistory(); void loadHistory();
}, [loadHistory]); void markRead();
}, [loadHistory, markRead]);
// SSE subscription with auto-reconnect. AbortController unwinds the // SSE subscription with auto-reconnect. AbortController unwinds the
// stream when the component unmounts or the topic/key changes. // stream when the component unmounts or the topic/key changes.
@@ -212,6 +230,7 @@ export function TopicChatPanel({
if (seenIdsRef.current.has(m.id)) continue; if (seenIdsRef.current.has(m.id)) continue;
seenIdsRef.current.add(m.id); seenIdsRef.current.add(m.id);
setMessages((cur) => [...cur, m]); setMessages((cur) => [...cur, m]);
void markRead();
} catch { } catch {
// Drop malformed events silently — heartbeat-as-message // Drop malformed events silently — heartbeat-as-message
// happens once per misconfigured proxy. // happens once per misconfigured proxy.
@@ -236,7 +255,7 @@ export function TopicChatPanel({
setStreamState("stopped"); setStreamState("stopped");
ctl.abort(); ctl.abort();
}; };
}, [apiKeySecret, topicName]); }, [apiKeySecret, topicName, markRead]);
useEffect(() => { useEffect(() => {
scrollRef.current?.scrollTo({ scrollRef.current?.scrollTo({

View File

@@ -22,6 +22,13 @@ export type ApiKeyCapability = "send" | "read" | "state_write" | "admin";
export interface AuthedApiKey { export interface AuthedApiKey {
id: string; id: string;
meshId: string; meshId: string;
/**
* The mesh member that minted this key. Dashboard keys carry the
* owner's member id; CLI-minted keys carry the issuing peer. Endpoints
* that attribute a side-effect to a member (e.g. PATCH /read,
* presence ping) read this field instead of the api key id.
*/
issuedByMemberId: string | null;
capabilities: ApiKeyCapability[]; capabilities: ApiKeyCapability[];
topicScopes: string[] | null; topicScopes: string[] | null;
} }
@@ -37,6 +44,7 @@ async function verifyBearer(secret: string): Promise<AuthedApiKey | null> {
secretHash: meshApiKey.secretHash, secretHash: meshApiKey.secretHash,
capabilities: meshApiKey.capabilities, capabilities: meshApiKey.capabilities,
topicScopes: meshApiKey.topicScopes, topicScopes: meshApiKey.topicScopes,
issuedByMemberId: meshApiKey.issuedByMemberId,
revokedAt: meshApiKey.revokedAt, revokedAt: meshApiKey.revokedAt,
expiresAt: meshApiKey.expiresAt, expiresAt: meshApiKey.expiresAt,
}) })
@@ -58,6 +66,7 @@ async function verifyBearer(secret: string): Promise<AuthedApiKey | null> {
return { return {
id: c.id, id: c.id,
meshId: c.meshId, meshId: c.meshId,
issuedByMemberId: c.issuedByMemberId ?? null,
capabilities: (c.capabilities ?? []) as ApiKeyCapability[], capabilities: (c.capabilities ?? []) as ApiKeyCapability[],
topicScopes: c.topicScopes ?? null, topicScopes: c.topicScopes ?? null,
}; };

View File

@@ -36,7 +36,7 @@ import {
messageQueue, messageQueue,
presence, presence,
} from "@turbostarter/db/schema/mesh"; } from "@turbostarter/db/schema/mesh";
import { and, asc, desc, eq, gt, isNull, lt } from "drizzle-orm"; import { and, asc, count, desc, eq, gt, isNull, lt, sql } from "drizzle-orm";
import { validate } from "../../middleware"; import { validate } from "../../middleware";
import { import {
@@ -140,6 +140,11 @@ export const v1Router = new Hono<Env>()
}) })
// GET /v1/topics — list topics in the key's mesh // GET /v1/topics — list topics in the key's mesh
// Includes per-topic unread counts when the key has an issuing member
// (i.e. dashboard keys; CLI-minted keys also carry it). Counts are
// computed against topic_member.last_read_at; if no membership row
// exists for this member, the topic counts as 0 unread (member is
// not subscribed — surfacing the topic without nagging them).
.get("/topics", async (c) => { .get("/topics", async (c) => {
const key = c.var.apiKey; const key = c.var.apiKey;
requireCapability(key, "read"); requireCapability(key, "read");
@@ -157,6 +162,38 @@ export const v1Router = new Hono<Env>()
const filtered = key.topicScopes const filtered = key.topicScopes
? rows.filter((r) => key.topicScopes!.includes(r.name)) ? rows.filter((r) => key.topicScopes!.includes(r.name))
: rows; : rows;
// Build an unread-count map keyed by topic id. Only meaningful when
// we know whose last_read_at to compare against.
const unreadByTopic = new Map<string, number>();
if (key.issuedByMemberId && filtered.length > 0) {
const topicIds = filtered.map((t) => t.id);
const counts = await db
.select({
topicId: meshTopicMessage.topicId,
unread: count(meshTopicMessage.id),
})
.from(meshTopicMessage)
.leftJoin(
meshTopicMember,
and(
eq(meshTopicMember.topicId, meshTopicMessage.topicId),
eq(meshTopicMember.memberId, key.issuedByMemberId),
),
)
.where(
and(
sql`${meshTopicMessage.topicId} = ANY(${topicIds})`,
sql`${meshTopicMessage.createdAt} > COALESCE(${meshTopicMember.lastReadAt}, '1970-01-01'::timestamp)`,
sql`${meshTopicMessage.senderMemberId} <> ${key.issuedByMemberId}`,
),
)
.groupBy(meshTopicMessage.topicId);
for (const row of counts) {
unreadByTopic.set(row.topicId, Number(row.unread));
}
}
return c.json({ return c.json({
topics: filtered.map((t) => ({ topics: filtered.map((t) => ({
id: t.id, id: t.id,
@@ -164,10 +201,60 @@ export const v1Router = new Hono<Env>()
description: t.description, description: t.description,
visibility: t.visibility, visibility: t.visibility,
createdAt: t.createdAt.toISOString(), createdAt: t.createdAt.toISOString(),
unread: unreadByTopic.get(t.id) ?? 0,
})), })),
}); });
}) })
// PATCH /v1/topics/:name/read — mark a topic read up to now for the
// member that issued this api key. Upserts topic_member if no row
// exists yet (e.g. dashboard owner who joined the mesh before #general
// existed and hadn't been auto-subscribed). No-op if the api key has
// no issuing member (legacy keys without issuedByMemberId).
.patch("/topics/:name/read", async (c) => {
const key = c.var.apiKey;
requireCapability(key, "read");
const name = c.req.param("name");
requireTopicScope(key, name);
if (!key.issuedByMemberId) {
return c.json({ error: "api_key_has_no_issuer" }, 400);
}
const [topic] = await db
.select({ id: meshTopic.id })
.from(meshTopic)
.where(
and(
eq(meshTopic.meshId, key.meshId),
eq(meshTopic.name, name),
isNull(meshTopic.archivedAt),
),
);
if (!topic) {
return c.json({ error: "topic_not_found", topic: name }, 404);
}
const now = new Date();
await db
.insert(meshTopicMember)
.values({
topicId: topic.id,
memberId: key.issuedByMemberId,
lastReadAt: now,
})
.onConflictDoUpdate({
target: [meshTopicMember.topicId, meshTopicMember.memberId],
set: { lastReadAt: now },
});
return c.json({
topic: name,
topicId: topic.id,
readAt: now.toISOString(),
});
})
// GET /v1/topics/:name/messages?limit=50&before=<id> // GET /v1/topics/:name/messages?limit=50&before=<id>
.get( .get(
"/topics/:name/messages", "/topics/:name/messages",