feat(broker): m1 — two-phase claim/deliver + client_ack + role-tagged presence

Three correctness fixes on top of the m1 schema migration:

1) Fix the drainForMember claim-then-push race
   ----------------------------------------------------------------
   Previously the claim CTE set delivered_at = NOW() *before* the WS
   send. If readyState !== OPEN at push time, the row was marked
   delivered and the message dropped silently — at-most-once with no
   retry hook.

   The new flow:
     - claim sets (claimed_at, claim_id, claim_expires_at = NOW()+30s)
     - delivered_at stays NULL until the recipient acks
     - re-eligibility predicate now also accepts rows whose lease
       expired, so dropped pushes redeliver (at-least-once)

   Adds two helpers:
     - markDelivered() — scoped to (mesh_id, recipient pubkey) so a
       peer can only ack its own messages
     - sweepExpiredClaims() — clears expired (claimed_at, claim_id,
       claim_expires_at) every 15s, wired into startSweepers

2) Accept `client_ack` from recipients
   ----------------------------------------------------------------
   New WS message type handled in the dispatcher right after `send`.
   Lookups by clientMessageId or brokerMessageId; either is fine. Until
   the daemon (apps/cli, separate worktree) starts emitting acks, leases
   will simply expire and re-deliver — which is the desired retry
   behaviour.

3) Tag presence rows with `role`
   ----------------------------------------------------------------
   handleHello (member-keyed, used by the long-lived daemon WS) →
     role: 'control-plane'
   handleSessionHello (per-Claude-Code session WS) →
     role: 'session'

   listPeersInMesh exposes the new field; the peers_list response
   surfaces it. WSPeersListMessage type adds an optional `role` plus the
   long-undocumented `memberPubkey`. CLI-side filter swap from peerType
   to role lands in a follow-up worktree — that's why the CLI is
   untouched here per the M1 spec.

Typechecks clean (apps/broker tsc --noEmit, packages/db tsc --noEmit).
Test suite needs a real DB so wasn't run in this worktree; existing
dup-delivery and broker tests use drainForMember positionally and the
new claimerPresenceId arg is optional, so they should continue to pass.
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 18:10:25 +01:00
parent 5a8db796a0
commit b57e47ed65
3 changed files with 210 additions and 1 deletions

View File

