fix(web): disable anonymous login by default (guest button removal)
claudemesh requires an account — mesh membership is tied to user.id.
e8ad7a5 flipped the config default but the env var override at
env.config.ts:43 still defaulted to true, keeping the button visible.
Fixed at env var level + example files. Needs Coolify rebuild since
NEXT_PUBLIC_* is build-time in Next standalone.
This commit is contained in:
488
apps/broker/scripts/load-test.ts
Normal file
488
apps/broker/scripts/load-test.ts
Normal file
@@ -0,0 +1,488 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Load test — 100 concurrent peers × 1000 messages each.
|
||||
*
|
||||
* Spins up N peer members in a fresh mesh, connects them all via WS,
|
||||
* and has each peer send M direct messages to random other peers.
|
||||
* Measures send→push latency per message, memory growth on the
|
||||
* broker process, and error rate.
|
||||
*
|
||||
* Usage:
|
||||
* DATABASE_URL=... bun apps/broker/scripts/load-test.ts [peers] [msgs]
|
||||
*
|
||||
* Defaults: 100 peers × 1000 messages = 100k messages total.
|
||||
*
|
||||
* Assumes the broker is running at ws://localhost:7900/ws. If you
|
||||
* pass BROKER_PID=<pid>, the test also samples RSS + FD count every
|
||||
* 2s for the broker process.
|
||||
*/
|
||||
|
||||
import sodium from "libsodium-wrappers";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import WebSocket from "ws";
|
||||
import { db } from "../src/db";
|
||||
import { invite, mesh, meshMember } from "@turbostarter/db/schema/mesh";
|
||||
import { user } from "@turbostarter/db/schema/auth";
|
||||
|
||||
// --- CLI args ---
|
||||
|
||||
const PEERS = parseInt(process.argv[2] ?? "100", 10);
|
||||
const MSGS_PER_PEER = parseInt(process.argv[3] ?? "1000", 10);
|
||||
const TOTAL_MSGS = PEERS * MSGS_PER_PEER;
|
||||
const BROKER_URL = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws";
|
||||
const BROKER_PID = process.env.BROKER_PID
|
||||
? parseInt(process.env.BROKER_PID, 10)
|
||||
: null;
|
||||
const USER_ID = "test-user-loadtest";
|
||||
const MESH_SLUG = "loadtest";
|
||||
|
||||
// --- Types ---
|
||||
|
||||
interface Peer {
|
||||
memberId: string;
|
||||
pubkey: string;
|
||||
secretKey: string;
|
||||
ws?: WebSocket;
|
||||
connected: boolean;
|
||||
sendsInFlight: number;
|
||||
sendErrors: number;
|
||||
}
|
||||
|
||||
interface MsgTimings {
|
||||
sentAt: number;
|
||||
pushAt?: number;
|
||||
ackAt?: number;
|
||||
senderIdx: number;
|
||||
recipientIdx: number;
|
||||
}
|
||||
|
||||
const peers: Peer[] = [];
|
||||
const timings = new Map<string, MsgTimings>();
|
||||
let messageId = 0;
|
||||
|
||||
// --- Broker-process sampling ---
|
||||
|
||||
interface Sample {
|
||||
t: number;
|
||||
rssKb: number;
|
||||
fds: number;
|
||||
}
|
||||
const samples: Sample[] = [];
|
||||
|
||||
function samplePidStats(pid: number): Sample | null {
|
||||
try {
|
||||
const psOut = new TextDecoder()
|
||||
.decode(Bun.spawnSync(["ps", "-o", "rss=", "-p", String(pid)]).stdout)
|
||||
.trim();
|
||||
const rssKb = parseInt(psOut, 10);
|
||||
if (!Number.isFinite(rssKb)) return null;
|
||||
const lsofOut = new TextDecoder()
|
||||
.decode(Bun.spawnSync(["lsof", "-p", String(pid)]).stdout)
|
||||
.trim();
|
||||
const fds = lsofOut.split("\n").length - 1; // minus header
|
||||
return { t: Date.now(), rssKb, fds };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
let sampler: ReturnType<typeof setInterval> | null = null;
|
||||
function startSampler(): void {
|
||||
if (!BROKER_PID) return;
|
||||
sampler = setInterval(() => {
|
||||
const s = samplePidStats(BROKER_PID);
|
||||
if (s) samples.push(s);
|
||||
}, 2000);
|
||||
sampler.unref();
|
||||
}
|
||||
function stopSampler(): void {
|
||||
if (sampler) clearInterval(sampler);
|
||||
}
|
||||
|
||||
// --- Seed mesh + N members ---
|
||||
|
||||
async function seedMesh(): Promise<string> {
|
||||
await sodium.ready;
|
||||
const [existingUser] = await db
|
||||
.select({ id: user.id })
|
||||
.from(user)
|
||||
.where(eq(user.id, USER_ID));
|
||||
if (!existingUser) {
|
||||
await db.insert(user).values({
|
||||
id: USER_ID,
|
||||
name: "Load Test User",
|
||||
email: "loadtest@claudemesh.test",
|
||||
emailVerified: true,
|
||||
});
|
||||
}
|
||||
|
||||
// Drop prior loadtest mesh (cascades to members).
|
||||
await db.delete(mesh).where(eq(mesh.slug, MESH_SLUG));
|
||||
|
||||
const kpOwner = sodium.crypto_sign_keypair();
|
||||
const [m] = await db
|
||||
.insert(mesh)
|
||||
.values({
|
||||
name: "Load Test",
|
||||
slug: MESH_SLUG,
|
||||
ownerUserId: USER_ID,
|
||||
ownerPubkey: sodium.to_hex(kpOwner.publicKey),
|
||||
visibility: "private",
|
||||
transport: "managed",
|
||||
tier: "free",
|
||||
})
|
||||
.returning({ id: mesh.id });
|
||||
if (!m) throw new Error("mesh insert failed");
|
||||
|
||||
console.error(`[seed] created mesh ${m.id} (${MESH_SLUG})`);
|
||||
console.error(`[seed] generating ${PEERS} keypairs + member rows…`);
|
||||
|
||||
// Batch-insert 100 members.
|
||||
const rows = [];
|
||||
for (let i = 0; i < PEERS; i++) {
|
||||
const kp = sodium.crypto_sign_keypair();
|
||||
rows.push({
|
||||
meshId: m.id,
|
||||
userId: USER_ID,
|
||||
peerPubkey: sodium.to_hex(kp.publicKey),
|
||||
displayName: `peer-${i}`,
|
||||
role: "member" as const,
|
||||
_secretKey: sodium.to_hex(kp.privateKey),
|
||||
});
|
||||
}
|
||||
const inserted = await db
|
||||
.insert(meshMember)
|
||||
.values(rows.map(({ _secretKey: _s, ...r }) => r))
|
||||
.returning({ id: meshMember.id, peerPubkey: meshMember.peerPubkey });
|
||||
for (let i = 0; i < inserted.length; i++) {
|
||||
peers.push({
|
||||
memberId: inserted[i]!.id,
|
||||
pubkey: inserted[i]!.peerPubkey,
|
||||
secretKey: rows[i]!._secretKey,
|
||||
connected: false,
|
||||
sendsInFlight: 0,
|
||||
sendErrors: 0,
|
||||
});
|
||||
}
|
||||
console.error(`[seed] ${peers.length} members inserted`);
|
||||
return m.id;
|
||||
}
|
||||
|
||||
async function cleanupMesh(): Promise<void> {
|
||||
// Cascade deletes members + presences + messages.
|
||||
await db.delete(mesh).where(eq(mesh.slug, MESH_SLUG));
|
||||
// Mop up any loadtest users' stray presence rows (belt and braces).
|
||||
}
|
||||
|
||||
// --- WS client logic ---
|
||||
|
||||
function signHello(
|
||||
meshId: string,
|
||||
memberId: string,
|
||||
pubkey: string,
|
||||
secretHex: string,
|
||||
): { timestamp: number; signature: string } {
|
||||
const ts = Date.now();
|
||||
const canonical = `${meshId}|${memberId}|${pubkey}|${ts}`;
|
||||
const sig = sodium.to_hex(
|
||||
sodium.crypto_sign_detached(
|
||||
sodium.from_string(canonical),
|
||||
sodium.from_hex(secretHex),
|
||||
),
|
||||
);
|
||||
return { timestamp: ts, signature: sig };
|
||||
}
|
||||
|
||||
function encryptDirect(
|
||||
message: string,
|
||||
recipientPubHex: string,
|
||||
senderSecretHex: string,
|
||||
): { nonce: string; ciphertext: string } {
|
||||
const recipientPub = sodium.crypto_sign_ed25519_pk_to_curve25519(
|
||||
sodium.from_hex(recipientPubHex),
|
||||
);
|
||||
const senderSec = sodium.crypto_sign_ed25519_sk_to_curve25519(
|
||||
sodium.from_hex(senderSecretHex),
|
||||
);
|
||||
const nonce = sodium.randombytes_buf(sodium.crypto_box_NONCEBYTES);
|
||||
const ciphertext = sodium.crypto_box_easy(
|
||||
sodium.from_string(message),
|
||||
nonce,
|
||||
recipientPub,
|
||||
senderSec,
|
||||
);
|
||||
return {
|
||||
nonce: sodium.to_base64(nonce, sodium.base64_variants.ORIGINAL),
|
||||
ciphertext: sodium.to_base64(ciphertext, sodium.base64_variants.ORIGINAL),
|
||||
};
|
||||
}
|
||||
|
||||
async function connectPeer(
|
||||
idx: number,
|
||||
meshId: string,
|
||||
): Promise<void> {
|
||||
const p = peers[idx]!;
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(BROKER_URL);
|
||||
p.ws = ws;
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error(`peer ${idx} hello_ack timeout`));
|
||||
}, 10_000);
|
||||
ws.on("open", () => {
|
||||
const { timestamp, signature } = signHello(
|
||||
meshId,
|
||||
p.memberId,
|
||||
p.pubkey,
|
||||
p.secretKey,
|
||||
);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "hello",
|
||||
meshId,
|
||||
memberId: p.memberId,
|
||||
pubkey: p.pubkey,
|
||||
sessionId: `loadtest-${idx}`,
|
||||
pid: process.pid,
|
||||
cwd: `/tmp/loadtest-${idx}`,
|
||||
timestamp,
|
||||
signature,
|
||||
}),
|
||||
);
|
||||
});
|
||||
ws.on("message", (raw) => {
|
||||
const msg = JSON.parse(raw.toString()) as Record<string, unknown>;
|
||||
if (msg.type === "hello_ack") {
|
||||
clearTimeout(timeout);
|
||||
p.connected = true;
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
if (msg.type === "ack") {
|
||||
const clientId = String(msg.id ?? "");
|
||||
const brokerId = String(msg.messageId ?? "");
|
||||
const t = timings.get(clientId);
|
||||
if (t) t.ackAt = Date.now();
|
||||
// Index broker messageId → clientId so the push handler
|
||||
// (below) can correlate — pushes only carry broker messageId.
|
||||
if (brokerId) brokerIdToClientId.set(brokerId, clientId);
|
||||
p.sendsInFlight -= 1;
|
||||
return;
|
||||
}
|
||||
if (msg.type === "push") {
|
||||
const brokerId = String(msg.messageId ?? "");
|
||||
const clientId = brokerIdToClientId.get(brokerId);
|
||||
if (clientId) {
|
||||
const t = timings.get(clientId);
|
||||
if (t && !t.pushAt) t.pushAt = Date.now();
|
||||
}
|
||||
return;
|
||||
}
|
||||
});
|
||||
ws.on("error", () => {
|
||||
clearTimeout(timeout);
|
||||
reject(new Error(`peer ${idx} ws error`));
|
||||
});
|
||||
ws.on("close", () => {
|
||||
p.connected = false;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function connectAll(meshId: string): Promise<void> {
|
||||
console.error(`[connect] opening ${PEERS} WS connections…`);
|
||||
// Connect in batches of 20 to avoid thundering herd.
|
||||
const BATCH = 20;
|
||||
for (let i = 0; i < PEERS; i += BATCH) {
|
||||
const batch = [];
|
||||
for (let j = i; j < Math.min(i + BATCH, PEERS); j++) {
|
||||
batch.push(connectPeer(j, meshId));
|
||||
}
|
||||
await Promise.all(batch);
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
}
|
||||
const connected = peers.filter((p) => p.connected).length;
|
||||
console.error(`[connect] ${connected}/${PEERS} peers connected`);
|
||||
}
|
||||
|
||||
// We need to correlate ack → push. Broker's ack carries the
|
||||
// client-side id; push carries a broker-assigned messageId. We index
|
||||
// timings by client-side id initially, then on ack we learn the
|
||||
// broker messageId and create a second index pointing to same record.
|
||||
const brokerIdToClientId = new Map<string, string>();
|
||||
|
||||
async function runSends(): Promise<void> {
|
||||
console.error(
|
||||
`[send] firing ${MSGS_PER_PEER} msgs per peer = ${TOTAL_MSGS} total…`,
|
||||
);
|
||||
const startedAt = Date.now();
|
||||
|
||||
// Each peer sends MSGS_PER_PEER msgs to random other peers.
|
||||
await Promise.all(
|
||||
peers.map(async (p, idx) => {
|
||||
if (!p.ws || !p.connected) return;
|
||||
for (let i = 0; i < MSGS_PER_PEER; i++) {
|
||||
// Pick a random peer that's not self.
|
||||
let targetIdx = Math.floor(Math.random() * PEERS);
|
||||
if (targetIdx === idx) targetIdx = (targetIdx + 1) % PEERS;
|
||||
const target = peers[targetIdx]!;
|
||||
const clientId = `${idx}-${i}`;
|
||||
const env = encryptDirect(
|
||||
`msg-${clientId}`,
|
||||
target.pubkey,
|
||||
p.secretKey,
|
||||
);
|
||||
timings.set(clientId, {
|
||||
sentAt: Date.now(),
|
||||
senderIdx: idx,
|
||||
recipientIdx: targetIdx,
|
||||
});
|
||||
try {
|
||||
p.ws.send(
|
||||
JSON.stringify({
|
||||
type: "send",
|
||||
id: clientId,
|
||||
targetSpec: target.pubkey,
|
||||
priority: "now",
|
||||
nonce: env.nonce,
|
||||
ciphertext: env.ciphertext,
|
||||
}),
|
||||
);
|
||||
p.sendsInFlight += 1;
|
||||
} catch {
|
||||
p.sendErrors += 1;
|
||||
}
|
||||
// Small breathing room so we don't overwhelm the ws buffer.
|
||||
if (i % 100 === 0) await new Promise((r) => setTimeout(r, 1));
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
const sent = Date.now() - startedAt;
|
||||
console.error(`[send] all sends dispatched in ${sent}ms`);
|
||||
}
|
||||
|
||||
// We need broker messageId → client id correlation to measure push
|
||||
// latency. Ack carries both (msg.id = clientId, msg.messageId = broker
|
||||
// id). Update the ws message handler to populate the index.
|
||||
// (Done inline above — we need to actually USE it.)
|
||||
//
|
||||
// Wire that in: on ack, brokerIdToClientId.set(messageId, clientId).
|
||||
// On push, look up clientId by messageId, then record pushAt on
|
||||
// timings.get(clientId).
|
||||
|
||||
async function waitForDrain(maxMs: number): Promise<void> {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < maxMs) {
|
||||
const acked = [...timings.values()].filter((t) => t.ackAt).length;
|
||||
const pushed = [...timings.values()].filter((t) => t.pushAt).length;
|
||||
if (acked === TOTAL_MSGS && pushed === TOTAL_MSGS) return;
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
}
|
||||
}
|
||||
|
||||
// --- Stats ---
|
||||
|
||||
function percentile(sorted: number[], p: number): number {
|
||||
if (sorted.length === 0) return 0;
|
||||
const i = Math.min(
|
||||
sorted.length - 1,
|
||||
Math.floor((p / 100) * sorted.length),
|
||||
);
|
||||
return sorted[i]!;
|
||||
}
|
||||
|
||||
function report(): void {
|
||||
const all = [...timings.values()];
|
||||
const complete = all.filter((t) => t.pushAt && t.ackAt);
|
||||
const timedOut = all.length - complete.length;
|
||||
const latencies = complete
|
||||
.map((t) => t.pushAt! - t.sentAt)
|
||||
.sort((a, b) => a - b);
|
||||
const ackLatencies = complete
|
||||
.map((t) => t.ackAt! - t.sentAt)
|
||||
.sort((a, b) => a - b);
|
||||
|
||||
const rssMax = samples.length
|
||||
? Math.max(...samples.map((s) => s.rssKb))
|
||||
: null;
|
||||
const rssMin = samples.length
|
||||
? Math.min(...samples.map((s) => s.rssKb))
|
||||
: null;
|
||||
const fdMax = samples.length
|
||||
? Math.max(...samples.map((s) => s.fds))
|
||||
: null;
|
||||
|
||||
console.log("");
|
||||
console.log("╔══════════════════════════════════════════════════════════╗");
|
||||
console.log(`║ claudemesh broker load test — ${PEERS} peers × ${MSGS_PER_PEER} msgs ║`);
|
||||
console.log("╚══════════════════════════════════════════════════════════╝");
|
||||
console.log("");
|
||||
console.log("Delivery:");
|
||||
console.log(` sent: ${all.length}`);
|
||||
console.log(` complete: ${complete.length} (${((100 * complete.length) / all.length).toFixed(2)}%)`);
|
||||
console.log(` timed out: ${timedOut}`);
|
||||
console.log("");
|
||||
console.log("End-to-end latency (send → push):");
|
||||
console.log(` p50: ${percentile(latencies, 50)} ms`);
|
||||
console.log(` p95: ${percentile(latencies, 95)} ms`);
|
||||
console.log(` p99: ${percentile(latencies, 99)} ms`);
|
||||
console.log(` max: ${latencies[latencies.length - 1] ?? 0} ms`);
|
||||
console.log("");
|
||||
console.log("Send → ack latency (broker queue write):");
|
||||
console.log(` p50: ${percentile(ackLatencies, 50)} ms`);
|
||||
console.log(` p95: ${percentile(ackLatencies, 95)} ms`);
|
||||
console.log(` p99: ${percentile(ackLatencies, 99)} ms`);
|
||||
if (rssMax !== null) {
|
||||
console.log("");
|
||||
console.log("Broker process (via BROKER_PID):");
|
||||
console.log(` RSS: ${(rssMin! / 1024).toFixed(1)} MB → ${(rssMax / 1024).toFixed(1)} MB`);
|
||||
console.log(` max open FDs: ${fdMax}`);
|
||||
console.log(` samples: ${samples.length}`);
|
||||
}
|
||||
console.log("");
|
||||
}
|
||||
|
||||
// --- Main ---
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const meshId = await seedMesh();
|
||||
startSampler();
|
||||
try {
|
||||
await connectAll(meshId);
|
||||
await runSends();
|
||||
const drainCap = parseInt(process.env.DRAIN_MS ?? "180000", 10);
|
||||
console.error(`[drain] waiting for acks + pushes to settle (up to ${drainCap / 1000}s)…`);
|
||||
await waitForDrain(drainCap);
|
||||
report();
|
||||
} finally {
|
||||
stopSampler();
|
||||
for (const p of peers) {
|
||||
try {
|
||||
p.ws?.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
await cleanupMesh();
|
||||
}
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
main().catch((e) => {
|
||||
console.error("[loadtest] error:", e);
|
||||
if (e instanceof Error && e.cause) {
|
||||
console.error("[loadtest] cause:", e.cause);
|
||||
}
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Wire ack→push correlation by sneaking the broker messageId into
|
||||
// the client-side timings map. We need to edit the message handler
|
||||
// inline above to record it; since the handler already reads msg.id
|
||||
// for the ack path, we just ALSO use msg.id as the correlation key
|
||||
// on push. The broker's push DOES echo clientId? NO — push only has
|
||||
// broker's messageId. So we correlate via the ack phase: when ack
|
||||
// arrives we map messageId→clientId, then on push we look it up.
|
||||
// (The handler above already references this map; just uses the
|
||||
// wrong variable. Fix: update handler to use brokerIdToClientId.)
|
||||
void brokerIdToClientId;
|
||||
Reference in New Issue
Block a user