feat(api+web): stream topic chat live over server-sent events
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

GET /v1/topics/:name/stream opens an SSE firehose, polled server-side
every 2s and streamed as `message` events. Forward-only — clients
hit /messages once for backfill, then live from connect-time onward.
Heartbeats every 30s keep the connection through proxies.

Web chat panel reads the stream via fetch + ReadableStream so the
bearer token stays in the Authorization header (EventSource can't
set custom headers, which would force token-in-URL leaks). Auto-
reconnect with exponential backoff. setInterval polling removed.

Vercel maxDuration bumped to 300s on the catch-all API route so
streams aren't cut at the 10s default.

drizzle migrations/meta/ deleted — superseded by the filename-
tracked custom runner in apps/broker/src/migrate.ts (c2cd67a).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 19:02:38 +01:00
parent d7cef45640
commit 7e71a61db4
16 changed files with 288 additions and 36155 deletions

View File

@@ -2,6 +2,16 @@ import { handle } from "hono/vercel";
import { appRouter } from "@turbostarter/api";
// Streamable endpoints (e.g. /v1/topics/:name/stream SSE) need to keep
// the connection open for minutes, not the 10s default. 300s is the
// Vercel Pro ceiling; on Hobby the platform clamps to 60s and the
// client auto-reconnects via the SSE retry loop.
export const maxDuration = 300;
// Force dynamic rendering — streaming responses can't be statically
// optimized and we don't want Next caching SSE traffic.
export const dynamic = "force-dynamic";
const handler = handle(appRouter);
export {
handler as GET,

View File

@@ -4,8 +4,6 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { Button } from "@turbostarter/ui-web/button";
const POLL_INTERVAL_MS = 5000;
interface TopicMessage {
id: string;
senderPubkey: string;
@@ -69,16 +67,54 @@ function fmtTime(iso: string): string {
}
}
function fmtRelative(iso: string): string {
const ms = Date.now() - new Date(iso).getTime();
if (ms < 60_000) return `${Math.floor(ms / 1000)}s ago`;
if (ms < 3_600_000) return `${Math.floor(ms / 60_000)}m ago`;
if (ms < 86_400_000) return `${Math.floor(ms / 3_600_000)}h ago`;
return new Date(iso).toLocaleDateString();
}
const monoStyle = { fontFamily: "var(--cm-font-mono)" } as const;
type SseEvent = {
event: string;
id?: string;
data: string;
};
/**
* Minimal text/event-stream parser. Reads from a `fetch` body so we can
* keep the bearer token in the Authorization header — the native
* EventSource API doesn't allow custom headers, which would force us to
* pass the secret via query string and leak it into proxy/referer logs.
*
* Yields each `event:`/`id:`/`data:` block. Anything that doesn't fit
* the format (comments, blank lines, unknown fields) is skipped.
*/
async function* readSseStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
): AsyncGenerator<SseEvent> {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buffer.indexOf("\n\n")) !== -1) {
const block = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
const ev: SseEvent = { event: "message", data: "" };
const dataLines: string[] = [];
for (const line of block.split("\n")) {
if (!line || line.startsWith(":")) continue;
const colon = line.indexOf(":");
if (colon < 0) continue;
const field = line.slice(0, colon);
const val = line.slice(colon + 1).replace(/^ /, "");
if (field === "event") ev.event = val;
else if (field === "id") ev.id = val;
else if (field === "data") dataLines.push(val);
}
ev.data = dataLines.join("\n");
yield ev;
}
}
}
export function TopicChatPanel({
topicName,
meshSlug,
@@ -89,9 +125,12 @@ export function TopicChatPanel({
const [draft, setDraft] = useState("");
const [error, setError] = useState<string | null>(null);
const [sending, setSending] = useState(false);
const [isFetching, setIsFetching] = useState(false);
const [lastPollAt, setLastPollAt] = useState<number | null>(null);
const [streamState, setStreamState] = useState<
"connecting" | "live" | "reconnecting" | "stopped"
>("connecting");
const [lastEventAt, setLastEventAt] = useState<number | null>(null);
const scrollRef = useRef<HTMLDivElement>(null);
const seenIdsRef = useRef<Set<string>>(new Set());
const headers = useMemo(
() => ({
@@ -101,8 +140,9 @@ export function TopicChatPanel({
[apiKeySecret],
);
const refresh = useCallback(async () => {
setIsFetching(true);
// One-shot history backfill on mount; the SSE stream is forward-only,
// so any messages older than connect-time come from this fetch.
const loadHistory = useCallback(async () => {
try {
const res = await fetch(
`/api/v1/topics/${encodeURIComponent(topicName)}/messages?limit=100`,
@@ -113,21 +153,90 @@ export function TopicChatPanel({
return;
}
const json = (await res.json()) as { messages: TopicMessage[] };
setMessages(json.messages.slice().reverse());
const ordered = json.messages.slice().reverse();
for (const m of ordered) seenIdsRef.current.add(m.id);
setMessages(ordered);
setError(null);
setLastPollAt(Date.now());
} catch (e) {
setError((e as Error).message);
} finally {
setIsFetching(false);
}
}, [headers, topicName]);
useEffect(() => {
void refresh();
const t = setInterval(refresh, POLL_INTERVAL_MS);
return () => clearInterval(t);
}, [refresh]);
void loadHistory();
}, [loadHistory]);
// SSE subscription with auto-reconnect. AbortController unwinds the
// stream when the component unmounts or the topic/key changes.
useEffect(() => {
const ctl = new AbortController();
let cancelled = false;
let backoffMs = 1000;
const run = async () => {
while (!cancelled) {
try {
setStreamState((prev) =>
prev === "live" ? "reconnecting" : "connecting",
);
const res = await fetch(
`/api/v1/topics/${encodeURIComponent(topicName)}/stream`,
{
headers: { Authorization: `Bearer ${apiKeySecret}` },
signal: ctl.signal,
cache: "no-store",
},
);
if (!res.ok || !res.body) {
throw new Error(`stream open failed: ${res.status}`);
}
backoffMs = 1000;
setStreamState("live");
const reader = res.body.getReader();
for await (const ev of readSseStream(reader)) {
setLastEventAt(Date.now());
if (ev.event === "ready") continue;
if (ev.event === "heartbeat") continue;
if (ev.event === "error") {
try {
const parsed = JSON.parse(ev.data) as { error?: string };
setError(parsed.error ?? "stream error");
} catch {
setError("stream error");
}
continue;
}
if (ev.event === "message") {
try {
const m = JSON.parse(ev.data) as TopicMessage;
if (seenIdsRef.current.has(m.id)) continue;
seenIdsRef.current.add(m.id);
setMessages((cur) => [...cur, m]);
} catch {
// Drop malformed events silently — heartbeat-as-message
// happens once per misconfigured proxy.
}
}
}
// Reader exhausted (server closed) — loop will reconnect.
} catch (e) {
if (cancelled || ctl.signal.aborted) return;
setError(`stream: ${(e as Error).message}`);
}
if (cancelled) return;
setStreamState("reconnecting");
await new Promise((r) => setTimeout(r, backoffMs));
backoffMs = Math.min(backoffMs * 2, 15_000);
}
};
void run();
return () => {
cancelled = true;
setStreamState("stopped");
ctl.abort();
};
}, [apiKeySecret, topicName]);
useEffect(() => {
scrollRef.current?.scrollTo({
@@ -154,7 +263,7 @@ export function TopicChatPanel({
return;
}
setDraft("");
void refresh();
// SSE stream will deliver the message back; no manual refresh.
} catch (e) {
setError((e as Error).message);
} finally {
@@ -162,10 +271,26 @@ export function TopicChatPanel({
}
};
const secondsSincePoll = lastPollAt
? Math.max(0, Math.floor((Date.now() - lastPollAt) / 1000))
const secondsSinceEvent = lastEventAt
? Math.max(0, Math.floor((Date.now() - lastEventAt) / 1000))
: null;
const dotClass =
streamState === "live"
? "bg-emerald-500"
: streamState === "stopped"
? "bg-[var(--cm-fg-tertiary)]"
: "bg-[var(--cm-clay)] animate-pulse";
const stateLabel =
streamState === "live"
? `live · ${secondsSinceEvent ?? 0}s`
: streamState === "connecting"
? "connecting…"
: streamState === "reconnecting"
? "reconnecting…"
: "stopped";
return (
<div className="flex h-[70vh] flex-col overflow-hidden rounded-[var(--cm-radius-lg)] border border-[var(--cm-border)] bg-[var(--cm-bg)]">
{/* Header — mono strip, clay-pulse dot, metadata right */}
@@ -174,23 +299,13 @@ export function TopicChatPanel({
style={monoStyle}
>
<div className="flex items-center gap-3">
<span
className={
"inline-block h-2 w-2 rounded-full " +
(isFetching
? "bg-[var(--cm-clay)] animate-pulse"
: "bg-emerald-500")
}
/>
<span className={"inline-block h-2 w-2 rounded-full " + dotClass} />
<span className="text-[11px] text-[var(--cm-fg-secondary)]">
#{topicName}
</span>
</div>
<span className="text-[10px] text-[var(--cm-fg-tertiary)]">
{messages.length} msg ·{" "}
{isFetching
? "polling…"
: `${secondsSincePoll ?? "—"}s ago`}
{messages.length} msg · {stateLabel}
</span>
</div>
@@ -275,7 +390,7 @@ export function TopicChatPanel({
<span className="inline-block h-1.5 w-1.5 rounded-full bg-[var(--cm-clay)]" />
mesh · {meshSlug}
</span>
<span>polling every {POLL_INTERVAL_MS / 1000}s</span>
<span>SSE · 2s push</span>
<span>key valid until {fmtTime(apiKeyExpiresAt)}</span>
<span className="ml-auto">
v0.2.0 · plaintext base64 · per-topic crypto in v0.3.0

View File

@@ -9,17 +9,21 @@
* POST /v1/messages — send to a topic
* GET /v1/topics — list topics in the key's mesh
* GET /v1/topics/:name/messages — fetch topic history (paginated)
* GET /v1/topics/:name/stream — SSE: live message firehose for a topic
* PATCH /v1/topics/:name/read — mark a topic read up to now
* GET /v1/peers — list peers in the mesh
*
* Live delivery: writes to mesh.message_queue + mesh.topic_message. The
* broker's existing pendingTimer drains the queue and pushes to live
* peers. Latency = polling interval (~2s today). Real-time push from
* REST writes is a follow-up.
* peers. The /stream endpoint server-side polls topic_message every
* 2s and pushes new rows as SSE events — clients see new messages
* within 2s without burning a poll-per-tab.
*
* Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md
*/
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { z } from "zod";
import { db } from "@turbostarter/db/server";
@@ -32,7 +36,7 @@ import {
messageQueue,
presence,
} from "@turbostarter/db/schema/mesh";
import { and, asc, desc, eq, isNull, lt } from "drizzle-orm";
import { and, asc, desc, eq, gt, isNull, lt } from "drizzle-orm";
import { validate } from "../../middleware";
import {
@@ -239,6 +243,120 @@ export const v1Router = new Hono<Env>()
},
)
// GET /v1/topics/:name/stream — live SSE firehose for a topic.
//
// Server-side polls mesh.topic_message every STREAM_POLL_MS for rows
// newer than the last seen createdAt and pushes each as an SSE
// `message` event. First connection sample establishes the watermark
// (no historical replay — clients fetch /messages for that). The
// stream ends when the client disconnects or the topic is archived.
//
// Heartbeats every 30s as SSE comments (`:keep-alive`) keep the
// connection through proxies that drop idle TCP. Postgres LISTEN/
// NOTIFY is the obvious upgrade path when message volume grows; the
// poll loop here is fine for v0.2.0's low write rate.
.get("/topics/:name/stream", async (c) => {
const key = c.var.apiKey;
requireCapability(key, "read");
const name = c.req.param("name");
requireTopicScope(key, name);
const [topic] = await db
.select({ id: meshTopic.id })
.from(meshTopic)
.where(
and(
eq(meshTopic.meshId, key.meshId),
eq(meshTopic.name, name),
isNull(meshTopic.archivedAt),
),
);
if (!topic) {
return c.json({ error: "topic_not_found", topic: name }, 404);
}
const STREAM_POLL_MS = 2000;
const HEARTBEAT_MS = 30_000;
return streamSSE(c, async (stream) => {
// Watermark: skip messages older than connect time so we don't
// replay history. Clients backfill via GET /messages.
let cursor = new Date();
let lastHeartbeat = Date.now();
let aborted = false;
stream.onAbort(() => {
aborted = true;
});
// Initial hello so clients know the stream is alive.
await stream.writeSSE({
event: "ready",
data: JSON.stringify({
topic: name,
topicId: topic.id,
connectedAt: cursor.toISOString(),
}),
});
while (!aborted) {
try {
const rows = await db
.select({
id: meshTopicMessage.id,
senderPubkey: meshMember.peerPubkey,
senderName: meshMember.displayName,
nonce: meshTopicMessage.nonce,
ciphertext: meshTopicMessage.ciphertext,
createdAt: meshTopicMessage.createdAt,
})
.from(meshTopicMessage)
.innerJoin(
meshMember,
eq(meshTopicMessage.senderMemberId, meshMember.id),
)
.where(
and(
eq(meshTopicMessage.topicId, topic.id),
gt(meshTopicMessage.createdAt, cursor),
),
)
.orderBy(asc(meshTopicMessage.createdAt))
.limit(100);
for (const r of rows) {
await stream.writeSSE({
event: "message",
id: r.id,
data: JSON.stringify({
id: r.id,
senderPubkey: r.senderPubkey,
senderName: r.senderName,
nonce: r.nonce,
ciphertext: r.ciphertext,
createdAt: r.createdAt.toISOString(),
}),
});
if (r.createdAt > cursor) cursor = r.createdAt;
}
if (Date.now() - lastHeartbeat > HEARTBEAT_MS) {
await stream.writeSSE({ event: "heartbeat", data: String(Date.now()) });
lastHeartbeat = Date.now();
}
} catch (e) {
await stream.writeSSE({
event: "error",
data: JSON.stringify({
error: e instanceof Error ? e.message : String(e),
}),
});
}
await stream.sleep(STREAM_POLL_MS);
}
});
})
// GET /v1/peers — connected peers in the key's mesh
// Dedupe by memberId — a member can have multiple active presence
// rows (one per session). Status reflects the most recent presence;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,90 +0,0 @@
{
"version": "7",
"dialect": "postgresql",
"entries": [
{
"idx": 0,
"version": "7",
"when": 1775336269295,
"tag": "0000_living_namora",
"breakpoints": true
},
{
"idx": 1,
"version": "7",
"when": 1775339743477,
"tag": "0001_demonic_karnak",
"breakpoints": true
},
{
"idx": 2,
"version": "7",
"when": 1775340519054,
"tag": "0002_vengeful_enchantress",
"breakpoints": true
},
{
"idx": 3,
"version": "7",
"when": 1775463897329,
"tag": "0003_add-presence-summary",
"breakpoints": true
},
{
"idx": 4,
"version": "7",
"when": 1775468683383,
"tag": "0004_add-presence-display-name",
"breakpoints": true
},
{
"idx": 5,
"version": "7",
"when": 1775470435032,
"tag": "0005_add-presence-session-pubkey",
"breakpoints": true
},
{
"idx": 6,
"version": "7",
"when": 1775470979207,
"tag": "0006_add-sender-session-pubkey",
"breakpoints": true
},
{
"idx": 7,
"version": "7",
"when": 1775476994511,
"tag": "0007_add-presence-groups",
"breakpoints": true
},
{
"idx": 8,
"version": "7",
"when": 1775477883426,
"tag": "0008_add-state-and-memory",
"breakpoints": true
},
{
"idx": 9,
"version": "7",
"when": 1775480008546,
"tag": "0009_add-file-tables",
"breakpoints": true
},
{
"idx": 10,
"version": "7",
"when": 1775480729014,
"tag": "0010_add-context-and-tasks",
"breakpoints": true
},
{
"idx": 11,
"version": "7",
"when": 1775481222701,
"tag": "0011_add-streams",
"breakpoints": true
}
]
}