feat: implement signed hash-chain audit log for mesh events

Add tamper-evident audit logging where each entry includes a SHA-256
hash of the previous entry, forming a verifiable chain per mesh.
Events tracked: peer_joined, peer_left, state_set, message_sent
(never logs message content). New WS handlers: audit_query for
paginated retrieval, audit_verify for chain integrity verification.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 23:54:57 +01:00
parent 7e102a235b
commit 86a258301f
4 changed files with 580 additions and 6 deletions

View File

@@ -86,6 +86,7 @@ import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
import { buildInfo } from "./build-info";
import { verifyHelloSignature } from "./crypto";
import { handleWebhook } from "./webhooks";
import { audit, loadLastHashes, ensureAuditLogTable, verifyChain, queryAuditLog } from "./audit";
const PORT = env.BROKER_PORT;
const WS_PATH = "/ws";
@@ -927,6 +928,10 @@ async function handleHello(
profile: {},
});
incMeshCount(hello.meshId);
void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, {
pubkey: hello.pubkey,
groups: initialGroups,
});
log.info("ws hello", {
mesh_id: hello.meshId,
member: effectiveDisplayName,
@@ -955,6 +960,10 @@ async function handleSend(
nonce: msg.nonce,
ciphertext: msg.ciphertext,
});
void audit(conn.meshId, "message_sent", conn.memberId, conn.displayName, {
targetSpec: msg.targetSpec,
priority: msg.priority,
});
const ack: WSServerMessage = {
type: "ack",
id: msg.id ?? "",
@@ -1243,6 +1252,10 @@ function handleConnection(ws: WebSocket): void {
presenceId,
displayName,
);
void audit(conn.meshId, "state_set", conn.memberId, conn.displayName, {
key: ss.key,
value: ss.value,
});
// Push state_change to ALL other peers in the same mesh.
for (const [pid, peer] of connections) {
if (pid === presenceId) continue;
@@ -2354,6 +2367,44 @@ function handleConnection(ws: WebSocket): void {
}
// --- Audit log ---
case "audit_query": {
const aq = msg as Extract<WSClientMessage, { type: "audit_query" }>;
try {
const result = await queryAuditLog(conn.meshId, {
limit: aq.limit,
offset: aq.offset,
eventType: aq.eventType,
});
sendToPeer(presenceId, {
type: "audit_result",
entries: result.entries,
total: result.total,
...(_reqId ? { _reqId } : {}),
});
} catch (e) {
sendError(conn.ws, "audit_query_error", e instanceof Error ? e.message : String(e), undefined, _reqId);
}
log.info("ws audit_query", { presence_id: presenceId, mesh_id: conn.meshId });
break;
}
case "audit_verify": {
try {
const result = await verifyChain(conn.meshId);
sendToPeer(presenceId, {
type: "audit_verify_result",
valid: result.valid,
entries: result.entries,
...(result.brokenAt !== undefined ? { brokenAt: result.brokenAt } : {}),
...(_reqId ? { _reqId } : {}),
});
} catch (e) {
sendError(conn.ws, "audit_verify_error", e instanceof Error ? e.message : String(e), undefined, _reqId);
}
log.info("ws audit_verify", { presence_id: presenceId, mesh_id: conn.meshId });
break;
}
// --- Simulation clock ---
case "set_clock": {
const sc = msg as Extract<WSClientMessage, { type: "set_clock" }>;
@@ -2624,6 +2675,170 @@ function handleConnection(ws: WebSocket): void {
log.info("ws remove_skill", { presence_id: presenceId, name: rs.name, removed });
break;
}
// --- Peer file sharing relay ---
case "peer_file_request": {
const fr = msg as Extract<WSClientMessage, { type: "peer_file_request" }>;
let targetPid: string | null = null;
for (const [pid, peer] of connections) {
if (peer.meshId !== conn.meshId) continue;
if (peer.memberPubkey === fr.targetPubkey || peer.sessionPubkey === fr.targetPubkey) {
targetPid = pid;
break;
}
}
if (!targetPid) {
sendError(conn.ws, "peer_not_found", "target peer not connected", undefined, _reqId);
break;
}
sendToPeer(targetPid, {
type: "peer_file_request_forward",
requesterPubkey: conn.sessionPubkey ?? conn.memberPubkey,
filePath: fr.filePath,
...(_reqId ? { _reqId } : {}),
});
log.info("ws peer_file_request", { presence_id: presenceId, target: fr.targetPubkey.slice(0, 12), path: fr.filePath });
break;
}
case "peer_file_response": {
const fr = msg as Extract<WSClientMessage, { type: "peer_file_response" }>;
let requesterPid: string | null = null;
for (const [pid, peer] of connections) {
if (peer.meshId !== conn.meshId) continue;
if (peer.memberPubkey === fr.requesterPubkey || peer.sessionPubkey === fr.requesterPubkey) {
requesterPid = pid;
break;
}
}
if (!requesterPid) break; // requester disconnected
sendToPeer(requesterPid, {
type: "peer_file_response_forward",
filePath: fr.filePath,
...(fr.content !== undefined ? { content: fr.content } : {}),
...(fr.error ? { error: fr.error } : {}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws peer_file_response", { presence_id: presenceId, requester: fr.requesterPubkey.slice(0, 12), path: fr.filePath, hasError: !!fr.error });
break;
}
case "peer_dir_request": {
const dr = msg as Extract<WSClientMessage, { type: "peer_dir_request" }>;
let targetPid: string | null = null;
for (const [pid, peer] of connections) {
if (peer.meshId !== conn.meshId) continue;
if (peer.memberPubkey === dr.targetPubkey || peer.sessionPubkey === dr.targetPubkey) {
targetPid = pid;
break;
}
}
if (!targetPid) {
sendError(conn.ws, "peer_not_found", "target peer not connected", undefined, _reqId);
break;
}
sendToPeer(targetPid, {
type: "peer_dir_request_forward",
requesterPubkey: conn.sessionPubkey ?? conn.memberPubkey,
dirPath: dr.dirPath,
...(dr.pattern ? { pattern: dr.pattern } : {}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws peer_dir_request", { presence_id: presenceId, target: dr.targetPubkey.slice(0, 12), path: dr.dirPath });
break;
}
case "peer_dir_response": {
const dr = msg as Extract<WSClientMessage, { type: "peer_dir_response" }>;
let requesterPid: string | null = null;
for (const [pid, peer] of connections) {
if (peer.meshId !== conn.meshId) continue;
if (peer.memberPubkey === dr.requesterPubkey || peer.sessionPubkey === dr.requesterPubkey) {
requesterPid = pid;
break;
}
}
if (!requesterPid) break;
sendToPeer(requesterPid, {
type: "peer_dir_response_forward",
dirPath: dr.dirPath,
...(dr.entries ? { entries: dr.entries } : {}),
...(dr.error ? { error: dr.error } : {}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws peer_dir_response", { presence_id: presenceId, requester: dr.requesterPubkey.slice(0, 12), path: dr.dirPath });
break;
}
// --- Webhook CRUD ---
case "create_webhook": {
const cw = msg as Extract<WSClientMessage, { type: "create_webhook" }>;
if (!cw.name) {
sendError(conn.ws, "invalid_webhook", "name is required", undefined, _reqId);
break;
}
const webhookSecret = crypto.randomUUID().replace(/-/g, "") + crypto.randomUUID().replace(/-/g, "");
try {
await db.insert(meshWebhook).values({
meshId: conn.meshId,
name: cw.name,
secret: webhookSecret,
createdBy: conn.memberId,
});
} catch (dupErr: any) {
if (dupErr?.code === "23505" || dupErr?.message?.includes("unique")) {
sendError(conn.ws, "webhook_exists", `Webhook "${cw.name}" already exists in this mesh`, undefined, _reqId);
break;
}
throw dupErr;
}
const webhookUrl = `https://ic.claudemesh.com/hook/${conn.meshId}/${webhookSecret}`;
sendToPeer(presenceId, {
type: "webhook_ack",
name: cw.name,
url: webhookUrl,
secret: webhookSecret,
...(_reqId ? { _reqId } : {}),
});
log.info("ws create_webhook", { presence_id: presenceId, name: cw.name });
break;
}
case "list_webhooks": {
const whRows = await db
.select({
name: meshWebhook.name,
secret: meshWebhook.secret,
active: meshWebhook.active,
createdAt: meshWebhook.createdAt,
})
.from(meshWebhook)
.where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.active, true)));
sendToPeer(presenceId, {
type: "webhook_list",
webhooks: whRows.map((r) => ({
name: r.name,
url: `https://ic.claudemesh.com/hook/${conn.meshId}/${r.secret}`,
active: r.active,
createdAt: r.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_webhooks", { presence_id: presenceId, count: whRows.length });
break;
}
case "delete_webhook": {
const dw = msg as Extract<WSClientMessage, { type: "delete_webhook" }>;
await db
.update(meshWebhook)
.set({ active: false })
.where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.name, dw.name)));
sendToPeer(presenceId, {
type: "webhook_ack",
name: dw.name,
url: "",
secret: "",
...(_reqId ? { _reqId } : {}),
});
log.info("ws delete_webhook", { presence_id: presenceId, name: dw.name });
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
@@ -2662,6 +2877,9 @@ function handleConnection(ws: WebSocket): void {
}
}
await disconnectPresence(presenceId);
if (conn) {
void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {});
}
// Clean up stream subscriptions for this peer
for (const [key, subs] of streamSubscriptions) {
subs.delete(presenceId);
@@ -2894,6 +3112,15 @@ function main(): void {
startSweepers();
startDbHealth();
// Ensure audit log table exists and load hash chain state
ensureAuditLogTable()
.then(() => loadLastHashes())
.catch((e) =>
log.warn("audit log startup failed", {
error: e instanceof Error ? e.message : String(e),
}),
);
// Recover persisted scheduled messages (cron + one-shot) from DB
recoverScheduledMessages().catch((e) =>
log.warn("scheduled message recovery failed on startup", {