4 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
d1ea1a0efa fix(web): typecast handle() response in auth/join page to unblock TS build
Some checks failed
CI / Tests / 🧪 Test (push) Has been cancelled
The Hono RPC client loses the response shape on /organizations/:id because
the route has no zod response validator on c.json(). Tactical cast at the
callsite unblocks the web Docker build. Proper fix is to add a
getOrganizationResponseSchema in packages/api and wire it into the route.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:43:24 +01:00
Alejandro Gutiérrez
cd389c6bdd fix(broker): atomic message claim to prevent duplicate delivery
drainForMember previously ran SELECT undelivered rows, THEN UPDATE
delivered_at. Two concurrent callers (e.g. WS fan-out on send +
handleHello's own drain for the target) could both SELECT the same
row before either UPDATEd, pushing the same envelope twice.

now: single atomic UPDATE ... FROM member ... WHERE id IN (
  SELECT id ... FOR UPDATE SKIP LOCKED
) RETURNING mq.*, m.peer_pubkey AS sender_pubkey.

FOR UPDATE SKIP LOCKED is the key primitive — concurrent callers
each claim DISJOINT sets, so a message can never be drained twice.
Union of all concurrent drains still covers every eligible row.

re-sorts RETURNING rows by created_at client-side (Postgres makes no
FIFO guarantee on the RETURNING clause's output order), and normalizes
created_at to Date since raw-sql results can come back as ISO strings.

regression: tests/dup-delivery.test.ts (4 tests)
- two concurrent drains produce disjoint result sets
- six concurrent drains partition cleanly (20 messages, each drained once)
- subsequent drain after success returns empty
- FIFO ordering preserved within a single drain

48/48 tests pass. Live round-trip no longer logs the double-push.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:39:48 +01:00
Alejandro Gutiérrez
758ea0e42c feat(cli): invite-link parsing + join flow + keypair generation
End-to-end join: user runs `claudemesh join ic://join/<base64>` and
walks away with a signed member record + persistent keypair.

new modules:
- src/crypto/keypair.ts: libsodium ed25519 keypair generation. Format
  is crypto_sign_keypair raw bytes, hex-encoded (32-byte pub, 64-byte
  secret = seed || pub). Same format libsodium will need in Step 18
  for sign/verify.
- src/invite/parse.ts: ic://join/<base64url(JSON)> parser with Zod
  shape validation + expiry check. encodeInviteLink helper for tests.
- src/invite/enroll.ts: POST /join to broker, converts ws:// to http://
  transparently.

rewritten join command wires them together:
  1. parse invite → 2. generate keypair → 3. POST /join → 4. persist
  config → 5. print success.

state/config.ts: saveConfig now chmods the file to 0600 after write,
since it holds ed25519 secret keys. No-op on Windows.

signature verification (step 18) + invite-token one-time-use tracking
are deferred. For now the invite link is a plain bearer token; any
client with the link can join.

verified end-to-end via apps/cli/scripts/join-roundtrip.ts:
  build invite → run join subprocess → load new config → connect as
  new member → send A→B → receive push. Flow passes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:36:32 +01:00
Alejandro Gutiérrez
39b914bdce feat(broker): add /join endpoint for peer self-registration
Single HTTP POST /join the CLI calls after parsing an invite link +
generating an ed25519 keypair client-side. Broker validates the mesh
exists + is not archived, inserts a mesh.member row (or returns the
existing id for idempotency), returns {ok, memberId, alreadyMember?}.

body: {mesh_id, peer_pubkey, display_name, role}
- peer_pubkey must be 64 hex chars (32 bytes)
- role is "admin" | "member"

v0.1.0 trusts the request — no invite-token validation, no ed25519
signature check. Both land in Step 18 alongside libsodium wrapping.

size cap enforced via MAX_MESSAGE_BYTES (shared with hook endpoint).
structured log line per enrollment with truncated pubkey + whether
it was a new member or re-enrolled existing one.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:36:16 +01:00
12 changed files with 731 additions and 56 deletions

View File

@@ -26,9 +26,11 @@ import {
isNull,
lt,
or,
sql,
} from "drizzle-orm";
import { db } from "./db";
import {
mesh,
meshMember as memberTable,
messageQueue,
pendingStatus,
@@ -389,10 +391,12 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
/**
* Drain deliverable messages addressed to a specific member in a mesh.
* Joins mesh.member so each envelope carries the sender's pubkey, which
* the receiving client needs to identify who sent it. Marks drained
* rows as delivered and returns the envelopes for WS push.
* Atomically claims rows via UPDATE ... WHERE id IN (SELECT ... FOR
* UPDATE SKIP LOCKED) — concurrent callers each claim DISJOINT sets,
* so the same message can never be pushed twice (even under fan-out
* racing with handleHello's own drain).
*
* Joins mesh.member so each envelope carries the sender's pubkey.
* targetSpec routing: matches either the member's pubkey directly or
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
* config that lives outside this function.
@@ -414,48 +418,70 @@ export async function drainForMember(
}>
> {
const priorities = deliverablePriorities(status);
const targetFilter = or(
eq(messageQueue.targetSpec, memberPubkey),
eq(messageQueue.targetSpec, "*"),
)!;
if (priorities.length === 0) return [];
const priorityList = sql.raw(
priorities.map((p) => `'${p}'`).join(","),
);
const rows = await db
.select({
id: messageQueue.id,
priority: messageQueue.priority,
nonce: messageQueue.nonce,
ciphertext: messageQueue.ciphertext,
createdAt: messageQueue.createdAt,
senderMemberId: messageQueue.senderMemberId,
senderPubkey: memberTable.peerPubkey,
})
.from(messageQueue)
.innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId))
.where(
and(
eq(messageQueue.meshId, meshId),
isNull(messageQueue.deliveredAt),
inArray(messageQueue.priority, priorities),
targetFilter,
),
// Atomic claim: inner SELECT locks candidate rows (skipping any
// already locked by a concurrent drain), outer UPDATE marks them
// delivered, the FROM join fetches the sender's pubkey, RETURNING
// gives us everything we need to push in one round-trip.
const result = await db.execute<{
id: string;
priority: string;
nonce: string;
ciphertext: string;
created_at: Date;
sender_member_id: string;
sender_pubkey: string;
}>(sql`
UPDATE mesh.message_queue AS mq
SET delivered_at = NOW()
FROM mesh.member AS m
WHERE mq.id IN (
SELECT id FROM mesh.message_queue
WHERE mesh_id = ${meshId}
AND delivered_at IS NULL
AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*')
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
)
.orderBy(asc(messageQueue.createdAt));
AND m.id = mq.sender_member_id
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey
`);
if (rows.length === 0) return [];
const now = new Date();
const ids = rows.map((r) => r.id);
await db
.update(messageQueue)
.set({ deliveredAt: now })
.where(inArray(messageQueue.id, ids));
return rows.map((r) => ({
const rows = (result.rows ?? result) as Array<{
id: string;
priority: string;
nonce: string;
ciphertext: string;
created_at: string | Date;
sender_member_id: string;
sender_pubkey: string;
}>;
if (!rows || rows.length === 0) return [];
// Normalize created_at to Date (pg driver sometimes returns ISO
// strings for raw sql results).
const normalized = rows.map((r) => ({
...r,
created_at:
r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
}));
// RETURNING order may not match the inner SELECT's ORDER BY — re-sort.
normalized.sort(
(a, b) => a.created_at.getTime() - b.created_at.getTime(),
);
return normalized.map((r) => ({
id: r.id,
priority: r.priority as Priority,
nonce: r.nonce,
ciphertext: r.ciphertext,
createdAt: r.createdAt,
senderMemberId: r.senderMemberId,
senderPubkey: r.senderPubkey,
createdAt: r.created_at,
senderMemberId: r.sender_member_id,
senderPubkey: r.sender_pubkey,
}));
}
@@ -489,6 +515,57 @@ export async function stopSweepers(): Promise<void> {
.where(isNull(presence.disconnectedAt));
}
/**
* Enroll a new member in an existing mesh. Called by the CLI join
* flow after invite-link parsing + keypair generation client-side.
*
* v0.1.0: trusts the request. Signature verification + invite-token
* one-time-use tracking land in Step 18.
*/
export async function joinMesh(args: {
meshId: string;
peerPubkey: string;
displayName: string;
role: "admin" | "member";
}): Promise<
| { ok: true; memberId: string; alreadyMember?: boolean }
| { ok: false; error: string }
> {
// Validate the mesh exists.
const [m] = await db
.select({ id: mesh.id })
.from(mesh)
.where(and(eq(mesh.id, args.meshId), isNull(mesh.archivedAt)));
if (!m) return { ok: false, error: "mesh not found or archived" };
// Idempotency: same pubkey already a member → return existing id.
const [existing] = await db
.select({ id: memberTable.id })
.from(memberTable)
.where(
and(
eq(memberTable.meshId, args.meshId),
eq(memberTable.peerPubkey, args.peerPubkey),
isNull(memberTable.revokedAt),
),
);
if (existing) {
return { ok: true, memberId: existing.id, alreadyMember: true };
}
const [row] = await db
.insert(memberTable)
.values({
meshId: args.meshId,
peerPubkey: args.peerPubkey,
displayName: args.displayName,
role: args.role,
})
.returning({ id: memberTable.id });
if (!row) return { ok: false, error: "member insert failed" };
return { ok: true, memberId: row.id };
}
/**
* Look up a member row by pubkey within a mesh. Used at WS handshake
* to authenticate an incoming hello.

View File

@@ -23,6 +23,7 @@ import {
findMemberByPubkey,
handleHookSetStatus,
heartbeat,
joinMesh,
queueMessage,
refreshQueueDepth,
refreshStatusFromJsonl,
@@ -149,6 +150,11 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
return;
}
if (req.method === "POST" && req.url === "/join") {
handleJoinPost(req, res, started);
return;
}
res.writeHead(404);
res.end("not found");
log.debug("http", { route, status: 404, latency_ms: Date.now() - started });
@@ -219,6 +225,83 @@ function handleHookPost(
});
}
function handleJoinPost(
req: IncomingMessage,
res: ServerResponse,
started: number,
): void {
const chunks: Buffer[] = [];
let total = 0;
let aborted = false;
req.on("data", (chunk: Buffer) => {
if (aborted) return;
total += chunk.length;
if (total > env.MAX_MESSAGE_BYTES) {
aborted = true;
writeJson(res, 413, { ok: false, error: "payload too large" });
req.destroy();
return;
}
chunks.push(chunk);
});
req.on("end", async () => {
if (aborted) return;
try {
const payload = JSON.parse(Buffer.concat(chunks).toString()) as {
mesh_id?: string;
peer_pubkey?: string;
display_name?: string;
role?: "admin" | "member";
};
// Minimal shape validation.
if (
!payload.mesh_id ||
!payload.peer_pubkey ||
!payload.display_name ||
!payload.role
) {
writeJson(res, 400, {
ok: false,
error: "mesh_id, peer_pubkey, display_name, role required",
});
return;
}
if (!/^[0-9a-f]{64}$/i.test(payload.peer_pubkey)) {
writeJson(res, 400, {
ok: false,
error: "peer_pubkey must be 64 hex chars (32 bytes)",
});
return;
}
const result = await joinMesh({
meshId: payload.mesh_id,
peerPubkey: payload.peer_pubkey,
displayName: payload.display_name,
role: payload.role,
});
writeJson(res, result.ok ? 200 : 400, result);
log.info("join", {
route: "POST /join",
mesh_id: payload.mesh_id,
pubkey: payload.peer_pubkey.slice(0, 12),
ok: result.ok,
already_member: "alreadyMember" in result ? result.alreadyMember : false,
latency_ms: Date.now() - started,
});
} catch (e) {
writeJson(res, 500, {
ok: false,
error: e instanceof Error ? e.message : String(e),
});
log.error("join handler error", {
error: e instanceof Error ? e.message : String(e),
});
}
});
}
function handleUpgrade(
wss: WebSocketServer,
req: IncomingMessage,

View File

@@ -0,0 +1,126 @@
/**
* Concurrency regression: drainForMember must return DISJOINT row
* sets when two callers race for the same member's queue.
*
* Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the
* same undelivered rows, both sent push notifications, and only
* after did they race to UPDATE delivered_at. Receivers saw
* duplicate pushes for the same message id.
*
* After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR
* UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows.
*/
import { afterAll, afterEach, describe, expect, test } from "vitest";
import { drainForMember, queueMessage } from "../src/broker";
import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers";
afterAll(async () => {
await cleanupAllTestMeshes();
});
describe("drainForMember — concurrent callers", () => {
let m: TestMesh;
afterEach(async () => m && (await m.cleanup()));
test("two concurrent drains produce disjoint result sets", async () => {
m = await setupTestMesh("dup-basic");
// Queue 10 messages for peer-b.
for (let i = 0; i < 10; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
// Fire two drains in parallel.
const [a, b] = await Promise.all([
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
]);
const idsA = new Set(a.map((r) => r.id));
const idsB = new Set(b.map((r) => r.id));
// No overlap.
for (const id of idsA) expect(idsB.has(id)).toBe(false);
// Union covers all 10.
expect(idsA.size + idsB.size).toBe(10);
});
test("six concurrent drains also partition cleanly", async () => {
m = await setupTestMesh("dup-six");
for (let i = 0; i < 20; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drains = await Promise.all(
Array.from({ length: 6 }).map(() =>
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
),
);
const allIds: string[] = [];
for (const d of drains) for (const r of d) allIds.push(r.id);
const unique = new Set(allIds);
expect(allIds.length).toBe(20);
expect(unique.size).toBe(20);
});
test("after drain, subsequent drain returns empty", async () => {
m = await setupTestMesh("dup-drain-empty");
for (let i = 0; i < 3; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const first = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(first).toHaveLength(3);
const second = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(second).toHaveLength(0);
});
test("FIFO ordering preserved within a single drain", async () => {
m = await setupTestMesh("dup-fifo");
for (let i = 0; i < 5; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drained = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
for (let i = 0; i < 5; i++) {
expect(drained[i]!.ciphertext).toBe(`msg-${i}`);
}
});
});

View File

@@ -0,0 +1,115 @@
#!/usr/bin/env bun
/**
* Full join → connect → send round-trip.
*
* Uses a mesh already seeded in the DB (reads /tmp/cli-seed.json).
* Creates a fresh invite link, runs the join command, connects with
* the newly-generated member identity, sends a message to peer B,
* asserts receipt.
*/
// Run this script with CLAUDEMESH_CONFIG_DIR=/tmp/... set in env —
// ESM imports hoist above statements, so we can't set process.env
// after the `import { env }` side effect has already run.
import { readFileSync } from "node:fs";
import { execSync } from "node:child_process";
import { BrokerClient } from "../src/ws/client";
import type { JoinedMesh } from "../src/state/config";
import { loadConfig, getConfigPath } from "../src/state/config";
if (!process.env.CLAUDEMESH_CONFIG_DIR) {
console.error(
"Run with: CLAUDEMESH_CONFIG_DIR=/tmp/claudemesh-join-test-rt bun scripts/join-roundtrip.ts",
);
process.exit(1);
}
execSync(`rm -rf "${process.env.CLAUDEMESH_CONFIG_DIR}"`, {
stdio: "ignore",
});
const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as {
meshId: string;
peerB: { memberId: string; pubkey: string };
};
async function main(): Promise<void> {
// 1. Build invite.
const link = execSync("bun scripts/make-invite.ts").toString().trim();
console.log("[rt] invite:", link.slice(0, 60) + "…");
// 2. Run `claudemesh join` with the same CONFIG_DIR.
const joinOut = execSync(`bun src/index.ts join "${link}"`, {
env: {
...process.env,
CLAUDEMESH_CONFIG_DIR: "/tmp/claudemesh-join-test-rt",
},
}).toString();
console.log("[rt] join output (tail):");
console.log(
joinOut
.split("\n")
.slice(-7)
.map((l) => " " + l)
.join("\n"),
);
// 3. Load the fresh config and connect as the new peer.
console.log(`[rt] loading config from: ${getConfigPath()}`);
const config = loadConfig();
console.log(`[rt] loaded ${config.meshes.length} mesh(es)`);
const joined = config.meshes.find((m) => m.slug === "rt-join");
if (!joined) throw new Error("rt-join mesh not found in config");
const joinedMesh: JoinedMesh = joined;
console.log(
`[rt] joined member_id=${joinedMesh.memberId} pubkey=${joinedMesh.pubkey.slice(0, 16)}`,
);
// 4. Connect also as peer-B (the target) so we can observe receipt.
const targetMesh: JoinedMesh = {
...joinedMesh,
memberId: seed.peerB.memberId,
slug: "rt-join-b",
pubkey: seed.peerB.pubkey,
};
const joiner = new BrokerClient(joinedMesh);
const target = new BrokerClient(targetMesh);
let received = "";
target.onPush((m) => {
received = Buffer.from(m.ciphertext, "base64").toString("utf-8");
console.log(`[rt] target got: "${received}"`);
});
await Promise.all([joiner.connect(), target.connect()]);
console.log(`[rt] joiner=${joiner.status} target=${target.status}`);
const res = await joiner.send(
seed.peerB.pubkey,
"sent-by-newly-joined-peer",
"now",
);
console.log("[rt] send result:", res);
for (let i = 0; i < 30 && !received; i++) {
await new Promise((r) => setTimeout(r, 100));
}
joiner.close();
target.close();
if (!res.ok) {
console.error("✗ FAIL: send did not ack");
process.exit(1);
}
if (received !== "sent-by-newly-joined-peer") {
console.error(`✗ FAIL: receive mismatch: "${received}"`);
process.exit(1);
}
console.log("✓ join → connect → send → receive FLOW PASSED");
process.exit(0);
}
main().catch((e) => {
console.error("✗ FAIL:", e instanceof Error ? e.message : e);
process.exit(1);
});

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env bun
/**
* Build a test invite link from a seeded mesh (reads /tmp/cli-seed.json).
* Writes the link to stdout.
*/
import { readFileSync } from "node:fs";
import { encodeInviteLink } from "../src/invite/parse";
const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as {
meshId: string;
};
const link = encodeInviteLink({
v: 1,
mesh_id: seed.meshId,
mesh_slug: "rt-join",
broker_url: process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws",
expires_at: Math.floor(Date.now() / 1000) + 3600,
mesh_root_key: "Y2xhdWRlbWVzaC10ZXN0LW1lc2gta2V5LWRldm9ubHk",
role: "member",
});
console.log(link);

View File

@@ -1,30 +1,90 @@
/**
* `claudemesh join <invite-link>` — parse a mesh invite link and
* join the mesh.
* `claudemesh join <invite-link>` — full join flow.
*
* STUB: real invite-link parsing + keypair generation + broker
* enrollment lands in Step 17. For now this just validates the link
* shape and tells the user what's coming.
* 1. Parse + validate the ic://join/... link
* 2. Generate a fresh ed25519 keypair (libsodium)
* 3. POST /join to the broker → get member_id
* 4. Persist the mesh + keypair to ~/.claudemesh/config.json (0600)
* 5. Print success
*
* Signature verification + invite-token one-time-use land in Step 18.
*/
export function runJoin(args: string[]): void {
import { parseInviteLink } from "../invite/parse";
import { enrollWithBroker } from "../invite/enroll";
import { generateKeypair } from "../crypto/keypair";
import { loadConfig, saveConfig, getConfigPath } from "../state/config";
import { hostname } from "node:os";
export async function runJoin(args: string[]): Promise<void> {
const link = args[0];
if (!link) {
console.error("Usage: claudemesh join <invite-link>");
console.error("");
console.error("Example: claudemesh join ic://join/BASE64URL...");
console.error("Example: claudemesh join ic://join/eyJ2IjoxLC4uLn0");
process.exit(1);
}
if (!link.startsWith("ic://join/")) {
// 1. Parse.
let invite;
try {
invite = parseInviteLink(link);
} catch (e) {
console.error(
`claudemesh: invalid invite link. Expected ic://join/... got "${link}"`,
`claudemesh: ${e instanceof Error ? e.message : String(e)}`,
);
process.exit(1);
}
console.log("claudemesh: join not yet implemented (Step 17).");
console.log(` Invite link parsed: ${link.slice(0, 40)}...`);
console.log(
" Real flow will: verify sig, generate keypair, enroll member, persist to ~/.claudemesh/config.json",
const { payload } = invite;
console.log(`Joining mesh "${payload.mesh_slug}" (${payload.mesh_id})…`);
// 2. Generate keypair.
const keypair = await generateKeypair();
// 3. Enroll with broker.
const displayName = `${hostname()}-${process.pid}`;
let enroll;
try {
enroll = await enrollWithBroker({
brokerWsUrl: payload.broker_url,
meshId: payload.mesh_id,
peerPubkey: keypair.publicKey,
displayName,
role: payload.role,
});
} catch (e) {
console.error(
`claudemesh: broker enrollment failed: ${e instanceof Error ? e.message : String(e)}`,
);
process.exit(1);
}
// 4. Persist.
const config = loadConfig();
config.meshes = config.meshes.filter(
(m) => m.slug !== payload.mesh_slug,
);
process.exit(0);
config.meshes.push({
meshId: payload.mesh_id,
memberId: enroll.memberId,
slug: payload.mesh_slug,
name: payload.mesh_slug,
pubkey: keypair.publicKey,
secretKey: keypair.secretKey,
brokerUrl: payload.broker_url,
joinedAt: new Date().toISOString(),
});
saveConfig(config);
// 5. Report.
console.log("");
console.log(
`✓ Joined "${payload.mesh_slug}" as ${displayName}${enroll.alreadyMember ? " (already a member — re-enrolled with same pubkey)" : ""}`,
);
console.log(` member id: ${enroll.memberId}`);
console.log(` pubkey: ${keypair.publicKey.slice(0, 16)}`);
console.log(` broker: ${payload.broker_url}`);
console.log(` config: ${getConfigPath()}`);
console.log("");
console.log("Restart Claude Code to pick up the new mesh.");
}

View File

@@ -0,0 +1,36 @@
/**
* Ed25519 keypair generation using libsodium.
*
* We use libsodium-wrappers even in Step 17 (pre-crypto) so the key
* format matches what Step 18's signing/encryption code will expect —
* no migration needed later.
*/
import sodium from "libsodium-wrappers";
let ready = false;
export async function ensureSodium(): Promise<typeof sodium> {
if (!ready) {
await sodium.ready;
ready = true;
}
return sodium;
}
export interface Ed25519Keypair {
/** 32-byte public key, hex-encoded. */
publicKey: string;
/** 64-byte secret key (seed || publicKey), hex-encoded. */
secretKey: string;
}
/** Generate a fresh ed25519 keypair. */
export async function generateKeypair(): Promise<Ed25519Keypair> {
const s = await ensureSodium();
const kp = s.crypto_sign_keypair();
return {
publicKey: s.to_hex(kp.publicKey),
secretKey: s.to_hex(kp.privateKey),
};
}

View File

@@ -48,7 +48,7 @@ async function main(): Promise<void> {
runInstall();
return;
case "join":
runJoin(args);
await runJoin(args);
return;
case "list":
runList();

View File

@@ -0,0 +1,56 @@
/**
* Broker /join HTTP enrollment.
*
* Takes a parsed invite + freshly generated keypair, POSTs to the
* broker, returns the member_id. Converts the broker's WSS URL to
* HTTPS for the /join call (same host, different protocol).
*/
export interface EnrollResult {
memberId: string;
alreadyMember: boolean;
}
function wsToHttp(wsUrl: string): string {
// wss://host/ws → https://host
// ws://host:port/ws → http://host:port
const u = new URL(wsUrl);
const httpScheme = u.protocol === "wss:" ? "https:" : "http:";
return `${httpScheme}//${u.host}`;
}
export async function enrollWithBroker(args: {
brokerWsUrl: string;
meshId: string;
peerPubkey: string;
displayName: string;
role: "admin" | "member";
}): Promise<EnrollResult> {
const base = wsToHttp(args.brokerWsUrl);
const res = await fetch(`${base}/join`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
mesh_id: args.meshId,
peer_pubkey: args.peerPubkey,
display_name: args.displayName,
role: args.role,
}),
signal: AbortSignal.timeout(10_000),
});
const body = (await res.json()) as {
ok?: boolean;
memberId?: string;
error?: string;
alreadyMember?: boolean;
};
if (!res.ok || !body.ok || !body.memberId) {
throw new Error(
`broker /join failed (${res.status}): ${body.error ?? "unknown"}`,
);
}
return {
memberId: body.memberId,
alreadyMember: body.alreadyMember ?? false,
};
}

View File

@@ -0,0 +1,81 @@
/**
* Invite-link parser for claudemesh `ic://join/<base64url(JSON)>` links.
*
* v0.1.0: parses + shape-validates + checks expiry. Signature
* verification and one-time-use invite-token tracking land in Step 18.
*/
import { z } from "zod";
const invitePayloadSchema = z.object({
v: z.literal(1),
mesh_id: z.string().min(1),
mesh_slug: z.string().min(1),
broker_url: z.string().min(1),
expires_at: z.number().int().positive(),
mesh_root_key: z.string().min(1),
role: z.enum(["admin", "member"]),
signature: z.string().optional(), // ed25519 b64, validated in Step 18
});
export type InvitePayload = z.infer<typeof invitePayloadSchema>;
export interface ParsedInvite {
payload: InvitePayload;
raw: string; // the original ic://join/... string
}
export function parseInviteLink(link: string): ParsedInvite {
if (!link.startsWith("ic://join/")) {
throw new Error(
`invalid invite link: expected prefix "ic://join/", got "${link.slice(0, 20)}…"`,
);
}
const encoded = link.slice("ic://join/".length);
if (!encoded) throw new Error("invite link has no payload");
let json: string;
try {
json = Buffer.from(encoded, "base64url").toString("utf-8");
} catch (e) {
throw new Error(
`invite link base64 decode failed: ${e instanceof Error ? e.message : e}`,
);
}
let obj: unknown;
try {
obj = JSON.parse(json);
} catch (e) {
throw new Error(
`invite link JSON parse failed: ${e instanceof Error ? e.message : e}`,
);
}
const parsed = invitePayloadSchema.safeParse(obj);
if (!parsed.success) {
throw new Error(
`invite link shape invalid: ${parsed.error.issues.map((i) => i.path.join(".") + ": " + i.message).join("; ")}`,
);
}
// Expiry check (unix seconds).
const nowSeconds = Math.floor(Date.now() / 1000);
if (parsed.data.expires_at < nowSeconds) {
throw new Error(
`invite expired: expires_at=${parsed.data.expires_at}, now=${nowSeconds}`,
);
}
return { payload: parsed.data, raw: link };
}
/**
* Encode a payload back to an `ic://join/...` link. Used for testing
* + for building links server-side once we add that flow.
*/
export function encodeInviteLink(payload: InvitePayload): string {
const json = JSON.stringify(payload);
const encoded = Buffer.from(json, "utf-8").toString("base64url");
return `ic://join/${encoded}`;
}

View File

@@ -6,7 +6,13 @@
* and on every join/leave.
*/
import { readFileSync, writeFileSync, existsSync, mkdirSync } from "node:fs";
import {
readFileSync,
writeFileSync,
existsSync,
mkdirSync,
chmodSync,
} from "node:fs";
import { homedir } from "node:os";
import { join, dirname } from "node:path";
import { z } from "zod";
@@ -51,6 +57,12 @@ export function loadConfig(): Config {
export function saveConfig(config: Config): void {
mkdirSync(dirname(CONFIG_PATH), { recursive: true });
writeFileSync(CONFIG_PATH, JSON.stringify(config, null, 2) + "\n", "utf-8");
// Config holds ed25519 secret keys — restrict to owner read/write.
try {
chmodSync(CONFIG_PATH, 0o600);
} catch {
// Windows filesystems ignore chmod; that's fine.
}
}
export function getConfigPath(): string {

View File

@@ -42,11 +42,16 @@ export default async function JoinPage({
const invitation = await getInvitation({ id: invitationId });
if (invitation) {
const { organization } = await handle(api.organizations[":id"].$get)({
// tactical typecast: Hono RPC inference loses the response shape on this
// route (no zod validator on the response). Proper fix is to add a
// getOrganizationResponseSchema to packages/api and wire it into the
// route's c.json() call.
const res = (await handle(api.organizations[":id"].$get)({
param: {
id: invitation.organizationId,
},
});
})) as { organization: Parameters<typeof Invitation>[0]["organization"] | null };
const { organization } = res;
if (!organization) {
return notFound();