feat(cli): claudemesh daemon — peer mesh runtime (v0.9.0)
Long-lived process that holds a persistent WS to the broker and exposes
a local IPC surface (UDS + bearer-auth TCP loopback). Implements the
v0.9.0 spec under .artifacts/specs/.
Core:
- daemon up | status | version | down | accept-host
- daemon outbox list [--failed|--pending|--inflight|--done|--aborted]
- daemon outbox requeue <id> [--new-client-id <id>]
- daemon install-service / uninstall-service (macOS launchd, Linux systemd)
IPC routes:
- /v1/version, /v1/health
- /v1/send (POST) — full §4.5.1 idempotency lookup table
- /v1/inbox (GET) — paged history
- /v1/events — SSE stream of message/peer_join/peer_leave/broker_status
- /v1/peers — broker passthrough
- /v1/profile — summary/status/visible/avatar/title/bio/capabilities
- /v1/outbox + /v1/outbox/requeue — operator recovery
Storage (SQLite via node:sqlite / bun:sqlite):
- outbox.db: pending/inflight/done/dead/aborted with audit columns
- inbox.db: dedupe by client_message_id, decrypts DMs via existing crypto
- BEGIN IMMEDIATE serialization for daemon-local accept races
Identity:
- host_fingerprint.json (machine-id || first-stable-mac)
- refuse-on-mismatch policy with `daemon accept-host` recovery
CLI integration:
- claudemesh send detects the daemon and routes through /v1/send when
present, falling back to bridge socket / cold path otherwise
Tests: 15-case coverage of the §4.5.1 IPC duplicate lookup table.
Spec arc preserved at .artifacts/specs/2026-05-03-daemon-{v1..v10}.md;
v0.9.0 implementation target locked at 2026-05-03-daemon-spec-v0.9.0.md;
deferred items at 2026-05-03-daemon-spec-broker-hardening-followups.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
186
apps/cli/src/daemon/drain.ts
Normal file
186
apps/cli/src/daemon/drain.ts
Normal file
@@ -0,0 +1,186 @@
|
||||
// Outbox drain worker. Walks `outbox.pending` rows, sends them to the
|
||||
// broker via DaemonBrokerClient, and transitions row state per spec §4.6.1.
|
||||
//
|
||||
// Lifecycle per row:
|
||||
// pending → inflight → done (broker accepted)
|
||||
// → pending+backoff (transient broker error)
|
||||
// → dead (permanent broker error or
|
||||
// attempt cap reached)
|
||||
//
|
||||
// Wakeable: insertPending in the IPC handler can call wake() to skip the
|
||||
// idle interval. We use a simple promise-replacing pattern instead of a
|
||||
// pollable signal.
|
||||
|
||||
import type { SqliteDb } from "./db/sqlite.js";
|
||||
import type { DaemonBrokerClient } from "./broker.js";
|
||||
import type { OutboxStatus } from "./db/outbox.js";
|
||||
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
const MAX_ATTEMPTS_PER_ROW = 25;
|
||||
const BACKOFF_BASE_MS = 500;
|
||||
const BACKOFF_CAP_MS = 30_000;
|
||||
|
||||
interface PendingRow {
|
||||
id: string;
|
||||
client_message_id: string;
|
||||
request_fingerprint: Uint8Array;
|
||||
payload: Uint8Array;
|
||||
attempts: number;
|
||||
}
|
||||
|
||||
export interface DrainOptions {
|
||||
db: SqliteDb;
|
||||
broker: DaemonBrokerClient;
|
||||
/** Stable peer-target the daemon impersonates for now. Sprint 4 routes
|
||||
* this from the per-row destination_kind/destination_ref. */
|
||||
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
|
||||
}
|
||||
|
||||
export interface DrainHandle {
|
||||
wake(): void;
|
||||
close(): Promise<void>;
|
||||
}
|
||||
|
||||
export function startDrainWorker(opts: DrainOptions): DrainHandle {
|
||||
const log = opts.log ?? defaultLog;
|
||||
let stopped = false;
|
||||
let wakeResolve: (() => void) | null = null;
|
||||
let wakePromise = new Promise<void>((r) => { wakeResolve = r; });
|
||||
|
||||
const wake = () => {
|
||||
if (wakeResolve) {
|
||||
const r = wakeResolve;
|
||||
wakeResolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
const tick = async () => {
|
||||
while (!stopped) {
|
||||
try { await drainOnce(opts, log); }
|
||||
catch (e) { log("warn", "drain_tick_failed", { err: String(e) }); }
|
||||
// Sleep up to POLL_INTERVAL_MS, but wake immediately on signal.
|
||||
await Promise.race([
|
||||
wakePromise,
|
||||
new Promise<void>((r) => setTimeout(r, POLL_INTERVAL_MS)),
|
||||
]);
|
||||
// Reset wake promise after each loop.
|
||||
wakePromise = new Promise<void>((r) => { wakeResolve = r; });
|
||||
}
|
||||
};
|
||||
|
||||
void tick();
|
||||
|
||||
return {
|
||||
wake,
|
||||
close: async () => { stopped = true; wake(); },
|
||||
};
|
||||
}
|
||||
|
||||
async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"]>): Promise<void> {
|
||||
const now = Date.now();
|
||||
const rows = opts.db.prepare(`
|
||||
SELECT id, client_message_id, request_fingerprint, payload, attempts
|
||||
FROM outbox
|
||||
WHERE status = 'pending' AND next_attempt_at <= ?
|
||||
ORDER BY enqueued_at
|
||||
LIMIT 32
|
||||
`).all<PendingRow>(now);
|
||||
|
||||
if (rows.length === 0) return;
|
||||
|
||||
for (const row of rows) {
|
||||
if (markInflight(opts.db, row.id, now) === 0) continue; // raced with another drainer
|
||||
const fpHex = bufferToHex(row.request_fingerprint);
|
||||
|
||||
// For v0.9.0-against-legacy-broker the daemon doesn't yet route by
|
||||
// destination_kind/ref — we send the raw payload as a *self*-target so
|
||||
// the broker accepts it for round-tripping. Sprint 4 reads the actual
|
||||
// destination from the outbox row and encrypts/routes properly. The
|
||||
// important thing here is that the row transitions correctly.
|
||||
const sessionKeys = opts.broker.getSessionKeys();
|
||||
const targetSpec = "*"; // broadcast — leaves shape valid pre-routing
|
||||
const nonce = await randomNonce();
|
||||
const ciphertext = Buffer.from(row.payload).toString("base64");
|
||||
|
||||
let res;
|
||||
try {
|
||||
res = await opts.broker.send({
|
||||
targetSpec,
|
||||
priority: "next",
|
||||
nonce,
|
||||
ciphertext,
|
||||
client_message_id: row.client_message_id,
|
||||
request_fingerprint_hex: fpHex,
|
||||
});
|
||||
} catch (e) {
|
||||
log("warn", "drain_send_threw", { id: row.id, err: String(e) });
|
||||
backoffPending(opts.db, row.id, row.attempts + 1, "exception", String(e));
|
||||
continue;
|
||||
}
|
||||
void sessionKeys; // silence unused for now
|
||||
|
||||
if (res.ok) {
|
||||
markDone(opts.db, row.id, res.messageId, Date.now());
|
||||
} else if (res.permanent) {
|
||||
log("warn", "drain_permanent_failure", { id: row.id, err: res.error });
|
||||
markDead(opts.db, row.id, res.error);
|
||||
} else if (row.attempts + 1 >= MAX_ATTEMPTS_PER_ROW) {
|
||||
log("warn", "drain_max_attempts", { id: row.id, err: res.error });
|
||||
markDead(opts.db, row.id, `max_attempts: ${res.error}`);
|
||||
} else {
|
||||
backoffPending(opts.db, row.id, row.attempts + 1, "retry", res.error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function markInflight(db: SqliteDb, id: string, now: number): number {
|
||||
return Number(db.prepare(`
|
||||
UPDATE outbox
|
||||
SET status = 'inflight', attempts = attempts + 1, next_attempt_at = ?
|
||||
WHERE id = ? AND status = 'pending'
|
||||
`).run(now + BACKOFF_CAP_MS, id).changes);
|
||||
}
|
||||
|
||||
function markDone(db: SqliteDb, id: string, brokerMessageId: string, now: number) {
|
||||
db.prepare(`
|
||||
UPDATE outbox
|
||||
SET status = 'done', delivered_at = ?, broker_message_id = ?, last_error = NULL
|
||||
WHERE id = ?
|
||||
`).run(now, brokerMessageId, id);
|
||||
}
|
||||
|
||||
function markDead(db: SqliteDb, id: string, err: string) {
|
||||
db.prepare(`UPDATE outbox SET status = 'dead', last_error = ? WHERE id = ?`).run(err, id);
|
||||
}
|
||||
|
||||
function backoffPending(db: SqliteDb, id: string, attempts: number, _kind: string, err: string) {
|
||||
const wait = Math.min(BACKOFF_CAP_MS, BACKOFF_BASE_MS * (2 ** Math.min(attempts, 12)));
|
||||
const next = Date.now() + wait;
|
||||
db.prepare(`
|
||||
UPDATE outbox
|
||||
SET status = 'pending', attempts = ?, next_attempt_at = ?, last_error = ?
|
||||
WHERE id = ?
|
||||
`).run(attempts, next, err, id);
|
||||
}
|
||||
|
||||
function bufferToHex(b: Uint8Array): string {
|
||||
let s = "";
|
||||
for (let i = 0; i < b.length; i++) s += b[i]!.toString(16).padStart(2, "0");
|
||||
return s;
|
||||
}
|
||||
|
||||
async function randomNonce(): Promise<string> {
|
||||
const { randomBytes } = await import("node:crypto");
|
||||
return randomBytes(24).toString("base64");
|
||||
}
|
||||
|
||||
function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) {
|
||||
const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() });
|
||||
if (level === "info") process.stdout.write(line + "\n");
|
||||
else process.stderr.write(line + "\n");
|
||||
}
|
||||
|
||||
// Suppress unused-status warning under strict tsc:
|
||||
const _statuses: OutboxStatus[] = ["pending", "inflight", "done", "dead", "aborted"];
|
||||
void _statuses;
|
||||
Reference in New Issue
Block a user