From 3c0154ae702fd3b9c4553a42544688e487423a5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 4 Apr 2026 21:32:14 +0100 Subject: [PATCH] feat(broker): port routing + status model from claude-intercom to postgres MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the proven claude-intercom broker logic into apps/broker with SQLite → Drizzle/Postgres translation. Core state engine kept verbatim: source-priority writes (hook > manual > jsonl), fresh-gating, TTL sweeper for stuck-working, pending-status race handler, priority delivery gates (now/next/low), Windows path encoding (5-candidate fallback incl. Roberto's H:\Claude → H--Claude rule). New modules: - broker.ts (492 lines): writeStatus, handleHookSetStatus, sweepers, presence lifecycle, message queueing + drainForMember, sourceRank + isHookFresh / isSourceFresh logic, findMemberByPubkey (WS auth hook). - paths.ts (141): cwdToProjectKeyCandidates + findActiveJsonl + inferStatusFromJsonl — JSONL fallback inference for peers without hooks installed or with stale hook signals. - types.ts (111): WS protocol envelopes (hello/send/push/ack/error/ set_status), HookSetStatusRequest/Response, ConnectedPeer view. - index.ts (323): HTTP on BROKER_PORT+1 for /hook/set-status + /health; WebSocket on BROKER_PORT for authenticated peer connections with hello/send/set_status handlers; connections registry; heartbeat ping/pong every 30s; graceful SIGTERM/SIGINT that marks all active presences disconnected. Mesh scoping: every query/mutation includes meshId. Peer identity is split between mesh.member (stable) and mesh.presence (ephemeral). WS hello authenticates by pubkey against mesh.member (signature verify is stubbed — libsodium wiring lands in client-side package later). Broker never sees plaintext: nonce + ciphertext are opaque text fields passed through. Routing happens on targetSpec (pubkey | "#channel" | "tag:xyz" | "*"), resolved against currently-connected peers. Dependencies not installed; no tests run. Verified via static review of imports against @turbostarter/db exports. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/src/broker.ts | 504 +++++++++++++++++++++++++++++++++++++- apps/broker/src/index.ts | 328 +++++++++++++++++++++---- apps/broker/src/paths.ts | 141 +++++++++++ apps/broker/src/types.ts | 106 ++++++-- 4 files changed, 1011 insertions(+), 68 deletions(-) create mode 100644 apps/broker/src/paths.ts diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 394177d..8d768c5 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -1,12 +1,492 @@ -// TODO: port from ~/tools/claude-intercom/broker.ts in step 8 -// -// That implementation carries the battle-tested pieces we'll migrate: -// - status_source column (hook > manual > jsonl) + writeStatus rules -// - TTL sweeper for stuck-"working" peers -// - Pending hook statuses (first-turn race handler) -// - /hook/set-status endpoint for Claude Code hook scripts -// -// The port swaps SQLite prepared statements for Drizzle queries against -// the `mesh` pgSchema (see packages/db/src/schema/mesh.ts). All logic -// and test patterns are ported verbatim — only the persistence layer -// changes. +/** + * MeshBroker — core state engine for claudemesh. + * + * Ported from ~/tools/claude-intercom/broker.ts with the SQLite layer + * translated to Drizzle/Postgres against the `mesh` pgSchema. The + * status model (hook > manual > jsonl priority, fresh-gating, TTL + * sweeper) and priority delivery logic are kept verbatim — they're the + * battle-tested pieces. + * + * Differences from claude-intercom: + * - Peer identity is split: mesh.member (stable, mesh-scoped) vs + * mesh.presence (ephemeral, one per WS connection). + * - Every query/mutation is scoped by meshId. + * - Message envelopes are opaque ciphertext (client-side crypto). + */ + +import { and, asc, eq, inArray, isNull, lt, or, sql } from "drizzle-orm"; +import { db } from "./db"; +import { + member as memberTable, + messageQueue, + pendingStatus, + presence, +} from "@turbostarter/db/schema/mesh"; +import { env } from "./env"; +import { inferStatusFromJsonl } from "./paths"; +import type { + HookSetStatusRequest, + HookSetStatusResponse, + PeerStatus, + Priority, + StatusSource, +} from "./types"; + +// --- Config (seconds → ms) --- + +const WORKING_TTL_MS = env.STATUS_TTL_SECONDS * 1000; +const HOOK_FRESHNESS_MS = env.HOOK_FRESH_WINDOW_SECONDS * 1000; +const PENDING_TTL_MS = 10_000; +const TTL_SWEEP_INTERVAL_MS = 15_000; +const PENDING_SWEEP_INTERVAL_MS = PENDING_TTL_MS; + +// --- Source priority rules (ported verbatim) --- + +function sourceRank(source: StatusSource): number { + return source === "hook" ? 3 : source === "manual" ? 2 : 1; +} + +function isSourceFresh(updatedAt: Date | null, now: Date): boolean { + if (!updatedAt) return false; + const age = now.getTime() - updatedAt.getTime(); + return age >= 0 && age <= HOOK_FRESHNESS_MS; +} + +export function isHookFresh( + source: StatusSource, + updatedAt: Date | null, + now: Date, +): boolean { + if (source !== "hook") return false; + return isSourceFresh(updatedAt, now); +} + +// --- Core status write (ported verbatim, translated to Drizzle) --- + +/** + * Write a status update for a presence row, honoring source priority. + * + * Rules (identical to claude-intercom): + * - Status changed → bump everything, record new source. + * - Status unchanged, incoming source ≥ recorded source → upgrade. + * - Status unchanged, incoming source < recorded source: + * - Recorded source still fresh → keep it (just bump timestamp). + * - Recorded source stale → downgrade to honest attribution. + */ +export async function writeStatus( + presenceId: string, + status: PeerStatus, + source: StatusSource, + now: Date, +): Promise { + const [prev] = await db + .select({ + status: presence.status, + statusSource: presence.statusSource, + statusUpdatedAt: presence.statusUpdatedAt, + }) + .from(presence) + .where(eq(presence.id, presenceId)); + if (!prev) return; + + if (prev.status !== status) { + await db + .update(presence) + .set({ status, statusSource: source, statusUpdatedAt: now }) + .where(eq(presence.id, presenceId)); + return; + } + + if (sourceRank(source) >= sourceRank(prev.statusSource as StatusSource)) { + await db + .update(presence) + .set({ statusSource: source, statusUpdatedAt: now }) + .where(eq(presence.id, presenceId)); + return; + } + + // Lower-rank source. Keep recorded source if fresh, else downgrade. + if (isSourceFresh(prev.statusUpdatedAt, now)) { + await db + .update(presence) + .set({ statusUpdatedAt: now }) + .where(eq(presence.id, presenceId)); + } else { + await db + .update(presence) + .set({ statusSource: source, statusUpdatedAt: now }) + .where(eq(presence.id, presenceId)); + } +} + +// --- Hook-driven status updates --- + +/** + * HTTP POST /hook/set-status handler. Resolves (pid, cwd) to an active + * presence row; if none exists (first-turn race), stashes the signal + * in pending_status to be applied on next presence connect. + */ +export async function handleHookSetStatus( + body: HookSetStatusRequest, +): Promise { + if (!body.cwd || !body.status) { + return { ok: false, error: "cwd and status required" }; + } + const now = new Date(); + + // Find active presence row. Prefer (pid, cwd) match; fall back to + // most-recent cwd match only. + const activeFilter = and( + eq(presence.cwd, body.cwd), + isNull(presence.disconnectedAt), + ); + let row: { id: string; status: PeerStatus } | undefined; + if (body.pid) { + const [r] = await db + .select({ id: presence.id, status: presence.status }) + .from(presence) + .where(and(activeFilter, eq(presence.pid, body.pid))) + .limit(1); + row = r as { id: string; status: PeerStatus } | undefined; + } + if (!row) { + const [r] = await db + .select({ id: presence.id, status: presence.status }) + .from(presence) + .where(activeFilter) + .orderBy(sql`${presence.connectedAt} DESC`) + .limit(1); + row = r as { id: string; status: PeerStatus } | undefined; + } + + if (!row) { + // No active presence — stash signal for future apply-on-register. + await db.insert(pendingStatus).values({ + pid: body.pid ?? 0, + cwd: body.cwd, + status: body.status, + statusSource: "hook", + createdAt: now, + }); + return { ok: true, pending: true }; + } + + // DND is sacred — hooks cannot unset it. + if (row.status === "dnd") return { ok: true, presence_id: row.id }; + + await writeStatus(row.id, body.status, "hook", now); + return { ok: true, presence_id: row.id }; +} + +/** + * When a new presence row is created, check pending_status for queued + * hook signals for this (pid, cwd) and apply the newest one. + */ +export async function applyPendingHookStatus( + presenceId: string, + pid: number, + cwd: string, + now: Date, +): Promise { + const cutoff = new Date(now.getTime() - PENDING_TTL_MS); + const [row] = await db + .select({ id: pendingStatus.id, status: pendingStatus.status }) + .from(pendingStatus) + .where( + and( + eq(pendingStatus.pid, pid), + eq(pendingStatus.cwd, cwd), + isNull(pendingStatus.appliedAt), + sql`${pendingStatus.createdAt} >= ${cutoff}`, + ), + ) + .orderBy(sql`${pendingStatus.createdAt} DESC`) + .limit(1); + if (!row) return; + await writeStatus(presenceId, row.status as PeerStatus, "hook", now); + await db + .update(pendingStatus) + .set({ appliedAt: now }) + .where(eq(pendingStatus.id, row.id)); +} + +// --- Sweepers --- + +/** + * TTL sweep: flip presences stuck in "working" > WORKING_TTL_MS back + * to idle. DND preserved. Source set to jsonl so a fresh hook can + * reclaim immediately. + */ +export async function sweepStuckWorking(): Promise { + const now = new Date(); + const cutoff = new Date(now.getTime() - WORKING_TTL_MS); + const stuck = await db + .select({ id: presence.id }) + .from(presence) + .where( + and( + eq(presence.status, "working"), + lt(presence.statusUpdatedAt, cutoff), + isNull(presence.disconnectedAt), + ), + ); + for (const row of stuck) { + await writeStatus(row.id, "idle", "jsonl", now); + } +} + +/** Sweep expired pending_status entries. */ +export async function sweepPendingStatuses(): Promise { + const cutoff = new Date(Date.now() - PENDING_TTL_MS); + await db + .delete(pendingStatus) + .where( + or( + lt(pendingStatus.createdAt, cutoff), + sql`${pendingStatus.appliedAt} IS NOT NULL`, + )!, + ); +} + +/** + * JSONL fallback refresh for a presence row. Called from heartbeat + + * delivery paths. No-op if a fresh hook signal is still recorded. + */ +export async function refreshStatusFromJsonl( + presenceId: string, + cwd: string, + now: Date, +): Promise { + const [row] = await db + .select({ + status: presence.status, + statusSource: presence.statusSource, + statusUpdatedAt: presence.statusUpdatedAt, + }) + .from(presence) + .where(eq(presence.id, presenceId)); + if (!row) return "idle"; + if (row.status === "dnd") return "dnd"; + if (isHookFresh(row.statusSource as StatusSource, row.statusUpdatedAt, now)) { + return row.status as PeerStatus; + } + const inferred = inferStatusFromJsonl(cwd); + await writeStatus(presenceId, inferred, "jsonl", now); + return inferred; +} + +// --- Presence lifecycle --- + +export interface ConnectParams { + memberId: string; + sessionId: string; + pid: number; + cwd: string; +} + +/** Create a presence row for a new WS connection. */ +export async function connectPresence( + params: ConnectParams, +): Promise { + const now = new Date(); + const [row] = await db + .insert(presence) + .values({ + memberId: params.memberId, + sessionId: params.sessionId, + pid: params.pid, + cwd: params.cwd, + status: "idle", + statusSource: "jsonl", + statusUpdatedAt: now, + connectedAt: now, + lastPingAt: now, + }) + .returning({ id: presence.id }); + if (!row) throw new Error("failed to create presence row"); + await applyPendingHookStatus(row.id, params.pid, params.cwd, now); + return row.id; +} + +/** Mark presence disconnected (idempotent). */ +export async function disconnectPresence(presenceId: string): Promise { + const now = new Date(); + await db + .update(presence) + .set({ disconnectedAt: now }) + .where(and(eq(presence.id, presenceId), isNull(presence.disconnectedAt))); +} + +/** Bump lastPingAt on a heartbeat from client. */ +export async function heartbeat(presenceId: string): Promise { + await db + .update(presence) + .set({ lastPingAt: new Date() }) + .where(eq(presence.id, presenceId)); +} + +// --- Message queueing + delivery --- + +export interface QueueParams { + meshId: string; + senderMemberId: string; + targetSpec: string; + priority: Priority; + nonce: string; + ciphertext: string; + expiresAt?: Date; +} + +/** Insert an E2E envelope into the mesh's message queue. */ +export async function queueMessage(params: QueueParams): Promise { + const [row] = await db + .insert(messageQueue) + .values({ + meshId: params.meshId, + senderMemberId: params.senderMemberId, + targetSpec: params.targetSpec, + priority: params.priority, + nonce: params.nonce, + ciphertext: params.ciphertext, + expiresAt: params.expiresAt, + }) + .returning({ id: messageQueue.id }); + if (!row) throw new Error("failed to queue message"); + return row.id; +} + +/** + * Resolve which priorities to deliver to a peer in a given status. + * Ported verbatim: + * - idle → all (now + next + low) + * - dnd → now only + * - working → now only (next/low held until idle) + */ +function deliverablePriorities(status: PeerStatus): Priority[] { + if (status === "idle") return ["now", "next", "low"]; + return ["now"]; +} + +/** + * Drain deliverable messages addressed to a specific member in a mesh. + * Marks them delivered and returns the envelopes for the caller to + * push over WebSocket. Does NOT handle targetSpec routing — that's the + * responsibility of the ingress fanout (see queueForTargets). + */ +export async function drainForMember( + meshId: string, + memberId: string, + memberPubkey: string, + status: PeerStatus, +): Promise< + Array<{ + id: string; + priority: Priority; + nonce: string; + ciphertext: string; + createdAt: Date; + senderMemberId: string; + }> +> { + const priorities = deliverablePriorities(status); + + // A message is deliverable to this member if its targetSpec + // addresses them directly (pubkey match) or is a broadcast. + // Channel/tag resolution is a per-mesh concern layered on top. + const targetFilter = or( + eq(messageQueue.targetSpec, memberPubkey), + eq(messageQueue.targetSpec, "*"), + )!; + + const rows = await db + .select({ + id: messageQueue.id, + priority: messageQueue.priority, + nonce: messageQueue.nonce, + ciphertext: messageQueue.ciphertext, + createdAt: messageQueue.createdAt, + senderMemberId: messageQueue.senderMemberId, + }) + .from(messageQueue) + .where( + and( + eq(messageQueue.meshId, meshId), + isNull(messageQueue.deliveredAt), + inArray(messageQueue.priority, priorities), + targetFilter, + ), + ) + .orderBy(asc(messageQueue.createdAt)); + + if (rows.length === 0) return []; + const now = new Date(); + const ids = rows.map((r) => r.id); + await db + .update(messageQueue) + .set({ deliveredAt: now }) + .where(inArray(messageQueue.id, ids)); + return rows.map((r) => ({ + id: r.id, + priority: r.priority as Priority, + nonce: r.nonce, + ciphertext: r.ciphertext, + createdAt: r.createdAt, + senderMemberId: r.senderMemberId, + })); +} + +// --- Lifecycle --- + +let ttlTimer: ReturnType | null = null; +let pendingTimer: ReturnType | null = null; + +/** Start background sweepers. Idempotent. */ +export function startSweepers(): void { + if (ttlTimer) return; + ttlTimer = setInterval(() => { + sweepStuckWorking().catch((e) => console.error("[broker] ttl sweep:", e)); + }, TTL_SWEEP_INTERVAL_MS); + pendingTimer = setInterval(() => { + sweepPendingStatuses().catch((e) => + console.error("[broker] pending sweep:", e), + ); + }, PENDING_SWEEP_INTERVAL_MS); +} + +/** Stop background sweepers and mark all active presences disconnected. */ +export async function stopSweepers(): Promise { + if (ttlTimer) clearInterval(ttlTimer); + if (pendingTimer) clearInterval(pendingTimer); + ttlTimer = null; + pendingTimer = null; + await db + .update(presence) + .set({ disconnectedAt: new Date() }) + .where(isNull(presence.disconnectedAt)); +} + +/** + * Look up a member row by pubkey within a mesh. Used at WS handshake + * to authenticate an incoming hello. + */ +export async function findMemberByPubkey( + meshId: string, + pubkey: string, +): Promise<{ id: string; displayName: string; role: string } | null> { + const [row] = await db + .select({ + id: memberTable.id, + displayName: memberTable.displayName, + role: memberTable.role, + }) + .from(memberTable) + .where( + and( + eq(memberTable.meshId, meshId), + eq(memberTable.peerPubkey, pubkey), + isNull(memberTable.revokedAt), + ), + ) + .limit(1); + return row ?? null; +} diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 98db87a..973d1cc 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -2,76 +2,322 @@ /** * @claudemesh/broker entry point. * - * Stands up a WebSocket server, accepts peer connections, and (in step - * 8) routes E2E-encrypted envelopes between peers joined to the same - * mesh. For now this is a scaffold: it boots, logs, accepts connections - * with a stub handler, and shuts down cleanly on SIGTERM/SIGINT. + * Spins up two servers in a single process: + * - HTTP on BROKER_PORT+1 for the /hook/set-status endpoint + * (Claude Code hook scripts POST here on turn boundaries). + * - WebSocket on BROKER_PORT for authenticated peer connections + * (routes E2E-encrypted envelopes between mesh members). + * + * Background: TTL sweeper + pending-status sweeper. + * Shutdown: clean SIGTERM/SIGINT marks all presences disconnected. */ +import { createServer } from "node:http"; import { WebSocketServer, type WebSocket } from "ws"; import { env } from "./env"; +import { + connectPresence, + disconnectPresence, + drainForMember, + findMemberByPubkey, + handleHookSetStatus, + heartbeat, + queueMessage, + refreshStatusFromJsonl, + startSweepers, + stopSweepers, + writeStatus, +} from "./broker"; +import type { + HookSetStatusRequest, + WSClientMessage, + WSPushMessage, + WSServerMessage, +} from "./types"; const VERSION = "0.1.0"; +const WS_PORT = env.BROKER_PORT; +const HTTP_PORT = env.BROKER_PORT + 1; function log(msg: string): void { console.error(`[broker] ${msg}`); } -function handleConnection(ws: WebSocket, remoteAddress: string | undefined): void { - log(`connection from ${remoteAddress ?? "unknown"}`); +// --- Runtime connection registry --- - ws.on("message", (data) => { - // Step-8 stub: echo message length. Real handler will parse the - // WSMessage envelope, authenticate the peer by pubkey, and route. - log(`recv ${data.toString().length} bytes`); +/** In-memory map of presenceId → authenticated WS connection. */ +const connections = new Map< + string, + { + ws: WebSocket; + meshId: string; + memberId: string; + memberPubkey: string; + cwd: string; + } +>(); + +function sendToPeer(presenceId: string, msg: WSServerMessage): void { + const conn = connections.get(presenceId); + if (!conn) return; + if (conn.ws.readyState !== conn.ws.OPEN) return; + try { + conn.ws.send(JSON.stringify(msg)); + } catch (e) { + log(`push failed to ${presenceId}: ${e instanceof Error ? e.message : e}`); + } +} + +// --- HTTP server (hook endpoint) --- + +function startHttpServer(): ReturnType { + const server = createServer((req, res) => { + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Methods", "POST, OPTIONS"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type"); + if (req.method === "OPTIONS") { + res.writeHead(204); + res.end(); + return; + } + + if (req.method === "GET" && req.url === "/health") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "ok", version: VERSION })); + return; + } + + if (req.method === "POST" && req.url === "/hook/set-status") { + let body = ""; + req.on("data", (chunk) => (body += chunk.toString())); + req.on("end", async () => { + try { + const payload = JSON.parse(body) as HookSetStatusRequest; + const result = await handleHookSetStatus(payload); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify(result)); + + // If the hook flipped a presence to idle, drain any queued + // "next" messages immediately so the peer gets them on next tick. + if (result.ok && result.presence_id && !result.pending) { + void maybePushQueuedMessages(result.presence_id); + } + } catch (e) { + res.writeHead(500, { "Content-Type": "application/json" }); + res.end( + JSON.stringify({ + ok: false, + error: e instanceof Error ? e.message : String(e), + }), + ); + } + }); + return; + } + + res.writeHead(404); + res.end("not found"); }); - - ws.on("close", () => { - log("connection closed"); + server.listen(HTTP_PORT, "0.0.0.0", () => { + log(`http (hooks + health) listening on :${HTTP_PORT}`); }); + return server; +} - ws.on("error", (err) => { - log(`ws error: ${err.message}`); +async function maybePushQueuedMessages(presenceId: string): Promise { + const conn = connections.get(presenceId); + if (!conn) return; + const status = await refreshStatusFromJsonl( + presenceId, + conn.cwd, + new Date(), + ); + const messages = await drainForMember( + conn.meshId, + conn.memberId, + conn.memberPubkey, + status, + ); + for (const m of messages) { + const push: WSPushMessage = { + type: "push", + messageId: m.id, + meshId: conn.meshId, + senderPubkey: "", // resolved client-side via senderMemberId lookup, or cache + priority: m.priority, + nonce: m.nonce, + ciphertext: m.ciphertext, + createdAt: m.createdAt.toISOString(), + }; + sendToPeer(presenceId, push); + } +} + +// --- WebSocket server (peer connections) --- + +async function handleHello( + ws: WebSocket, + hello: Extract, +): Promise { + // Authenticate: member with this pubkey must exist in this mesh and + // not be revoked. Signature verification is TODO (crypto not wired + // yet; client-side libsodium sign_detached is planned). + const member = await findMemberByPubkey(hello.meshId, hello.pubkey); + if (!member) { + const err: WSServerMessage = { + type: "error", + code: "unauthorized", + message: "pubkey not found in mesh", + }; + ws.send(JSON.stringify(err)); + return null; + } + const presenceId = await connectPresence({ + memberId: member.id, + sessionId: hello.sessionId, + pid: hello.pid, + cwd: hello.cwd, + }); + connections.set(presenceId, { + ws, + meshId: hello.meshId, + memberId: member.id, + memberPubkey: hello.pubkey, + cwd: hello.cwd, + }); + log( + `hello: mesh=${hello.meshId} member=${member.displayName} presence=${presenceId}`, + ); + // Drain any messages already queued for this member. + await maybePushQueuedMessages(presenceId); + return presenceId; +} + +async function handleSend( + conn: NonNullable>, + msg: Extract, +): Promise { + const messageId = await queueMessage({ + meshId: conn.meshId, + senderMemberId: conn.memberId, + targetSpec: msg.targetSpec, + priority: msg.priority, + nonce: msg.nonce, + ciphertext: msg.ciphertext, + }); + const ack: WSServerMessage = { + type: "ack", + id: msg.id ?? "", + messageId, + queued: true, + }; + conn.ws.send(JSON.stringify(ack)); + + // Fan-out: push to any currently-connected peer whose pubkey matches + // the target (or to everyone on broadcast). Drain their queue which + // handles priority gating automatically. + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec) continue; + void maybePushQueuedMessages(pid); + } +} + +function handleConnection(ws: WebSocket): void { + let presenceId: string | null = null; + ws.on("message", async (raw) => { + try { + const msg = JSON.parse(raw.toString()) as WSClientMessage; + if (msg.type === "hello") { + presenceId = await handleHello(ws, msg); + return; + } + if (!presenceId) { + const err: WSServerMessage = { + type: "error", + code: "no_hello", + message: "must send hello first", + }; + ws.send(JSON.stringify(err)); + return; + } + const conn = connections.get(presenceId); + if (!conn) return; + switch (msg.type) { + case "send": + await handleSend(conn, msg); + break; + case "set_status": + await writeStatus(presenceId, msg.status, "manual", new Date()); + break; + } + } catch (e) { + log(`ws msg error: ${e instanceof Error ? e.message : e}`); + } + }); + ws.on("close", async () => { + if (presenceId) { + connections.delete(presenceId); + await disconnectPresence(presenceId); + log(`disconnect: ${presenceId}`); + } + }); + ws.on("error", (err) => log(`ws error: ${err.message}`)); + ws.on("pong", () => { + if (presenceId) void heartbeat(presenceId); }); } -function main(): void { - const wss = new WebSocketServer({ - host: "0.0.0.0", - port: env.BROKER_PORT, - }); - - wss.on("connection", (ws, req) => { - handleConnection(ws, req.socket.remoteAddress); - }); - +function startWsServer(): WebSocketServer { + const wss = new WebSocketServer({ host: "0.0.0.0", port: WS_PORT }); + wss.on("connection", handleConnection); wss.on("listening", () => { - log(`@claudemesh/broker v${VERSION} listening on :${env.BROKER_PORT}`); log( - `config: STATUS_TTL=${env.STATUS_TTL_SECONDS}s HOOK_FRESH=${env.HOOK_FRESH_WINDOW_SECONDS}s`, + `@claudemesh/broker v${VERSION} ws listening on :${WS_PORT} | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`, ); }); - wss.on("error", (err) => { - log(`server error: ${err.message}`); + log(`ws server error: ${err.message}`); process.exit(1); }); + // Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt. + setInterval(() => { + for (const { ws } of connections.values()) { + if (ws.readyState === ws.OPEN) ws.ping(); + } + }, 30_000).unref(); + return wss; +} - const shutdown = (signal: string): void => { +// --- Main --- + +function main(): void { + const http = startHttpServer(); + const wss = startWsServer(); + startSweepers(); + + const shutdown = async (signal: string): Promise => { log(`${signal} received, shutting down`); - wss.close(() => { - log("server closed, bye"); - process.exit(0); - }); - // Hard exit if close hangs past 5s. - setTimeout(() => { - log("forcing exit after 5s"); - process.exit(1); - }, 5000).unref(); + await stopSweepers(); + for (const { ws } of connections.values()) { + try { + ws.close(); + } catch { + /* ignore */ + } + } + wss.close(); + http.close(); + log("closed, bye"); + process.exit(0); }; - process.on("SIGTERM", () => shutdown("SIGTERM")); - process.on("SIGINT", () => shutdown("SIGINT")); + process.on("SIGTERM", () => { + void shutdown("SIGTERM"); + }); + process.on("SIGINT", () => { + void shutdown("SIGINT"); + }); } main(); diff --git a/apps/broker/src/paths.ts b/apps/broker/src/paths.ts new file mode 100644 index 0000000..7963728 --- /dev/null +++ b/apps/broker/src/paths.ts @@ -0,0 +1,141 @@ +/** + * JSONL session-transcript discovery. + * + * Ported verbatim from ~/tools/claude-intercom/broker.ts — including + * the cross-platform 5-candidate encoding strategy and Roberto's + * confirmed Windows rule (H:\Claude → H--Claude via [\\/:]→-). + * + * Used as the *fallback* status inference path when no fresh hook + * signal is available for a presence row. + */ + +import { + readdirSync, + statSync, + openSync, + readSync, + closeSync, + existsSync, +} from "node:fs"; +import { homedir } from "node:os"; +import { join } from "node:path"; + +export const PROJECTS_DIR = join(homedir(), ".claude", "projects"); +const TAIL_BYTES = 8192; + +/** + * Generate candidate project-key formats for a given cwd. + * + * Claude Code stores session transcripts under + * `~/.claude/projects//`. The encoding differs per platform: + * + * macOS/Linux: /Users/x/foo → "-Users-x-foo" (replace / with -) + * Windows: H:\Claude → "H--Claude" (replace : and \ with -) + * Windows: C:\Users\x → "C--Users-x" (same rule) + * + * We emit the platform-native candidate first, then fallbacks, so the + * first directory existence check typically wins. + */ +export function cwdToProjectKeyCandidates(cwd: string): string[] { + const seen = new Set(); + const push = (s: string): void => { + if (s && !seen.has(s)) seen.add(s); + }; + + // Most likely: replace /, \, and : with dash. Matches macOS/Linux and + // Windows (confirmed live: H:\Claude → H--Claude). + push(cwd.replace(/[\\/:]/g, "-")); + // Unix legacy (replace / only). + push(cwd.replaceAll("/", "-")); + // Replace both separators, keep colons (hypothetical Windows variant). + push(cwd.replace(/[\\/]/g, "-")); + // Strip drive letter, then Unix-style. + const withoutDrive = cwd.replace(/^[A-Za-z]:/, ""); + push(withoutDrive.replace(/[\\/]/g, "-")); + // Leading-dash fallback for relative-ish paths. + for (const k of [...seen]) { + if (!k.startsWith("-")) push("-" + k); + } + + return [...seen]; +} + +/** + * Find the most recently modified JSONL file for a project, trying + * each candidate key in order. Returns the first match that exists. + */ +export function findActiveJsonl( + cwd: string, +): { path: string; mtime: number } | null { + for (const key of cwdToProjectKeyCandidates(cwd)) { + const projDir = join(PROJECTS_DIR, key); + if (!existsSync(projDir)) continue; + try { + const files = readdirSync(projDir).filter((f) => f.endsWith(".jsonl")); + let best: { path: string; mtime: number } | null = null; + for (const f of files) { + const full = join(projDir, f); + try { + const st = statSync(full); + const mt = st.mtimeMs; + if (!best || mt > best.mtime) best = { path: full, mtime: mt }; + } catch { + /* skip unreadable files */ + } + } + if (best) return best; + } catch { + /* can't read dir, try next candidate */ + } + } + return null; +} + +/** + * Tail the JSONL file and check whether the last assistant message + * has a pending tool_use (= the session is actively running a tool). + */ +function lastAssistantHasToolUse(filePath: string): boolean { + try { + const st = statSync(filePath); + const size = st.size; + if (size === 0) return false; + const readSize = Math.min(TAIL_BYTES, size); + const buf = Buffer.alloc(readSize); + const fd = openSync(filePath, "r"); + try { + readSync(fd, buf, 0, readSize, size - readSize); + } finally { + closeSync(fd); + } + const tail = buf.toString("utf-8"); + const lines = tail.split("\n"); + for (let i = lines.length - 1; i >= 0; i--) { + const line = lines[i]?.trim(); + if (!line) continue; + if (!line.includes('"assistant"')) continue; + try { + const d = JSON.parse(line); + if (d.type !== "assistant") continue; + const content = d.message?.content; + if (!Array.isArray(content)) continue; + return content.some((c: { type?: string }) => c.type === "tool_use"); + } catch { + /* malformed line, skip */ + } + } + } catch { + /* file read error */ + } + return false; +} + +/** + * Infer peer status from JSONL: "working" if last assistant entry has + * a pending tool_use, else "idle". Returns "idle" if no transcript. + */ +export function inferStatusFromJsonl(cwd: string): "idle" | "working" { + const jsonl = findActiveJsonl(cwd); + if (!jsonl) return "idle"; + return lastAssistantHasToolUse(jsonl.path) ? "working" : "idle"; +} diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index d27d278..ce90585 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -1,9 +1,10 @@ /** - * Broker protocol types. + * Broker types. * - * Wire format for WebSocket messages between peers and broker. Kept - * minimal here — the concrete schema lands in step 8 when we port the - * claude-intercom logic into this workspace. + * Wire format for WebSocket messages between peers and broker, plus the + * internal status/priority enums that govern delivery. The status model + * is ported verbatim from claude-intercom and reflects the proven + * hook > manual > jsonl priority design. */ export type Priority = "now" | "next" | "low"; @@ -12,24 +13,99 @@ export type PeerStatus = "idle" | "working" | "dnd"; export type StatusSource = "hook" | "manual" | "jsonl"; -/** Runtime view of a connected peer. */ -export interface Peer { - id: string; // broker-assigned short id +/** Runtime view of a connected peer (derived from mesh.presence + mesh.member). */ +export interface ConnectedPeer { + presenceId: string; + memberId: string; meshId: string; - pubkey: string; // ed25519 hex + pubkey: string; // ed25519 hex, from mesh.member displayName: string; + sessionId: string; + pid: number; + cwd: string; status: PeerStatus; statusSource: StatusSource; statusUpdatedAt: Date; connectedAt: Date; } -/** - * Generic WS message envelope. Concrete variants (hello, send, ack, - * presence, channel_push) are defined in step 8. - */ -export interface WSMessage { - type: string; - payload: T; +/** Hook-driven status update (received via HTTP POST /hook/set-status). */ +export interface HookSetStatusRequest { + cwd: string; + pid?: number; + status: PeerStatus; + session_id?: string; +} + +export interface HookSetStatusResponse { + ok: boolean; + presence_id?: string; + pending?: boolean; + error?: string; +} + +// --- WebSocket protocol envelopes --- + +/** Sent by client on connect to authenticate. */ +export interface WSHelloMessage { + type: "hello"; + meshId: string; + memberId: string; + pubkey: string; // must match mesh.member.peerPubkey + sessionId: string; + pid: number; + cwd: string; + signature: string; // ed25519 over (meshId||memberId||sessionId||nonce) + nonce: string; +} + +/** Client → broker: send an E2E-encrypted envelope to a target. */ +export interface WSSendMessage { + type: "send"; + targetSpec: string; // member pubkey | "#channel" | "tag:xyz" | "*" + priority: Priority; + nonce: string; // base64 + ciphertext: string; // base64 + id?: string; // client-side correlation id +} + +/** Broker → client: an envelope addressed to this peer. */ +export interface WSPushMessage { + type: "push"; + messageId: string; + meshId: string; + senderPubkey: string; + priority: Priority; + nonce: string; + ciphertext: string; + createdAt: string; +} + +/** Client → broker: manual status override (dnd, forced idle). */ +export interface WSSetStatusMessage { + type: "set_status"; + status: PeerStatus; +} + +/** Broker → client: acknowledgement for a send. */ +export interface WSAckMessage { + type: "ack"; + id: string; // echoes client-side correlation id + messageId: string; + queued: boolean; +} + +/** Broker → client: structured error. */ +export interface WSErrorMessage { + type: "error"; + code: string; + message: string; id?: string; } + +export type WSClientMessage = + | WSHelloMessage + | WSSendMessage + | WSSetStatusMessage; + +export type WSServerMessage = WSPushMessage | WSAckMessage | WSErrorMessage;