refactor(broker): merge HTTP+WS to single port, populate senderPubkey on push

Single-port refactor:
- Drop the BROKER_PORT+1 HTTP side-port. Use `ws` with noServer:true
  and attach to a single node:http server via the 'upgrade' event.
- Clients connect to ws://host:PORT/ws
- Hook POSTs go to http://host:PORT/hook/set-status
- Health probe at http://host:PORT/health
- One port = one Traefik label, one cert, one deploy route. Matches
  the Coolify/VPS operational constraints.

senderPubkey on push:
- drainForMember now joins mesh.message_queue → mesh.member to return
  the sender's peerPubkey alongside each envelope. No extra round-trip,
  no cache invalidation needed (option A from review).
- index.ts populates WSPushMessage.senderPubkey from the join result
  instead of the empty-string placeholder.
- Receivers can now identify who sent a message directly from the push.

README updated with a routes table for the single-port layout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-04 21:35:05 +01:00
parent 3c0154ae70
commit 0a97a0c369
3 changed files with 106 additions and 76 deletions

View File

@@ -23,11 +23,19 @@ pnpm --filter=@claudemesh/broker start # production
| Var | Default | Purpose | | Var | Default | Purpose |
| ---------------------------- | ------- | --------------------------------------------------- | | ---------------------------- | ------- | --------------------------------------------------- |
| `BROKER_PORT` | `7899` | Port the WS server listens on | | `BROKER_PORT` | `7899` | Single port for HTTP routes + WebSocket upgrade |
| `DATABASE_URL` | — | Postgres connection string (shared with apps/web) | | `DATABASE_URL` | — | Postgres connection string (shared with apps/web) |
| `STATUS_TTL_SECONDS` | `60` | Flip stuck-"working" peers to idle after this TTL | | `STATUS_TTL_SECONDS` | `60` | Flip stuck-"working" peers to idle after this TTL |
| `HOOK_FRESH_WINDOW_SECONDS` | `30` | How long a hook signal beats JSONL inference | | `HOOK_FRESH_WINDOW_SECONDS` | `30` | How long a hook signal beats JSONL inference |
## Routes (single port)
| Path | Protocol | Purpose |
| -------------------- | --------- | ----------------------------------------- |
| `/ws` | WebSocket | Authenticated peer connections |
| `/hook/set-status` | HTTP POST | Claude Code hook scripts report status |
| `/health` | HTTP GET | Liveness probe |
## Depends on ## Depends on
- `@turbostarter/db` — Drizzle/Postgres schema (uses the `mesh` pgSchema) - `@turbostarter/db` — Drizzle/Postgres schema (uses the `mesh` pgSchema)

View File

@@ -369,13 +369,17 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
/** /**
* Drain deliverable messages addressed to a specific member in a mesh. * Drain deliverable messages addressed to a specific member in a mesh.
* Marks them delivered and returns the envelopes for the caller to * Joins mesh.member so each envelope carries the sender's pubkey, which
* push over WebSocket. Does NOT handle targetSpec routing — that's the * the receiving client needs to identify who sent it. Marks drained
* responsibility of the ingress fanout (see queueForTargets). * rows as delivered and returns the envelopes for WS push.
*
* targetSpec routing: matches either the member's pubkey directly or
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
* config that lives outside this function.
*/ */
export async function drainForMember( export async function drainForMember(
meshId: string, meshId: string,
memberId: string, _memberId: string,
memberPubkey: string, memberPubkey: string,
status: PeerStatus, status: PeerStatus,
): Promise< ): Promise<
@@ -386,13 +390,10 @@ export async function drainForMember(
ciphertext: string; ciphertext: string;
createdAt: Date; createdAt: Date;
senderMemberId: string; senderMemberId: string;
senderPubkey: string;
}> }>
> { > {
const priorities = deliverablePriorities(status); const priorities = deliverablePriorities(status);
// A message is deliverable to this member if its targetSpec
// addresses them directly (pubkey match) or is a broadcast.
// Channel/tag resolution is a per-mesh concern layered on top.
const targetFilter = or( const targetFilter = or(
eq(messageQueue.targetSpec, memberPubkey), eq(messageQueue.targetSpec, memberPubkey),
eq(messageQueue.targetSpec, "*"), eq(messageQueue.targetSpec, "*"),
@@ -406,8 +407,10 @@ export async function drainForMember(
ciphertext: messageQueue.ciphertext, ciphertext: messageQueue.ciphertext,
createdAt: messageQueue.createdAt, createdAt: messageQueue.createdAt,
senderMemberId: messageQueue.senderMemberId, senderMemberId: messageQueue.senderMemberId,
senderPubkey: memberTable.peerPubkey,
}) })
.from(messageQueue) .from(messageQueue)
.innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId))
.where( .where(
and( and(
eq(messageQueue.meshId, meshId), eq(messageQueue.meshId, meshId),
@@ -432,6 +435,7 @@ export async function drainForMember(
ciphertext: r.ciphertext, ciphertext: r.ciphertext,
createdAt: r.createdAt, createdAt: r.createdAt,
senderMemberId: r.senderMemberId, senderMemberId: r.senderMemberId,
senderPubkey: r.senderPubkey,
})); }));
} }

