test(broker): smoke test for hello + direct message flow
Some checks failed
CI / Tests / 🧪 Test (push) Has been cancelled

Adds scripts/{seed-test-mesh,peer-a,peer-b,smoke-test}.ts|.sh that
prove an end-to-end message flow works against a real Postgres:

- seed-test-mesh.ts creates user+mesh+2 members with deterministic
  hex pubkeys ("aa..aa", "bb..bb"), writes seed JSON to stdout
- peer-a.ts sends hello then a direct "send" message to peer B's
  pubkey with fake ciphertext "hello-from-a"
- peer-b.ts sends hello, waits up to 5s for a push, asserts
  senderPubkey matches peer A, exits 0/1
- smoke-test.sh wires the three together

Verified flow: hello registers presence row → send queues into
mesh.message_queue → fanout matches connected peer by pubkey →
drainForMember joins on mesh.member for senderPubkey → push lands
with ciphertext + correct sender attribution.

Also fixes a date-serialization bug that blocked the first run:
applyPendingHookStatus used `sql${col} >= ${jsDate}` which passed
JS Date.toString() to Postgres (failed to parse). Replaced raw
sql`` template with typed gte/desc/isNotNull operators from
drizzle-orm. Same fix applied in sweepPendingStatuses.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-04 21:53:33 +01:00
parent 56b70ac54c
commit 76760c9b8c
5 changed files with 297 additions and 8 deletions

View File

@@ -0,0 +1,73 @@
#!/usr/bin/env bun
/**
* Smoke-test peer A (sender).
*
* Reads the seed JSON from /tmp/smoke-seed.json, connects to the
* broker, sends hello, then sends one direct message to peer B.
* Exits after 5s whether or not it gets anything back.
*/
import { readFileSync } from "node:fs";
import WebSocket from "ws";
const seed = JSON.parse(readFileSync("/tmp/smoke-seed.json", "utf-8")) as {
meshId: string;
peerA: { memberId: string; pubkey: string };
peerB: { memberId: string; pubkey: string };
};
const BROKER = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws";
const ws = new WebSocket(BROKER);
let helloAcked = false;
ws.on("open", () => {
console.log("[peer-a] connected, sending hello");
ws.send(
JSON.stringify({
type: "hello",
meshId: seed.meshId,
memberId: seed.peerA.memberId,
pubkey: seed.peerA.pubkey,
sessionId: "peer-a-session",
pid: process.pid,
cwd: "/tmp/peer-a",
signature: "stub",
nonce: "stub",
}),
);
});
ws.on("message", (raw) => {
const msg = JSON.parse(raw.toString());
console.log("[peer-a] recv:", JSON.stringify(msg));
if (!helloAcked) {
// Broker doesn't currently ack hello explicitly; first message we
// get is a push OR error. Assume success and fire our send after
// a short delay.
}
});
ws.on("error", (e) => console.error("[peer-a] error:", e.message));
ws.on("close", () => console.log("[peer-a] closed"));
// After a short delay to let hello complete, send the test message.
setTimeout(() => {
console.log("[peer-a] sending direct message to peer-b");
ws.send(
JSON.stringify({
type: "send",
targetSpec: seed.peerB.pubkey,
priority: "now",
nonce: "fake-nonce-aaa",
ciphertext: "hello-from-a",
id: "msg-1",
}),
);
}, 500);
setTimeout(() => {
console.log("[peer-a] done, closing");
ws.close();
process.exit(0);
}, 5000);

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env bun
/**
* Smoke-test peer B (receiver).
*
* Connects, sends hello, then waits up to 5s for a push from peer A.
* Exits 0 on successful receive with matching senderPubkey, 1 on
* timeout or mismatch.
*/
import { readFileSync } from "node:fs";
import WebSocket from "ws";
const seed = JSON.parse(readFileSync("/tmp/smoke-seed.json", "utf-8")) as {
meshId: string;
peerA: { memberId: string; pubkey: string };
peerB: { memberId: string; pubkey: string };
};
const BROKER = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws";
const ws = new WebSocket(BROKER);
let received = false;
ws.on("open", () => {
console.log("[peer-b] connected, sending hello");
ws.send(
JSON.stringify({
type: "hello",
meshId: seed.meshId,
memberId: seed.peerB.memberId,
pubkey: seed.peerB.pubkey,
sessionId: "peer-b-session",
pid: process.pid,
cwd: "/tmp/peer-b",
signature: "stub",
nonce: "stub",
}),
);
});
ws.on("message", (raw) => {
const msg = JSON.parse(raw.toString()) as {
type: string;
senderPubkey?: string;
ciphertext?: string;
code?: string;
message?: string;
};
console.log("[peer-b] recv:", JSON.stringify(msg));
if (msg.type === "push") {
if (msg.senderPubkey === seed.peerA.pubkey) {
console.log("[peer-b] ✓ got expected push from peer-a");
received = true;
ws.close();
process.exit(0);
} else {
console.error(
`[peer-b] ✗ wrong senderPubkey: got ${msg.senderPubkey}, expected ${seed.peerA.pubkey}`,
);
ws.close();
process.exit(1);
}
}
if (msg.type === "error") {
console.error(`[peer-b] ✗ broker error: ${msg.code} ${msg.message}`);
ws.close();
process.exit(1);
}
});
ws.on("error", (e) => console.error("[peer-b] ws error:", e.message));
ws.on("close", () => console.log("[peer-b] closed"));
setTimeout(() => {
if (!received) {
console.error("[peer-b] ✗ timeout waiting for push (5s)");
process.exit(1);
}
}, 5000);

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env bun
/**
* Seed a minimal "smoke-test" mesh with two members.
*
* Idempotent: safe to run repeatedly. Re-creates members by
* deleting any prior "smoke-test" mesh and its cascaded rows first.
*
* Outputs the meshId + both memberIds + both pubkeys as JSON (stdout)
* so peer-a.ts and peer-b.ts can read them before connecting.
*/
import { eq } from "drizzle-orm";
import { db } from "../src/db";
import { mesh, meshMember } from "@turbostarter/db/schema/mesh";
import { user } from "@turbostarter/db/schema/auth";
const USER_ID = "test-user-smoke";
const MESH_SLUG = "smoke-test";
const PEER_A_PUBKEY = "a".repeat(64);
const PEER_B_PUBKEY = "b".repeat(64);
async function main() {
// Ensure the test user exists (re-usable across runs).
const [existingUser] = await db
.select({ id: user.id })
.from(user)
.where(eq(user.id, USER_ID));
if (!existingUser) {
await db.insert(user).values({
id: USER_ID,
name: "Smoke Test User",
email: "smoke@claudemesh.test",
emailVerified: true,
});
}
// Drop any prior mesh with this slug (cascades to members).
await db.delete(mesh).where(eq(mesh.slug, MESH_SLUG));
// Fresh mesh + 2 members.
const [m] = await db
.insert(mesh)
.values({
name: "Smoke Test",
slug: MESH_SLUG,
ownerUserId: USER_ID,
visibility: "private",
transport: "managed",
tier: "free",
})
.returning({ id: mesh.id });
if (!m) throw new Error("mesh insert failed");
const [peerA] = await db
.insert(meshMember)
.values({
meshId: m.id,
userId: USER_ID,
peerPubkey: PEER_A_PUBKEY,
displayName: "peer-a",
role: "admin",
})
.returning({ id: meshMember.id });
const [peerB] = await db
.insert(meshMember)
.values({
meshId: m.id,
userId: USER_ID,
peerPubkey: PEER_B_PUBKEY,
displayName: "peer-b",
role: "member",
})
.returning({ id: meshMember.id });
if (!peerA || !peerB) throw new Error("member insert failed");
const seed = {
meshId: m.id,
peerA: { memberId: peerA.id, pubkey: PEER_A_PUBKEY },
peerB: { memberId: peerB.id, pubkey: PEER_B_PUBKEY },
};
console.log(JSON.stringify(seed, null, 2));
process.exit(0);
}
main().catch((e) => {
console.error("[seed] error:", e instanceof Error ? e.message : e);
process.exit(1);
});

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env bash
# End-to-end smoke test for the broker.
#
# Flow:
# 1. Seed a test mesh with 2 members → writes /tmp/smoke-seed.json
# 2. Start peer B (receiver) in background
# 3. Start peer A (sender)
# 4. Wait for B → exit code is the test result
#
# Assumes: broker is running on ws://localhost:7900/ws, DATABASE_URL
# is in env. Run from the broker workspace:
# cd apps/broker && ./scripts/smoke-test.sh
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "── seeding test mesh ──"
bun "$DIR/seed-test-mesh.ts" > /tmp/smoke-seed.json
cat /tmp/smoke-seed.json
echo ""
echo "── starting peer-b (receiver) ──"
bun "$DIR/peer-b.ts" &
B_PID=$!
sleep 1
echo ""
echo "── starting peer-a (sender) ──"
bun "$DIR/peer-a.ts"
echo ""
echo "── waiting for peer-b ──"
if wait $B_PID; then
echo "✓ smoke test PASSED"
exit 0
else
echo "✗ smoke test FAILED"
exit 1
fi

