Compare commits
4 Commits
04bf349e7d
...
d1ea1a0efa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1ea1a0efa | ||
|
|
cd389c6bdd | ||
|
|
758ea0e42c | ||
|
|
39b914bdce |
@@ -26,9 +26,11 @@ import {
|
||||
isNull,
|
||||
lt,
|
||||
or,
|
||||
sql,
|
||||
} from "drizzle-orm";
|
||||
import { db } from "./db";
|
||||
import {
|
||||
mesh,
|
||||
meshMember as memberTable,
|
||||
messageQueue,
|
||||
pendingStatus,
|
||||
@@ -389,10 +391,12 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
|
||||
|
||||
/**
|
||||
* Drain deliverable messages addressed to a specific member in a mesh.
|
||||
* Joins mesh.member so each envelope carries the sender's pubkey, which
|
||||
* the receiving client needs to identify who sent it. Marks drained
|
||||
* rows as delivered and returns the envelopes for WS push.
|
||||
* Atomically claims rows via UPDATE ... WHERE id IN (SELECT ... FOR
|
||||
* UPDATE SKIP LOCKED) — concurrent callers each claim DISJOINT sets,
|
||||
* so the same message can never be pushed twice (even under fan-out
|
||||
* racing with handleHello's own drain).
|
||||
*
|
||||
* Joins mesh.member so each envelope carries the sender's pubkey.
|
||||
* targetSpec routing: matches either the member's pubkey directly or
|
||||
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
|
||||
* config that lives outside this function.
|
||||
@@ -414,48 +418,70 @@ export async function drainForMember(
|
||||
}>
|
||||
> {
|
||||
const priorities = deliverablePriorities(status);
|
||||
const targetFilter = or(
|
||||
eq(messageQueue.targetSpec, memberPubkey),
|
||||
eq(messageQueue.targetSpec, "*"),
|
||||
)!;
|
||||
if (priorities.length === 0) return [];
|
||||
const priorityList = sql.raw(
|
||||
priorities.map((p) => `'${p}'`).join(","),
|
||||
);
|
||||
|
||||
const rows = await db
|
||||
.select({
|
||||
id: messageQueue.id,
|
||||
priority: messageQueue.priority,
|
||||
nonce: messageQueue.nonce,
|
||||
ciphertext: messageQueue.ciphertext,
|
||||
createdAt: messageQueue.createdAt,
|
||||
senderMemberId: messageQueue.senderMemberId,
|
||||
senderPubkey: memberTable.peerPubkey,
|
||||
})
|
||||
.from(messageQueue)
|
||||
.innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId))
|
||||
.where(
|
||||
and(
|
||||
eq(messageQueue.meshId, meshId),
|
||||
isNull(messageQueue.deliveredAt),
|
||||
inArray(messageQueue.priority, priorities),
|
||||
targetFilter,
|
||||
),
|
||||
// Atomic claim: inner SELECT locks candidate rows (skipping any
|
||||
// already locked by a concurrent drain), outer UPDATE marks them
|
||||
// delivered, the FROM join fetches the sender's pubkey, RETURNING
|
||||
// gives us everything we need to push in one round-trip.
|
||||
const result = await db.execute<{
|
||||
id: string;
|
||||
priority: string;
|
||||
nonce: string;
|
||||
ciphertext: string;
|
||||
created_at: Date;
|
||||
sender_member_id: string;
|
||||
sender_pubkey: string;
|
||||
}>(sql`
|
||||
UPDATE mesh.message_queue AS mq
|
||||
SET delivered_at = NOW()
|
||||
FROM mesh.member AS m
|
||||
WHERE mq.id IN (
|
||||
SELECT id FROM mesh.message_queue
|
||||
WHERE mesh_id = ${meshId}
|
||||
AND delivered_at IS NULL
|
||||
AND priority::text IN (${priorityList})
|
||||
AND (target_spec = ${memberPubkey} OR target_spec = '*')
|
||||
ORDER BY created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
.orderBy(asc(messageQueue.createdAt));
|
||||
AND m.id = mq.sender_member_id
|
||||
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
|
||||
mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey
|
||||
`);
|
||||
|
||||
if (rows.length === 0) return [];
|
||||
const now = new Date();
|
||||
const ids = rows.map((r) => r.id);
|
||||
await db
|
||||
.update(messageQueue)
|
||||
.set({ deliveredAt: now })
|
||||
.where(inArray(messageQueue.id, ids));
|
||||
return rows.map((r) => ({
|
||||
const rows = (result.rows ?? result) as Array<{
|
||||
id: string;
|
||||
priority: string;
|
||||
nonce: string;
|
||||
ciphertext: string;
|
||||
created_at: string | Date;
|
||||
sender_member_id: string;
|
||||
sender_pubkey: string;
|
||||
}>;
|
||||
if (!rows || rows.length === 0) return [];
|
||||
// Normalize created_at to Date (pg driver sometimes returns ISO
|
||||
// strings for raw sql results).
|
||||
const normalized = rows.map((r) => ({
|
||||
...r,
|
||||
created_at:
|
||||
r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
|
||||
}));
|
||||
// RETURNING order may not match the inner SELECT's ORDER BY — re-sort.
|
||||
normalized.sort(
|
||||
(a, b) => a.created_at.getTime() - b.created_at.getTime(),
|
||||
);
|
||||
return normalized.map((r) => ({
|
||||
id: r.id,
|
||||
priority: r.priority as Priority,
|
||||
nonce: r.nonce,
|
||||
ciphertext: r.ciphertext,
|
||||
createdAt: r.createdAt,
|
||||
senderMemberId: r.senderMemberId,
|
||||
senderPubkey: r.senderPubkey,
|
||||
createdAt: r.created_at,
|
||||
senderMemberId: r.sender_member_id,
|
||||
senderPubkey: r.sender_pubkey,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -489,6 +515,57 @@ export async function stopSweepers(): Promise<void> {
|
||||
.where(isNull(presence.disconnectedAt));
|
||||
}
|
||||
|
||||
/**
|
||||
* Enroll a new member in an existing mesh. Called by the CLI join
|
||||
* flow after invite-link parsing + keypair generation client-side.
|
||||
*
|
||||
* v0.1.0: trusts the request. Signature verification + invite-token
|
||||
* one-time-use tracking land in Step 18.
|
||||
*/
|
||||
export async function joinMesh(args: {
|
||||
meshId: string;
|
||||
peerPubkey: string;
|
||||
displayName: string;
|
||||
role: "admin" | "member";
|
||||
}): Promise<
|
||||
| { ok: true; memberId: string; alreadyMember?: boolean }
|
||||
| { ok: false; error: string }
|
||||
> {
|
||||
// Validate the mesh exists.
|
||||
const [m] = await db
|
||||
.select({ id: mesh.id })
|
||||
.from(mesh)
|
||||
.where(and(eq(mesh.id, args.meshId), isNull(mesh.archivedAt)));
|
||||
if (!m) return { ok: false, error: "mesh not found or archived" };
|
||||
|
||||
// Idempotency: same pubkey already a member → return existing id.
|
||||
const [existing] = await db
|
||||
.select({ id: memberTable.id })
|
||||
.from(memberTable)
|
||||
.where(
|
||||
and(
|
||||
eq(memberTable.meshId, args.meshId),
|
||||
eq(memberTable.peerPubkey, args.peerPubkey),
|
||||
isNull(memberTable.revokedAt),
|
||||
),
|
||||
);
|
||||
if (existing) {
|
||||
return { ok: true, memberId: existing.id, alreadyMember: true };
|
||||
}
|
||||
|
||||
const [row] = await db
|
||||
.insert(memberTable)
|
||||
.values({
|
||||
meshId: args.meshId,
|
||||
peerPubkey: args.peerPubkey,
|
||||
displayName: args.displayName,
|
||||
role: args.role,
|
||||
})
|
||||
.returning({ id: memberTable.id });
|
||||
if (!row) return { ok: false, error: "member insert failed" };
|
||||
return { ok: true, memberId: row.id };
|
||||
}
|
||||
|
||||
/**
|
||||
* Look up a member row by pubkey within a mesh. Used at WS handshake
|
||||
* to authenticate an incoming hello.
|
||||
|
||||
@@ -23,6 +23,7 @@ import {
|
||||
findMemberByPubkey,
|
||||
handleHookSetStatus,
|
||||
heartbeat,
|
||||
joinMesh,
|
||||
queueMessage,
|
||||
refreshQueueDepth,
|
||||
refreshStatusFromJsonl,
|
||||
@@ -149,6 +150,11 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "POST" && req.url === "/join") {
|
||||
handleJoinPost(req, res, started);
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(404);
|
||||
res.end("not found");
|
||||
log.debug("http", { route, status: 404, latency_ms: Date.now() - started });
|
||||
@@ -219,6 +225,83 @@ function handleHookPost(
|
||||
});
|
||||
}
|
||||
|
||||
function handleJoinPost(
|
||||
req: IncomingMessage,
|
||||
res: ServerResponse,
|
||||
started: number,
|
||||
): void {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
let aborted = false;
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
if (aborted) return;
|
||||
total += chunk.length;
|
||||
if (total > env.MAX_MESSAGE_BYTES) {
|
||||
aborted = true;
|
||||
writeJson(res, 413, { ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
req.on("end", async () => {
|
||||
if (aborted) return;
|
||||
try {
|
||||
const payload = JSON.parse(Buffer.concat(chunks).toString()) as {
|
||||
mesh_id?: string;
|
||||
peer_pubkey?: string;
|
||||
display_name?: string;
|
||||
role?: "admin" | "member";
|
||||
};
|
||||
// Minimal shape validation.
|
||||
if (
|
||||
!payload.mesh_id ||
|
||||
!payload.peer_pubkey ||
|
||||
!payload.display_name ||
|
||||
!payload.role
|
||||
) {
|
||||
writeJson(res, 400, {
|
||||
ok: false,
|
||||
error: "mesh_id, peer_pubkey, display_name, role required",
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!/^[0-9a-f]{64}$/i.test(payload.peer_pubkey)) {
|
||||
writeJson(res, 400, {
|
||||
ok: false,
|
||||
error: "peer_pubkey must be 64 hex chars (32 bytes)",
|
||||
});
|
||||
return;
|
||||
}
|
||||
const result = await joinMesh({
|
||||
meshId: payload.mesh_id,
|
||||
peerPubkey: payload.peer_pubkey,
|
||||
displayName: payload.display_name,
|
||||
role: payload.role,
|
||||
});
|
||||
writeJson(res, result.ok ? 200 : 400, result);
|
||||
log.info("join", {
|
||||
route: "POST /join",
|
||||
mesh_id: payload.mesh_id,
|
||||
pubkey: payload.peer_pubkey.slice(0, 12),
|
||||
ok: result.ok,
|
||||
already_member: "alreadyMember" in result ? result.alreadyMember : false,
|
||||
latency_ms: Date.now() - started,
|
||||
});
|
||||
} catch (e) {
|
||||
writeJson(res, 500, {
|
||||
ok: false,
|
||||
error: e instanceof Error ? e.message : String(e),
|
||||
});
|
||||
log.error("join handler error", {
|
||||
error: e instanceof Error ? e.message : String(e),
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function handleUpgrade(
|
||||
wss: WebSocketServer,
|
||||
req: IncomingMessage,
|
||||
|
||||
126
apps/broker/tests/dup-delivery.test.ts
Normal file
126
apps/broker/tests/dup-delivery.test.ts
Normal file
@@ -0,0 +1,126 @@
|
||||
/**
|
||||
* Concurrency regression: drainForMember must return DISJOINT row
|
||||
* sets when two callers race for the same member's queue.
|
||||
*
|
||||
* Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the
|
||||
* same undelivered rows, both sent push notifications, and only
|
||||
* after did they race to UPDATE delivered_at. Receivers saw
|
||||
* duplicate pushes for the same message id.
|
||||
*
|
||||
* After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR
|
||||
* UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows.
|
||||
*/
|
||||
|
||||
import { afterAll, afterEach, describe, expect, test } from "vitest";
|
||||
import { drainForMember, queueMessage } from "../src/broker";
|
||||
import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers";
|
||||
|
||||
afterAll(async () => {
|
||||
await cleanupAllTestMeshes();
|
||||
});
|
||||
|
||||
describe("drainForMember — concurrent callers", () => {
|
||||
let m: TestMesh;
|
||||
afterEach(async () => m && (await m.cleanup()));
|
||||
|
||||
test("two concurrent drains produce disjoint result sets", async () => {
|
||||
m = await setupTestMesh("dup-basic");
|
||||
// Queue 10 messages for peer-b.
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await queueMessage({
|
||||
meshId: m.meshId,
|
||||
senderMemberId: m.peerA.memberId,
|
||||
targetSpec: m.peerB.pubkey,
|
||||
priority: "now",
|
||||
nonce: `n${i}`,
|
||||
ciphertext: `msg-${i}`,
|
||||
});
|
||||
}
|
||||
// Fire two drains in parallel.
|
||||
const [a, b] = await Promise.all([
|
||||
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
||||
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
||||
]);
|
||||
const idsA = new Set(a.map((r) => r.id));
|
||||
const idsB = new Set(b.map((r) => r.id));
|
||||
// No overlap.
|
||||
for (const id of idsA) expect(idsB.has(id)).toBe(false);
|
||||
// Union covers all 10.
|
||||
expect(idsA.size + idsB.size).toBe(10);
|
||||
});
|
||||
|
||||
test("six concurrent drains also partition cleanly", async () => {
|
||||
m = await setupTestMesh("dup-six");
|
||||
for (let i = 0; i < 20; i++) {
|
||||
await queueMessage({
|
||||
meshId: m.meshId,
|
||||
senderMemberId: m.peerA.memberId,
|
||||
targetSpec: m.peerB.pubkey,
|
||||
priority: "now",
|
||||
nonce: `n${i}`,
|
||||
ciphertext: `msg-${i}`,
|
||||
});
|
||||
}
|
||||
const drains = await Promise.all(
|
||||
Array.from({ length: 6 }).map(() =>
|
||||
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
||||
),
|
||||
);
|
||||
const allIds: string[] = [];
|
||||
for (const d of drains) for (const r of d) allIds.push(r.id);
|
||||
const unique = new Set(allIds);
|
||||
expect(allIds.length).toBe(20);
|
||||
expect(unique.size).toBe(20);
|
||||
});
|
||||
|
||||
test("after drain, subsequent drain returns empty", async () => {
|
||||
m = await setupTestMesh("dup-drain-empty");
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await queueMessage({
|
||||
meshId: m.meshId,
|
||||
senderMemberId: m.peerA.memberId,
|
||||
targetSpec: m.peerB.pubkey,
|
||||
priority: "now",
|
||||
nonce: `n${i}`,
|
||||
ciphertext: `msg-${i}`,
|
||||
});
|
||||
}
|
||||
const first = await drainForMember(
|
||||
m.meshId,
|
||||
m.peerB.memberId,
|
||||
m.peerB.pubkey,
|
||||
"idle",
|
||||
);
|
||||
expect(first).toHaveLength(3);
|
||||
const second = await drainForMember(
|
||||
m.meshId,
|
||||
m.peerB.memberId,
|
||||
m.peerB.pubkey,
|
||||
"idle",
|
||||
);
|
||||
expect(second).toHaveLength(0);
|
||||
});
|
||||
|
||||
test("FIFO ordering preserved within a single drain", async () => {
|
||||
m = await setupTestMesh("dup-fifo");
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await queueMessage({
|
||||
meshId: m.meshId,
|
||||
senderMemberId: m.peerA.memberId,
|
||||
targetSpec: m.peerB.pubkey,
|
||||
priority: "now",
|
||||
nonce: `n${i}`,
|
||||
ciphertext: `msg-${i}`,
|
||||
});
|
||||
}
|
||||
const drained = await drainForMember(
|
||||
m.meshId,
|
||||
m.peerB.memberId,
|
||||
m.peerB.pubkey,
|
||||
"idle",
|
||||
);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(drained[i]!.ciphertext).toBe(`msg-${i}`);
|
||||
}
|
||||
});
|
||||
});
|
||||
115
apps/cli/scripts/join-roundtrip.ts
Normal file
115
apps/cli/scripts/join-roundtrip.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Full join → connect → send round-trip.
|
||||
*
|
||||
* Uses a mesh already seeded in the DB (reads /tmp/cli-seed.json).
|
||||
* Creates a fresh invite link, runs the join command, connects with
|
||||
* the newly-generated member identity, sends a message to peer B,
|
||||
* asserts receipt.
|
||||
*/
|
||||
|
||||
// Run this script with CLAUDEMESH_CONFIG_DIR=/tmp/... set in env —
|
||||
// ESM imports hoist above statements, so we can't set process.env
|
||||
// after the `import { env }` side effect has already run.
|
||||
import { readFileSync } from "node:fs";
|
||||
import { execSync } from "node:child_process";
|
||||
import { BrokerClient } from "../src/ws/client";
|
||||
import type { JoinedMesh } from "../src/state/config";
|
||||
import { loadConfig, getConfigPath } from "../src/state/config";
|
||||
|
||||
if (!process.env.CLAUDEMESH_CONFIG_DIR) {
|
||||
console.error(
|
||||
"Run with: CLAUDEMESH_CONFIG_DIR=/tmp/claudemesh-join-test-rt bun scripts/join-roundtrip.ts",
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
execSync(`rm -rf "${process.env.CLAUDEMESH_CONFIG_DIR}"`, {
|
||||
stdio: "ignore",
|
||||
});
|
||||
|
||||
const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as {
|
||||
meshId: string;
|
||||
peerB: { memberId: string; pubkey: string };
|
||||
};
|
||||
|
||||
async function main(): Promise<void> {
|
||||
// 1. Build invite.
|
||||
const link = execSync("bun scripts/make-invite.ts").toString().trim();
|
||||
console.log("[rt] invite:", link.slice(0, 60) + "…");
|
||||
|
||||
// 2. Run `claudemesh join` with the same CONFIG_DIR.
|
||||
const joinOut = execSync(`bun src/index.ts join "${link}"`, {
|
||||
env: {
|
||||
...process.env,
|
||||
CLAUDEMESH_CONFIG_DIR: "/tmp/claudemesh-join-test-rt",
|
||||
},
|
||||
}).toString();
|
||||
console.log("[rt] join output (tail):");
|
||||
console.log(
|
||||
joinOut
|
||||
.split("\n")
|
||||
.slice(-7)
|
||||
.map((l) => " " + l)
|
||||
.join("\n"),
|
||||
);
|
||||
|
||||
// 3. Load the fresh config and connect as the new peer.
|
||||
console.log(`[rt] loading config from: ${getConfigPath()}`);
|
||||
const config = loadConfig();
|
||||
console.log(`[rt] loaded ${config.meshes.length} mesh(es)`);
|
||||
const joined = config.meshes.find((m) => m.slug === "rt-join");
|
||||
if (!joined) throw new Error("rt-join mesh not found in config");
|
||||
const joinedMesh: JoinedMesh = joined;
|
||||
console.log(
|
||||
`[rt] joined member_id=${joinedMesh.memberId} pubkey=${joinedMesh.pubkey.slice(0, 16)}…`,
|
||||
);
|
||||
|
||||
// 4. Connect also as peer-B (the target) so we can observe receipt.
|
||||
const targetMesh: JoinedMesh = {
|
||||
...joinedMesh,
|
||||
memberId: seed.peerB.memberId,
|
||||
slug: "rt-join-b",
|
||||
pubkey: seed.peerB.pubkey,
|
||||
};
|
||||
const joiner = new BrokerClient(joinedMesh);
|
||||
const target = new BrokerClient(targetMesh);
|
||||
|
||||
let received = "";
|
||||
target.onPush((m) => {
|
||||
received = Buffer.from(m.ciphertext, "base64").toString("utf-8");
|
||||
console.log(`[rt] target got: "${received}"`);
|
||||
});
|
||||
|
||||
await Promise.all([joiner.connect(), target.connect()]);
|
||||
console.log(`[rt] joiner=${joiner.status} target=${target.status}`);
|
||||
|
||||
const res = await joiner.send(
|
||||
seed.peerB.pubkey,
|
||||
"sent-by-newly-joined-peer",
|
||||
"now",
|
||||
);
|
||||
console.log("[rt] send result:", res);
|
||||
|
||||
for (let i = 0; i < 30 && !received; i++) {
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
}
|
||||
|
||||
joiner.close();
|
||||
target.close();
|
||||
|
||||
if (!res.ok) {
|
||||
console.error("✗ FAIL: send did not ack");
|
||||
process.exit(1);
|
||||
}
|
||||
if (received !== "sent-by-newly-joined-peer") {
|
||||
console.error(`✗ FAIL: receive mismatch: "${received}"`);
|
||||
process.exit(1);
|
||||
}
|
||||
console.log("✓ join → connect → send → receive FLOW PASSED");
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
main().catch((e) => {
|
||||
console.error("✗ FAIL:", e instanceof Error ? e.message : e);
|
||||
process.exit(1);
|
||||
});
|
||||
24
apps/cli/scripts/make-invite.ts
Normal file
24
apps/cli/scripts/make-invite.ts
Normal file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Build a test invite link from a seeded mesh (reads /tmp/cli-seed.json).
|
||||
* Writes the link to stdout.
|
||||
*/
|
||||
|
||||
import { readFileSync } from "node:fs";
|
||||
import { encodeInviteLink } from "../src/invite/parse";
|
||||
|
||||
const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as {
|
||||
meshId: string;
|
||||
};
|
||||
|
||||
const link = encodeInviteLink({
|
||||
v: 1,
|
||||
mesh_id: seed.meshId,
|
||||
mesh_slug: "rt-join",
|
||||
broker_url: process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws",
|
||||
expires_at: Math.floor(Date.now() / 1000) + 3600,
|
||||
mesh_root_key: "Y2xhdWRlbWVzaC10ZXN0LW1lc2gta2V5LWRldm9ubHk",
|
||||
role: "member",
|
||||
});
|
||||
|
||||
console.log(link);
|
||||
@@ -1,30 +1,90 @@
|
||||
/**
|
||||
* `claudemesh join <invite-link>` — parse a mesh invite link and
|
||||
* join the mesh.
|
||||
* `claudemesh join <invite-link>` — full join flow.
|
||||
*
|
||||
* STUB: real invite-link parsing + keypair generation + broker
|
||||
* enrollment lands in Step 17. For now this just validates the link
|
||||
* shape and tells the user what's coming.
|
||||
* 1. Parse + validate the ic://join/... link
|
||||
* 2. Generate a fresh ed25519 keypair (libsodium)
|
||||
* 3. POST /join to the broker → get member_id
|
||||
* 4. Persist the mesh + keypair to ~/.claudemesh/config.json (0600)
|
||||
* 5. Print success
|
||||
*
|
||||
* Signature verification + invite-token one-time-use land in Step 18.
|
||||
*/
|
||||
|
||||
export function runJoin(args: string[]): void {
|
||||
import { parseInviteLink } from "../invite/parse";
|
||||
import { enrollWithBroker } from "../invite/enroll";
|
||||
import { generateKeypair } from "../crypto/keypair";
|
||||
import { loadConfig, saveConfig, getConfigPath } from "../state/config";
|
||||
import { hostname } from "node:os";
|
||||
|
||||
export async function runJoin(args: string[]): Promise<void> {
|
||||
const link = args[0];
|
||||
if (!link) {
|
||||
console.error("Usage: claudemesh join <invite-link>");
|
||||
console.error("");
|
||||
console.error("Example: claudemesh join ic://join/BASE64URL...");
|
||||
console.error("Example: claudemesh join ic://join/eyJ2IjoxLC4uLn0");
|
||||
process.exit(1);
|
||||
}
|
||||
if (!link.startsWith("ic://join/")) {
|
||||
|
||||
// 1. Parse.
|
||||
let invite;
|
||||
try {
|
||||
invite = parseInviteLink(link);
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`claudemesh: invalid invite link. Expected ic://join/... got "${link}"`,
|
||||
`claudemesh: ${e instanceof Error ? e.message : String(e)}`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
console.log("claudemesh: join not yet implemented (Step 17).");
|
||||
console.log(` Invite link parsed: ${link.slice(0, 40)}...`);
|
||||
console.log(
|
||||
" Real flow will: verify sig, generate keypair, enroll member, persist to ~/.claudemesh/config.json",
|
||||
const { payload } = invite;
|
||||
console.log(`Joining mesh "${payload.mesh_slug}" (${payload.mesh_id})…`);
|
||||
|
||||
// 2. Generate keypair.
|
||||
const keypair = await generateKeypair();
|
||||
|
||||
// 3. Enroll with broker.
|
||||
const displayName = `${hostname()}-${process.pid}`;
|
||||
let enroll;
|
||||
try {
|
||||
enroll = await enrollWithBroker({
|
||||
brokerWsUrl: payload.broker_url,
|
||||
meshId: payload.mesh_id,
|
||||
peerPubkey: keypair.publicKey,
|
||||
displayName,
|
||||
role: payload.role,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`claudemesh: broker enrollment failed: ${e instanceof Error ? e.message : String(e)}`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// 4. Persist.
|
||||
const config = loadConfig();
|
||||
config.meshes = config.meshes.filter(
|
||||
(m) => m.slug !== payload.mesh_slug,
|
||||
);
|
||||
process.exit(0);
|
||||
config.meshes.push({
|
||||
meshId: payload.mesh_id,
|
||||
memberId: enroll.memberId,
|
||||
slug: payload.mesh_slug,
|
||||
name: payload.mesh_slug,
|
||||
pubkey: keypair.publicKey,
|
||||
secretKey: keypair.secretKey,
|
||||
brokerUrl: payload.broker_url,
|
||||
joinedAt: new Date().toISOString(),
|
||||
});
|
||||
saveConfig(config);
|
||||
|
||||
// 5. Report.
|
||||
console.log("");
|
||||
console.log(
|
||||
`✓ Joined "${payload.mesh_slug}" as ${displayName}${enroll.alreadyMember ? " (already a member — re-enrolled with same pubkey)" : ""}`,
|
||||
);
|
||||
console.log(` member id: ${enroll.memberId}`);
|
||||
console.log(` pubkey: ${keypair.publicKey.slice(0, 16)}…`);
|
||||
console.log(` broker: ${payload.broker_url}`);
|
||||
console.log(` config: ${getConfigPath()}`);
|
||||
console.log("");
|
||||
console.log("Restart Claude Code to pick up the new mesh.");
|
||||
}
|
||||
|
||||
36
apps/cli/src/crypto/keypair.ts
Normal file
36
apps/cli/src/crypto/keypair.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Ed25519 keypair generation using libsodium.
|
||||
*
|
||||
* We use libsodium-wrappers even in Step 17 (pre-crypto) so the key
|
||||
* format matches what Step 18's signing/encryption code will expect —
|
||||
* no migration needed later.
|
||||
*/
|
||||
|
||||
import sodium from "libsodium-wrappers";
|
||||
|
||||
let ready = false;
|
||||
|
||||
export async function ensureSodium(): Promise<typeof sodium> {
|
||||
if (!ready) {
|
||||
await sodium.ready;
|
||||
ready = true;
|
||||
}
|
||||
return sodium;
|
||||
}
|
||||
|
||||
export interface Ed25519Keypair {
|
||||
/** 32-byte public key, hex-encoded. */
|
||||
publicKey: string;
|
||||
/** 64-byte secret key (seed || publicKey), hex-encoded. */
|
||||
secretKey: string;
|
||||
}
|
||||
|
||||
/** Generate a fresh ed25519 keypair. */
|
||||
export async function generateKeypair(): Promise<Ed25519Keypair> {
|
||||
const s = await ensureSodium();
|
||||
const kp = s.crypto_sign_keypair();
|
||||
return {
|
||||
publicKey: s.to_hex(kp.publicKey),
|
||||
secretKey: s.to_hex(kp.privateKey),
|
||||
};
|
||||
}
|
||||
@@ -48,7 +48,7 @@ async function main(): Promise<void> {
|
||||
runInstall();
|
||||
return;
|
||||
case "join":
|
||||
runJoin(args);
|
||||
await runJoin(args);
|
||||
return;
|
||||
case "list":
|
||||
runList();
|
||||
|
||||
56
apps/cli/src/invite/enroll.ts
Normal file
56
apps/cli/src/invite/enroll.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
/**
|
||||
* Broker /join HTTP enrollment.
|
||||
*
|
||||
* Takes a parsed invite + freshly generated keypair, POSTs to the
|
||||
* broker, returns the member_id. Converts the broker's WSS URL to
|
||||
* HTTPS for the /join call (same host, different protocol).
|
||||
*/
|
||||
|
||||
export interface EnrollResult {
|
||||
memberId: string;
|
||||
alreadyMember: boolean;
|
||||
}
|
||||
|
||||
function wsToHttp(wsUrl: string): string {
|
||||
// wss://host/ws → https://host
|
||||
// ws://host:port/ws → http://host:port
|
||||
const u = new URL(wsUrl);
|
||||
const httpScheme = u.protocol === "wss:" ? "https:" : "http:";
|
||||
return `${httpScheme}//${u.host}`;
|
||||
}
|
||||
|
||||
export async function enrollWithBroker(args: {
|
||||
brokerWsUrl: string;
|
||||
meshId: string;
|
||||
peerPubkey: string;
|
||||
displayName: string;
|
||||
role: "admin" | "member";
|
||||
}): Promise<EnrollResult> {
|
||||
const base = wsToHttp(args.brokerWsUrl);
|
||||
const res = await fetch(`${base}/join`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
mesh_id: args.meshId,
|
||||
peer_pubkey: args.peerPubkey,
|
||||
display_name: args.displayName,
|
||||
role: args.role,
|
||||
}),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const body = (await res.json()) as {
|
||||
ok?: boolean;
|
||||
memberId?: string;
|
||||
error?: string;
|
||||
alreadyMember?: boolean;
|
||||
};
|
||||
if (!res.ok || !body.ok || !body.memberId) {
|
||||
throw new Error(
|
||||
`broker /join failed (${res.status}): ${body.error ?? "unknown"}`,
|
||||
);
|
||||
}
|
||||
return {
|
||||
memberId: body.memberId,
|
||||
alreadyMember: body.alreadyMember ?? false,
|
||||
};
|
||||
}
|
||||
81
apps/cli/src/invite/parse.ts
Normal file
81
apps/cli/src/invite/parse.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
/**
|
||||
* Invite-link parser for claudemesh `ic://join/<base64url(JSON)>` links.
|
||||
*
|
||||
* v0.1.0: parses + shape-validates + checks expiry. Signature
|
||||
* verification and one-time-use invite-token tracking land in Step 18.
|
||||
*/
|
||||
|
||||
import { z } from "zod";
|
||||
|
||||
const invitePayloadSchema = z.object({
|
||||
v: z.literal(1),
|
||||
mesh_id: z.string().min(1),
|
||||
mesh_slug: z.string().min(1),
|
||||
broker_url: z.string().min(1),
|
||||
expires_at: z.number().int().positive(),
|
||||
mesh_root_key: z.string().min(1),
|
||||
role: z.enum(["admin", "member"]),
|
||||
signature: z.string().optional(), // ed25519 b64, validated in Step 18
|
||||
});
|
||||
|
||||
export type InvitePayload = z.infer<typeof invitePayloadSchema>;
|
||||
|
||||
export interface ParsedInvite {
|
||||
payload: InvitePayload;
|
||||
raw: string; // the original ic://join/... string
|
||||
}
|
||||
|
||||
export function parseInviteLink(link: string): ParsedInvite {
|
||||
if (!link.startsWith("ic://join/")) {
|
||||
throw new Error(
|
||||
`invalid invite link: expected prefix "ic://join/", got "${link.slice(0, 20)}…"`,
|
||||
);
|
||||
}
|
||||
const encoded = link.slice("ic://join/".length);
|
||||
if (!encoded) throw new Error("invite link has no payload");
|
||||
|
||||
let json: string;
|
||||
try {
|
||||
json = Buffer.from(encoded, "base64url").toString("utf-8");
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`invite link base64 decode failed: ${e instanceof Error ? e.message : e}`,
|
||||
);
|
||||
}
|
||||
|
||||
let obj: unknown;
|
||||
try {
|
||||
obj = JSON.parse(json);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`invite link JSON parse failed: ${e instanceof Error ? e.message : e}`,
|
||||
);
|
||||
}
|
||||
|
||||
const parsed = invitePayloadSchema.safeParse(obj);
|
||||
if (!parsed.success) {
|
||||
throw new Error(
|
||||
`invite link shape invalid: ${parsed.error.issues.map((i) => i.path.join(".") + ": " + i.message).join("; ")}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Expiry check (unix seconds).
|
||||
const nowSeconds = Math.floor(Date.now() / 1000);
|
||||
if (parsed.data.expires_at < nowSeconds) {
|
||||
throw new Error(
|
||||
`invite expired: expires_at=${parsed.data.expires_at}, now=${nowSeconds}`,
|
||||
);
|
||||
}
|
||||
|
||||
return { payload: parsed.data, raw: link };
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode a payload back to an `ic://join/...` link. Used for testing
|
||||
* + for building links server-side once we add that flow.
|
||||
*/
|
||||
export function encodeInviteLink(payload: InvitePayload): string {
|
||||
const json = JSON.stringify(payload);
|
||||
const encoded = Buffer.from(json, "utf-8").toString("base64url");
|
||||
return `ic://join/${encoded}`;
|
||||
}
|
||||
@@ -6,7 +6,13 @@
|
||||
* and on every join/leave.
|
||||
*/
|
||||
|
||||
import { readFileSync, writeFileSync, existsSync, mkdirSync } from "node:fs";
|
||||
import {
|
||||
readFileSync,
|
||||
writeFileSync,
|
||||
existsSync,
|
||||
mkdirSync,
|
||||
chmodSync,
|
||||
} from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join, dirname } from "node:path";
|
||||
import { z } from "zod";
|
||||
@@ -51,6 +57,12 @@ export function loadConfig(): Config {
|
||||
export function saveConfig(config: Config): void {
|
||||
mkdirSync(dirname(CONFIG_PATH), { recursive: true });
|
||||
writeFileSync(CONFIG_PATH, JSON.stringify(config, null, 2) + "\n", "utf-8");
|
||||
// Config holds ed25519 secret keys — restrict to owner read/write.
|
||||
try {
|
||||
chmodSync(CONFIG_PATH, 0o600);
|
||||
} catch {
|
||||
// Windows filesystems ignore chmod; that's fine.
|
||||
}
|
||||
}
|
||||
|
||||
export function getConfigPath(): string {
|
||||
|
||||
@@ -42,11 +42,16 @@ export default async function JoinPage({
|
||||
const invitation = await getInvitation({ id: invitationId });
|
||||
|
||||
if (invitation) {
|
||||
const { organization } = await handle(api.organizations[":id"].$get)({
|
||||
// tactical typecast: Hono RPC inference loses the response shape on this
|
||||
// route (no zod validator on the response). Proper fix is to add a
|
||||
// getOrganizationResponseSchema to packages/api and wire it into the
|
||||
// route's c.json() call.
|
||||
const res = (await handle(api.organizations[":id"].$get)({
|
||||
param: {
|
||||
id: invitation.organizationId,
|
||||
},
|
||||
});
|
||||
})) as { organization: Parameters<typeof Invitation>[0]["organization"] | null };
|
||||
const { organization } = res;
|
||||
|
||||
if (!organization) {
|
||||
return notFound();
|
||||
|
||||
Reference in New Issue
Block a user