feat(crypto): client-side direct-message encryption with crypto_box
Direct messages between peers are now end-to-end encrypted. The
broker only ever sees {nonce, ciphertext} — plaintext lives on the
two endpoints.
apps/cli/src/crypto/envelope.ts:
- encryptDirect(message, recipientPubkeyHex, senderSecretKeyHex)
→ {nonce, ciphertext} via crypto_box_easy, 24-byte fresh nonce
- decryptDirect(envelope, senderPubkeyHex, recipientSecretKeyHex)
→ plaintext or null (null on MAC failure / malformed input)
- ed25519 keys (from Step 17) are converted to X25519 on the fly via
crypto_sign_ed25519_{pk,sk}_to_curve25519 — one signing keypair
covers both signing + encryption roles.
BrokerClient.send():
- if targetSpec is a 64-hex pubkey → encrypt via crypto_box
- else (broadcast "*" or channel "#foo") → base64-wrapped plaintext
(shared-key encryption for channels lands in a later step)
InboundPush now carries:
- plaintext: string | null (decrypted body, null if decryption failed
OR it's a non-direct message)
- kind: "direct" | "broadcast" | "channel" | "unknown"
MCP check_messages formatter reads plaintext directly.
side-fixes pulled in during 18a:
- apps/broker/scripts/seed-test-mesh.ts now generates real ed25519
keypairs (the previous "aaaa…" / "bbbb…" fillers weren't valid
curve points, so crypto_sign_ed25519_pk_to_curve25519 rejected
them). Seed output now includes secretKey for each peer.
- apps/broker/src/broker.ts drainForMember wraps the atomic claim in
a CTE + outer ORDER BY so FIFO ordering is SQL-sourced, not
JS-sorted (Postgres microsecond timestamps collapse to the same
Date.getTime() milliseconds otherwise).
- vitest.config.ts fileParallelism: false — test files share
DB state via cleanupAllTestMeshes afterAll, so running them in
parallel caused one file's cleanup to race another's inserts.
- integration/health.test.ts "returns 200" now uses waitFullyHealthy
(a 200-only waiter) instead of waitHealthyOrAny — prevents a race
with the startup DB ping.
verified live:
- apps/cli/scripts/roundtrip.ts (direct A→B): ciphertext in DB is
opaque bytes (not base64-plaintext), decrypted correctly on arrival
- apps/cli/scripts/join-roundtrip.ts (full join → encrypted send):
PASSED
- 48/48 broker tests green
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -423,34 +423,39 @@ export async function drainForMember(
|
||||
priorities.map((p) => `'${p}'`).join(","),
|
||||
);
|
||||
|
||||
// Atomic claim: inner SELECT locks candidate rows (skipping any
|
||||
// already locked by a concurrent drain), outer UPDATE marks them
|
||||
// delivered, the FROM join fetches the sender's pubkey, RETURNING
|
||||
// gives us everything we need to push in one round-trip.
|
||||
// Atomic claim with SQL-side ordering. The CTE claims rows via
|
||||
// UPDATE...RETURNING; the outer SELECT re-orders by created_at
|
||||
// (with id as tiebreaker so equal-timestamp rows stay deterministic).
|
||||
// Sorting in SQL avoids JS Date's millisecond-precision collapse of
|
||||
// Postgres microsecond timestamps.
|
||||
const result = await db.execute<{
|
||||
id: string;
|
||||
priority: string;
|
||||
nonce: string;
|
||||
ciphertext: string;
|
||||
created_at: Date;
|
||||
created_at: string | Date;
|
||||
sender_member_id: string;
|
||||
sender_pubkey: string;
|
||||
}>(sql`
|
||||
UPDATE mesh.message_queue AS mq
|
||||
SET delivered_at = NOW()
|
||||
FROM mesh.member AS m
|
||||
WHERE mq.id IN (
|
||||
SELECT id FROM mesh.message_queue
|
||||
WHERE mesh_id = ${meshId}
|
||||
AND delivered_at IS NULL
|
||||
AND priority::text IN (${priorityList})
|
||||
AND (target_spec = ${memberPubkey} OR target_spec = '*')
|
||||
ORDER BY created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
WITH claimed AS (
|
||||
UPDATE mesh.message_queue AS mq
|
||||
SET delivered_at = NOW()
|
||||
FROM mesh.member AS m
|
||||
WHERE mq.id IN (
|
||||
SELECT id FROM mesh.message_queue
|
||||
WHERE mesh_id = ${meshId}
|
||||
AND delivered_at IS NULL
|
||||
AND priority::text IN (${priorityList})
|
||||
AND (target_spec = ${memberPubkey} OR target_spec = '*')
|
||||
ORDER BY created_at ASC, id ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
AND m.id = mq.sender_member_id
|
||||
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
|
||||
mq.created_at, mq.sender_member_id,
|
||||
m.peer_pubkey AS sender_pubkey
|
||||
)
|
||||
AND m.id = mq.sender_member_id
|
||||
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
|
||||
mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey
|
||||
SELECT * FROM claimed ORDER BY created_at ASC, id ASC
|
||||
`);
|
||||
|
||||
const rows = (result.rows ?? result) as Array<{
|
||||
@@ -463,23 +468,13 @@ export async function drainForMember(
|
||||
sender_pubkey: string;
|
||||
}>;
|
||||
if (!rows || rows.length === 0) return [];
|
||||
// Normalize created_at to Date (pg driver sometimes returns ISO
|
||||
// strings for raw sql results).
|
||||
const normalized = rows.map((r) => ({
|
||||
...r,
|
||||
created_at:
|
||||
r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
|
||||
}));
|
||||
// RETURNING order may not match the inner SELECT's ORDER BY — re-sort.
|
||||
normalized.sort(
|
||||
(a, b) => a.created_at.getTime() - b.created_at.getTime(),
|
||||
);
|
||||
return normalized.map((r) => ({
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
priority: r.priority as Priority,
|
||||
nonce: r.nonce,
|
||||
ciphertext: r.ciphertext,
|
||||
createdAt: r.created_at,
|
||||
createdAt:
|
||||
r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
|
||||
senderMemberId: r.sender_member_id,
|
||||
senderPubkey: r.sender_pubkey,
|
||||
}));
|
||||
|
||||
Reference in New Issue
Block a user