feat(daemon): sprint 4 outbound routing + CLI thin-client + ambient mode
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Daemon outbox now stores resolved target_spec + crypto_box ciphertext
+ nonce per row. Drain worker is a forwarder; no per-row resolution at
drain time. Outbound routing is no longer a placeholder.

Schema additions (additive, NULL allowed for legacy rows): outbox.mesh,
target_spec, nonce, ciphertext, priority. v0.9.0 rows keep draining via
the broadcast fallback so existing in-flight rows finish cleanly.

IPC /v1/send resolves the user-friendly to (display name, hex prefix,
full pubkey, @group, *, #topicId) into a broker-format target_spec at
accept time. DMs encrypt via crypto_box; broadcast/topic/group base64
the plaintext. Hex prefixes (16+ chars) match against connected peers.

CLI thin-client routing extends trySendViaDaemon pattern to peer list
and skill list/get. Three new helpers in services/bridge/daemon-route.ts.

SKILL.md gains ambient mode section: after claudemesh install, raw
claude works for the daemon's attached mesh. Launch stays as the
override path.

Spec at .artifacts/specs/2026-05-04-v2-roadmap-completion.md orders
the remaining v2.0.0 work: multi-mesh daemon (1.26), CLI-to-thin-client
(1.27), mesh-to-workspace rename (1.28), HKDF identity (2.0).

Released as 1.25.0 on npm.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 01:36:16 +01:00
parent 6794aa8512
commit 0e3a5babd9
13 changed files with 482 additions and 23 deletions

View File

@@ -20,6 +20,12 @@ export interface OutboxRow {
aborted_at: number | null;
aborted_by: string | null;
superseded_by: string | null;
/** Sprint 4 routing: NULL on v0.9.0 rows, drained via broadcast fallback. */
mesh: string | null;
target_spec: string | null;
nonce: string | null;
ciphertext: string | null;
priority: string | null;
}
export function migrateOutbox(db: SqliteDb): void {
@@ -46,13 +52,35 @@ export function migrateOutbox(db: SqliteDb): void {
CREATE INDEX IF NOT EXISTS outbox_aborted
ON outbox(status, aborted_at) WHERE status = 'aborted';
`);
// v1.25.0 / Sprint 4: real outbound routing. Adds the broker-format
// target spec, mesh slug, and the already-encrypted ciphertext+nonce so
// the drain worker can dispatch each row without re-resolving names or
// re-running crypto. Existing rows from v0.9.0 land with NULLs and get
// drained via the legacy broadcast fallback (preserves no-regression).
const hasMesh = columnExists(db, "outbox", "mesh");
const hasTargetSpec = columnExists(db, "outbox", "target_spec");
const hasNonce = columnExists(db, "outbox", "nonce");
const hasCiphertext = columnExists(db, "outbox", "ciphertext");
const hasPriority = columnExists(db, "outbox", "priority");
if (!hasMesh) db.exec(`ALTER TABLE outbox ADD COLUMN mesh TEXT`);
if (!hasTargetSpec) db.exec(`ALTER TABLE outbox ADD COLUMN target_spec TEXT`);
if (!hasNonce) db.exec(`ALTER TABLE outbox ADD COLUMN nonce TEXT`);
if (!hasCiphertext) db.exec(`ALTER TABLE outbox ADD COLUMN ciphertext TEXT`);
if (!hasPriority) db.exec(`ALTER TABLE outbox ADD COLUMN priority TEXT`);
}
function columnExists(db: SqliteDb, table: string, column: string): boolean {
const rows = db.prepare(`PRAGMA table_info(${table})`).all<{ name: string }>();
return rows.some((r) => r.name === column);
}
export function findByClientId(db: SqliteDb, clientMessageId: string): OutboxRow | null {
const row = db.prepare(`
SELECT id, client_message_id, request_fingerprint, payload, enqueued_at,
attempts, next_attempt_at, status, last_error, delivered_at,
broker_message_id, aborted_at, aborted_by, superseded_by
broker_message_id, aborted_at, aborted_by, superseded_by,
mesh, target_spec, nonce, ciphertext, priority
FROM outbox WHERE client_message_id = ?
`).get<OutboxRow>(clientMessageId);
return row ?? null;
@@ -64,14 +92,21 @@ export interface InsertPendingInput {
request_fingerprint: Uint8Array;
payload: Uint8Array;
now: number;
/** Sprint 4: routing fields. Optional only for legacy/v0.9.0 callers. */
mesh?: string;
target_spec?: string;
nonce?: string;
ciphertext?: string;
priority?: string;
}
export function insertPending(db: SqliteDb, input: InsertPendingInput): void {
db.prepare(`
INSERT INTO outbox (
id, client_message_id, request_fingerprint, payload,
enqueued_at, attempts, next_attempt_at, status
) VALUES (?, ?, ?, ?, ?, 0, ?, 'pending')
enqueued_at, attempts, next_attempt_at, status,
mesh, target_spec, nonce, ciphertext, priority
) VALUES (?, ?, ?, ?, ?, 0, ?, 'pending', ?, ?, ?, ?, ?)
`).run(
input.id,
input.client_message_id,
@@ -79,6 +114,11 @@ export function insertPending(db: SqliteDb, input: InsertPendingInput): void {
input.payload,
input.now,
input.now,
input.mesh ?? null,
input.target_spec ?? null,
input.nonce ?? null,
input.ciphertext ?? null,
input.priority ?? null,
);
}
@@ -108,7 +148,8 @@ export function listOutbox(db: SqliteDb, p: ListOutboxParams = {}): OutboxRow[]
const sql = `
SELECT id, client_message_id, request_fingerprint, payload, enqueued_at,
attempts, next_attempt_at, status, last_error, delivered_at,
broker_message_id, aborted_at, aborted_by, superseded_by
broker_message_id, aborted_at, aborted_by, superseded_by,
mesh, target_spec, nonce, ciphertext, priority
FROM outbox
${where.length ? "WHERE " + where.join(" AND ") : ""}
ORDER BY enqueued_at DESC
@@ -122,7 +163,8 @@ export function findById(db: SqliteDb, id: string): OutboxRow | null {
return db.prepare(`
SELECT id, client_message_id, request_fingerprint, payload, enqueued_at,
attempts, next_attempt_at, status, last_error, delivered_at,
broker_message_id, aborted_at, aborted_by, superseded_by
broker_message_id, aborted_at, aborted_by, superseded_by,
mesh, target_spec, nonce, ciphertext, priority
FROM outbox WHERE id = ?
`).get<OutboxRow>(id) ?? null;
}

View File

@@ -26,6 +26,12 @@ interface PendingRow {
request_fingerprint: Uint8Array;
payload: Uint8Array;
attempts: number;
/** Sprint 4 routing fields. NULL on legacy v0.9.0 rows → broadcast fallback. */
target_spec: string | null;
nonce: string | null;
ciphertext: string | null;
priority: string | null;
mesh: string | null;
}
export interface DrainOptions {
@@ -80,7 +86,8 @@ export function startDrainWorker(opts: DrainOptions): DrainHandle {
async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"]>): Promise<void> {
const now = Date.now();
const rows = opts.db.prepare(`
SELECT id, client_message_id, request_fingerprint, payload, attempts
SELECT id, client_message_id, request_fingerprint, payload, attempts,
target_spec, nonce, ciphertext, priority, mesh
FROM outbox
WHERE status = 'pending' AND next_attempt_at <= ?
ORDER BY enqueued_at
@@ -93,21 +100,30 @@ async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"
if (markInflight(opts.db, row.id, now) === 0) continue; // raced with another drainer
const fpHex = bufferToHex(row.request_fingerprint);
// For v0.9.0-against-legacy-broker the daemon doesn't yet route by
// destination_kind/ref — we send the raw payload as a *self*-target so
// the broker accepts it for round-tripping. Sprint 4 reads the actual
// destination from the outbox row and encrypts/routes properly. The
// important thing here is that the row transitions correctly.
const sessionKeys = opts.broker.getSessionKeys();
const targetSpec = "*"; // broadcast — leaves shape valid pre-routing
const nonce = await randomNonce();
const ciphertext = Buffer.from(row.payload).toString("base64");
// Sprint 4: use the row's resolved target/ciphertext if present.
// Legacy v0.9.0 rows (NULL on these columns) fall back to the
// broadcast smoke-test shape so existing in-flight rows still drain.
let targetSpec: string;
let nonce: string;
let ciphertext: string;
let priority: "now" | "next" | "low";
if (row.target_spec && row.nonce && row.ciphertext) {
targetSpec = row.target_spec;
nonce = row.nonce;
ciphertext = row.ciphertext;
priority = (row.priority === "now" || row.priority === "low") ? row.priority : "next";
} else {
targetSpec = "*";
nonce = await randomNonce();
ciphertext = Buffer.from(row.payload).toString("base64");
priority = "next";
}
let res;
try {
res = await opts.broker.send({
targetSpec,
priority: "next",
priority,
nonce,
ciphertext,
client_message_id: row.client_message_id,
@@ -118,7 +134,6 @@ async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"
backoffPending(opts.db, row.id, row.attempts + 1, "exception", String(e));
continue;
}
void sessionKeys; // silence unused for now
if (res.ok) {
markDone(opts.db, row.id, res.messageId, Date.now());

View File

@@ -31,6 +31,14 @@ export interface SendRequest {
/** Destination kind + ref must be supplied by the IPC layer after parsing `to`. */
destination_kind: DestKind;
destination_ref: string;
/** Sprint 4: pre-resolved broker-format target (pubkey hex, "#topicId", @group, *). */
target_spec?: string;
/** Sprint 4: pre-encrypted ciphertext (base64). For DMs: crypto_box. For broadcast/topic: base64-of-plaintext. */
ciphertext?: string;
/** Sprint 4: nonce that pairs with ciphertext (base64). */
nonce?: string;
/** Sprint 4: which mesh this send is for (single-mesh daemon today; multi-mesh later). */
mesh?: string;
}
export type AcceptOutcome =
@@ -80,6 +88,11 @@ export function acceptSend(req: SendRequest, deps: AcceptDeps): AcceptOutcome {
request_fingerprint: fingerprint,
payload: body,
now,
mesh: req.mesh,
target_spec: req.target_spec,
nonce: req.nonce,
ciphertext: req.ciphertext,
priority: req.priority,
});
return { kind: "accepted_pending", status: 202, client_message_id: clientId };
}

View File

@@ -42,6 +42,10 @@ export interface IpcServerOptions {
broker?: DaemonBrokerClient;
/** Notify when a new outbox row was inserted (drains can wake). */
onPendingInserted?: () => void;
/** Mesh secret key (hex) used to encrypt outbound DMs at accept time. */
meshSecretKey?: string;
/** Mesh slug attached to this daemon — stamped on outbox rows for the drain. */
meshSlug?: string;
}
export interface IpcServerHandle {
@@ -64,6 +68,8 @@ export function startIpcServer(opts: IpcServerOptions): IpcServerHandle {
bus: opts.bus,
broker: opts.broker,
onPendingInserted: opts.onPendingInserted,
meshSecretKey: opts.meshSecretKey,
meshSlug: opts.meshSlug,
});
// --- UDS listener -------------------------------------------------------
@@ -123,6 +129,8 @@ function makeHandler(opts: {
bus?: EventBus;
broker?: DaemonBrokerClient;
onPendingInserted?: () => void;
meshSecretKey?: string;
meshSlug?: string;
}) {
const tokenBytes = Buffer.from(opts.localToken, "utf8");
@@ -363,6 +371,21 @@ function makeHandler(opts: {
respond(res, 400, { error: parsed.error });
return;
}
// Sprint 4: resolve `to` → broker-format target_spec and encrypt at
// accept time, then store ciphertext+nonce on the outbox row. This
// crystallises routing so the drain worker is just a forwarder.
if (opts.broker && opts.meshSecretKey) {
try {
const routed = await resolveAndEncrypt(parsed.req, opts.broker, opts.meshSecretKey, opts.meshSlug ?? null);
parsed.req.target_spec = routed.target_spec;
parsed.req.ciphertext = routed.ciphertext;
parsed.req.nonce = routed.nonce;
parsed.req.mesh = routed.mesh;
} catch (e) {
respond(res, 502, { error: "route_failed", detail: String(e) });
return;
}
}
const outcome = acceptSend(parsed.req, { db: opts.outboxDb });
switch (outcome.kind) {
case "accepted_pending":
@@ -481,6 +504,79 @@ function parseSendRequest(body: unknown, idempotencyHeader: string | string[] |
};
}
/**
* Sprint 4: resolve a user-friendly `to` (peer name, pubkey hex, @group, *,
* topic name, "#topicId") into a broker-format target_spec, and encrypt
* the plaintext payload appropriately for the destination kind.
*
* - DM by 64-char hex pubkey: target_spec = pubkey hex, ciphertext via
* crypto_box (recipient pubkey + sender session secret).
* - DM by display name: resolve via broker.listPeers, then same as above.
* - Group / broadcast / topic: target_spec = `@<group>` / `*` / `#<topicId>`,
* ciphertext = base64(plaintext) [matches the cold path's pre-encryption
* convention until topic crypto lands].
*/
async function resolveAndEncrypt(
req: SendRequest,
broker: DaemonBrokerClient,
meshSecretKey: string,
meshSlug: string | null,
): Promise<{ target_spec: string; ciphertext: string; nonce: string; mesh: string }> {
const { encryptDirect } = await import("~/services/crypto/box.js");
const { randomBytes } = await import("node:crypto");
const to = req.to.trim();
// Topic by id ("#<topicId>") — hex-like 20+ chars.
if (to.startsWith("#") && /^#[0-9a-z_-]{20,}$/i.test(to)) {
const ciphertext = Buffer.from(req.message, "utf8").toString("base64");
const nonce = randomBytes(24).toString("base64");
return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" };
}
// Group, broadcast — pass through. (Topic-by-name resolution happens
// when the daemon hooks topic_list later; not required for v1.25.0.)
if (to.startsWith("@") || to === "*") {
const ciphertext = Buffer.from(req.message, "utf8").toString("base64");
const nonce = randomBytes(24).toString("base64");
return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" };
}
// 64-char hex pubkey → DM directly.
if (/^[0-9a-f]{64}$/i.test(to)) {
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, to, senderSecret);
return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
// Hex prefix (16+ chars but <64) → resolve via peer list prefix match.
// Matches the ergonomics of `claudemesh peer list` which shows 16-char
// prefixes, so users naturally paste prefixes back.
const peers = await broker.listPeers().catch(() => []);
if (/^[0-9a-f]{16,63}$/i.test(to)) {
const matches = peers.filter((p) =>
p.pubkey.toLowerCase().startsWith(to.toLowerCase()) ||
(p.memberPubkey ?? "").toLowerCase().startsWith(to.toLowerCase()),
);
if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`);
if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`);
const recipient = matches[0]!.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
// Otherwise — display name.
const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase());
if (!match) throw new Error(`peer "${to}" not found`);
const recipient = match.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
function respond(res: ServerResponse, status: number, body: unknown) {
const json = JSON.stringify(body);
res.statusCode = status;

View File

@@ -159,6 +159,10 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
bus,
broker,
onPendingInserted: () => drain?.wake(),
// Sprint 4: IPC accept-send needs these to resolve targets and
// encrypt at accept time so the drain worker is just a forwarder.
meshSecretKey: mesh.secretKey,
meshSlug: mesh.slug,
});
try {