View File

@@ -12,7 +12,8 @@
* Shutdown: clean SIGTERM/SIGINT marks all presences disconnected. * Shutdown: clean SIGTERM/SIGINT marks all presences disconnected.
*/ */
import { createServer } from "node:http"; import { createServer, type IncomingMessage } from "node:http";
import type { Duplex } from "node:stream";
import { WebSocketServer, type WebSocket } from "ws"; import { WebSocketServer, type WebSocket } from "ws";
import { env } from "./env"; import { env } from "./env";
import { import {
@@ -36,8 +37,8 @@ import type {
} from "./types"; } from "./types";
const VERSION = "0.1.0"; const VERSION = "0.1.0";
const WS_PORT = env.BROKER_PORT; const PORT = env.BROKER_PORT;
const HTTP_PORT = env.BROKER_PORT + 1; const WS_PATH = "/ws";
function log(msg: string): void { function log(msg: string): void {
console.error(`[broker] ${msg}`); console.error(`[broker] ${msg}`);
@@ -68,60 +69,76 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
} }
} }
// --- HTTP server (hook endpoint) --- // --- Combined HTTP + WS server on a single port ---
//
// `ws` is run with noServer:true and attached to the HTTP server's
// 'upgrade' event. Clients connect to ws://host:PORT/ws; everything
// else is routed by the HTTP handler.
function startHttpServer(): ReturnType<typeof createServer> { function handleHttpRequest(
const server = createServer((req, res) => { req: IncomingMessage,
res.setHeader("Access-Control-Allow-Origin", "*"); res: import("node:http").ServerResponse,
res.setHeader("Access-Control-Allow-Methods", "POST, OPTIONS"); ): void {
res.setHeader("Access-Control-Allow-Headers", "Content-Type"); res.setHeader("Access-Control-Allow-Origin", "*");
if (req.method === "OPTIONS") { res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS");
res.writeHead(204); res.setHeader("Access-Control-Allow-Headers", "Content-Type");
res.end(); if (req.method === "OPTIONS") {
return; res.writeHead(204);
} res.end();
return;
}
if (req.method === "GET" && req.url === "/health") { if (req.method === "GET" && req.url === "/health") {
res.writeHead(200, { "Content-Type": "application/json" }); res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ status: "ok", version: VERSION })); res.end(JSON.stringify({ status: "ok", version: VERSION }));
return; return;
} }
if (req.method === "POST" && req.url === "/hook/set-status") { if (req.method === "POST" && req.url === "/hook/set-status") {
let body = ""; let body = "";
req.on("data", (chunk) => (body += chunk.toString())); req.on("data", (chunk) => (body += chunk.toString()));
req.on("end", async () => { req.on("end", async () => {
try { try {
const payload = JSON.parse(body) as HookSetStatusRequest; const payload = JSON.parse(body) as HookSetStatusRequest;
const result = await handleHookSetStatus(payload); const result = await handleHookSetStatus(payload);
res.writeHead(200, { "Content-Type": "application/json" }); res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(result)); res.end(JSON.stringify(result));
// If the hook flipped a presence to idle, drain any queued // If the hook flipped a presence to idle, drain queued
// "next" messages immediately so the peer gets them on next tick. // "next" messages immediately for low-latency delivery.
if (result.ok && result.presence_id && !result.pending) { if (result.ok && result.presence_id && !result.pending) {
void maybePushQueuedMessages(result.presence_id); void maybePushQueuedMessages(result.presence_id);
}
} catch (e) {
res.writeHead(500, { "Content-Type": "application/json" });
res.end(
JSON.stringify({
ok: false,
error: e instanceof Error ? e.message : String(e),
}),
);
} }
}); } catch (e) {
return; res.writeHead(500, { "Content-Type": "application/json" });
} res.end(
JSON.stringify({
ok: false,
error: e instanceof Error ? e.message : String(e),
}),
);
}
});
return;
}
res.writeHead(404); res.writeHead(404);
res.end("not found"); res.end("not found");
}
function handleUpgrade(
wss: WebSocketServer,
req: IncomingMessage,
socket: Duplex,
head: Buffer,
): void {
if (req.url !== WS_PATH) {
socket.destroy();
return;
}
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
}); });
server.listen(HTTP_PORT, "0.0.0.0", () => {
log(`http (hooks + health) listening on :${HTTP_PORT}`);
});
return server;
} }
async function maybePushQueuedMessages(presenceId: string): Promise<void> { async function maybePushQueuedMessages(presenceId: string): Promise<void> {
@@ -143,7 +160,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
type: "push", type: "push",
messageId: m.id, messageId: m.id,
meshId: conn.meshId, meshId: conn.meshId,
senderPubkey: "", // resolved client-side via senderMemberId lookup, or cache senderPubkey: m.senderPubkey,
priority: m.priority, priority: m.priority,
nonce: m.nonce, nonce: m.nonce,
ciphertext: m.ciphertext, ciphertext: m.ciphertext,
@@ -268,32 +285,33 @@ function handleConnection(ws: WebSocket): void {
}); });
} }
function startWsServer(): WebSocketServer { // --- Main ---
const wss = new WebSocketServer({ host: "0.0.0.0", port: WS_PORT });
function main(): void {
const wss = new WebSocketServer({ noServer: true });
wss.on("connection", handleConnection); wss.on("connection", handleConnection);
wss.on("listening", () => {
log( const http = createServer(handleHttpRequest);
`@claudemesh/broker v${VERSION} ws listening on :${WS_PORT} | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`, http.on("upgrade", (req, socket, head) =>
); handleUpgrade(wss, req, socket, head),
}); );
wss.on("error", (err) => { http.on("error", (err) => {
log(`ws server error: ${err.message}`); log(`http server error: ${err.message}`);
process.exit(1); process.exit(1);
}); });
http.listen(PORT, "0.0.0.0", () => {
log(
`@claudemesh/broker v${VERSION} listening on :${PORT} (ws:${WS_PATH}, http:/hook/set-status, http:/health) | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`,
);
});
// Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt. // Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt.
setInterval(() => { setInterval(() => {
for (const { ws } of connections.values()) { for (const { ws } of connections.values()) {
if (ws.readyState === ws.OPEN) ws.ping(); if (ws.readyState === ws.OPEN) ws.ping();
} }
}, 30_000).unref(); }, 30_000).unref();
return wss;
}
// --- Main ---
function main(): void {
const http = startHttpServer();
const wss = startWsServer();
startSweepers(); startSweepers();
const shutdown = async (signal: string): Promise<void> => { const shutdown = async (signal: string): Promise<void> => {