diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index f308320..80ed21f 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -93,7 +93,11 @@ interface PeerConn { memberId: string; memberPubkey: string; sessionPubkey: string | null; + displayName: string; cwd: string; + peerType?: "ai" | "human" | "connector"; + channel?: string; + model?: string; groups: Array<{ name: string; role?: string }>; } @@ -625,17 +629,21 @@ async function handleHello( cwd: hello.cwd, groups: initialGroups, }); + const effectiveDisplayName = hello.displayName || member.displayName; connections.set(presenceId, { ws, meshId: hello.meshId, memberId: member.id, memberPubkey: hello.pubkey, sessionPubkey: hello.sessionPubkey ?? null, + displayName: effectiveDisplayName, cwd: hello.cwd, + peerType: hello.peerType, + channel: hello.channel, + model: hello.model, groups: initialGroups, }); incMeshCount(hello.meshId); - const effectiveDisplayName = hello.displayName || member.displayName; log.info("ws hello", { mesh_id: hello.meshId, member: effectiveDisplayName, @@ -762,6 +770,32 @@ function handleConnection(ws: WebSocket): void { } catch { /* ws closed during hello */ } + // Broadcast peer_joined to all other peers in the same mesh. + const joinedConn = connections.get(presenceId); + if (joinedConn) { + const joinMsg: WSPushMessage = { + type: "push", + subtype: "system", + event: "peer_joined", + eventData: { + name: result.memberDisplayName, + pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey, + groups: joinedConn.groups, + }, + messageId: crypto.randomUUID(), + meshId: joinedConn.meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (pid === presenceId) continue; + if (peer.meshId !== joinedConn.meshId) continue; + sendToPeer(pid, joinMsg); + } + } return; } if (!presenceId) { @@ -783,17 +817,32 @@ function handleConnection(ws: WebSocket): void { break; case "list_peers": { const peers = await listPeersInMesh(conn.meshId); + // Build a lookup from pubkey → in-memory PeerConn for metadata + const connByPubkey = new Map(); + for (const [, pc] of connections) { + if (pc.meshId === conn.meshId) { + connByPubkey.set(pc.memberPubkey, pc); + if (pc.sessionPubkey) connByPubkey.set(pc.sessionPubkey, pc); + } + } const resp: WSServerMessage = { type: "peers_list", - peers: peers.map((p) => ({ - pubkey: p.pubkey, - displayName: p.displayName, - status: p.status as "idle" | "working" | "dnd", - summary: p.summary, - groups: p.groups, - sessionId: p.sessionId, - connectedAt: p.connectedAt.toISOString(), - })), + peers: peers.map((p) => { + const pc = connByPubkey.get(p.pubkey); + return { + pubkey: p.pubkey, + displayName: p.displayName, + status: p.status as "idle" | "working" | "dnd", + summary: p.summary, + groups: p.groups, + sessionId: p.sessionId, + connectedAt: p.connectedAt.toISOString(), + cwd: pc?.cwd ?? p.cwd, + ...(pc?.peerType ? { peerType: pc.peerType } : {}), + ...(pc?.channel ? { channel: pc.channel } : {}), + ...(pc?.model ? { model: pc.model } : {}), + }; + }), ...(_reqId ? { _reqId } : {}), }; conn.ws.send(JSON.stringify(resp)); @@ -1905,7 +1954,30 @@ function handleConnection(ws: WebSocket): void { if (presenceId) { const conn = connections.get(presenceId); connections.delete(presenceId); - if (conn) decMeshCount(conn.meshId); + if (conn) { + decMeshCount(conn.meshId); + // Broadcast peer_left to remaining peers in the same mesh. + const leaveMsg: WSPushMessage = { + type: "push", + subtype: "system", + event: "peer_left", + eventData: { + name: conn.displayName, + pubkey: conn.sessionPubkey ?? conn.memberPubkey, + }, + messageId: crypto.randomUUID(), + meshId: conn.meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + sendToPeer(pid, leaveMsg); + } + } await disconnectPresence(presenceId); // Clean up stream subscriptions for this peer for (const [key, subs] of streamSubscriptions) { diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 2c08e2e..40e1f85 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -57,6 +57,12 @@ export interface WSHelloMessage { sessionId: string; pid: number; cwd: string; + /** Peer type: ai session, human user, or external connector. */ + peerType?: "ai" | "human" | "connector"; + /** Channel the peer connected from (e.g. "claude-code", "telegram", "slack", "web"). */ + channel?: string; + /** AI model identifier (e.g. "opus-4", "sonnet-4"). */ + model?: string; /** Initial groups to join on connect. */ groups?: Array<{ name: string; role?: string }>; /** ms epoch; broker rejects if outside ±60s of its own clock. */ @@ -86,8 +92,13 @@ export interface WSPushMessage { nonce: string; ciphertext: string; createdAt: string; - /** Optional semantic tag — "reminder" when delivered by the scheduler. */ - subtype?: "reminder"; + /** Optional semantic tag — "reminder" when delivered by the scheduler, + * "system" for broker-originated topology events (peer join/leave). */ + subtype?: "reminder" | "system"; + /** Machine-readable event name (e.g. "peer_joined", "peer_left"). */ + event?: string; + /** Structured payload for the event. */ + eventData?: Record; } /** Client → broker: manual status override (dnd, forced idle). */ @@ -184,6 +195,10 @@ export interface WSPeersListMessage { groups: Array<{ name: string; role?: string }>; sessionId: string; connectedAt: string; + cwd?: string; + peerType?: "ai" | "human" | "connector"; + channel?: string; + model?: string; }>; _reqId?: string; } @@ -673,10 +688,14 @@ export interface WSScheduleMessage { type: "schedule"; to: string; message: string; - /** Unix timestamp (ms) when to deliver. */ + /** Unix timestamp (ms) when to deliver. Ignored for cron schedules. */ deliverAt: number; /** Optional semantic tag — "reminder" surfaces differently to the receiver. */ subtype?: "reminder"; + /** Standard 5-field cron expression for recurring delivery (e.g. "0 */2 * * *"). */ + cron?: string; + /** Whether this is a recurring schedule. Implied true when `cron` is set. */ + recurring?: boolean; _reqId?: string; } diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index c4fadba..a7b060b 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -990,6 +990,39 @@ Your message mode is "${messageMode}". client.onPush(async (msg) => { if (messageMode === "off") return; + // System events (peer join/leave) — always push, regardless of mode. + if (msg.subtype === "system" && msg.event) { + const eventName = msg.event; + const data = msg.eventData ?? {}; + let content: string; + if (eventName === "peer_joined") { + content = `[system] Peer "${data.name ?? "unknown"}" joined the mesh`; + } else if (eventName === "peer_left") { + content = `[system] Peer "${data.name ?? "unknown"}" left the mesh`; + } else { + content = `[system] ${eventName}: ${JSON.stringify(data)}`; + } + try { + await server.notification({ + method: "notifications/claude/channel", + params: { + content, + meta: { + kind: "system", + event: eventName, + mesh_slug: client.meshSlug, + mesh_id: client.meshId, + ...(Object.keys(data).length > 0 ? { eventData: data } : {}), + }, + }, + }); + process.stderr.write(`[claudemesh] system: ${content}\n`); + } catch (pushErr) { + process.stderr.write(`[claudemesh] system push FAILED: ${pushErr}\n`); + } + return; + } + const fromPubkey = msg.senderPubkey || ""; const fromName = fromPubkey ? await resolvePeerName(client, fromPubkey) diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 18504e5..f0e82a0 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -51,8 +51,13 @@ export interface InboundPush { /** Hint for UI: "direct" (crypto_box), "channel"/"broadcast" * (plaintext for now). */ kind: "direct" | "broadcast" | "channel" | "unknown"; - /** Optional semantic tag — "reminder" when fired by the scheduler. */ - subtype?: "reminder"; + /** Optional semantic tag — "reminder" when fired by the scheduler, + * "system" for broker-originated topology events. */ + subtype?: "reminder" | "system"; + /** Machine-readable event name (e.g. "peer_joined", "peer_left"). */ + event?: string; + /** Structured payload for the event. */ + eventData?: Record; } type PushHandler = (msg: InboundPush) => void; @@ -937,7 +942,9 @@ export class BrokerClient { receivedAt: new Date().toISOString(), plaintext, kind, - ...(msg.subtype ? { subtype: msg.subtype as "reminder" } : {}), + ...(msg.subtype ? { subtype: msg.subtype as "reminder" | "system" } : {}), + ...(msg.event ? { event: String(msg.event) } : {}), + ...(msg.eventData ? { eventData: msg.eventData as Record } : {}), }; this.pushBuffer.push(push); if (this.pushBuffer.length > 500) this.pushBuffer.shift();