diff --git a/apps/broker/scripts/peer-a.ts b/apps/broker/scripts/peer-a.ts new file mode 100644 index 0000000..5c00314 --- /dev/null +++ b/apps/broker/scripts/peer-a.ts @@ -0,0 +1,73 @@ +#!/usr/bin/env bun +/** + * Smoke-test peer A (sender). + * + * Reads the seed JSON from /tmp/smoke-seed.json, connects to the + * broker, sends hello, then sends one direct message to peer B. + * Exits after 5s whether or not it gets anything back. + */ + +import { readFileSync } from "node:fs"; +import WebSocket from "ws"; + +const seed = JSON.parse(readFileSync("/tmp/smoke-seed.json", "utf-8")) as { + meshId: string; + peerA: { memberId: string; pubkey: string }; + peerB: { memberId: string; pubkey: string }; +}; + +const BROKER = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws"; +const ws = new WebSocket(BROKER); + +let helloAcked = false; + +ws.on("open", () => { + console.log("[peer-a] connected, sending hello"); + ws.send( + JSON.stringify({ + type: "hello", + meshId: seed.meshId, + memberId: seed.peerA.memberId, + pubkey: seed.peerA.pubkey, + sessionId: "peer-a-session", + pid: process.pid, + cwd: "/tmp/peer-a", + signature: "stub", + nonce: "stub", + }), + ); +}); + +ws.on("message", (raw) => { + const msg = JSON.parse(raw.toString()); + console.log("[peer-a] recv:", JSON.stringify(msg)); + if (!helloAcked) { + // Broker doesn't currently ack hello explicitly; first message we + // get is a push OR error. Assume success and fire our send after + // a short delay. + } +}); + +ws.on("error", (e) => console.error("[peer-a] error:", e.message)); +ws.on("close", () => console.log("[peer-a] closed")); + +// After a short delay to let hello complete, send the test message. +setTimeout(() => { + console.log("[peer-a] sending direct message to peer-b"); + ws.send( + JSON.stringify({ + type: "send", + targetSpec: seed.peerB.pubkey, + priority: "now", + nonce: "fake-nonce-aaa", + ciphertext: "hello-from-a", + id: "msg-1", + }), + ); +}, 500); + +setTimeout(() => { + console.log("[peer-a] done, closing"); + ws.close(); + process.exit(0); +}, 5000); diff --git a/apps/broker/scripts/peer-b.ts b/apps/broker/scripts/peer-b.ts new file mode 100644 index 0000000..ede2018 --- /dev/null +++ b/apps/broker/scripts/peer-b.ts @@ -0,0 +1,79 @@ +#!/usr/bin/env bun +/** + * Smoke-test peer B (receiver). + * + * Connects, sends hello, then waits up to 5s for a push from peer A. + * Exits 0 on successful receive with matching senderPubkey, 1 on + * timeout or mismatch. + */ + +import { readFileSync } from "node:fs"; +import WebSocket from "ws"; + +const seed = JSON.parse(readFileSync("/tmp/smoke-seed.json", "utf-8")) as { + meshId: string; + peerA: { memberId: string; pubkey: string }; + peerB: { memberId: string; pubkey: string }; +}; + +const BROKER = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws"; +const ws = new WebSocket(BROKER); + +let received = false; + +ws.on("open", () => { + console.log("[peer-b] connected, sending hello"); + ws.send( + JSON.stringify({ + type: "hello", + meshId: seed.meshId, + memberId: seed.peerB.memberId, + pubkey: seed.peerB.pubkey, + sessionId: "peer-b-session", + pid: process.pid, + cwd: "/tmp/peer-b", + signature: "stub", + nonce: "stub", + }), + ); +}); + +ws.on("message", (raw) => { + const msg = JSON.parse(raw.toString()) as { + type: string; + senderPubkey?: string; + ciphertext?: string; + code?: string; + message?: string; + }; + console.log("[peer-b] recv:", JSON.stringify(msg)); + if (msg.type === "push") { + if (msg.senderPubkey === seed.peerA.pubkey) { + console.log("[peer-b] ✓ got expected push from peer-a"); + received = true; + ws.close(); + process.exit(0); + } else { + console.error( + `[peer-b] ✗ wrong senderPubkey: got ${msg.senderPubkey}, expected ${seed.peerA.pubkey}`, + ); + ws.close(); + process.exit(1); + } + } + if (msg.type === "error") { + console.error(`[peer-b] ✗ broker error: ${msg.code} ${msg.message}`); + ws.close(); + process.exit(1); + } +}); + +ws.on("error", (e) => console.error("[peer-b] ws error:", e.message)); +ws.on("close", () => console.log("[peer-b] closed")); + +setTimeout(() => { + if (!received) { + console.error("[peer-b] ✗ timeout waiting for push (5s)"); + process.exit(1); + } +}, 5000); diff --git a/apps/broker/scripts/seed-test-mesh.ts b/apps/broker/scripts/seed-test-mesh.ts new file mode 100644 index 0000000..a795bfa --- /dev/null +++ b/apps/broker/scripts/seed-test-mesh.ts @@ -0,0 +1,88 @@ +#!/usr/bin/env bun +/** + * Seed a minimal "smoke-test" mesh with two members. + * + * Idempotent: safe to run repeatedly. Re-creates members by + * deleting any prior "smoke-test" mesh and its cascaded rows first. + * + * Outputs the meshId + both memberIds + both pubkeys as JSON (stdout) + * so peer-a.ts and peer-b.ts can read them before connecting. + */ + +import { eq } from "drizzle-orm"; +import { db } from "../src/db"; +import { mesh, meshMember } from "@turbostarter/db/schema/mesh"; +import { user } from "@turbostarter/db/schema/auth"; + +const USER_ID = "test-user-smoke"; +const MESH_SLUG = "smoke-test"; +const PEER_A_PUBKEY = "a".repeat(64); +const PEER_B_PUBKEY = "b".repeat(64); + +async function main() { + // Ensure the test user exists (re-usable across runs). + const [existingUser] = await db + .select({ id: user.id }) + .from(user) + .where(eq(user.id, USER_ID)); + if (!existingUser) { + await db.insert(user).values({ + id: USER_ID, + name: "Smoke Test User", + email: "smoke@claudemesh.test", + emailVerified: true, + }); + } + + // Drop any prior mesh with this slug (cascades to members). + await db.delete(mesh).where(eq(mesh.slug, MESH_SLUG)); + + // Fresh mesh + 2 members. + const [m] = await db + .insert(mesh) + .values({ + name: "Smoke Test", + slug: MESH_SLUG, + ownerUserId: USER_ID, + visibility: "private", + transport: "managed", + tier: "free", + }) + .returning({ id: mesh.id }); + if (!m) throw new Error("mesh insert failed"); + + const [peerA] = await db + .insert(meshMember) + .values({ + meshId: m.id, + userId: USER_ID, + peerPubkey: PEER_A_PUBKEY, + displayName: "peer-a", + role: "admin", + }) + .returning({ id: meshMember.id }); + const [peerB] = await db + .insert(meshMember) + .values({ + meshId: m.id, + userId: USER_ID, + peerPubkey: PEER_B_PUBKEY, + displayName: "peer-b", + role: "member", + }) + .returning({ id: meshMember.id }); + if (!peerA || !peerB) throw new Error("member insert failed"); + + const seed = { + meshId: m.id, + peerA: { memberId: peerA.id, pubkey: PEER_A_PUBKEY }, + peerB: { memberId: peerB.id, pubkey: PEER_B_PUBKEY }, + }; + console.log(JSON.stringify(seed, null, 2)); + process.exit(0); +} + +main().catch((e) => { + console.error("[seed] error:", e instanceof Error ? e.message : e); + process.exit(1); +}); diff --git a/apps/broker/scripts/smoke-test.sh b/apps/broker/scripts/smoke-test.sh new file mode 100755 index 0000000..c9d8aa7 --- /dev/null +++ b/apps/broker/scripts/smoke-test.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +# End-to-end smoke test for the broker. +# +# Flow: +# 1. Seed a test mesh with 2 members → writes /tmp/smoke-seed.json +# 2. Start peer B (receiver) in background +# 3. Start peer A (sender) +# 4. Wait for B → exit code is the test result +# +# Assumes: broker is running on ws://localhost:7900/ws, DATABASE_URL +# is in env. Run from the broker workspace: +# cd apps/broker && ./scripts/smoke-test.sh + +set -euo pipefail + +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "── seeding test mesh ──" +bun "$DIR/seed-test-mesh.ts" > /tmp/smoke-seed.json +cat /tmp/smoke-seed.json + +echo "" +echo "── starting peer-b (receiver) ──" +bun "$DIR/peer-b.ts" & +B_PID=$! + +sleep 1 + +echo "" +echo "── starting peer-a (sender) ──" +bun "$DIR/peer-a.ts" + +echo "" +echo "── waiting for peer-b ──" +if wait $B_PID; then + echo "✓ smoke test PASSED" + exit 0 +else + echo "✗ smoke test FAILED" + exit 1 +fi diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index d15da28..3c359fe 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -14,7 +14,18 @@ * - Message envelopes are opaque ciphertext (client-side crypto). */ -import { and, asc, eq, inArray, isNull, lt, or, sql } from "drizzle-orm"; +import { + and, + asc, + desc, + eq, + gte, + inArray, + isNotNull, + isNull, + lt, + or, +} from "drizzle-orm"; import { db } from "./db"; import { meshMember as memberTable, @@ -154,7 +165,7 @@ export async function handleHookSetStatus( .select({ id: presence.id, status: presence.status }) .from(presence) .where(activeFilter) - .orderBy(sql`${presence.connectedAt} DESC`) + .orderBy(desc(presence.connectedAt)) .limit(1); row = r as { id: string; status: PeerStatus } | undefined; } @@ -197,10 +208,10 @@ export async function applyPendingHookStatus( eq(pendingStatus.pid, pid), eq(pendingStatus.cwd, cwd), isNull(pendingStatus.appliedAt), - sql`${pendingStatus.createdAt} >= ${cutoff}`, + gte(pendingStatus.createdAt, cutoff), ), ) - .orderBy(sql`${pendingStatus.createdAt} DESC`) + .orderBy(desc(pendingStatus.createdAt)) .limit(1); if (!row) return; await writeStatus(presenceId, row.status as PeerStatus, "hook", now); @@ -241,10 +252,7 @@ export async function sweepPendingStatuses(): Promise { await db .delete(pendingStatus) .where( - or( - lt(pendingStatus.createdAt, cutoff), - sql`${pendingStatus.appliedAt} IS NOT NULL`, - )!, + or(lt(pendingStatus.createdAt, cutoff), isNotNull(pendingStatus.appliedAt))!, ); }