feat(cli): 1.34.7 → 1.34.13 — multi-session correctness train
Seven-ship sequence that took the daemon from "works for one session"
to "internally consistent for N sessions on one daemon." Architecture
invariant after 1.34.13: every shared store / channel scopes by
recipient (SSE demux at bind layer + token forwarding, inbox per-
recipient columns, outbox sender-session routing).
- 1.34.7 inbox flush + delete commands
- 1.34.8 seen_at column + TTL prune + first echo guard
- 1.34.9 broader echo guard + system-event polish + staleness warning
- 1.34.10 per-session SSE demux (SseFilterOptions) + universal daemon
(--mesh / --name deprecated) + daemon_started version stamp
- 1.34.11 inbox per-recipient column (storage half of 1.34.10)
- 1.34.12 daemon up detaches by default (logs to ~/.claudemesh/daemon/
daemon.log; service units explicitly pass --foreground)
- 1.34.13 MCP forwards session token on /v1/events — the actual fix
that activates 1.34.10's demux. Without this header the
daemon's session resolved null, filter was empty, every MCP
received the unfiltered global stream.
Roadmap entry at docs/roadmap.md captures the timeline + the four
known gaps tracked for follow-ups (launch env-var leak, broker
listPeers mesh-filter, kick on control-plane no-op, session caps as
first-class concept).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
|
||||
import type { SqliteDb } from "./db/sqlite.js";
|
||||
import type { DaemonBrokerClient } from "./broker.js";
|
||||
import type { SessionBrokerClient } from "./session-broker.js";
|
||||
import type { OutboxStatus } from "./db/outbox.js";
|
||||
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
@@ -32,6 +33,10 @@ interface PendingRow {
|
||||
ciphertext: string | null;
|
||||
priority: string | null;
|
||||
mesh: string | null;
|
||||
/** 1.34.0: hex pubkey of the originating session — drain prefers
|
||||
* routing via that session's WS so broker fan-out attributes the
|
||||
* push to the session pubkey. NULL on cold-path / pre-1.34.0 rows. */
|
||||
sender_session_pubkey: string | null;
|
||||
}
|
||||
|
||||
export interface DrainOptions {
|
||||
@@ -40,6 +45,20 @@ export interface DrainOptions {
|
||||
* broker keyed by its `mesh` column. Single-mesh daemons pass a
|
||||
* Map of size 1; multi-mesh daemons pass one entry per joined mesh. */
|
||||
brokers: Map<string, DaemonBrokerClient>;
|
||||
/**
|
||||
* 1.34.0: lookup for the per-session WS keyed by hex session pubkey.
|
||||
* When an outbox row has `sender_session_pubkey` set and this lookup
|
||||
* returns an open client, the drain routes via the session-WS so the
|
||||
* broker fan-out attributes the push to the session pubkey instead
|
||||
* of the daemon's stable member pubkey.
|
||||
*
|
||||
* Returning `undefined` (or an unopened client) signals "no session
|
||||
* WS available" — the drain backs off and retries; it does NOT fall
|
||||
* back to the daemon-WS, because the row was encrypted with the
|
||||
* session secret and would fail to decrypt on the recipient side
|
||||
* if attribution silently changed mid-flight.
|
||||
*/
|
||||
getSessionBrokerByPubkey?: (sessionPubkey: string) => SessionBrokerClient | undefined;
|
||||
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
|
||||
}
|
||||
|
||||
@@ -88,7 +107,8 @@ async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"
|
||||
const now = Date.now();
|
||||
const rows = opts.db.prepare(`
|
||||
SELECT id, client_message_id, request_fingerprint, payload, attempts,
|
||||
target_spec, nonce, ciphertext, priority, mesh
|
||||
target_spec, nonce, ciphertext, priority, mesh,
|
||||
sender_session_pubkey
|
||||
FROM outbox
|
||||
WHERE status = 'pending' AND next_attempt_at <= ?
|
||||
ORDER BY enqueued_at
|
||||
@@ -101,21 +121,34 @@ async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"
|
||||
if (markInflight(opts.db, row.id, now) === 0) continue; // raced with another drainer
|
||||
const fpHex = bufferToHex(row.request_fingerprint);
|
||||
|
||||
// v1.26.0: pick the broker keyed by the row's mesh. Legacy rows
|
||||
// (mesh=NULL) fall back to the only broker if there's exactly one;
|
||||
// otherwise mark dead because we don't know where to send them.
|
||||
let broker: DaemonBrokerClient | undefined;
|
||||
// v1.26.0: pick the daemon-WS broker keyed by the row's mesh.
|
||||
// Legacy rows (mesh=NULL) fall back to the only broker if there's
|
||||
// exactly one; otherwise mark dead because we don't know where to
|
||||
// send them.
|
||||
let daemonBroker: DaemonBrokerClient | undefined;
|
||||
if (row.mesh) {
|
||||
broker = opts.brokers.get(row.mesh);
|
||||
daemonBroker = opts.brokers.get(row.mesh);
|
||||
} else if (opts.brokers.size === 1) {
|
||||
broker = opts.brokers.values().next().value;
|
||||
daemonBroker = opts.brokers.values().next().value;
|
||||
}
|
||||
if (!broker) {
|
||||
if (!daemonBroker) {
|
||||
log("warn", "drain_no_broker_for_mesh", { id: row.id, mesh: row.mesh ?? "(null)" });
|
||||
markDead(opts.db, row.id, `no_broker_for_mesh:${row.mesh ?? "null"}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 1.34.0: when the row was written by an authenticated session,
|
||||
// dispatch via the matching SessionBrokerClient so broker fan-out
|
||||
// attributes the push to the session pubkey. Encryption is
|
||||
// session-secret based on those rows, so we MUST NOT silently fall
|
||||
// back to the daemon-WS — the recipient's decrypt would fail. If
|
||||
// the session-WS is closed (reconnecting / session terminated), we
|
||||
// back off and retry.
|
||||
let sessionBroker: SessionBrokerClient | undefined;
|
||||
if (row.sender_session_pubkey && opts.getSessionBrokerByPubkey) {
|
||||
sessionBroker = opts.getSessionBrokerByPubkey(row.sender_session_pubkey);
|
||||
}
|
||||
|
||||
// Sprint 4: use the row's resolved target/ciphertext if present.
|
||||
// Legacy v0.9.0 rows (NULL on these columns) fall back to the
|
||||
// broadcast smoke-test shape so existing in-flight rows still drain.
|
||||
@@ -135,16 +168,31 @@ async function drainOnce(opts: DrainOptions, log: NonNullable<DrainOptions["log"
|
||||
priority = "next";
|
||||
}
|
||||
|
||||
const sendArgs = {
|
||||
targetSpec,
|
||||
priority,
|
||||
nonce,
|
||||
ciphertext,
|
||||
client_message_id: row.client_message_id,
|
||||
request_fingerprint_hex: fpHex,
|
||||
};
|
||||
|
||||
let res;
|
||||
try {
|
||||
res = await broker.send({
|
||||
targetSpec,
|
||||
priority,
|
||||
nonce,
|
||||
ciphertext,
|
||||
client_message_id: row.client_message_id,
|
||||
request_fingerprint_hex: fpHex,
|
||||
});
|
||||
if (row.sender_session_pubkey) {
|
||||
// Session-attributed row. Require an open session-WS — see comment
|
||||
// above on why we don't fall back to the daemon-WS.
|
||||
if (!sessionBroker || !sessionBroker.isOpen()) {
|
||||
log("info", "drain_session_ws_not_ready", {
|
||||
id: row.id, session_pubkey: row.sender_session_pubkey.slice(0, 12),
|
||||
});
|
||||
backoffPending(opts.db, row.id, row.attempts + 1, "session_ws_not_open", "session_ws_not_open");
|
||||
continue;
|
||||
}
|
||||
res = await sessionBroker.send(sendArgs);
|
||||
} else {
|
||||
res = await daemonBroker.send(sendArgs);
|
||||
}
|
||||
} catch (e) {
|
||||
log("warn", "drain_send_threw", { id: row.id, err: String(e) });
|
||||
backoffPending(opts.db, row.id, row.attempts + 1, "exception", String(e));
|
||||
|
||||
Reference in New Issue
Block a user