Merge m1-broker-drain-race-and-presence-role into main
Milestone 1 broker side: - Schema: claimedAt + claimId + claimExpiresAt on message_queue, role on presence (default 'session') - Migration 0029_drain_lease_and_presence_role.sql - drainForMember rewritten for two-phase claim/deliver with 30s lease - New markDelivered() called on receipt of client_ack - New sweepExpiredClaims() running every 15s - handleHello sets role='control-plane', handleSessionHello sets 'session' - listPeersInMesh returns role - WSClientAckMessage type added; broker accepts and dispatches client_ack
This commit is contained in:
@@ -369,8 +369,19 @@ export interface ConnectParams {
|
|||||||
pid: number;
|
pid: number;
|
||||||
cwd: string;
|
cwd: string;
|
||||||
groups?: Array<{ name: string; role?: string }>;
|
groups?: Array<{ name: string; role?: string }>;
|
||||||
|
/**
|
||||||
|
* v2 agentic-comms (M1) — connection role.
|
||||||
|
* 'control-plane' — daemon WS (hidden from user-facing peer lists).
|
||||||
|
* 'session' — per-Claude-Code-session WS (default).
|
||||||
|
* 'service' — autonomous bots/services attached to the mesh.
|
||||||
|
* Optional for backwards compatibility; defaults to 'session'.
|
||||||
|
*/
|
||||||
|
role?: PresenceRole;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** v2 agentic-comms (M1): typed connection roles. */
|
||||||
|
export type PresenceRole = "control-plane" | "session" | "service";
|
||||||
|
|
||||||
/** Create a presence row for a new WS connection. */
|
/** Create a presence row for a new WS connection. */
|
||||||
export async function connectPresence(
|
export async function connectPresence(
|
||||||
params: ConnectParams,
|
params: ConnectParams,
|
||||||
@@ -389,6 +400,7 @@ export async function connectPresence(
|
|||||||
statusSource: "jsonl",
|
statusSource: "jsonl",
|
||||||
statusUpdatedAt: now,
|
statusUpdatedAt: now,
|
||||||
groups: params.groups ?? [],
|
groups: params.groups ?? [],
|
||||||
|
role: params.role ?? "session",
|
||||||
connectedAt: now,
|
connectedAt: now,
|
||||||
lastPingAt: now,
|
lastPingAt: now,
|
||||||
})
|
})
|
||||||
@@ -431,6 +443,9 @@ export async function listPeersInMesh(
|
|||||||
sessionId: string;
|
sessionId: string;
|
||||||
cwd: string;
|
cwd: string;
|
||||||
connectedAt: Date;
|
connectedAt: Date;
|
||||||
|
/** v2 agentic-comms (M1): connection role. CLI uses this to hide
|
||||||
|
* control-plane daemons from user-facing lists. */
|
||||||
|
role: PresenceRole;
|
||||||
}>
|
}>
|
||||||
> {
|
> {
|
||||||
const rows = await db
|
const rows = await db
|
||||||
@@ -445,6 +460,7 @@ export async function listPeersInMesh(
|
|||||||
sessionId: presence.sessionId,
|
sessionId: presence.sessionId,
|
||||||
cwd: presence.cwd,
|
cwd: presence.cwd,
|
||||||
connectedAt: presence.connectedAt,
|
connectedAt: presence.connectedAt,
|
||||||
|
role: presence.role,
|
||||||
})
|
})
|
||||||
.from(presence)
|
.from(presence)
|
||||||
.innerJoin(memberTable, eq(presence.memberId, memberTable.id))
|
.innerJoin(memberTable, eq(presence.memberId, memberTable.id))
|
||||||
@@ -469,6 +485,7 @@ export async function listPeersInMesh(
|
|||||||
sessionId: r.sessionId,
|
sessionId: r.sessionId,
|
||||||
cwd: r.cwd,
|
cwd: r.cwd,
|
||||||
connectedAt: r.connectedAt,
|
connectedAt: r.connectedAt,
|
||||||
|
role: (r.role ?? "session") as PresenceRole,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2311,6 +2328,22 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
|
|||||||
* targetSpec routing: matches either the member's pubkey directly or
|
* targetSpec routing: matches either the member's pubkey directly or
|
||||||
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
|
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
|
||||||
* config that lives outside this function.
|
* config that lives outside this function.
|
||||||
|
*
|
||||||
|
* v2 agentic-comms (M1): two-phase claim/deliver with a 30s lease.
|
||||||
|
*
|
||||||
|
* The legacy implementation set `delivered_at = NOW()` in the same
|
||||||
|
* UPDATE that selected the row. If the recipient WS was no longer
|
||||||
|
* OPEN at push time, the message dropped silently (the row read as
|
||||||
|
* "delivered" so the next reconnect's drain skipped it).
|
||||||
|
*
|
||||||
|
* The new behaviour:
|
||||||
|
* - claim sets (claimed_at, claim_id, claim_expires_at = NOW() + 30s)
|
||||||
|
* - delivered_at stays NULL until the recipient acks via `client_ack`
|
||||||
|
* - re-eligibility predicate accepts rows whose claim has expired,
|
||||||
|
* so dropped pushes are redelivered (at-least-once)
|
||||||
|
*
|
||||||
|
* `claimerPresenceId` is recorded on the row purely for debugging — it
|
||||||
|
* never gates re-claim; expiry alone does.
|
||||||
*/
|
*/
|
||||||
export async function drainForMember(
|
export async function drainForMember(
|
||||||
meshId: string,
|
meshId: string,
|
||||||
@@ -2320,6 +2353,7 @@ export async function drainForMember(
|
|||||||
sessionPubkey?: string,
|
sessionPubkey?: string,
|
||||||
excludeSenderSessionPubkey?: string,
|
excludeSenderSessionPubkey?: string,
|
||||||
memberGroups?: string[],
|
memberGroups?: string[],
|
||||||
|
claimerPresenceId?: string,
|
||||||
): Promise<
|
): Promise<
|
||||||
Array<{
|
Array<{
|
||||||
id: string;
|
id: string;
|
||||||
@@ -2385,6 +2419,11 @@ export async function drainForMember(
|
|||||||
// (with id as tiebreaker so equal-timestamp rows stay deterministic).
|
// (with id as tiebreaker so equal-timestamp rows stay deterministic).
|
||||||
// Sorting in SQL avoids JS Date's millisecond-precision collapse of
|
// Sorting in SQL avoids JS Date's millisecond-precision collapse of
|
||||||
// Postgres microsecond timestamps.
|
// Postgres microsecond timestamps.
|
||||||
|
//
|
||||||
|
// v2 (M1): claim sets the lease columns, NOT delivered_at. Re-eligibility
|
||||||
|
// accepts unclaimed rows AND rows with an expired claim (NULL or past
|
||||||
|
// NOW()). delivered_at stays NULL until a `client_ack` lands.
|
||||||
|
const claimerId = claimerPresenceId ?? null;
|
||||||
const result = await db.execute<{
|
const result = await db.execute<{
|
||||||
id: string;
|
id: string;
|
||||||
priority: string;
|
priority: string;
|
||||||
@@ -2398,12 +2437,15 @@ export async function drainForMember(
|
|||||||
}>(sql`
|
}>(sql`
|
||||||
WITH claimed AS (
|
WITH claimed AS (
|
||||||
UPDATE mesh.message_queue AS mq
|
UPDATE mesh.message_queue AS mq
|
||||||
SET delivered_at = NOW()
|
SET claimed_at = NOW(),
|
||||||
|
claim_id = ${claimerId},
|
||||||
|
claim_expires_at = NOW() + INTERVAL '30 seconds'
|
||||||
FROM mesh.member AS m
|
FROM mesh.member AS m
|
||||||
WHERE mq.id IN (
|
WHERE mq.id IN (
|
||||||
SELECT id FROM mesh.message_queue
|
SELECT id FROM mesh.message_queue
|
||||||
WHERE mesh_id = ${meshId}
|
WHERE mesh_id = ${meshId}
|
||||||
AND delivered_at IS NULL
|
AND delivered_at IS NULL
|
||||||
|
AND (claimed_at IS NULL OR claim_expires_at IS NULL OR claim_expires_at < NOW())
|
||||||
AND priority::text IN (${priorityList})
|
AND priority::text IN (${priorityList})
|
||||||
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``})
|
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``})
|
||||||
${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
|
${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
|
||||||
@@ -2445,11 +2487,93 @@ export async function drainForMember(
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* v2 agentic-comms (M1): mark a message_queue row as delivered.
|
||||||
|
*
|
||||||
|
* Called when the recipient WS replies with a `client_ack` carrying the
|
||||||
|
* original `client_message_id`. Lookup is scoped to (mesh_id, member_id)
|
||||||
|
* so a malicious peer can't ack messages addressed to others. Returns
|
||||||
|
* the number of rows marked (0 = unknown id, already delivered, or wrong
|
||||||
|
* recipient).
|
||||||
|
*/
|
||||||
|
export async function markDelivered(params: {
|
||||||
|
meshId: string;
|
||||||
|
/** memberId of the WS that's claiming to have received this message. */
|
||||||
|
recipientMemberId: string;
|
||||||
|
recipientMemberPubkey: string;
|
||||||
|
recipientSessionPubkey?: string | null;
|
||||||
|
clientMessageId?: string | null;
|
||||||
|
brokerMessageId?: string | null;
|
||||||
|
}): Promise<number> {
|
||||||
|
const {
|
||||||
|
meshId,
|
||||||
|
recipientMemberPubkey,
|
||||||
|
recipientSessionPubkey,
|
||||||
|
clientMessageId,
|
||||||
|
brokerMessageId,
|
||||||
|
} = params;
|
||||||
|
if (!clientMessageId && !brokerMessageId) return 0;
|
||||||
|
|
||||||
|
// Prefer broker id when available; falls back to clientMessageId.
|
||||||
|
// Scope to (mesh_id, target_spec ∈ {member-pubkey, session-pubkey, '*', @group, #topic}).
|
||||||
|
// For minimal blast radius we only allow direct/broadcast acks here —
|
||||||
|
// group/topic acks would need the same membership expansion drainForMember
|
||||||
|
// does and we'd rather under-ack than over-ack (re-claim is cheap).
|
||||||
|
const result = await db.execute<{ id: string }>(sql`
|
||||||
|
UPDATE mesh.message_queue
|
||||||
|
SET delivered_at = NOW()
|
||||||
|
WHERE mesh_id = ${meshId}
|
||||||
|
AND delivered_at IS NULL
|
||||||
|
AND (
|
||||||
|
${brokerMessageId ? sql`id = ${brokerMessageId}` : sql`FALSE`}
|
||||||
|
OR ${clientMessageId ? sql`client_message_id = ${clientMessageId}` : sql`FALSE`}
|
||||||
|
)
|
||||||
|
AND (
|
||||||
|
target_spec = ${recipientMemberPubkey}
|
||||||
|
${recipientSessionPubkey ? sql`OR target_spec = ${recipientSessionPubkey}` : sql``}
|
||||||
|
OR target_spec = '*'
|
||||||
|
OR target_spec LIKE '@%'
|
||||||
|
OR target_spec LIKE '#%'
|
||||||
|
)
|
||||||
|
RETURNING id
|
||||||
|
`);
|
||||||
|
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
|
||||||
|
return rows.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* v2 agentic-comms (M1): reap expired claims so dropped pushes redeliver.
|
||||||
|
*
|
||||||
|
* Runs every 15s. Clears (claimed_at, claim_id, claim_expires_at) on rows
|
||||||
|
* where the lease has expired and no `client_ack` arrived. The next
|
||||||
|
* `drainForMember` call will pick the row up again — at-least-once.
|
||||||
|
*
|
||||||
|
* Returns the number of rows reaped.
|
||||||
|
*/
|
||||||
|
export async function sweepExpiredClaims(): Promise<number> {
|
||||||
|
const result = await db.execute<{ id: string }>(sql`
|
||||||
|
UPDATE mesh.message_queue
|
||||||
|
SET claimed_at = NULL,
|
||||||
|
claim_id = NULL,
|
||||||
|
claim_expires_at = NULL
|
||||||
|
WHERE delivered_at IS NULL
|
||||||
|
AND claim_expires_at IS NOT NULL
|
||||||
|
AND claim_expires_at < NOW()
|
||||||
|
RETURNING id
|
||||||
|
`);
|
||||||
|
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
|
||||||
|
return rows.length;
|
||||||
|
}
|
||||||
|
|
||||||
// --- Lifecycle ---
|
// --- Lifecycle ---
|
||||||
|
|
||||||
let ttlTimer: ReturnType<typeof setInterval> | null = null;
|
let ttlTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
let pendingTimer: ReturnType<typeof setInterval> | null = null;
|
let pendingTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
let staleTimer: ReturnType<typeof setInterval> | null = null;
|
let staleTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let claimSweepTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
|
/** v2 agentic-comms (M1): how often we reap expired message claims. */
|
||||||
|
const CLAIM_SWEEP_INTERVAL_MS = 15_000;
|
||||||
|
|
||||||
/** Start background sweepers. Idempotent. */
|
/** Start background sweepers. Idempotent. */
|
||||||
export function startSweepers(): void {
|
export function startSweepers(): void {
|
||||||
@@ -2467,6 +2591,13 @@ export function startSweepers(): void {
|
|||||||
console.error("[broker] stale presence sweep:", e),
|
console.error("[broker] stale presence sweep:", e),
|
||||||
);
|
);
|
||||||
}, 30_000);
|
}, 30_000);
|
||||||
|
claimSweepTimer = setInterval(() => {
|
||||||
|
sweepExpiredClaims()
|
||||||
|
.then((n) => {
|
||||||
|
if (n > 0) console.log(`[broker] expired claims swept: ${n}`);
|
||||||
|
})
|
||||||
|
.catch((e) => console.error("[broker] claim sweep:", e));
|
||||||
|
}, CLAIM_SWEEP_INTERVAL_MS);
|
||||||
// Orphan-message sweep every hour; cheap, rows are all >7d at deletion time.
|
// Orphan-message sweep every hour; cheap, rows are all >7d at deletion time.
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
sweepOrphanMessages()
|
sweepOrphanMessages()
|
||||||
@@ -2480,9 +2611,11 @@ export async function stopSweepers(): Promise<void> {
|
|||||||
if (ttlTimer) clearInterval(ttlTimer);
|
if (ttlTimer) clearInterval(ttlTimer);
|
||||||
if (pendingTimer) clearInterval(pendingTimer);
|
if (pendingTimer) clearInterval(pendingTimer);
|
||||||
if (staleTimer) clearInterval(staleTimer);
|
if (staleTimer) clearInterval(staleTimer);
|
||||||
|
if (claimSweepTimer) clearInterval(claimSweepTimer);
|
||||||
ttlTimer = null;
|
ttlTimer = null;
|
||||||
pendingTimer = null;
|
pendingTimer = null;
|
||||||
staleTimer = null;
|
staleTimer = null;
|
||||||
|
claimSweepTimer = null;
|
||||||
await db
|
await db
|
||||||
.update(presence)
|
.update(presence)
|
||||||
.set({ disconnectedAt: new Date() })
|
.set({ disconnectedAt: new Date() })
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ import {
|
|||||||
listFiles,
|
listFiles,
|
||||||
listPeersInMesh,
|
listPeersInMesh,
|
||||||
listState,
|
listState,
|
||||||
|
markDelivered,
|
||||||
listTasks,
|
listTasks,
|
||||||
queueMessage,
|
queueMessage,
|
||||||
recallMemory,
|
recallMemory,
|
||||||
@@ -546,6 +547,7 @@ async function maybePushQueuedMessages(
|
|||||||
conn.sessionPubkey ?? undefined,
|
conn.sessionPubkey ?? undefined,
|
||||||
excludeSenderSessionPubkey,
|
excludeSenderSessionPubkey,
|
||||||
conn.groups.map((g) => g.name),
|
conn.groups.map((g) => g.name),
|
||||||
|
presenceId,
|
||||||
);
|
);
|
||||||
log.info("maybePush", {
|
log.info("maybePush", {
|
||||||
presence_id: presenceId,
|
presence_id: presenceId,
|
||||||
@@ -1772,6 +1774,11 @@ async function handleHello(
|
|||||||
pid: hello.pid,
|
pid: hello.pid,
|
||||||
cwd: hello.cwd,
|
cwd: hello.cwd,
|
||||||
groups: initialGroups,
|
groups: initialGroups,
|
||||||
|
// v2 agentic-comms (M1): the regular member-keyed `hello` path is
|
||||||
|
// used by long-lived control-plane connections (claudemesh daemon,
|
||||||
|
// dashboard, automation). Per-Claude-Code sessions go through
|
||||||
|
// `session_hello` and get role='session'.
|
||||||
|
role: "control-plane",
|
||||||
});
|
});
|
||||||
const effectiveDisplayName = hello.displayName || member.displayName;
|
const effectiveDisplayName = hello.displayName || member.displayName;
|
||||||
connections.set(presenceId, {
|
connections.set(presenceId, {
|
||||||
@@ -1796,6 +1803,7 @@ async function handleHello(
|
|||||||
pubkey: hello.pubkey,
|
pubkey: hello.pubkey,
|
||||||
groups: initialGroups,
|
groups: initialGroups,
|
||||||
restored: !!saved,
|
restored: !!saved,
|
||||||
|
role: "control-plane",
|
||||||
});
|
});
|
||||||
log.info("ws hello", {
|
log.info("ws hello", {
|
||||||
mesh_id: hello.meshId,
|
mesh_id: hello.meshId,
|
||||||
@@ -1993,6 +2001,9 @@ async function handleSessionHello(
|
|||||||
pid: hello.pid,
|
pid: hello.pid,
|
||||||
cwd: hello.cwd,
|
cwd: hello.cwd,
|
||||||
groups: initialGroups,
|
groups: initialGroups,
|
||||||
|
// v2 agentic-comms (M1): per-Claude-Code session WS — these are the
|
||||||
|
// user-facing peers shown in `claudemesh peer list`.
|
||||||
|
role: "session",
|
||||||
});
|
});
|
||||||
const effectiveDisplayName = hello.displayName || member.displayName;
|
const effectiveDisplayName = hello.displayName || member.displayName;
|
||||||
connections.set(presenceId, {
|
connections.set(presenceId, {
|
||||||
@@ -2018,6 +2029,7 @@ async function handleSessionHello(
|
|||||||
session_pubkey: hello.sessionPubkey,
|
session_pubkey: hello.sessionPubkey,
|
||||||
groups: initialGroups,
|
groups: initialGroups,
|
||||||
via: "session_hello",
|
via: "session_hello",
|
||||||
|
role: "session",
|
||||||
});
|
});
|
||||||
log.info("ws session_hello", {
|
log.info("ws session_hello", {
|
||||||
mesh_id: hello.meshId,
|
mesh_id: hello.meshId,
|
||||||
@@ -2567,6 +2579,39 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
case "send":
|
case "send":
|
||||||
await handleSend(conn, msg);
|
await handleSend(conn, msg);
|
||||||
break;
|
break;
|
||||||
|
case "client_ack": {
|
||||||
|
// v2 agentic-comms (M1): close out a previously pushed message.
|
||||||
|
// Lookup is scoped to (mesh_id, recipient pubkey) so a peer can
|
||||||
|
// only ack messages addressed to itself.
|
||||||
|
const ack = msg as Extract<WSClientMessage, { type: "client_ack" }>;
|
||||||
|
if (!ack.clientMessageId && !ack.brokerMessageId) {
|
||||||
|
// Nothing to do; don't error — the daemon may speculatively
|
||||||
|
// ack and we'd rather be lenient than break a CLI release.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const n = await markDelivered({
|
||||||
|
meshId: conn.meshId,
|
||||||
|
recipientMemberId: conn.memberId,
|
||||||
|
recipientMemberPubkey: conn.memberPubkey,
|
||||||
|
recipientSessionPubkey: conn.sessionPubkey ?? null,
|
||||||
|
clientMessageId: ack.clientMessageId ?? null,
|
||||||
|
brokerMessageId: ack.brokerMessageId ?? null,
|
||||||
|
});
|
||||||
|
log.debug("ws client_ack", {
|
||||||
|
presence_id: presenceId,
|
||||||
|
client_message_id: ack.clientMessageId,
|
||||||
|
broker_message_id: ack.brokerMessageId,
|
||||||
|
marked: n,
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
log.warn("ws client_ack failed", {
|
||||||
|
presence_id: presenceId,
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
case "set_status":
|
case "set_status":
|
||||||
await writeStatus(presenceId, msg.status, "manual", new Date());
|
await writeStatus(presenceId, msg.status, "manual", new Date());
|
||||||
log.info("ws set_status", {
|
log.info("ws set_status", {
|
||||||
@@ -2604,6 +2649,10 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
sessionId: p.sessionId,
|
sessionId: p.sessionId,
|
||||||
connectedAt: p.connectedAt.toISOString(),
|
connectedAt: p.connectedAt.toISOString(),
|
||||||
cwd: pc?.cwd ?? p.cwd,
|
cwd: pc?.cwd ?? p.cwd,
|
||||||
|
// v2 agentic-comms (M1): typed connection role. CLI uses
|
||||||
|
// this to hide control-plane daemons from user-facing
|
||||||
|
// peer lists (filter swap from peerType happens CLI-side).
|
||||||
|
role: p.role,
|
||||||
...(pc?.hostname ? { hostname: pc.hostname } : {}),
|
...(pc?.hostname ? { hostname: pc.hostname } : {}),
|
||||||
...(pc?.peerType ? { peerType: pc.peerType } : {}),
|
...(pc?.peerType ? { peerType: pc.peerType } : {}),
|
||||||
...(pc?.channel ? { channel: pc.channel } : {}),
|
...(pc?.channel ? { channel: pc.channel } : {}),
|
||||||
|
|||||||
@@ -224,6 +224,26 @@ export interface WSSetStatusMessage {
|
|||||||
status: PeerStatus;
|
status: PeerStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Client → broker: confirm receipt of a previously pushed envelope so the
|
||||||
|
* broker can mark the message_queue row delivered.
|
||||||
|
*
|
||||||
|
* v2 agentic-comms (M1): pairs with the two-phase claim/lease introduced
|
||||||
|
* in `drainForMember`. Without this ack, the lease expires after 30s and
|
||||||
|
* the message is re-claimed and re-pushed (at-least-once retry).
|
||||||
|
*
|
||||||
|
* Either id is accepted; daemons that track inbox dedupe by clientMessageId
|
||||||
|
* should send that one. brokerMessageId is the row primary key, useful when
|
||||||
|
* the original send didn't carry a client_message_id (legacy traffic).
|
||||||
|
*/
|
||||||
|
export interface WSClientAckMessage {
|
||||||
|
type: "client_ack";
|
||||||
|
/** Original caller-supplied idempotency id from the `send` envelope. */
|
||||||
|
clientMessageId?: string;
|
||||||
|
/** Broker-side row id (the `messageId` field on the inbound `push`). */
|
||||||
|
brokerMessageId?: string;
|
||||||
|
}
|
||||||
|
|
||||||
/** Client → broker: request list of connected peers in the same mesh. */
|
/** Client → broker: request list of connected peers in the same mesh. */
|
||||||
export interface WSListPeersMessage {
|
export interface WSListPeersMessage {
|
||||||
type: "list_peers";
|
type: "list_peers";
|
||||||
@@ -518,6 +538,8 @@ export interface WSPeersListMessage {
|
|||||||
type: "peers_list";
|
type: "peers_list";
|
||||||
peers: Array<{
|
peers: Array<{
|
||||||
pubkey: string;
|
pubkey: string;
|
||||||
|
/** Stable member pubkey — present on M1+ broker responses. */
|
||||||
|
memberPubkey?: string;
|
||||||
displayName: string;
|
displayName: string;
|
||||||
status: PeerStatus;
|
status: PeerStatus;
|
||||||
summary: string | null;
|
summary: string | null;
|
||||||
@@ -525,6 +547,10 @@ export interface WSPeersListMessage {
|
|||||||
sessionId: string;
|
sessionId: string;
|
||||||
connectedAt: string;
|
connectedAt: string;
|
||||||
cwd?: string;
|
cwd?: string;
|
||||||
|
/** v2 agentic-comms (M1): typed connection role. CLI uses this to
|
||||||
|
* filter control-plane daemons out of user-facing peer lists.
|
||||||
|
* Optional for clients talking to a pre-M1 broker. */
|
||||||
|
role?: "control-plane" | "session" | "service";
|
||||||
hostname?: string;
|
hostname?: string;
|
||||||
peerType?: "ai" | "human" | "connector";
|
peerType?: "ai" | "human" | "connector";
|
||||||
channel?: string;
|
channel?: string;
|
||||||
@@ -1417,6 +1443,7 @@ export type WSClientMessage =
|
|||||||
| WSHelloMessage
|
| WSHelloMessage
|
||||||
| WSSessionHelloMessage
|
| WSSessionHelloMessage
|
||||||
| WSSendMessage
|
| WSSendMessage
|
||||||
|
| WSClientAckMessage
|
||||||
| WSSetStatusMessage
|
| WSSetStatusMessage
|
||||||
| WSListPeersMessage
|
| WSListPeersMessage
|
||||||
| WSSetSummaryMessage
|
| WSSetSummaryMessage
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
-- Milestone 1 (v2 agentic-comms architecture).
|
||||||
|
--
|
||||||
|
-- Two concerns rolled into one migration because both are tiny and both
|
||||||
|
-- ship together with the broker change in the same PR:
|
||||||
|
--
|
||||||
|
-- 1. message_queue claim/lease columns (drainForMember race fix)
|
||||||
|
-- --------------------------------------------------------------
|
||||||
|
-- Before this migration, drainForMember claimed rows by setting
|
||||||
|
-- `delivered_at = NOW()` inside the same UPDATE that selected them.
|
||||||
|
-- If the recipient WS was closed between claim-time and ws.send(),
|
||||||
|
-- the message was silently dropped — the row read as "delivered" so
|
||||||
|
-- the next reconnect's drain skipped it. At-most-once semantics with
|
||||||
|
-- no retry hook.
|
||||||
|
--
|
||||||
|
-- The fix moves to two-phase claim/deliver with a lease:
|
||||||
|
-- claimed_at — set when drainForMember picks the row
|
||||||
|
-- claim_id — presenceId of the claimer (debugging)
|
||||||
|
-- claim_expires_at — claimed_at + 30s; if no `client_ack` lands by
|
||||||
|
-- then, a sweeper clears the claim and the row
|
||||||
|
-- is re-eligible for a new drain (at-least-once).
|
||||||
|
--
|
||||||
|
-- `delivered_at` only gets set when the recipient WS replies with a
|
||||||
|
-- `client_ack` containing the original client_message_id. Until any
|
||||||
|
-- daemon emits `client_ack`, claims will simply expire and re-deliver
|
||||||
|
-- — which is the desired retry behaviour for unreliable transports.
|
||||||
|
--
|
||||||
|
-- 2. presence.role column
|
||||||
|
-- --------------------------------------------------------------
|
||||||
|
-- The CLI currently hides daemon connections from `peer list` by
|
||||||
|
-- matching `peerType === 'claudemesh-daemon'`, which is fragile and
|
||||||
|
-- overloads a free-form field. M1 introduces a typed `role` column on
|
||||||
|
-- presence with three documented values:
|
||||||
|
-- 'control-plane' — long-lived daemon WS (one per host)
|
||||||
|
-- 'session' — per-Claude-Code-session WS (default)
|
||||||
|
-- 'service' — autonomous bots/services attached to a mesh
|
||||||
|
--
|
||||||
|
-- Backfilled to 'session' (default) so legacy presence rows keep their
|
||||||
|
-- existing visibility. The two hello paths in the broker pass
|
||||||
|
-- 'control-plane' / 'session' explicitly. CLI-side filter swap
|
||||||
|
-- (peerType -> role) is a follow-up worktree.
|
||||||
|
|
||||||
|
ALTER TABLE "mesh"."message_queue"
|
||||||
|
ADD COLUMN "claimed_at" timestamp,
|
||||||
|
ADD COLUMN "claim_id" text,
|
||||||
|
ADD COLUMN "claim_expires_at" timestamp;
|
||||||
|
|
||||||
|
ALTER TABLE "mesh"."presence"
|
||||||
|
ADD COLUMN "role" text NOT NULL DEFAULT 'session';
|
||||||
@@ -326,6 +326,14 @@ export const presence = meshSchema.table("presence", {
|
|||||||
statusUpdatedAt: timestamp().defaultNow().notNull(),
|
statusUpdatedAt: timestamp().defaultNow().notNull(),
|
||||||
summary: text(),
|
summary: text(),
|
||||||
groups: jsonb().$type<{ name: string; role?: string }[]>().default([]),
|
groups: jsonb().$type<{ name: string; role?: string }[]>().default([]),
|
||||||
|
// v2 agentic-comms (M1): connection role for routing/visibility.
|
||||||
|
// 'control-plane' — long-lived daemon WS (claudemesh daemon),
|
||||||
|
// used for fan-out and presence orchestration.
|
||||||
|
// Hidden from user-facing peer lists.
|
||||||
|
// 'session' — per-Claude-Code session WS (default).
|
||||||
|
// 'service' — autonomous bots/services attached to the mesh.
|
||||||
|
// Always populated; default 'session' keeps legacy hellos working.
|
||||||
|
role: text().notNull().default("session"),
|
||||||
connectedAt: timestamp().defaultNow().notNull(),
|
connectedAt: timestamp().defaultNow().notNull(),
|
||||||
lastPingAt: timestamp().defaultNow().notNull(),
|
lastPingAt: timestamp().defaultNow().notNull(),
|
||||||
disconnectedAt: timestamp(),
|
disconnectedAt: timestamp(),
|
||||||
@@ -367,6 +375,14 @@ export const messageQueue = meshSchema.table("message_queue", {
|
|||||||
// §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want
|
// §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want
|
||||||
// to enforce idempotency on retries will read this column.
|
// to enforce idempotency on retries will read this column.
|
||||||
requestFingerprint: text("request_fingerprint"),
|
requestFingerprint: text("request_fingerprint"),
|
||||||
|
// v2 agentic-comms (M1): two-phase claim/deliver with lease.
|
||||||
|
// `drainForMember` claims a row by setting (claimedAt, claimId,
|
||||||
|
// claimExpiresAt) — NOT deliveredAt. The recipient's WS only marks
|
||||||
|
// deliveredAt after replying with a `client_ack`. A periodic sweeper
|
||||||
|
// reaps expired claims so dropped pushes are redelivered (at-least-once).
|
||||||
|
claimedAt: timestamp(),
|
||||||
|
claimId: text("claim_id"),
|
||||||
|
claimExpiresAt: timestamp(),
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user