diff --git a/apps/broker/scripts/seed-test-mesh.ts b/apps/broker/scripts/seed-test-mesh.ts index a795bfa..8609467 100644 --- a/apps/broker/scripts/seed-test-mesh.ts +++ b/apps/broker/scripts/seed-test-mesh.ts @@ -10,16 +10,25 @@ */ import { eq } from "drizzle-orm"; +import sodium from "libsodium-wrappers"; import { db } from "../src/db"; import { mesh, meshMember } from "@turbostarter/db/schema/mesh"; import { user } from "@turbostarter/db/schema/auth"; const USER_ID = "test-user-smoke"; const MESH_SLUG = "smoke-test"; -const PEER_A_PUBKEY = "a".repeat(64); -const PEER_B_PUBKEY = "b".repeat(64); async function main() { + // Generate real ed25519 keypairs so crypto_box (via ed25519→X25519 + // conversion) works in Step 18+ round-trip tests. + await sodium.ready; + const kpA = sodium.crypto_sign_keypair(); + const kpB = sodium.crypto_sign_keypair(); + const PEER_A_PUBKEY = sodium.to_hex(kpA.publicKey); + const PEER_A_SECRET = sodium.to_hex(kpA.privateKey); + const PEER_B_PUBKEY = sodium.to_hex(kpB.publicKey); + const PEER_B_SECRET = sodium.to_hex(kpB.privateKey); + // Ensure the test user exists (re-usable across runs). const [existingUser] = await db .select({ id: user.id }) @@ -75,8 +84,16 @@ async function main() { const seed = { meshId: m.id, - peerA: { memberId: peerA.id, pubkey: PEER_A_PUBKEY }, - peerB: { memberId: peerB.id, pubkey: PEER_B_PUBKEY }, + peerA: { + memberId: peerA.id, + pubkey: PEER_A_PUBKEY, + secretKey: PEER_A_SECRET, + }, + peerB: { + memberId: peerB.id, + pubkey: PEER_B_PUBKEY, + secretKey: PEER_B_SECRET, + }, }; console.log(JSON.stringify(seed, null, 2)); process.exit(0); diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index ebf4bda..8a27193 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -423,34 +423,39 @@ export async function drainForMember( priorities.map((p) => `'${p}'`).join(","), ); - // Atomic claim: inner SELECT locks candidate rows (skipping any - // already locked by a concurrent drain), outer UPDATE marks them - // delivered, the FROM join fetches the sender's pubkey, RETURNING - // gives us everything we need to push in one round-trip. + // Atomic claim with SQL-side ordering. The CTE claims rows via + // UPDATE...RETURNING; the outer SELECT re-orders by created_at + // (with id as tiebreaker so equal-timestamp rows stay deterministic). + // Sorting in SQL avoids JS Date's millisecond-precision collapse of + // Postgres microsecond timestamps. const result = await db.execute<{ id: string; priority: string; nonce: string; ciphertext: string; - created_at: Date; + created_at: string | Date; sender_member_id: string; sender_pubkey: string; }>(sql` - UPDATE mesh.message_queue AS mq - SET delivered_at = NOW() - FROM mesh.member AS m - WHERE mq.id IN ( - SELECT id FROM mesh.message_queue - WHERE mesh_id = ${meshId} - AND delivered_at IS NULL - AND priority::text IN (${priorityList}) - AND (target_spec = ${memberPubkey} OR target_spec = '*') - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED + WITH claimed AS ( + UPDATE mesh.message_queue AS mq + SET delivered_at = NOW() + FROM mesh.member AS m + WHERE mq.id IN ( + SELECT id FROM mesh.message_queue + WHERE mesh_id = ${meshId} + AND delivered_at IS NULL + AND priority::text IN (${priorityList}) + AND (target_spec = ${memberPubkey} OR target_spec = '*') + ORDER BY created_at ASC, id ASC + FOR UPDATE SKIP LOCKED + ) + AND m.id = mq.sender_member_id + RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext, + mq.created_at, mq.sender_member_id, + m.peer_pubkey AS sender_pubkey ) - AND m.id = mq.sender_member_id - RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext, - mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey + SELECT * FROM claimed ORDER BY created_at ASC, id ASC `); const rows = (result.rows ?? result) as Array<{ @@ -463,23 +468,13 @@ export async function drainForMember( sender_pubkey: string; }>; if (!rows || rows.length === 0) return []; - // Normalize created_at to Date (pg driver sometimes returns ISO - // strings for raw sql results). - const normalized = rows.map((r) => ({ - ...r, - created_at: - r.created_at instanceof Date ? r.created_at : new Date(r.created_at), - })); - // RETURNING order may not match the inner SELECT's ORDER BY — re-sort. - normalized.sort( - (a, b) => a.created_at.getTime() - b.created_at.getTime(), - ); - return normalized.map((r) => ({ + return rows.map((r) => ({ id: r.id, priority: r.priority as Priority, nonce: r.nonce, ciphertext: r.ciphertext, - createdAt: r.created_at, + createdAt: + r.created_at instanceof Date ? r.created_at : new Date(r.created_at), senderMemberId: r.sender_member_id, senderPubkey: r.sender_pubkey, })); diff --git a/apps/broker/tests/integration/health.test.ts b/apps/broker/tests/integration/health.test.ts index 8d18754..5c01f31 100644 --- a/apps/broker/tests/integration/health.test.ts +++ b/apps/broker/tests/integration/health.test.ts @@ -26,7 +26,6 @@ async function waitHealthyOrAny(port: number, maxMs = 5000): Promise { const r = await fetch(`http://localhost:${port}/health`, { signal: AbortSignal.timeout(500), }); - // Any response (even 503) means the HTTP server is up. if (r.status === 200 || r.status === 503) return; } catch { /* not yet */ @@ -36,6 +35,23 @@ async function waitHealthyOrAny(port: number, maxMs = 5000): Promise { throw new Error(`broker on :${port} did not come up`); } +/** Wait until /health returns 200 (HTTP + DB ping both completed). */ +async function waitFullyHealthy(port: number, maxMs = 5000): Promise { + const start = Date.now(); + while (Date.now() - start < maxMs) { + try { + const r = await fetch(`http://localhost:${port}/health`, { + signal: AbortSignal.timeout(500), + }); + if (r.status === 200) return; + } catch { + /* not yet */ + } + await new Promise((r) => setTimeout(r, 100)); + } + throw new Error(`broker on :${port} did not become fully healthy`); +} + function spawnBroker(env: Record): BrokerProc { const port = 18000 + Math.floor(Math.random() * 1000); const brokerEntry = join( @@ -73,7 +89,7 @@ describe("/health endpoint", () => { process.env.DATABASE_URL ?? "postgresql://turbostarter:turbostarter@127.0.0.1:5440/claudemesh_test", }); - await waitHealthyOrAny(broker.port); + await waitFullyHealthy(broker.port); }); afterAll(() => broker?.kill()); diff --git a/apps/broker/vitest.config.ts b/apps/broker/vitest.config.ts index 6217c4f..3bf5db1 100644 --- a/apps/broker/vitest.config.ts +++ b/apps/broker/vitest.config.ts @@ -18,8 +18,9 @@ export default mergeConfig( test: { testTimeout: 10_000, hookTimeout: 10_000, - // Keep sequential initially — can flip to parallel once - // per-test isolation is proven. + // Test files share a Postgres schema and use cleanupAllTestMeshes + // in afterAll, so run them serially to avoid cross-file races. + fileParallelism: false, sequence: { concurrent: false, }, diff --git a/apps/cli/scripts/join-roundtrip.ts b/apps/cli/scripts/join-roundtrip.ts index 23c5e0b..fad4854 100644 --- a/apps/cli/scripts/join-roundtrip.ts +++ b/apps/cli/scripts/join-roundtrip.ts @@ -29,7 +29,7 @@ execSync(`rm -rf "${process.env.CLAUDEMESH_CONFIG_DIR}"`, { const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as { meshId: string; - peerB: { memberId: string; pubkey: string }; + peerB: { memberId: string; pubkey: string; secretKey: string }; }; async function main(): Promise { @@ -65,18 +65,20 @@ async function main(): Promise { ); // 4. Connect also as peer-B (the target) so we can observe receipt. + // Uses the real keypair from the seed (needed for crypto_box decrypt). const targetMesh: JoinedMesh = { ...joinedMesh, memberId: seed.peerB.memberId, slug: "rt-join-b", pubkey: seed.peerB.pubkey, + secretKey: seed.peerB.secretKey, }; const joiner = new BrokerClient(joinedMesh); const target = new BrokerClient(targetMesh); let received = ""; target.onPush((m) => { - received = Buffer.from(m.ciphertext, "base64").toString("utf-8"); + received = m.plaintext ?? ""; console.log(`[rt] target got: "${received}"`); }); diff --git a/apps/cli/scripts/roundtrip.ts b/apps/cli/scripts/roundtrip.ts index 3ecefd9..57d83b4 100644 --- a/apps/cli/scripts/roundtrip.ts +++ b/apps/cli/scripts/roundtrip.ts @@ -14,8 +14,8 @@ import type { JoinedMesh } from "../src/state/config"; const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as { meshId: string; - peerA: { memberId: string; pubkey: string }; - peerB: { memberId: string; pubkey: string }; + peerA: { memberId: string; pubkey: string; secretKey: string }; + peerB: { memberId: string; pubkey: string; secretKey: string }; }; const brokerUrl = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws"; @@ -25,11 +25,17 @@ const meshA: JoinedMesh = { slug: "rt-a", name: "roundtrip-a", pubkey: seed.peerA.pubkey, - secretKey: "stub", + secretKey: seed.peerA.secretKey, brokerUrl, joinedAt: new Date().toISOString(), }; -const meshB: JoinedMesh = { ...meshA, memberId: seed.peerB.memberId, slug: "rt-b", pubkey: seed.peerB.pubkey }; +const meshB: JoinedMesh = { + ...meshA, + memberId: seed.peerB.memberId, + slug: "rt-b", + pubkey: seed.peerB.pubkey, + secretKey: seed.peerB.secretKey, +}; async function main(): Promise { const a = new BrokerClient(meshA, { debug: true }); @@ -38,9 +44,9 @@ async function main(): Promise { let received: string | null = null; let receivedSender: string | null = null; b.onPush((msg) => { - received = Buffer.from(msg.ciphertext, "base64").toString("utf-8"); + received = msg.plaintext; receivedSender = msg.senderPubkey; - console.log(`[b] push: "${received}" from ${receivedSender}`); + console.log(`[b] push (kind=${msg.kind}): "${received}" from ${receivedSender?.slice(0, 16)}…`); }); console.log("[rt] connecting A + B…"); diff --git a/apps/cli/src/crypto/envelope.ts b/apps/cli/src/crypto/envelope.ts new file mode 100644 index 0000000..b0ea102 --- /dev/null +++ b/apps/cli/src/crypto/envelope.ts @@ -0,0 +1,96 @@ +/** + * Direct-message encryption via libsodium crypto_box. + * + * Keys: our peers hold ed25519 signing keypairs (from Step 17). + * crypto_box uses X25519 (curve25519) keys, so we convert on the fly + * via crypto_sign_ed25519_{pk,sk}_to_curve25519. One signing keypair + * serves both purposes cleanly. + * + * Wire format: {nonce, ciphertext} both base64. Nonce is 24 bytes + * (crypto_box_NONCEBYTES), fresh-random per message. + * + * Broadcasts ("*") and channels ("#foo") are NOT encrypted here — + * they need a shared key (mesh_root_key) and land in a later step. + */ + +import { ensureSodium } from "./keypair"; + +export interface Envelope { + nonce: string; // base64 + ciphertext: string; // base64 +} + +const HEX_PUBKEY = /^[0-9a-f]{64}$/; + +/** Does this targetSpec look like a direct-message pubkey? */ +export function isDirectTarget(targetSpec: string): boolean { + return HEX_PUBKEY.test(targetSpec); +} + +/** + * Encrypt a plaintext message addressed to a single recipient. + * Recipient's ed25519 pubkey (64 hex chars) is converted to X25519 + * on the fly. Sender's full ed25519 secret key (128 hex chars) is + * also converted. + */ +export async function encryptDirect( + message: string, + recipientPubkeyHex: string, + senderSecretKeyHex: string, +): Promise { + const sodium = await ensureSodium(); + const recipientPub = sodium.crypto_sign_ed25519_pk_to_curve25519( + sodium.from_hex(recipientPubkeyHex), + ); + const senderSec = sodium.crypto_sign_ed25519_sk_to_curve25519( + sodium.from_hex(senderSecretKeyHex), + ); + const nonce = sodium.randombytes_buf(sodium.crypto_box_NONCEBYTES); + const ciphertext = sodium.crypto_box_easy( + sodium.from_string(message), + nonce, + recipientPub, + senderSec, + ); + return { + nonce: sodium.to_base64(nonce, sodium.base64_variants.ORIGINAL), + ciphertext: sodium.to_base64(ciphertext, sodium.base64_variants.ORIGINAL), + }; +} + +/** + * Decrypt an inbound envelope from a known sender. Returns null if + * decryption fails (wrong keys, tampered ciphertext, malformed input). + */ +export async function decryptDirect( + envelope: Envelope, + senderPubkeyHex: string, + recipientSecretKeyHex: string, +): Promise { + const sodium = await ensureSodium(); + try { + const senderPub = sodium.crypto_sign_ed25519_pk_to_curve25519( + sodium.from_hex(senderPubkeyHex), + ); + const recipientSec = sodium.crypto_sign_ed25519_sk_to_curve25519( + sodium.from_hex(recipientSecretKeyHex), + ); + const nonce = sodium.from_base64( + envelope.nonce, + sodium.base64_variants.ORIGINAL, + ); + const ciphertext = sodium.from_base64( + envelope.ciphertext, + sodium.base64_variants.ORIGINAL, + ); + const plain = sodium.crypto_box_open_easy( + ciphertext, + nonce, + senderPub, + recipientSec, + ); + return sodium.to_string(plain); + } catch { + return null; + } +} diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 70b263a..298da5d 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -74,13 +74,7 @@ function resolveClient(to: string): { } function formatPush(p: InboundPush, meshSlug: string): string { - const body = (() => { - try { - return Buffer.from(p.ciphertext, "base64").toString("utf-8"); - } catch { - return "(invalid base64 ciphertext)"; - } - })(); + const body = p.plaintext ?? "(decryption failed)"; return `[${meshSlug}] from ${p.senderPubkey.slice(0, 12)}… (${p.priority}, ${p.createdAt}):\n${body}`; } diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 61fe763..4497d4d 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -15,6 +15,11 @@ import WebSocket from "ws"; import { randomBytes } from "node:crypto"; import type { JoinedMesh } from "../state/config"; +import { + decryptDirect, + encryptDirect, + isDirectTarget, +} from "../crypto/envelope"; export type Priority = "now" | "next" | "low"; export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; @@ -28,6 +33,12 @@ export interface InboundPush { ciphertext: string; createdAt: string; receivedAt: string; + /** Decrypted plaintext (if encryption succeeded). null = broadcast + * or channel (no per-recipient crypto yet), or decryption failed. */ + plaintext: string | null; + /** Hint for UI: "direct" (crypto_box), "channel"/"broadcast" + * (plaintext for now). */ + kind: "direct" | "broadcast" | "channel" | "unknown"; } type PushHandler = (msg: InboundPush) => void; @@ -157,8 +168,22 @@ export class BrokerClient { priority: Priority = "next", ): Promise<{ ok: boolean; messageId?: string; error?: string }> { const id = randomId(); - const nonce = randomNonce(); - const ciphertext = Buffer.from(message, "utf-8").toString("base64"); + // Direct messages get crypto_box encryption; broadcasts + channels + // still pass through as base64 plaintext until channel crypto lands. + let nonce: string; + let ciphertext: string; + if (isDirectTarget(targetSpec)) { + const env = await encryptDirect( + message, + targetSpec, + this.mesh.secretKey, + ); + nonce = env.nonce; + ciphertext = env.ciphertext; + } else { + nonce = randomNonce(); + ciphertext = Buffer.from(message, "utf-8").toString("base64"); + } return new Promise((resolve) => { if (this.pendingSends.size >= MAX_QUEUED) { @@ -254,26 +279,55 @@ export class BrokerClient { return; } if (msg.type === "push") { - const push: InboundPush = { - messageId: String(msg.messageId ?? ""), - meshId: String(msg.meshId ?? ""), - senderPubkey: String(msg.senderPubkey ?? ""), - priority: (msg.priority as Priority) ?? "next", - nonce: String(msg.nonce ?? ""), - ciphertext: String(msg.ciphertext ?? ""), - createdAt: String(msg.createdAt ?? ""), - receivedAt: new Date().toISOString(), - }; - this.pushBuffer.push(push); - // Cap buffer at 500 entries to avoid unbounded growth. - if (this.pushBuffer.length > 500) this.pushBuffer.shift(); - for (const h of this.pushHandlers) { - try { - h(push); - } catch { - /* handler errors are not the transport's problem */ + const nonce = String(msg.nonce ?? ""); + const ciphertext = String(msg.ciphertext ?? ""); + const senderPubkey = String(msg.senderPubkey ?? ""); + // Decrypt asynchronously, then enqueue. Ordering within the + // buffer is preserved by awaiting before push. + void (async (): Promise => { + const kind: InboundPush["kind"] = senderPubkey + ? "direct" + : "unknown"; + let plaintext: string | null = null; + if (senderPubkey && nonce && ciphertext) { + plaintext = await decryptDirect( + { nonce, ciphertext }, + senderPubkey, + this.mesh.secretKey, + ); } - } + // If decryption failed, fall back to base64 UTF-8 unwrap — + // this covers the legacy plaintext path for broadcasts/channels + // until channel crypto lands. + if (plaintext === null && ciphertext) { + try { + plaintext = Buffer.from(ciphertext, "base64").toString("utf-8"); + } catch { + plaintext = null; + } + } + const push: InboundPush = { + messageId: String(msg.messageId ?? ""), + meshId: String(msg.meshId ?? ""), + senderPubkey, + priority: (msg.priority as Priority) ?? "next", + nonce, + ciphertext, + createdAt: String(msg.createdAt ?? ""), + receivedAt: new Date().toISOString(), + plaintext, + kind, + }; + this.pushBuffer.push(push); + if (this.pushBuffer.length > 500) this.pushBuffer.shift(); + for (const h of this.pushHandlers) { + try { + h(push); + } catch { + /* handler errors are not the transport's problem */ + } + } + })(); return; } if (msg.type === "error") {