feat: broadcast system notifications on peer join/leave

When a peer connects or disconnects, the broker now broadcasts a
system push (subtype: "system") to all other peers in the same mesh.
The CLI formats these as [system] channel notifications so AI sessions
can react to topology changes without polling.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 23:28:49 +01:00
parent 5cb4cc4fe7
commit 453705a4e1
4 changed files with 148 additions and 17 deletions

View File

@@ -93,7 +93,11 @@ interface PeerConn {
memberId: string; memberId: string;
memberPubkey: string; memberPubkey: string;
sessionPubkey: string | null; sessionPubkey: string | null;
displayName: string;
cwd: string; cwd: string;
peerType?: "ai" | "human" | "connector";
channel?: string;
model?: string;
groups: Array<{ name: string; role?: string }>; groups: Array<{ name: string; role?: string }>;
} }
@@ -625,17 +629,21 @@ async function handleHello(
cwd: hello.cwd, cwd: hello.cwd,
groups: initialGroups, groups: initialGroups,
}); });
const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, { connections.set(presenceId, {
ws, ws,
meshId: hello.meshId, meshId: hello.meshId,
memberId: member.id, memberId: member.id,
memberPubkey: hello.pubkey, memberPubkey: hello.pubkey,
sessionPubkey: hello.sessionPubkey ?? null, sessionPubkey: hello.sessionPubkey ?? null,
displayName: effectiveDisplayName,
cwd: hello.cwd, cwd: hello.cwd,
peerType: hello.peerType,
channel: hello.channel,
model: hello.model,
groups: initialGroups, groups: initialGroups,
}); });
incMeshCount(hello.meshId); incMeshCount(hello.meshId);
const effectiveDisplayName = hello.displayName || member.displayName;
log.info("ws hello", { log.info("ws hello", {
mesh_id: hello.meshId, mesh_id: hello.meshId,
member: effectiveDisplayName, member: effectiveDisplayName,
@@ -762,6 +770,32 @@ function handleConnection(ws: WebSocket): void {
} catch { } catch {
/* ws closed during hello */ /* ws closed during hello */
} }
// Broadcast peer_joined to all other peers in the same mesh.
const joinedConn = connections.get(presenceId);
if (joinedConn) {
const joinMsg: WSPushMessage = {
type: "push",
subtype: "system",
event: "peer_joined",
eventData: {
name: result.memberDisplayName,
pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey,
groups: joinedConn.groups,
},
messageId: crypto.randomUUID(),
meshId: joinedConn.meshId,
senderPubkey: "system",
priority: "low",
nonce: "",
ciphertext: "",
createdAt: new Date().toISOString(),
};
for (const [pid, peer] of connections) {
if (pid === presenceId) continue;
if (peer.meshId !== joinedConn.meshId) continue;
sendToPeer(pid, joinMsg);
}
}
return; return;
} }
if (!presenceId) { if (!presenceId) {
@@ -783,17 +817,32 @@ function handleConnection(ws: WebSocket): void {
break; break;
case "list_peers": { case "list_peers": {
const peers = await listPeersInMesh(conn.meshId); const peers = await listPeersInMesh(conn.meshId);
// Build a lookup from pubkey → in-memory PeerConn for metadata
const connByPubkey = new Map<string, PeerConn>();
for (const [, pc] of connections) {
if (pc.meshId === conn.meshId) {
connByPubkey.set(pc.memberPubkey, pc);
if (pc.sessionPubkey) connByPubkey.set(pc.sessionPubkey, pc);
}
}
const resp: WSServerMessage = { const resp: WSServerMessage = {
type: "peers_list", type: "peers_list",
peers: peers.map((p) => ({ peers: peers.map((p) => {
pubkey: p.pubkey, const pc = connByPubkey.get(p.pubkey);
displayName: p.displayName, return {
status: p.status as "idle" | "working" | "dnd", pubkey: p.pubkey,
summary: p.summary, displayName: p.displayName,
groups: p.groups, status: p.status as "idle" | "working" | "dnd",
sessionId: p.sessionId, summary: p.summary,
connectedAt: p.connectedAt.toISOString(), groups: p.groups,
})), sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(),
cwd: pc?.cwd ?? p.cwd,
...(pc?.peerType ? { peerType: pc.peerType } : {}),
...(pc?.channel ? { channel: pc.channel } : {}),
...(pc?.model ? { model: pc.model } : {}),
};
}),
...(_reqId ? { _reqId } : {}), ...(_reqId ? { _reqId } : {}),
}; };
conn.ws.send(JSON.stringify(resp)); conn.ws.send(JSON.stringify(resp));
@@ -1905,7 +1954,30 @@ function handleConnection(ws: WebSocket): void {
if (presenceId) { if (presenceId) {
const conn = connections.get(presenceId); const conn = connections.get(presenceId);
connections.delete(presenceId); connections.delete(presenceId);
if (conn) decMeshCount(conn.meshId); if (conn) {
decMeshCount(conn.meshId);
// Broadcast peer_left to remaining peers in the same mesh.
const leaveMsg: WSPushMessage = {
type: "push",
subtype: "system",
event: "peer_left",
eventData: {
name: conn.displayName,
pubkey: conn.sessionPubkey ?? conn.memberPubkey,
},
messageId: crypto.randomUUID(),
meshId: conn.meshId,
senderPubkey: "system",
priority: "low",
nonce: "",
ciphertext: "",
createdAt: new Date().toISOString(),
};
for (const [pid, peer] of connections) {
if (peer.meshId !== conn.meshId) continue;
sendToPeer(pid, leaveMsg);
}
}
await disconnectPresence(presenceId); await disconnectPresence(presenceId);
// Clean up stream subscriptions for this peer // Clean up stream subscriptions for this peer
for (const [key, subs] of streamSubscriptions) { for (const [key, subs] of streamSubscriptions) {

View File

@@ -57,6 +57,12 @@ export interface WSHelloMessage {
sessionId: string; sessionId: string;
pid: number; pid: number;
cwd: string; cwd: string;
/** Peer type: ai session, human user, or external connector. */
peerType?: "ai" | "human" | "connector";
/** Channel the peer connected from (e.g. "claude-code", "telegram", "slack", "web"). */
channel?: string;
/** AI model identifier (e.g. "opus-4", "sonnet-4"). */
model?: string;
/** Initial groups to join on connect. */ /** Initial groups to join on connect. */
groups?: Array<{ name: string; role?: string }>; groups?: Array<{ name: string; role?: string }>;
/** ms epoch; broker rejects if outside ±60s of its own clock. */ /** ms epoch; broker rejects if outside ±60s of its own clock. */
@@ -86,8 +92,13 @@ export interface WSPushMessage {
nonce: string; nonce: string;
ciphertext: string; ciphertext: string;
createdAt: string; createdAt: string;
/** Optional semantic tag — "reminder" when delivered by the scheduler. */ /** Optional semantic tag — "reminder" when delivered by the scheduler,
subtype?: "reminder"; * "system" for broker-originated topology events (peer join/leave). */
subtype?: "reminder" | "system";
/** Machine-readable event name (e.g. "peer_joined", "peer_left"). */
event?: string;
/** Structured payload for the event. */
eventData?: Record<string, unknown>;
} }
/** Client → broker: manual status override (dnd, forced idle). */ /** Client → broker: manual status override (dnd, forced idle). */
@@ -184,6 +195,10 @@ export interface WSPeersListMessage {
groups: Array<{ name: string; role?: string }>; groups: Array<{ name: string; role?: string }>;
sessionId: string; sessionId: string;
connectedAt: string; connectedAt: string;
cwd?: string;
peerType?: "ai" | "human" | "connector";
channel?: string;
model?: string;
}>; }>;
_reqId?: string; _reqId?: string;
} }
@@ -673,10 +688,14 @@ export interface WSScheduleMessage {
type: "schedule"; type: "schedule";
to: string; to: string;
message: string; message: string;
/** Unix timestamp (ms) when to deliver. */ /** Unix timestamp (ms) when to deliver. Ignored for cron schedules. */
deliverAt: number; deliverAt: number;
/** Optional semantic tag — "reminder" surfaces differently to the receiver. */ /** Optional semantic tag — "reminder" surfaces differently to the receiver. */
subtype?: "reminder"; subtype?: "reminder";
/** Standard 5-field cron expression for recurring delivery (e.g. "0 */2 * * *"). */
cron?: string;
/** Whether this is a recurring schedule. Implied true when `cron` is set. */
recurring?: boolean;
_reqId?: string; _reqId?: string;
} }

View File

@@ -990,6 +990,39 @@ Your message mode is "${messageMode}".
client.onPush(async (msg) => { client.onPush(async (msg) => {
if (messageMode === "off") return; if (messageMode === "off") return;
// System events (peer join/leave) — always push, regardless of mode.
if (msg.subtype === "system" && msg.event) {
const eventName = msg.event;
const data = msg.eventData ?? {};
let content: string;
if (eventName === "peer_joined") {
content = `[system] Peer "${data.name ?? "unknown"}" joined the mesh`;
} else if (eventName === "peer_left") {
content = `[system] Peer "${data.name ?? "unknown"}" left the mesh`;
} else {
content = `[system] ${eventName}: ${JSON.stringify(data)}`;
}
try {
await server.notification({
method: "notifications/claude/channel",
params: {
content,
meta: {
kind: "system",
event: eventName,
mesh_slug: client.meshSlug,
mesh_id: client.meshId,
...(Object.keys(data).length > 0 ? { eventData: data } : {}),
},
},
});
process.stderr.write(`[claudemesh] system: ${content}\n`);
} catch (pushErr) {
process.stderr.write(`[claudemesh] system push FAILED: ${pushErr}\n`);
}
return;
}
const fromPubkey = msg.senderPubkey || ""; const fromPubkey = msg.senderPubkey || "";
const fromName = fromPubkey const fromName = fromPubkey
? await resolvePeerName(client, fromPubkey) ? await resolvePeerName(client, fromPubkey)

View File

@@ -51,8 +51,13 @@ export interface InboundPush {
/** Hint for UI: "direct" (crypto_box), "channel"/"broadcast" /** Hint for UI: "direct" (crypto_box), "channel"/"broadcast"
* (plaintext for now). */ * (plaintext for now). */
kind: "direct" | "broadcast" | "channel" | "unknown"; kind: "direct" | "broadcast" | "channel" | "unknown";
/** Optional semantic tag — "reminder" when fired by the scheduler. */ /** Optional semantic tag — "reminder" when fired by the scheduler,
subtype?: "reminder"; * "system" for broker-originated topology events. */
subtype?: "reminder" | "system";
/** Machine-readable event name (e.g. "peer_joined", "peer_left"). */
event?: string;
/** Structured payload for the event. */
eventData?: Record<string, unknown>;
} }
type PushHandler = (msg: InboundPush) => void; type PushHandler = (msg: InboundPush) => void;
@@ -937,7 +942,9 @@ export class BrokerClient {
receivedAt: new Date().toISOString(), receivedAt: new Date().toISOString(),
plaintext, plaintext,
kind, kind,
...(msg.subtype ? { subtype: msg.subtype as "reminder" } : {}), ...(msg.subtype ? { subtype: msg.subtype as "reminder" | "system" } : {}),
...(msg.event ? { event: String(msg.event) } : {}),
...(msg.eventData ? { eventData: msg.eventData as Record<string, unknown> } : {}),
}; };
this.pushBuffer.push(push); this.pushBuffer.push(push);
if (this.pushBuffer.length > 500) this.pushBuffer.shift(); if (this.pushBuffer.length > 500) this.pushBuffer.shift();