diff --git a/apps/broker/README.md b/apps/broker/README.md index 44e8681..c68a124 100644 --- a/apps/broker/README.md +++ b/apps/broker/README.md @@ -23,11 +23,19 @@ pnpm --filter=@claudemesh/broker start # production | Var | Default | Purpose | | ---------------------------- | ------- | --------------------------------------------------- | -| `BROKER_PORT` | `7899` | Port the WS server listens on | +| `BROKER_PORT` | `7899` | Single port for HTTP routes + WebSocket upgrade | | `DATABASE_URL` | — | Postgres connection string (shared with apps/web) | | `STATUS_TTL_SECONDS` | `60` | Flip stuck-"working" peers to idle after this TTL | | `HOOK_FRESH_WINDOW_SECONDS` | `30` | How long a hook signal beats JSONL inference | +## Routes (single port) + +| Path | Protocol | Purpose | +| -------------------- | --------- | ----------------------------------------- | +| `/ws` | WebSocket | Authenticated peer connections | +| `/hook/set-status` | HTTP POST | Claude Code hook scripts report status | +| `/health` | HTTP GET | Liveness probe | + ## Depends on - `@turbostarter/db` — Drizzle/Postgres schema (uses the `mesh` pgSchema) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 8d768c5..e60dd7d 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -369,13 +369,17 @@ function deliverablePriorities(status: PeerStatus): Priority[] { /** * Drain deliverable messages addressed to a specific member in a mesh. - * Marks them delivered and returns the envelopes for the caller to - * push over WebSocket. Does NOT handle targetSpec routing — that's the - * responsibility of the ingress fanout (see queueForTargets). + * Joins mesh.member so each envelope carries the sender's pubkey, which + * the receiving client needs to identify who sent it. Marks drained + * rows as delivered and returns the envelopes for WS push. + * + * targetSpec routing: matches either the member's pubkey directly or + * the broadcast wildcard ("*"). Channel/tag resolution is per-mesh + * config that lives outside this function. */ export async function drainForMember( meshId: string, - memberId: string, + _memberId: string, memberPubkey: string, status: PeerStatus, ): Promise< @@ -386,13 +390,10 @@ export async function drainForMember( ciphertext: string; createdAt: Date; senderMemberId: string; + senderPubkey: string; }> > { const priorities = deliverablePriorities(status); - - // A message is deliverable to this member if its targetSpec - // addresses them directly (pubkey match) or is a broadcast. - // Channel/tag resolution is a per-mesh concern layered on top. const targetFilter = or( eq(messageQueue.targetSpec, memberPubkey), eq(messageQueue.targetSpec, "*"), @@ -406,8 +407,10 @@ export async function drainForMember( ciphertext: messageQueue.ciphertext, createdAt: messageQueue.createdAt, senderMemberId: messageQueue.senderMemberId, + senderPubkey: memberTable.peerPubkey, }) .from(messageQueue) + .innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId)) .where( and( eq(messageQueue.meshId, meshId), @@ -432,6 +435,7 @@ export async function drainForMember( ciphertext: r.ciphertext, createdAt: r.createdAt, senderMemberId: r.senderMemberId, + senderPubkey: r.senderPubkey, })); } diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 973d1cc..850020d 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -12,7 +12,8 @@ * Shutdown: clean SIGTERM/SIGINT marks all presences disconnected. */ -import { createServer } from "node:http"; +import { createServer, type IncomingMessage } from "node:http"; +import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; import { env } from "./env"; import { @@ -36,8 +37,8 @@ import type { } from "./types"; const VERSION = "0.1.0"; -const WS_PORT = env.BROKER_PORT; -const HTTP_PORT = env.BROKER_PORT + 1; +const PORT = env.BROKER_PORT; +const WS_PATH = "/ws"; function log(msg: string): void { console.error(`[broker] ${msg}`); @@ -68,60 +69,76 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void { } } -// --- HTTP server (hook endpoint) --- +// --- Combined HTTP + WS server on a single port --- +// +// `ws` is run with noServer:true and attached to the HTTP server's +// 'upgrade' event. Clients connect to ws://host:PORT/ws; everything +// else is routed by the HTTP handler. -function startHttpServer(): ReturnType { - const server = createServer((req, res) => { - res.setHeader("Access-Control-Allow-Origin", "*"); - res.setHeader("Access-Control-Allow-Methods", "POST, OPTIONS"); - res.setHeader("Access-Control-Allow-Headers", "Content-Type"); - if (req.method === "OPTIONS") { - res.writeHead(204); - res.end(); - return; - } +function handleHttpRequest( + req: IncomingMessage, + res: import("node:http").ServerResponse, +): void { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type"); + if (req.method === "OPTIONS") { + res.writeHead(204); + res.end(); + return; + } - if (req.method === "GET" && req.url === "/health") { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ status: "ok", version: VERSION })); - return; - } + if (req.method === "GET" && req.url === "/health") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "ok", version: VERSION })); + return; + } - if (req.method === "POST" && req.url === "/hook/set-status") { - let body = ""; - req.on("data", (chunk) => (body += chunk.toString())); - req.on("end", async () => { - try { - const payload = JSON.parse(body) as HookSetStatusRequest; - const result = await handleHookSetStatus(payload); - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(result)); + if (req.method === "POST" && req.url === "/hook/set-status") { + let body = ""; + req.on("data", (chunk) => (body += chunk.toString())); + req.on("end", async () => { + try { + const payload = JSON.parse(body) as HookSetStatusRequest; + const result = await handleHookSetStatus(payload); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify(result)); - // If the hook flipped a presence to idle, drain any queued - // "next" messages immediately so the peer gets them on next tick. - if (result.ok && result.presence_id && !result.pending) { - void maybePushQueuedMessages(result.presence_id); - } - } catch (e) { - res.writeHead(500, { "Content-Type": "application/json" }); - res.end( - JSON.stringify({ - ok: false, - error: e instanceof Error ? e.message : String(e), - }), - ); + // If the hook flipped a presence to idle, drain queued + // "next" messages immediately for low-latency delivery. + if (result.ok && result.presence_id && !result.pending) { + void maybePushQueuedMessages(result.presence_id); } - }); - return; - } + } catch (e) { + res.writeHead(500, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + ok: false, + error: e instanceof Error ? e.message : String(e), + }), + ); + } + }); + return; + } - res.writeHead(404); - res.end("not found"); + res.writeHead(404); + res.end("not found"); +} + +function handleUpgrade( + wss: WebSocketServer, + req: IncomingMessage, + socket: Duplex, + head: Buffer, +): void { + if (req.url !== WS_PATH) { + socket.destroy(); + return; + } + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit("connection", ws, req); }); - server.listen(HTTP_PORT, "0.0.0.0", () => { - log(`http (hooks + health) listening on :${HTTP_PORT}`); - }); - return server; } async function maybePushQueuedMessages(presenceId: string): Promise { @@ -143,7 +160,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise { type: "push", messageId: m.id, meshId: conn.meshId, - senderPubkey: "", // resolved client-side via senderMemberId lookup, or cache + senderPubkey: m.senderPubkey, priority: m.priority, nonce: m.nonce, ciphertext: m.ciphertext, @@ -268,32 +285,33 @@ function handleConnection(ws: WebSocket): void { }); } -function startWsServer(): WebSocketServer { - const wss = new WebSocketServer({ host: "0.0.0.0", port: WS_PORT }); +// --- Main --- + +function main(): void { + const wss = new WebSocketServer({ noServer: true }); wss.on("connection", handleConnection); - wss.on("listening", () => { - log( - `@claudemesh/broker v${VERSION} ws listening on :${WS_PORT} | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`, - ); - }); - wss.on("error", (err) => { - log(`ws server error: ${err.message}`); + + const http = createServer(handleHttpRequest); + http.on("upgrade", (req, socket, head) => + handleUpgrade(wss, req, socket, head), + ); + http.on("error", (err) => { + log(`http server error: ${err.message}`); process.exit(1); }); + http.listen(PORT, "0.0.0.0", () => { + log( + `@claudemesh/broker v${VERSION} listening on :${PORT} (ws:${WS_PATH}, http:/hook/set-status, http:/health) | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`, + ); + }); + // Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt. setInterval(() => { for (const { ws } of connections.values()) { if (ws.readyState === ws.OPEN) ws.ping(); } }, 30_000).unref(); - return wss; -} -// --- Main --- - -function main(): void { - const http = startHttpServer(); - const wss = startWsServer(); startSweepers(); const shutdown = async (signal: string): Promise => {