@@ -369,8 +369,19 @@ export interface ConnectParams {
pid: number; pid: number;
cwd: string; cwd: string;
groups?: Array<{ name: string; role?: string }>; groups?: Array<{ name: string; role?: string }>;
/**
* v2 agentic-comms (M1) — connection role.
* 'control-plane' — daemon WS (hidden from user-facing peer lists).
* 'session' — per-Claude-Code-session WS (default).
* 'service' — autonomous bots/services attached to the mesh.
* Optional for backwards compatibility; defaults to 'session'.
*/
role?: PresenceRole;
} }
/** v2 agentic-comms (M1): typed connection roles. */
export type PresenceRole = "control-plane" | "session" | "service";
/** Create a presence row for a new WS connection. */ /** Create a presence row for a new WS connection. */
export async function connectPresence( export async function connectPresence(
params: ConnectParams, params: ConnectParams,
@@ -389,6 +400,7 @@ export async function connectPresence(
statusSource: "jsonl", statusSource: "jsonl",
statusUpdatedAt: now, statusUpdatedAt: now,
groups: params.groups ?? [], groups: params.groups ?? [],
role: params.role ?? "session",
connectedAt: now, connectedAt: now,
lastPingAt: now, lastPingAt: now,
}) })
@@ -431,6 +443,9 @@ export async function listPeersInMesh(
sessionId: string; sessionId: string;
cwd: string; cwd: string;
connectedAt: Date; connectedAt: Date;
/** v2 agentic-comms (M1): connection role. CLI uses this to hide
* control-plane daemons from user-facing lists. */
role: PresenceRole;
}> }>
> { > {
const rows = await db const rows = await db
@@ -445,6 +460,7 @@ export async function listPeersInMesh(
sessionId: presence.sessionId, sessionId: presence.sessionId,
cwd: presence.cwd, cwd: presence.cwd,
connectedAt: presence.connectedAt, connectedAt: presence.connectedAt,
role: presence.role,
}) })
.from(presence) .from(presence)
.innerJoin(memberTable, eq(presence.memberId, memberTable.id)) .innerJoin(memberTable, eq(presence.memberId, memberTable.id))
@@ -469,6 +485,7 @@ export async function listPeersInMesh(
sessionId: r.sessionId, sessionId: r.sessionId,
cwd: r.cwd, cwd: r.cwd,
connectedAt: r.connectedAt, connectedAt: r.connectedAt,
role: (r.role ?? "session") as PresenceRole,
})); }));
} }
@@ -2311,6 +2328,22 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
* targetSpec routing: matches either the member's pubkey directly or * targetSpec routing: matches either the member's pubkey directly or
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh * the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
* config that lives outside this function. * config that lives outside this function.
*
* v2 agentic-comms (M1): two-phase claim/deliver with a 30s lease.
*
* The legacy implementation set `delivered_at = NOW()` in the same
* UPDATE that selected the row. If the recipient WS was no longer
* OPEN at push time, the message dropped silently (the row read as
* "delivered" so the next reconnect's drain skipped it).
*
* The new behaviour:
* - claim sets (claimed_at, claim_id, claim_expires_at = NOW() + 30s)
* - delivered_at stays NULL until the recipient acks via `client_ack`
* - re-eligibility predicate accepts rows whose claim has expired,
* so dropped pushes are redelivered (at-least-once)
*
* `claimerPresenceId` is recorded on the row purely for debugging — it
* never gates re-claim; expiry alone does.
*/ */
export async function drainForMember( export async function drainForMember(
meshId: string, meshId: string,
@@ -2320,6 +2353,7 @@ export async function drainForMember(
sessionPubkey?: string, sessionPubkey?: string,
excludeSenderSessionPubkey?: string, excludeSenderSessionPubkey?: string,
memberGroups?: string[], memberGroups?: string[],
claimerPresenceId?: string,
): Promise< ): Promise<
Array<{ Array<{
id: string; id: string;
@@ -2385,6 +2419,11 @@ export async function drainForMember(
// (with id as tiebreaker so equal-timestamp rows stay deterministic). // (with id as tiebreaker so equal-timestamp rows stay deterministic).
// Sorting in SQL avoids JS Date's millisecond-precision collapse of // Sorting in SQL avoids JS Date's millisecond-precision collapse of
// Postgres microsecond timestamps. // Postgres microsecond timestamps.
//
// v2 (M1): claim sets the lease columns, NOT delivered_at. Re-eligibility
// accepts unclaimed rows AND rows with an expired claim (NULL or past
// NOW()). delivered_at stays NULL until a `client_ack` lands.
const claimerId = claimerPresenceId ?? null;
const result = await db.execute<{ const result = await db.execute<{
id: string; id: string;
priority: string; priority: string;
@@ -2398,12 +2437,15 @@ export async function drainForMember(
}>(sql` }>(sql`
WITH claimed AS ( WITH claimed AS (
UPDATE mesh.message_queue AS mq UPDATE mesh.message_queue AS mq
SET delivered_at = NOW() SET claimed_at = NOW(),
claim_id = ${claimerId},
claim_expires_at = NOW() + INTERVAL '30 seconds'
FROM mesh.member AS m FROM mesh.member AS m
WHERE mq.id IN ( WHERE mq.id IN (
SELECT id FROM mesh.message_queue SELECT id FROM mesh.message_queue
WHERE mesh_id = ${meshId} WHERE mesh_id = ${meshId}
AND delivered_at IS NULL AND delivered_at IS NULL
AND (claimed_at IS NULL OR claim_expires_at IS NULL OR claim_expires_at < NOW())
AND priority::text IN (${priorityList}) AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``}) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``})
${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``} ${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
@@ -2445,11 +2487,93 @@ export async function drainForMember(
})); }));
} }
/**
* v2 agentic-comms (M1): mark a message_queue row as delivered.
*
* Called when the recipient WS replies with a `client_ack` carrying the
* original `client_message_id`. Lookup is scoped to (mesh_id, member_id)
* so a malicious peer can't ack messages addressed to others. Returns
* the number of rows marked (0 = unknown id, already delivered, or wrong
* recipient).
*/
export async function markDelivered(params: {
meshId: string;
/** memberId of the WS that's claiming to have received this message. */
recipientMemberId: string;
recipientMemberPubkey: string;
recipientSessionPubkey?: string | null;
clientMessageId?: string | null;
brokerMessageId?: string | null;
}): Promise<number> {
const {
meshId,
recipientMemberPubkey,
recipientSessionPubkey,
clientMessageId,
brokerMessageId,
} = params;
if (!clientMessageId && !brokerMessageId) return 0;
// Prefer broker id when available; falls back to clientMessageId.
// Scope to (mesh_id, target_spec ∈ {member-pubkey, session-pubkey, '*', @group, #topic}).
// For minimal blast radius we only allow direct/broadcast acks here —
// group/topic acks would need the same membership expansion drainForMember
// does and we'd rather under-ack than over-ack (re-claim is cheap).
const result = await db.execute<{ id: string }>(sql`
UPDATE mesh.message_queue
SET delivered_at = NOW()
WHERE mesh_id = ${meshId}
AND delivered_at IS NULL
AND (
${brokerMessageId ? sql`id = ${brokerMessageId}` : sql`FALSE`}
OR ${clientMessageId ? sql`client_message_id = ${clientMessageId}` : sql`FALSE`}
)
AND (
target_spec = ${recipientMemberPubkey}
${recipientSessionPubkey ? sql`OR target_spec = ${recipientSessionPubkey}` : sql``}
OR target_spec = '*'
OR target_spec LIKE '@%'
OR target_spec LIKE '#%'
)
RETURNING id
`);
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
return rows.length;
}
/**
* v2 agentic-comms (M1): reap expired claims so dropped pushes redeliver.
*
* Runs every 15s. Clears (claimed_at, claim_id, claim_expires_at) on rows
* where the lease has expired and no `client_ack` arrived. The next
* `drainForMember` call will pick the row up again — at-least-once.
*
* Returns the number of rows reaped.
*/
export async function sweepExpiredClaims(): Promise<number> {
const result = await db.execute<{ id: string }>(sql`
UPDATE mesh.message_queue
SET claimed_at = NULL,
claim_id = NULL,
claim_expires_at = NULL
WHERE delivered_at IS NULL
AND claim_expires_at IS NOT NULL
AND claim_expires_at < NOW()
RETURNING id
`);
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
return rows.length;
}
// --- Lifecycle --- // --- Lifecycle ---
let ttlTimer: ReturnType<typeof setInterval> | null = null; let ttlTimer: ReturnType<typeof setInterval> | null = null;
let pendingTimer: ReturnType<typeof setInterval> | null = null; let pendingTimer: ReturnType<typeof setInterval> | null = null;
let staleTimer: ReturnType<typeof setInterval> | null = null; let staleTimer: ReturnType<typeof setInterval> | null = null;
let claimSweepTimer: ReturnType<typeof setInterval> | null = null;
/** v2 agentic-comms (M1): how often we reap expired message claims. */
const CLAIM_SWEEP_INTERVAL_MS = 15_000;
/** Start background sweepers. Idempotent. */ /** Start background sweepers. Idempotent. */
export function startSweepers(): void { export function startSweepers(): void {
@@ -2467,6 +2591,13 @@ export function startSweepers(): void {
console.error("[broker] stale presence sweep:", e), console.error("[broker] stale presence sweep:", e),
); );
}, 30_000); }, 30_000);
claimSweepTimer = setInterval(() => {
sweepExpiredClaims()
.then((n) => {
if (n > 0) console.log(`[broker] expired claims swept: ${n}`);
})
.catch((e) => console.error("[broker] claim sweep:", e));
}, CLAIM_SWEEP_INTERVAL_MS);
// Orphan-message sweep every hour; cheap, rows are all >7d at deletion time. // Orphan-message sweep every hour; cheap, rows are all >7d at deletion time.
setInterval(() => { setInterval(() => {
sweepOrphanMessages() sweepOrphanMessages()
@@ -2480,9 +2611,11 @@ export async function stopSweepers(): Promise<void> {
if (ttlTimer) clearInterval(ttlTimer); if (ttlTimer) clearInterval(ttlTimer);
if (pendingTimer) clearInterval(pendingTimer); if (pendingTimer) clearInterval(pendingTimer);
if (staleTimer) clearInterval(staleTimer); if (staleTimer) clearInterval(staleTimer);
if (claimSweepTimer) clearInterval(claimSweepTimer);
ttlTimer = null; ttlTimer = null;
pendingTimer = null; pendingTimer = null;
staleTimer = null; staleTimer = null;
claimSweepTimer = null;
await db await db
.update(presence) .update(presence)
.set({ disconnectedAt: new Date() }) .set({ disconnectedAt: new Date() })

View File

@@ -49,6 +49,7 @@ import {
listFiles, listFiles,
listPeersInMesh, listPeersInMesh,
listState, listState,
markDelivered,
listTasks, listTasks,
queueMessage, queueMessage,
recallMemory, recallMemory,
@@ -546,6 +547,7 @@ async function maybePushQueuedMessages(
conn.sessionPubkey ?? undefined, conn.sessionPubkey ?? undefined,
excludeSenderSessionPubkey, excludeSenderSessionPubkey,
conn.groups.map((g) => g.name), conn.groups.map((g) => g.name),
presenceId,
); );
log.info("maybePush", { log.info("maybePush", {
presence_id: presenceId, presence_id: presenceId,
@@ -1772,6 +1774,11 @@ async function handleHello(
pid: hello.pid, pid: hello.pid,
cwd: hello.cwd, cwd: hello.cwd,
groups: initialGroups, groups: initialGroups,
// v2 agentic-comms (M1): the regular member-keyed `hello` path is
// used by long-lived control-plane connections (claudemesh daemon,
// dashboard, automation). Per-Claude-Code sessions go through
// `session_hello` and get role='session'.
role: "control-plane",
}); });
const effectiveDisplayName = hello.displayName || member.displayName; const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, { connections.set(presenceId, {
@@ -1796,6 +1803,7 @@ async function handleHello(
pubkey: hello.pubkey, pubkey: hello.pubkey,
groups: initialGroups, groups: initialGroups,
restored: !!saved, restored: !!saved,
role: "control-plane",
}); });
log.info("ws hello", { log.info("ws hello", {
mesh_id: hello.meshId, mesh_id: hello.meshId,
@@ -1993,6 +2001,9 @@ async function handleSessionHello(
pid: hello.pid, pid: hello.pid,
cwd: hello.cwd, cwd: hello.cwd,
groups: initialGroups, groups: initialGroups,
// v2 agentic-comms (M1): per-Claude-Code session WS — these are the
// user-facing peers shown in `claudemesh peer list`.
role: "session",
}); });
const effectiveDisplayName = hello.displayName || member.displayName; const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, { connections.set(presenceId, {
@@ -2018,6 +2029,7 @@ async function handleSessionHello(
session_pubkey: hello.sessionPubkey, session_pubkey: hello.sessionPubkey,
groups: initialGroups, groups: initialGroups,
via: "session_hello", via: "session_hello",
role: "session",
}); });
log.info("ws session_hello", { log.info("ws session_hello", {
mesh_id: hello.meshId, mesh_id: hello.meshId,
@@ -2567,6 +2579,39 @@ function handleConnection(ws: WebSocket): void {
case "send": case "send":
await handleSend(conn, msg); await handleSend(conn, msg);
break; break;
case "client_ack": {
// v2 agentic-comms (M1): close out a previously pushed message.
// Lookup is scoped to (mesh_id, recipient pubkey) so a peer can
// only ack messages addressed to itself.
const ack = msg as Extract<WSClientMessage, { type: "client_ack" }>;
if (!ack.clientMessageId && !ack.brokerMessageId) {
// Nothing to do; don't error — the daemon may speculatively
// ack and we'd rather be lenient than break a CLI release.
break;
}
try {
const n = await markDelivered({
meshId: conn.meshId,
recipientMemberId: conn.memberId,
recipientMemberPubkey: conn.memberPubkey,
recipientSessionPubkey: conn.sessionPubkey ?? null,
clientMessageId: ack.clientMessageId ?? null,
brokerMessageId: ack.brokerMessageId ?? null,
});
log.debug("ws client_ack", {
presence_id: presenceId,
client_message_id: ack.clientMessageId,
broker_message_id: ack.brokerMessageId,
marked: n,
});
} catch (e) {
log.warn("ws client_ack failed", {
presence_id: presenceId,
error: e instanceof Error ? e.message : String(e),
});
}
break;
}
case "set_status": case "set_status":
await writeStatus(presenceId, msg.status, "manual", new Date()); await writeStatus(presenceId, msg.status, "manual", new Date());
log.info("ws set_status", { log.info("ws set_status", {
@@ -2604,6 +2649,10 @@ function handleConnection(ws: WebSocket): void {
sessionId: p.sessionId, sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(), connectedAt: p.connectedAt.toISOString(),
cwd: pc?.cwd ?? p.cwd, cwd: pc?.cwd ?? p.cwd,
// v2 agentic-comms (M1): typed connection role. CLI uses
// this to hide control-plane daemons from user-facing
// peer lists (filter swap from peerType happens CLI-side).
role: p.role,
...(pc?.hostname ? { hostname: pc.hostname } : {}), ...(pc?.hostname ? { hostname: pc.hostname } : {}),
...(pc?.peerType ? { peerType: pc.peerType } : {}), ...(pc?.peerType ? { peerType: pc.peerType } : {}),
...(pc?.channel ? { channel: pc.channel } : {}), ...(pc?.channel ? { channel: pc.channel } : {}),

View File

@@ -224,6 +224,26 @@ export interface WSSetStatusMessage {
status: PeerStatus; status: PeerStatus;
} }
/**
* Client → broker: confirm receipt of a previously pushed envelope so the
* broker can mark the message_queue row delivered.
*
* v2 agentic-comms (M1): pairs with the two-phase claim/lease introduced
* in `drainForMember`. Without this ack, the lease expires after 30s and
* the message is re-claimed and re-pushed (at-least-once retry).
*
* Either id is accepted; daemons that track inbox dedupe by clientMessageId
* should send that one. brokerMessageId is the row primary key, useful when
* the original send didn't carry a client_message_id (legacy traffic).
*/
export interface WSClientAckMessage {
type: "client_ack";
/** Original caller-supplied idempotency id from the `send` envelope. */
clientMessageId?: string;
/** Broker-side row id (the `messageId` field on the inbound `push`). */
brokerMessageId?: string;
}
/** Client → broker: request list of connected peers in the same mesh. */ /** Client → broker: request list of connected peers in the same mesh. */
export interface WSListPeersMessage { export interface WSListPeersMessage {
type: "list_peers"; type: "list_peers";
@@ -518,6 +538,8 @@ export interface WSPeersListMessage {
type: "peers_list"; type: "peers_list";
peers: Array<{ peers: Array<{
pubkey: string; pubkey: string;
/** Stable member pubkey — present on M1+ broker responses. */
memberPubkey?: string;
displayName: string; displayName: string;
status: PeerStatus; status: PeerStatus;
summary: string | null; summary: string | null;
@@ -525,6 +547,10 @@ export interface WSPeersListMessage {
sessionId: string; sessionId: string;
connectedAt: string; connectedAt: string;
cwd?: string; cwd?: string;
/** v2 agentic-comms (M1): typed connection role. CLI uses this to
* filter control-plane daemons out of user-facing peer lists.
* Optional for clients talking to a pre-M1 broker. */
role?: "control-plane" | "session" | "service";
hostname?: string; hostname?: string;
peerType?: "ai" | "human" | "connector"; peerType?: "ai" | "human" | "connector";
channel?: string; channel?: string;
@@ -1417,6 +1443,7 @@ export type WSClientMessage =
| WSHelloMessage | WSHelloMessage
| WSSessionHelloMessage | WSSessionHelloMessage
| WSSendMessage | WSSendMessage
| WSClientAckMessage
| WSSetStatusMessage | WSSetStatusMessage
| WSListPeersMessage | WSListPeersMessage
| WSSetSummaryMessage | WSSetSummaryMessage