Completes the v0.1.0 security model. Every /join is now gated by a
signed invite that the broker re-verifies against the mesh owner's
ed25519 pubkey, plus an atomic single-use counter.
schema (migrations/0001_demonic_karnak.sql):
- mesh.mesh.owner_pubkey: ed25519 hex of the invite signer
- mesh.invite.token_bytes: canonical signed bytes (for re-verification)
Both nullable; required for new meshes going forward.
canonical invite format (signed bytes):
`${v}|${mesh_id}|${mesh_slug}|${broker_url}|${expires_at}|
${mesh_root_key}|${role}|${owner_pubkey}`
wire format — invite payload in ic://join/<base64url(JSON)> now has:
owner_pubkey: "<64 hex>"
signature: "<128 hex>"
broker joinMesh() (apps/broker/src/broker.ts):
1. verify ed25519 signature over canonical bytes using payload's
owner_pubkey → else invite_bad_signature
2. load mesh, ensure mesh.owner_pubkey matches payload's owner_pubkey
→ else invite_owner_mismatch (prevents a malicious admin from
substituting their own owner key)
3. load invite row by token, verify mesh_id matches → else
invite_mesh_mismatch
4. expiry check → else invite_expired
5. revoked check → else invite_revoked
6. idempotency: if pubkey is already a member, return existing id
WITHOUT burning an invite use
7. atomic CAS: UPDATE used_count = used_count + 1 WHERE used_count <
max_uses → if 0 rows affected, return invite_exhausted
8. insert member with role from payload
cli side:
- apps/cli/src/invite/parse.ts: zod-validated owner_pubkey + signature
fields; client verifies signature immediately and rejects tampered
links (fail-fast before even touching the broker)
- buildSignedInvite() helper: owners sign invites client-side
- enrollWithBroker sends {invite_token, invite_payload, peer_pubkey,
display_name} (was: {mesh_id, peer_pubkey, display_name, role})
- parseInviteLink is now async (libsodium ready + verify)
seed-test-mesh.ts generates an owner keypair, sets mesh.owner_pubkey,
builds + signs an invite, stores the invite row, emits ownerPubkey +
ownerSecretKey + inviteToken + inviteLink in the output JSON.
tests — invite-signature.test.ts (9 new):
- valid signed invite → join succeeds
- tampered payload → invite_bad_signature
- signer not the mesh owner → invite_owner_mismatch
- expired invite → invite_expired
- revoked invite → invite_revoked
- exhausted (maxUses=2, 3rd join) → invite_exhausted
- idempotent re-join doesn't burn a use
- atomic single-use: 5 concurrent joins → exactly 1 success, 4 exhausted
- mesh_id payload vs DB row mismatch → invite_mesh_mismatch
verified live: tampered link blocked client-side with a clear error.
Unmodified link joins cleanly end-to-end (roundtrip.ts + join-roundtrip.ts
both pass). 64/64 tests green.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
609 lines
17 KiB
TypeScript
609 lines
17 KiB
TypeScript
#!/usr/bin/env bun
|
|
/**
|
|
* @claudemesh/broker entry point.
|
|
*
|
|
* Single-port HTTP + WebSocket server. Routes:
|
|
* GET /health → liveness + build info (503 if DB down)
|
|
* GET /metrics → Prometheus plaintext
|
|
* POST /hook/set-status → Claude Code hook scripts report status
|
|
* WS /ws → authenticated peer connections
|
|
*
|
|
* Graceful shutdown on SIGTERM/SIGINT: stops sweepers, marks all
|
|
* active presences disconnected in the DB, closes servers.
|
|
*/
|
|
|
|
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
|
import type { Duplex } from "node:stream";
|
|
import { WebSocketServer, type WebSocket } from "ws";
|
|
import { env } from "./env";
|
|
import {
|
|
connectPresence,
|
|
disconnectPresence,
|
|
drainForMember,
|
|
findMemberByPubkey,
|
|
handleHookSetStatus,
|
|
heartbeat,
|
|
joinMesh,
|
|
queueMessage,
|
|
refreshQueueDepth,
|
|
refreshStatusFromJsonl,
|
|
startSweepers,
|
|
stopSweepers,
|
|
writeStatus,
|
|
} from "./broker";
|
|
import type {
|
|
HookSetStatusRequest,
|
|
WSClientMessage,
|
|
WSPushMessage,
|
|
WSServerMessage,
|
|
} from "./types";
|
|
import { log } from "./logger";
|
|
import { metrics, metricsToText } from "./metrics";
|
|
import { TokenBucket } from "./rate-limit";
|
|
import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
|
|
import { buildInfo } from "./build-info";
|
|
import { verifyHelloSignature } from "./crypto";
|
|
|
|
const PORT = env.BROKER_PORT;
|
|
const WS_PATH = "/ws";
|
|
|
|
// --- Runtime connection registry ---
|
|
|
|
interface PeerConn {
|
|
ws: WebSocket;
|
|
meshId: string;
|
|
memberId: string;
|
|
memberPubkey: string;
|
|
cwd: string;
|
|
}
|
|
|
|
const connections = new Map<string, PeerConn>();
|
|
const connectionsPerMesh = new Map<string, number>();
|
|
const hookRateLimit = new TokenBucket(
|
|
env.HOOK_RATE_LIMIT_PER_MIN,
|
|
env.HOOK_RATE_LIMIT_PER_MIN,
|
|
);
|
|
|
|
function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
|
const conn = connections.get(presenceId);
|
|
if (!conn) return;
|
|
if (conn.ws.readyState !== conn.ws.OPEN) return;
|
|
try {
|
|
conn.ws.send(JSON.stringify(msg));
|
|
} catch (e) {
|
|
log.warn("push failed", {
|
|
presence_id: presenceId,
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
}
|
|
|
|
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
|
const conn = connections.get(presenceId);
|
|
if (!conn) return;
|
|
const status = await refreshStatusFromJsonl(
|
|
presenceId,
|
|
conn.cwd,
|
|
new Date(),
|
|
);
|
|
const messages = await drainForMember(
|
|
conn.meshId,
|
|
conn.memberId,
|
|
conn.memberPubkey,
|
|
status,
|
|
);
|
|
for (const m of messages) {
|
|
const push: WSPushMessage = {
|
|
type: "push",
|
|
messageId: m.id,
|
|
meshId: conn.meshId,
|
|
senderPubkey: m.senderPubkey,
|
|
priority: m.priority,
|
|
nonce: m.nonce,
|
|
ciphertext: m.ciphertext,
|
|
createdAt: m.createdAt.toISOString(),
|
|
};
|
|
sendToPeer(presenceId, push);
|
|
metrics.messagesRoutedTotal.inc({ priority: m.priority });
|
|
}
|
|
}
|
|
|
|
// --- HTTP request routing ---
|
|
|
|
function writeJson(res: ServerResponse, status: number, body: unknown): void {
|
|
res.writeHead(status, { "Content-Type": "application/json" });
|
|
res.end(JSON.stringify(body));
|
|
}
|
|
|
|
function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
|
|
const started = Date.now();
|
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS");
|
|
res.setHeader("Access-Control-Allow-Headers", "Content-Type");
|
|
if (req.method === "OPTIONS") {
|
|
res.writeHead(204);
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
const route = `${req.method} ${req.url}`;
|
|
|
|
if (req.method === "GET" && req.url === "/health") {
|
|
const healthy = isDbHealthy();
|
|
const status = healthy ? 200 : 503;
|
|
writeJson(res, status, {
|
|
status: healthy ? "ok" : "degraded",
|
|
db: healthy ? "up" : "down",
|
|
...buildInfo(),
|
|
});
|
|
log.debug("http", { route, status, latency_ms: Date.now() - started });
|
|
return;
|
|
}
|
|
|
|
if (req.method === "GET" && req.url === "/metrics") {
|
|
res.writeHead(200, { "Content-Type": "text/plain; version=0.0.4" });
|
|
res.end(metricsToText());
|
|
return;
|
|
}
|
|
|
|
if (req.method === "POST" && req.url === "/hook/set-status") {
|
|
handleHookPost(req, res, started);
|
|
return;
|
|
}
|
|
|
|
if (req.method === "POST" && req.url === "/join") {
|
|
handleJoinPost(req, res, started);
|
|
return;
|
|
}
|
|
|
|
res.writeHead(404);
|
|
res.end("not found");
|
|
log.debug("http", { route, status: 404, latency_ms: Date.now() - started });
|
|
}
|
|
|
|
function handleHookPost(
|
|
req: IncomingMessage,
|
|
res: ServerResponse,
|
|
started: number,
|
|
): void {
|
|
metrics.hookRequestsTotal.inc();
|
|
const chunks: Buffer[] = [];
|
|
let total = 0;
|
|
let aborted = false;
|
|
|
|
req.on("data", (chunk: Buffer) => {
|
|
if (aborted) return;
|
|
total += chunk.length;
|
|
if (total > env.MAX_MESSAGE_BYTES) {
|
|
aborted = true;
|
|
writeJson(res, 413, { ok: false, error: "payload too large" });
|
|
req.destroy();
|
|
return;
|
|
}
|
|
chunks.push(chunk);
|
|
});
|
|
|
|
req.on("end", async () => {
|
|
if (aborted) return;
|
|
try {
|
|
const payload = JSON.parse(
|
|
Buffer.concat(chunks).toString(),
|
|
) as HookSetStatusRequest;
|
|
// Rate limit per (pid, cwd) if both present, else per cwd alone.
|
|
const rlKey = `${payload.pid ?? 0}:${payload.cwd ?? ""}`;
|
|
if (!hookRateLimit.take(rlKey)) {
|
|
metrics.hookRequestsRateLimited.inc();
|
|
writeJson(res, 429, { ok: false, error: "rate limited" });
|
|
log.warn("hook rate limited", {
|
|
cwd: payload.cwd,
|
|
pid: payload.pid,
|
|
});
|
|
return;
|
|
}
|
|
const result = await handleHookSetStatus(payload);
|
|
writeJson(res, 200, result);
|
|
log.info("hook", {
|
|
route: "POST /hook/set-status",
|
|
cwd: payload.cwd,
|
|
pid: payload.pid,
|
|
status: payload.status,
|
|
presence_id: result.presence_id,
|
|
pending: result.pending ?? false,
|
|
latency_ms: Date.now() - started,
|
|
});
|
|
if (result.ok && result.presence_id && !result.pending) {
|
|
void maybePushQueuedMessages(result.presence_id);
|
|
}
|
|
} catch (e) {
|
|
writeJson(res, 500, {
|
|
ok: false,
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
log.error("hook handler error", {
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
function handleJoinPost(
|
|
req: IncomingMessage,
|
|
res: ServerResponse,
|
|
started: number,
|
|
): void {
|
|
const chunks: Buffer[] = [];
|
|
let total = 0;
|
|
let aborted = false;
|
|
|
|
req.on("data", (chunk: Buffer) => {
|
|
if (aborted) return;
|
|
total += chunk.length;
|
|
if (total > env.MAX_MESSAGE_BYTES) {
|
|
aborted = true;
|
|
writeJson(res, 413, { ok: false, error: "payload too large" });
|
|
req.destroy();
|
|
return;
|
|
}
|
|
chunks.push(chunk);
|
|
});
|
|
|
|
req.on("end", async () => {
|
|
if (aborted) return;
|
|
try {
|
|
const payload = JSON.parse(Buffer.concat(chunks).toString()) as {
|
|
invite_token?: string;
|
|
invite_payload?: unknown;
|
|
peer_pubkey?: string;
|
|
display_name?: string;
|
|
};
|
|
if (
|
|
!payload.invite_token ||
|
|
!payload.invite_payload ||
|
|
!payload.peer_pubkey ||
|
|
!payload.display_name
|
|
) {
|
|
writeJson(res, 400, {
|
|
ok: false,
|
|
error:
|
|
"invite_token, invite_payload, peer_pubkey, display_name required",
|
|
});
|
|
return;
|
|
}
|
|
if (!/^[0-9a-f]{64}$/i.test(payload.peer_pubkey)) {
|
|
writeJson(res, 400, {
|
|
ok: false,
|
|
error: "peer_pubkey must be 64 hex chars (32 bytes)",
|
|
});
|
|
return;
|
|
}
|
|
const result = await joinMesh({
|
|
inviteToken: payload.invite_token,
|
|
invitePayload: payload.invite_payload as Parameters<
|
|
typeof joinMesh
|
|
>[0]["invitePayload"],
|
|
peerPubkey: payload.peer_pubkey,
|
|
displayName: payload.display_name,
|
|
});
|
|
writeJson(res, result.ok ? 200 : 400, result);
|
|
log.info("join", {
|
|
route: "POST /join",
|
|
pubkey: payload.peer_pubkey.slice(0, 12),
|
|
ok: result.ok,
|
|
error: !result.ok ? result.error : undefined,
|
|
already_member:
|
|
"alreadyMember" in result ? result.alreadyMember : false,
|
|
latency_ms: Date.now() - started,
|
|
});
|
|
} catch (e) {
|
|
writeJson(res, 500, {
|
|
ok: false,
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
log.error("join handler error", {
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
function handleUpgrade(
|
|
wss: WebSocketServer,
|
|
req: IncomingMessage,
|
|
socket: Duplex,
|
|
head: Buffer,
|
|
): void {
|
|
if (req.url !== WS_PATH) {
|
|
socket.destroy();
|
|
return;
|
|
}
|
|
wss.handleUpgrade(req, socket, head, (ws) => {
|
|
wss.emit("connection", ws, req);
|
|
});
|
|
}
|
|
|
|
// --- WS protocol handlers ---
|
|
|
|
function incMeshCount(meshId: string): number {
|
|
const n = (connectionsPerMesh.get(meshId) ?? 0) + 1;
|
|
connectionsPerMesh.set(meshId, n);
|
|
metrics.connectionsActive.set(connections.size + 1);
|
|
return n;
|
|
}
|
|
|
|
function decMeshCount(meshId: string): void {
|
|
const n = (connectionsPerMesh.get(meshId) ?? 1) - 1;
|
|
if (n <= 0) connectionsPerMesh.delete(meshId);
|
|
else connectionsPerMesh.set(meshId, n);
|
|
metrics.connectionsActive.set(connections.size);
|
|
}
|
|
|
|
function sendError(
|
|
ws: WebSocket,
|
|
code: string,
|
|
message: string,
|
|
id?: string,
|
|
): void {
|
|
const err: WSServerMessage = { type: "error", code, message, id };
|
|
try {
|
|
ws.send(JSON.stringify(err));
|
|
} catch {
|
|
/* ws already closed */
|
|
}
|
|
}
|
|
|
|
async function handleHello(
|
|
ws: WebSocket,
|
|
hello: Extract<WSClientMessage, { type: "hello" }>,
|
|
): Promise<{ presenceId: string; memberDisplayName: string } | null> {
|
|
// Capacity check BEFORE touching DB.
|
|
const existing = connectionsPerMesh.get(hello.meshId) ?? 0;
|
|
if (existing >= env.MAX_CONNECTIONS_PER_MESH) {
|
|
metrics.connectionsRejected.inc({ reason: "capacity" });
|
|
log.warn("mesh at capacity", {
|
|
mesh_id: hello.meshId,
|
|
existing,
|
|
cap: env.MAX_CONNECTIONS_PER_MESH,
|
|
});
|
|
sendError(ws, "capacity", "mesh at connection capacity");
|
|
ws.close(1008, "capacity");
|
|
return null;
|
|
}
|
|
// Signature + skew check. Proves the client holds the secret key
|
|
// for the pubkey they're claiming as identity.
|
|
const sig = await verifyHelloSignature({
|
|
meshId: hello.meshId,
|
|
memberId: hello.memberId,
|
|
pubkey: hello.pubkey,
|
|
timestamp: hello.timestamp,
|
|
signature: hello.signature,
|
|
});
|
|
if (!sig.ok) {
|
|
metrics.connectionsRejected.inc({ reason: sig.reason });
|
|
log.warn("hello sig rejected", {
|
|
reason: sig.reason,
|
|
mesh_id: hello.meshId,
|
|
pubkey: hello.pubkey?.slice(0, 12),
|
|
});
|
|
sendError(ws, sig.reason, `hello rejected: ${sig.reason}`);
|
|
ws.close(1008, sig.reason);
|
|
return null;
|
|
}
|
|
const member = await findMemberByPubkey(hello.meshId, hello.pubkey);
|
|
if (!member) {
|
|
metrics.connectionsRejected.inc({ reason: "unauthorized" });
|
|
sendError(ws, "unauthorized", "pubkey not found in mesh");
|
|
ws.close(1008, "unauthorized");
|
|
return null;
|
|
}
|
|
const presenceId = await connectPresence({
|
|
memberId: member.id,
|
|
sessionId: hello.sessionId,
|
|
pid: hello.pid,
|
|
cwd: hello.cwd,
|
|
});
|
|
connections.set(presenceId, {
|
|
ws,
|
|
meshId: hello.meshId,
|
|
memberId: member.id,
|
|
memberPubkey: hello.pubkey,
|
|
cwd: hello.cwd,
|
|
});
|
|
incMeshCount(hello.meshId);
|
|
log.info("ws hello", {
|
|
mesh_id: hello.meshId,
|
|
member: member.displayName,
|
|
presence_id: presenceId,
|
|
session_id: hello.sessionId,
|
|
});
|
|
// Drain any queued messages in the background. The hello_ack is
|
|
// sent by the CALLER after it assigns presenceId — sending it here
|
|
// races the caller's closure assignment, causing subsequent client
|
|
// messages to fail the "no_hello" check.
|
|
void maybePushQueuedMessages(presenceId);
|
|
return { presenceId, memberDisplayName: member.displayName };
|
|
}
|
|
|
|
async function handleSend(
|
|
conn: PeerConn,
|
|
msg: Extract<WSClientMessage, { type: "send" }>,
|
|
): Promise<void> {
|
|
const messageId = await queueMessage({
|
|
meshId: conn.meshId,
|
|
senderMemberId: conn.memberId,
|
|
targetSpec: msg.targetSpec,
|
|
priority: msg.priority,
|
|
nonce: msg.nonce,
|
|
ciphertext: msg.ciphertext,
|
|
});
|
|
const ack: WSServerMessage = {
|
|
type: "ack",
|
|
id: msg.id ?? "",
|
|
messageId,
|
|
queued: true,
|
|
};
|
|
conn.ws.send(JSON.stringify(ack));
|
|
|
|
// Fan-out over connected peers in the same mesh.
|
|
for (const [pid, peer] of connections) {
|
|
if (peer.meshId !== conn.meshId) continue;
|
|
if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec)
|
|
continue;
|
|
void maybePushQueuedMessages(pid);
|
|
}
|
|
}
|
|
|
|
function handleConnection(ws: WebSocket): void {
|
|
metrics.connectionsTotal.inc();
|
|
let presenceId: string | null = null;
|
|
ws.on("message", async (raw) => {
|
|
try {
|
|
const msg = JSON.parse(raw.toString()) as WSClientMessage;
|
|
if (msg.type === "hello") {
|
|
const result = await handleHello(ws, msg);
|
|
if (!result) return;
|
|
presenceId = result.presenceId;
|
|
// Ack AFTER closure assignment — subsequent client messages
|
|
// arriving immediately after will now see a non-null presenceId.
|
|
try {
|
|
ws.send(
|
|
JSON.stringify({
|
|
type: "hello_ack",
|
|
presenceId: result.presenceId,
|
|
memberDisplayName: result.memberDisplayName,
|
|
}),
|
|
);
|
|
} catch {
|
|
/* ws closed during hello */
|
|
}
|
|
return;
|
|
}
|
|
if (!presenceId) {
|
|
sendError(ws, "no_hello", "must send hello first");
|
|
return;
|
|
}
|
|
const conn = connections.get(presenceId);
|
|
if (!conn) return;
|
|
switch (msg.type) {
|
|
case "send":
|
|
await handleSend(conn, msg);
|
|
break;
|
|
case "set_status":
|
|
await writeStatus(presenceId, msg.status, "manual", new Date());
|
|
log.info("ws set_status", {
|
|
presence_id: presenceId,
|
|
status: msg.status,
|
|
});
|
|
break;
|
|
}
|
|
} catch (e) {
|
|
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
|
|
log.warn("ws message error", {
|
|
presence_id: presenceId,
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
});
|
|
ws.on("close", async () => {
|
|
if (presenceId) {
|
|
const conn = connections.get(presenceId);
|
|
connections.delete(presenceId);
|
|
if (conn) decMeshCount(conn.meshId);
|
|
await disconnectPresence(presenceId);
|
|
log.info("ws close", { presence_id: presenceId });
|
|
}
|
|
});
|
|
ws.on("error", (err) => {
|
|
log.warn("ws error", { error: err.message });
|
|
});
|
|
ws.on("pong", () => {
|
|
if (presenceId) void heartbeat(presenceId);
|
|
});
|
|
}
|
|
|
|
// --- Main ---
|
|
|
|
function main(): void {
|
|
const wss = new WebSocketServer({
|
|
noServer: true,
|
|
maxPayload: env.MAX_MESSAGE_BYTES,
|
|
});
|
|
wss.on("connection", handleConnection);
|
|
|
|
const http = createServer(handleHttpRequest);
|
|
http.on("upgrade", (req, socket, head) =>
|
|
handleUpgrade(wss, req, socket, head),
|
|
);
|
|
http.on("error", (err) => {
|
|
log.error("http server error", { error: err.message });
|
|
process.exit(1);
|
|
});
|
|
http.listen(PORT, "0.0.0.0", () => {
|
|
const info = buildInfo();
|
|
log.info("broker listening", {
|
|
port: PORT,
|
|
version: info.version,
|
|
gitSha: info.gitSha,
|
|
ws_path: WS_PATH,
|
|
ttl_seconds: env.STATUS_TTL_SECONDS,
|
|
hook_fresh_seconds: env.HOOK_FRESH_WINDOW_SECONDS,
|
|
max_connections_per_mesh: env.MAX_CONNECTIONS_PER_MESH,
|
|
max_message_bytes: env.MAX_MESSAGE_BYTES,
|
|
hook_rate_limit_per_min: env.HOOK_RATE_LIMIT_PER_MIN,
|
|
});
|
|
});
|
|
|
|
// WS heartbeat ping every 30s; clients reply with pong → bumps lastPingAt.
|
|
const pingInterval = setInterval(() => {
|
|
for (const { ws } of connections.values()) {
|
|
if (ws.readyState === ws.OPEN) ws.ping();
|
|
}
|
|
}, 30_000);
|
|
pingInterval.unref();
|
|
|
|
// GC rate-limit buckets periodically.
|
|
const rlSweep = setInterval(() => hookRateLimit.sweep(), 5 * 60_000);
|
|
rlSweep.unref();
|
|
|
|
// Queue depth gauge refresh (fires the metric; cheap COUNT query).
|
|
const queueDepthTimer = setInterval(() => {
|
|
refreshQueueDepth().catch((e) =>
|
|
log.warn("queue depth refresh failed", {
|
|
error: e instanceof Error ? e.message : String(e),
|
|
}),
|
|
);
|
|
}, 30_000);
|
|
queueDepthTimer.unref();
|
|
|
|
startSweepers();
|
|
startDbHealth();
|
|
|
|
const shutdown = async (signal: string): Promise<void> => {
|
|
log.info("shutdown signal", { signal });
|
|
clearInterval(pingInterval);
|
|
clearInterval(rlSweep);
|
|
clearInterval(queueDepthTimer);
|
|
stopDbHealth();
|
|
await stopSweepers();
|
|
for (const { ws } of connections.values()) {
|
|
try {
|
|
ws.close(1001, "shutting down");
|
|
} catch {
|
|
/* ignore */
|
|
}
|
|
}
|
|
wss.close();
|
|
http.close();
|
|
log.info("shutdown complete");
|
|
process.exit(0);
|
|
};
|
|
|
|
process.on("SIGTERM", () => {
|
|
void shutdown("SIGTERM");
|
|
});
|
|
process.on("SIGINT", () => {
|
|
void shutdown("SIGINT");
|
|
});
|
|
}
|
|
|
|
main();
|