View File

@@ -14,7 +14,18 @@
* - Message envelopes are opaque ciphertext (client-side crypto).
*/
import { and, asc, eq, inArray, isNull, lt, or, sql } from "drizzle-orm";
import {
and,
asc,
desc,
eq,
gte,
inArray,
isNotNull,
isNull,
lt,
or,
} from "drizzle-orm";
import { db } from "./db";
import {
meshMember as memberTable,
@@ -154,7 +165,7 @@ export async function handleHookSetStatus(
.select({ id: presence.id, status: presence.status })
.from(presence)
.where(activeFilter)
.orderBy(sql`${presence.connectedAt} DESC`)
.orderBy(desc(presence.connectedAt))
.limit(1);
row = r as { id: string; status: PeerStatus } | undefined;
}
@@ -197,10 +208,10 @@ export async function applyPendingHookStatus(
eq(pendingStatus.pid, pid),
eq(pendingStatus.cwd, cwd),
isNull(pendingStatus.appliedAt),
sql`${pendingStatus.createdAt} >= ${cutoff}`,
gte(pendingStatus.createdAt, cutoff),
),
)
.orderBy(sql`${pendingStatus.createdAt} DESC`)
.orderBy(desc(pendingStatus.createdAt))
.limit(1);
if (!row) return;
await writeStatus(presenceId, row.status as PeerStatus, "hook", now);
@@ -241,10 +252,7 @@ export async function sweepPendingStatuses(): Promise<void> {
await db
.delete(pendingStatus)
.where(
or(
lt(pendingStatus.createdAt, cutoff),
sql`${pendingStatus.appliedAt} IS NOT NULL`,
)!,
or(lt(pendingStatus.createdAt, cutoff), isNotNull(pendingStatus.appliedAt))!,
);
}