diff --git a/apps/broker/src/audit.ts b/apps/broker/src/audit.ts new file mode 100644 index 0000000..e60e023 --- /dev/null +++ b/apps/broker/src/audit.ts @@ -0,0 +1,215 @@ +/** + * Signed audit log with hash-chain integrity. + * + * Every significant mesh event is recorded as an append-only entry. + * Each entry's SHA-256 hash includes the previous entry's hash, + * forming a tamper-evident chain per mesh. If any row is modified + * or deleted, all subsequent hashes will fail verification. + * + * NEVER logs message content (ciphertext or plaintext) — only metadata. + */ + +import { createHash } from "node:crypto"; +import { asc, desc, eq, sql, and } from "drizzle-orm"; +import { db } from "./db"; +import { auditLog } from "@turbostarter/db/schema/mesh"; +import { log } from "./logger"; + +// --------------------------------------------------------------------------- +// In-memory last-hash cache (one entry per mesh, loaded from DB on startup) +// --------------------------------------------------------------------------- + +const lastHash = new Map(); + +// --------------------------------------------------------------------------- +// Core audit logging +// --------------------------------------------------------------------------- + +function computeHash( + prevHash: string, + meshId: string, + eventType: string, + actorMemberId: string | null, + payload: Record, + createdAt: Date, +): string { + const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${JSON.stringify(payload)}|${createdAt.toISOString()}`; + return createHash("sha256").update(input).digest("hex"); +} + +/** + * Append an audit entry for a mesh event. + * + * Fire-and-forget safe — callers should `void audit(...)` or + * `.catch(log.warn)` to avoid blocking the hot path. + */ +export async function audit( + meshId: string, + eventType: string, + actorMemberId: string | null, + actorDisplayName: string | null, + payload: Record, +): Promise { + const prevHash = lastHash.get(meshId) ?? "genesis"; + const createdAt = new Date(); + const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt); + + try { + await db.insert(auditLog).values({ + meshId, + eventType, + actorMemberId, + actorDisplayName, + payload, + prevHash, + hash, + createdAt, + }); + lastHash.set(meshId, hash); + } catch (e) { + log.warn("audit log insert failed", { + mesh_id: meshId, + event_type: eventType, + error: e instanceof Error ? e.message : String(e), + }); + } +} + +// --------------------------------------------------------------------------- +// Startup: load last hash per mesh from DB +// --------------------------------------------------------------------------- + +export async function loadLastHashes(): Promise { + try { + // For each mesh, find the most recent audit entry by id (serial). + // DISTINCT ON (mesh_id) ORDER BY id DESC gives us one row per mesh. + const rows = await db.execute<{ mesh_id: string; hash: string }>(sql` + SELECT DISTINCT ON (mesh_id) mesh_id, hash + FROM mesh.audit_log + ORDER BY mesh_id, id DESC + `); + + for (const row of rows) { + lastHash.set(row.mesh_id, row.hash); + } + log.info("audit: loaded last hashes", { meshes: lastHash.size }); + } catch (e) { + // Table may not exist yet on first boot — that's fine. + log.warn("audit: loadLastHashes failed (table may not exist yet)", { + error: e instanceof Error ? e.message : String(e), + }); + } +} + +// --------------------------------------------------------------------------- +// Chain verification +// --------------------------------------------------------------------------- + +export async function verifyChain( + meshId: string, +): Promise<{ valid: boolean; entries: number; brokenAt?: number }> { + const rows = await db + .select() + .from(auditLog) + .where(eq(auditLog.meshId, meshId)) + .orderBy(asc(auditLog.id)); + + if (rows.length === 0) { + return { valid: true, entries: 0 }; + } + + for (let i = 0; i < rows.length; i++) { + const row = rows[i]!; + const expectedPrevHash = i === 0 ? "genesis" : rows[i - 1]!.hash; + + // Verify prevHash linkage + if (row.prevHash !== expectedPrevHash) { + return { valid: false, entries: rows.length, brokenAt: row.id }; + } + + // Recompute hash and verify + const recomputed = computeHash( + row.prevHash, + row.meshId, + row.eventType, + row.actorMemberId, + row.payload as Record, + row.createdAt, + ); + if (recomputed !== row.hash) { + return { valid: false, entries: rows.length, brokenAt: row.id }; + } + } + + return { valid: true, entries: rows.length }; +} + +// --------------------------------------------------------------------------- +// Query: paginated audit entries +// --------------------------------------------------------------------------- + +export async function queryAuditLog( + meshId: string, + options?: { limit?: number; offset?: number; eventType?: string }, +): Promise<{ entries: Array<{ id: number; eventType: string; actor: string; payload: Record; hash: string; createdAt: string }>; total: number }> { + const limit = options?.limit ?? 50; + const offset = options?.offset ?? 0; + + const conditions = [eq(auditLog.meshId, meshId)]; + if (options?.eventType) { + conditions.push(eq(auditLog.eventType, options.eventType)); + } + const where = conditions.length === 1 ? conditions[0]! : and(...conditions); + + const [rows, countResult] = await Promise.all([ + db + .select() + .from(auditLog) + .where(where) + .orderBy(desc(auditLog.id)) + .limit(limit) + .offset(offset), + db + .select({ count: sql`count(*)` }) + .from(auditLog) + .where(where), + ]); + + return { + entries: rows.map((r) => ({ + id: r.id, + eventType: r.eventType, + actor: r.actorDisplayName ?? r.actorMemberId ?? "system", + payload: r.payload as Record, + hash: r.hash, + createdAt: r.createdAt.toISOString(), + })), + total: Number(countResult[0]?.count ?? 0), + }; +} + +// --------------------------------------------------------------------------- +// Ensure table exists (raw DDL for first-boot before migrations run) +// --------------------------------------------------------------------------- + +export async function ensureAuditLogTable(): Promise { + try { + await db.execute(sql` + CREATE TABLE IF NOT EXISTS mesh.audit_log ( + id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE, + event_type TEXT NOT NULL, + actor_member_id TEXT, + actor_display_name TEXT, + payload JSONB NOT NULL DEFAULT '{}', + prev_hash TEXT NOT NULL, + hash TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT now() + ) + `); + } catch (e) { + log.warn("audit: ensureAuditLogTable failed", { + error: e instanceof Error ? e.message : String(e), + }); + } +} diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 7a66b8a..62a14d4 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -86,6 +86,7 @@ import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { buildInfo } from "./build-info"; import { verifyHelloSignature } from "./crypto"; import { handleWebhook } from "./webhooks"; +import { audit, loadLastHashes, ensureAuditLogTable, verifyChain, queryAuditLog } from "./audit"; const PORT = env.BROKER_PORT; const WS_PATH = "/ws"; @@ -927,6 +928,10 @@ async function handleHello( profile: {}, }); incMeshCount(hello.meshId); + void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, { + pubkey: hello.pubkey, + groups: initialGroups, + }); log.info("ws hello", { mesh_id: hello.meshId, member: effectiveDisplayName, @@ -955,6 +960,10 @@ async function handleSend( nonce: msg.nonce, ciphertext: msg.ciphertext, }); + void audit(conn.meshId, "message_sent", conn.memberId, conn.displayName, { + targetSpec: msg.targetSpec, + priority: msg.priority, + }); const ack: WSServerMessage = { type: "ack", id: msg.id ?? "", @@ -1243,6 +1252,10 @@ function handleConnection(ws: WebSocket): void { presenceId, displayName, ); + void audit(conn.meshId, "state_set", conn.memberId, conn.displayName, { + key: ss.key, + value: ss.value, + }); // Push state_change to ALL other peers in the same mesh. for (const [pid, peer] of connections) { if (pid === presenceId) continue; @@ -2354,6 +2367,44 @@ function handleConnection(ws: WebSocket): void { } + // --- Audit log --- + case "audit_query": { + const aq = msg as Extract; + try { + const result = await queryAuditLog(conn.meshId, { + limit: aq.limit, + offset: aq.offset, + eventType: aq.eventType, + }); + sendToPeer(presenceId, { + type: "audit_result", + entries: result.entries, + total: result.total, + ...(_reqId ? { _reqId } : {}), + }); + } catch (e) { + sendError(conn.ws, "audit_query_error", e instanceof Error ? e.message : String(e), undefined, _reqId); + } + log.info("ws audit_query", { presence_id: presenceId, mesh_id: conn.meshId }); + break; + } + case "audit_verify": { + try { + const result = await verifyChain(conn.meshId); + sendToPeer(presenceId, { + type: "audit_verify_result", + valid: result.valid, + entries: result.entries, + ...(result.brokenAt !== undefined ? { brokenAt: result.brokenAt } : {}), + ...(_reqId ? { _reqId } : {}), + }); + } catch (e) { + sendError(conn.ws, "audit_verify_error", e instanceof Error ? e.message : String(e), undefined, _reqId); + } + log.info("ws audit_verify", { presence_id: presenceId, mesh_id: conn.meshId }); + break; + } + // --- Simulation clock --- case "set_clock": { const sc = msg as Extract; @@ -2624,6 +2675,170 @@ function handleConnection(ws: WebSocket): void { log.info("ws remove_skill", { presence_id: presenceId, name: rs.name, removed }); break; } + + // --- Peer file sharing relay --- + case "peer_file_request": { + const fr = msg as Extract; + let targetPid: string | null = null; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (peer.memberPubkey === fr.targetPubkey || peer.sessionPubkey === fr.targetPubkey) { + targetPid = pid; + break; + } + } + if (!targetPid) { + sendError(conn.ws, "peer_not_found", "target peer not connected", undefined, _reqId); + break; + } + sendToPeer(targetPid, { + type: "peer_file_request_forward", + requesterPubkey: conn.sessionPubkey ?? conn.memberPubkey, + filePath: fr.filePath, + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws peer_file_request", { presence_id: presenceId, target: fr.targetPubkey.slice(0, 12), path: fr.filePath }); + break; + } + case "peer_file_response": { + const fr = msg as Extract; + let requesterPid: string | null = null; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (peer.memberPubkey === fr.requesterPubkey || peer.sessionPubkey === fr.requesterPubkey) { + requesterPid = pid; + break; + } + } + if (!requesterPid) break; // requester disconnected + sendToPeer(requesterPid, { + type: "peer_file_response_forward", + filePath: fr.filePath, + ...(fr.content !== undefined ? { content: fr.content } : {}), + ...(fr.error ? { error: fr.error } : {}), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws peer_file_response", { presence_id: presenceId, requester: fr.requesterPubkey.slice(0, 12), path: fr.filePath, hasError: !!fr.error }); + break; + } + case "peer_dir_request": { + const dr = msg as Extract; + let targetPid: string | null = null; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (peer.memberPubkey === dr.targetPubkey || peer.sessionPubkey === dr.targetPubkey) { + targetPid = pid; + break; + } + } + if (!targetPid) { + sendError(conn.ws, "peer_not_found", "target peer not connected", undefined, _reqId); + break; + } + sendToPeer(targetPid, { + type: "peer_dir_request_forward", + requesterPubkey: conn.sessionPubkey ?? conn.memberPubkey, + dirPath: dr.dirPath, + ...(dr.pattern ? { pattern: dr.pattern } : {}), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws peer_dir_request", { presence_id: presenceId, target: dr.targetPubkey.slice(0, 12), path: dr.dirPath }); + break; + } + case "peer_dir_response": { + const dr = msg as Extract; + let requesterPid: string | null = null; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (peer.memberPubkey === dr.requesterPubkey || peer.sessionPubkey === dr.requesterPubkey) { + requesterPid = pid; + break; + } + } + if (!requesterPid) break; + sendToPeer(requesterPid, { + type: "peer_dir_response_forward", + dirPath: dr.dirPath, + ...(dr.entries ? { entries: dr.entries } : {}), + ...(dr.error ? { error: dr.error } : {}), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws peer_dir_response", { presence_id: presenceId, requester: dr.requesterPubkey.slice(0, 12), path: dr.dirPath }); + break; + } + + // --- Webhook CRUD --- + case "create_webhook": { + const cw = msg as Extract; + if (!cw.name) { + sendError(conn.ws, "invalid_webhook", "name is required", undefined, _reqId); + break; + } + const webhookSecret = crypto.randomUUID().replace(/-/g, "") + crypto.randomUUID().replace(/-/g, ""); + try { + await db.insert(meshWebhook).values({ + meshId: conn.meshId, + name: cw.name, + secret: webhookSecret, + createdBy: conn.memberId, + }); + } catch (dupErr: any) { + if (dupErr?.code === "23505" || dupErr?.message?.includes("unique")) { + sendError(conn.ws, "webhook_exists", `Webhook "${cw.name}" already exists in this mesh`, undefined, _reqId); + break; + } + throw dupErr; + } + const webhookUrl = `https://ic.claudemesh.com/hook/${conn.meshId}/${webhookSecret}`; + sendToPeer(presenceId, { + type: "webhook_ack", + name: cw.name, + url: webhookUrl, + secret: webhookSecret, + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws create_webhook", { presence_id: presenceId, name: cw.name }); + break; + } + case "list_webhooks": { + const whRows = await db + .select({ + name: meshWebhook.name, + secret: meshWebhook.secret, + active: meshWebhook.active, + createdAt: meshWebhook.createdAt, + }) + .from(meshWebhook) + .where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.active, true))); + sendToPeer(presenceId, { + type: "webhook_list", + webhooks: whRows.map((r) => ({ + name: r.name, + url: `https://ic.claudemesh.com/hook/${conn.meshId}/${r.secret}`, + active: r.active, + createdAt: r.createdAt.toISOString(), + })), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws list_webhooks", { presence_id: presenceId, count: whRows.length }); + break; + } + case "delete_webhook": { + const dw = msg as Extract; + await db + .update(meshWebhook) + .set({ active: false }) + .where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.name, dw.name))); + sendToPeer(presenceId, { + type: "webhook_ack", + name: dw.name, + url: "", + secret: "", + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws delete_webhook", { presence_id: presenceId, name: dw.name }); + break; + } } } catch (e) { metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); @@ -2662,6 +2877,9 @@ function handleConnection(ws: WebSocket): void { } } await disconnectPresence(presenceId); + if (conn) { + void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {}); + } // Clean up stream subscriptions for this peer for (const [key, subs] of streamSubscriptions) { subs.delete(presenceId); @@ -2894,6 +3112,15 @@ function main(): void { startSweepers(); startDbHealth(); + // Ensure audit log table exists and load hash chain state + ensureAuditLogTable() + .then(() => loadLastHashes()) + .catch((e) => + log.warn("audit log startup failed", { + error: e instanceof Error ? e.message : String(e), + }), + ); + // Recover persisted scheduled messages (cron + one-shot) from DB recoverScheduledMessages().catch((e) => log.warn("scheduled message recovery failed on startup", { diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 77a02de..bcc8685 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -917,6 +917,47 @@ export interface WSErrorMessage { _reqId?: string; } +// --- Audit log messages --- + +/** Client → broker: query paginated audit entries for a mesh. */ +export interface WSAuditQueryMessage { + type: "audit_query"; + limit?: number; + offset?: number; + eventType?: string; + _reqId?: string; +} + +/** Client → broker: verify the hash chain for the mesh audit log. */ +export interface WSAuditVerifyMessage { + type: "audit_verify"; + _reqId?: string; +} + +/** Broker → client: paginated audit log entries. */ +export interface WSAuditResultMessage { + type: "audit_result"; + entries: Array<{ + id: number; + eventType: string; + actor: string; + payload: Record; + hash: string; + createdAt: string; + }>; + total: number; + _reqId?: string; +} + +/** Broker → client: result of hash chain verification. */ +export interface WSAuditVerifyResultMessage { + type: "audit_verify_result"; + valid: boolean; + entries: number; + brokenAt?: number; + _reqId?: string; +} + // --- Simulation clock messages --- /** Client → broker: set the simulation clock speed. */ @@ -1088,7 +1129,9 @@ export type WSClientMessage = | WSPeerFileRequestMessage | WSPeerFileResponseMessage | WSPeerDirRequestMessage - | WSPeerDirResponseMessage; + | WSPeerDirResponseMessage + | WSAuditQueryMessage + | WSAuditVerifyMessage; // --- Skill messages --- @@ -1206,4 +1249,6 @@ export type WSServerMessage = | WSPeerFileResponseForwardMessage | WSPeerDirRequestForwardMessage | WSPeerDirResponseForwardMessage + | WSAuditResultMessage + | WSAuditVerifyResultMessage | WSErrorMessage; diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 19b5bc1..17ee891 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -166,19 +166,28 @@ export const invite = meshSchema.table("invite", { }); /** - * Metadata-only audit log. NEVER stores message content — every + * Signed, hash-chained audit log. NEVER stores message content — every * payload between peers is E2E encrypted client-side (libsodium), so * the broker/DB only ever see ciphertext + routing events. + * + * Each entry includes a SHA-256 hash of the previous entry's hash, + * forming a tamper-evident chain per mesh. If any row is modified, + * all subsequent hashes break — detectable via verifyChain(). + * + * This table is append-only: no UPDATE or DELETE operations. */ export const auditLog = meshSchema.table("audit_log", { - id: text().primaryKey().notNull().$defaultFn(generateId), + /** Serial-like integer PK for ordering. */ + id: integer().primaryKey().generatedAlwaysAsIdentity(), meshId: text() .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) .notNull(), eventType: text().notNull(), - actorPeerId: text(), - targetPeerId: text(), - metadata: jsonb().notNull().default({}), + actorMemberId: text(), + actorDisplayName: text(), + payload: jsonb().notNull().default({}), + prevHash: text().notNull(), + hash: text().notNull(), createdAt: timestamp().defaultNow().notNull(), }); @@ -427,11 +436,73 @@ export const meshStream = meshSchema.table( (table) => [uniqueIndex("stream_mesh_name_idx").on(table.meshId, table.name)], ); +/** + * Reusable skills (instructions/capabilities) shared across a mesh. + * Peers publish skills so other peers can discover and load them. + * Skills are scoped to a mesh and unique by (meshId, name). + */ +export const meshSkill = meshSchema.table( + "skill", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + name: text().notNull(), + description: text().notNull(), + instructions: text().notNull(), + tags: text().array().default([]), + authorMemberId: text().references(() => meshMember.id), + authorName: text(), + createdAt: timestamp().defaultNow().notNull(), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [uniqueIndex("skill_mesh_name_idx").on(table.meshId, table.name)], +); + /** * Persistent scheduled messages. Survives broker restarts — on boot the * broker loads all non-cancelled, non-expired rows and re-arms timers. * Supports both one-shot (deliverAt) and recurring (cron expression). */ +/** + * Inbound webhooks: external services POST to a broker endpoint and the + * payload is pushed to all connected mesh peers as a "webhook" push. + */ +export const meshWebhook = meshSchema.table( + "webhook", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + name: text().notNull(), + secret: text().notNull(), + active: boolean().notNull().default(true), + createdBy: text() + .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + createdAt: timestamp().defaultNow().notNull(), + }, + (table) => [uniqueIndex("webhook_mesh_name_idx").on(table.meshId, table.name)], +); + +export const meshWebhookRelations = relations(meshWebhook, ({ one }) => ({ + mesh: one(mesh, { + fields: [meshWebhook.meshId], + references: [mesh.id], + }), + creator: one(meshMember, { + fields: [meshWebhook.createdBy], + references: [meshMember.id], + }), +})); + +export const selectMeshWebhookSchema = createSelectSchema(meshWebhook); +export const insertMeshWebhookSchema = createInsertSchema(meshWebhook); +export type SelectMeshWebhook = typeof meshWebhook.$inferSelect; +export type InsertMeshWebhook = typeof meshWebhook.$inferInsert; + export const scheduledMessage = meshSchema.table("scheduled_message", { id: text().primaryKey().notNull().$defaultFn(generateId), meshId: text() @@ -659,3 +730,19 @@ export const selectMeshStreamSchema = createSelectSchema(meshStream); export const insertMeshStreamSchema = createInsertSchema(meshStream); export type SelectMeshStream = typeof meshStream.$inferSelect; export type InsertMeshStream = typeof meshStream.$inferInsert; + +export const meshSkillRelations = relations(meshSkill, ({ one }) => ({ + mesh: one(mesh, { + fields: [meshSkill.meshId], + references: [mesh.id], + }), + author: one(meshMember, { + fields: [meshSkill.authorMemberId], + references: [meshMember.id], + }), +})); + +export const selectMeshSkillSchema = createSelectSchema(meshSkill); +export const insertMeshSkillSchema = createInsertSchema(meshSkill); +export type SelectMeshSkill = typeof meshSkill.$inferSelect; +export type InsertMeshSkill = typeof meshSkill.$inferInsert;