fix(broker+cli): multi-session DM routing + broadcast self-loopback (v0.3.2)
Two related bugs surfaced in multi-session production use of 1.8.0: 1. Replies via `claudemesh send <from_id>` rejected with "no connected peer for target" when the original sender's session had rotated (Claude Code restart, /resume). Root cause: from_id carried the ephemeral session pubkey, which disappears the moment the session ends. Fix: handleSend pre-flight now also resolves the target pubkey against the persistent meshMember table and routes to the owning member's live session(s); MCP push channel now sets from_id to the stable member pubkey and exposes the ephemeral one under from_session_pubkey. 2. Broadcast/* and @group sends loopback'd to the sender's *sibling* sessions (same member, different session keypair), surfacing a spurious "tampered or wrong keypair" decrypt warning on the sender's own inboxes. Fix: broadcast/group fan-out now skips by memberPubkey, not just by presence_id, so the entire sender member is excluded — direct sends keep per-presence skip so a member can still DM their own sibling session intentionally. Push envelope now also carries senderMemberPubkey alongside senderPubkey so any other client of the WS channel can choose the right one.
This commit is contained in:
@@ -1874,13 +1874,22 @@ async function handleSend(
|
|||||||
!isTopicTargetEarly &&
|
!isTopicTargetEarly &&
|
||||||
!isBroadcastEarly &&
|
!isBroadcastEarly &&
|
||||||
msg.targetSpec !== "*";
|
msg.targetSpec !== "*";
|
||||||
|
// Resolved recipient member ids for direct sends — populated by the
|
||||||
|
// pre-flight finder below and reused by the fan-out loop so a stale
|
||||||
|
// session pubkey still routes to the owning member's live session.
|
||||||
|
const candidateMemberIds: string[] = [];
|
||||||
|
|
||||||
if (isDirectEarly) {
|
if (isDirectEarly) {
|
||||||
// Identify candidate recipient connections — anyone in the mesh whose
|
// Identify candidate recipient connections — anyone in the mesh whose
|
||||||
// member or session pubkey matches the target. Then check grants to
|
// member or session pubkey matches the target. Then check grants to
|
||||||
// see if at least one of them has granted the sender `dm`. Without
|
// see if at least one of them has granted the sender `dm`. Without
|
||||||
// this check, blocked DMs get queued and sit in the DB forever
|
// this check, blocked DMs get queued and sit in the DB forever
|
||||||
// (multicast marks delivered on queue; direct relies on drain-or-push).
|
// (multicast marks delivered on queue; direct relies on drain-or-push).
|
||||||
const candidateMemberIds: string[] = [];
|
//
|
||||||
|
// Replies often target a *session* pubkey that has since rotated
|
||||||
|
// (Claude Code restart, /resume, etc). When that happens we fall
|
||||||
|
// back to a member-pubkey lookup so the reply still finds the same
|
||||||
|
// peer's newest live session instead of bouncing with "not online".
|
||||||
for (const [, peer] of connections) {
|
for (const [, peer] of connections) {
|
||||||
if (peer.meshId !== conn.meshId) continue;
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
if (peer.ws === conn.ws) continue;
|
if (peer.ws === conn.ws) continue;
|
||||||
@@ -1888,6 +1897,35 @@ async function handleSend(
|
|||||||
candidateMemberIds.push(peer.memberId);
|
candidateMemberIds.push(peer.memberId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (candidateMemberIds.length === 0) {
|
||||||
|
// Fallback: target may be a stale session pubkey. Look up the
|
||||||
|
// owning member from the persistent member table and try again
|
||||||
|
// against the live sessions of that member.
|
||||||
|
try {
|
||||||
|
const [memberRow] = await db
|
||||||
|
.select({ id: meshMember.id, peerPubkey: meshMember.peerPubkey })
|
||||||
|
.from(meshMember)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(meshMember.meshId, conn.meshId),
|
||||||
|
isNull(meshMember.revokedAt),
|
||||||
|
eq(meshMember.peerPubkey, msg.targetSpec),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.limit(1);
|
||||||
|
if (memberRow) {
|
||||||
|
for (const [, peer] of connections) {
|
||||||
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
|
if (peer.ws === conn.ws) continue;
|
||||||
|
if (peer.memberId === memberRow.id) {
|
||||||
|
candidateMemberIds.push(peer.memberId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Soft-fail; the rejection below still fires if no candidate.
|
||||||
|
}
|
||||||
|
}
|
||||||
if (candidateMemberIds.length === 0) {
|
if (candidateMemberIds.length === 0) {
|
||||||
metrics.messagesRejectedTotal.inc({ reason: "no_recipient" });
|
metrics.messagesRejectedTotal.inc({ reason: "no_recipient" });
|
||||||
const errAck: WSServerMessage = {
|
const errAck: WSServerMessage = {
|
||||||
@@ -2007,6 +2045,7 @@ async function handleSend(
|
|||||||
messageId: persistedTopicMessageId ?? messageId,
|
messageId: persistedTopicMessageId ?? messageId,
|
||||||
meshId: conn.meshId,
|
meshId: conn.meshId,
|
||||||
senderPubkey: conn.sessionPubkey ?? conn.memberPubkey,
|
senderPubkey: conn.sessionPubkey ?? conn.memberPubkey,
|
||||||
|
senderMemberPubkey: conn.memberPubkey,
|
||||||
senderMemberId: conn.memberId,
|
senderMemberId: conn.memberId,
|
||||||
senderName: conn.displayName,
|
senderName: conn.displayName,
|
||||||
priority: msg.priority,
|
priority: msg.priority,
|
||||||
@@ -2050,21 +2089,40 @@ async function handleSend(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const [pid, peer] of connections) {
|
for (const [pid, peer] of connections) {
|
||||||
if (pid === senderPresenceId) continue;
|
|
||||||
if (peer.meshId !== conn.meshId) continue;
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
|
|
||||||
if (isBroadcast) {
|
if (isBroadcast) {
|
||||||
// broadcast — skip hidden peers
|
// broadcast/* — skip ALL sibling sessions of the sender's member,
|
||||||
|
// not just the originating presence_id. Sibling sessions share the
|
||||||
|
// member identity but hold a different ephemeral session keypair,
|
||||||
|
// so a self-fan-out can't decrypt the envelope and would surface
|
||||||
|
// a spurious "tampered or wrong keypair" warning in the sender's
|
||||||
|
// own inboxes.
|
||||||
|
if (peer.memberPubkey === conn.memberPubkey) continue;
|
||||||
if (!peer.visible) continue;
|
if (!peer.visible) continue;
|
||||||
} else if (groupName) {
|
} else if (groupName) {
|
||||||
// group routing — deliver only if peer is in the group; skip hidden
|
// group routing — same self-skip semantics as broadcast: don't
|
||||||
|
// ping your own member's sibling sessions; deliver only to other
|
||||||
|
// members in the group; skip hidden.
|
||||||
|
if (peer.memberPubkey === conn.memberPubkey) continue;
|
||||||
if (!peer.visible) continue;
|
if (!peer.visible) continue;
|
||||||
if (!peer.groups.some((g) => g.name === groupName)) continue;
|
if (!peer.groups.some((g) => g.name === groupName)) continue;
|
||||||
} else {
|
} else {
|
||||||
// direct routing — match by pubkey
|
// direct routing — keep the per-presence skip so a member CAN
|
||||||
if (peer.memberPubkey !== msg.targetSpec
|
// intentionally DM another session of themselves (e.g. mesh
|
||||||
&& peer.sessionPubkey !== msg.targetSpec)
|
// bridges), but match recipients by both ephemeral session
|
||||||
continue;
|
// pubkey AND stable member pubkey so replies addressed to a
|
||||||
|
// (now-rotated) old sessionPubkey still land on the latest live
|
||||||
|
// session of the same member. The pre-flight finder above also
|
||||||
|
// pre-resolves stale session pubkeys to their owning member, so
|
||||||
|
// candidateMemberIds is the source of truth here.
|
||||||
|
if (pid === senderPresenceId) continue;
|
||||||
|
const matchesByPubkey =
|
||||||
|
peer.memberPubkey === msg.targetSpec
|
||||||
|
|| peer.sessionPubkey === msg.targetSpec;
|
||||||
|
const matchesByResolvedMember =
|
||||||
|
candidateMemberIds.includes(peer.memberId);
|
||||||
|
if (!matchesByPubkey && !matchesByResolvedMember) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Per-peer capability check — silent drop if recipient hasn't granted
|
// Per-peer capability check — silent drop if recipient hasn't granted
|
||||||
|
|||||||
@@ -117,7 +117,13 @@ export interface WSPushMessage {
|
|||||||
type: "push";
|
type: "push";
|
||||||
messageId: string;
|
messageId: string;
|
||||||
meshId: string;
|
meshId: string;
|
||||||
|
/** Sender's *session* pubkey — ephemeral, rotates on session restart.
|
||||||
|
* DMs are sealed against the recipient's session key paired with this.
|
||||||
|
* For replies prefer `senderMemberPubkey` / `senderMemberId`. */
|
||||||
senderPubkey: string;
|
senderPubkey: string;
|
||||||
|
/** Sender's *member* pubkey — stable across reconnects/restarts.
|
||||||
|
* Use this as the canonical reply target. */
|
||||||
|
senderMemberPubkey?: string;
|
||||||
/** Stable mesh.member id of the sender — survives display-name changes,
|
/** Stable mesh.member id of the sender — survives display-name changes,
|
||||||
* use this as the canonical reply target when set. Optional for
|
* use this as the canonical reply target when set. Optional for
|
||||||
* legacy/non-topic broker paths that haven't been wired yet. */
|
* legacy/non-topic broker paths that haven't been wired yet. */
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "claudemesh-cli",
|
"name": "claudemesh-cli",
|
||||||
"version": "1.9.0",
|
"version": "1.9.1",
|
||||||
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
|
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"claude-code",
|
"claude-code",
|
||||||
|
|||||||
@@ -680,14 +680,23 @@ Your message mode is "${messageMode}".
|
|||||||
const prioBadge = msg.priority === "now" ? "[URGENT] " : msg.priority === "low" ? "[low] " : "";
|
const prioBadge = msg.priority === "now" ? "[URGENT] " : msg.priority === "low" ? "[low] " : "";
|
||||||
const kindBadge = msg.kind === "broadcast" ? " (broadcast)" : "";
|
const kindBadge = msg.kind === "broadcast" ? " (broadcast)" : "";
|
||||||
const content = `${prioBadge}${fromName}${kindBadge}: ${body}`;
|
const content = `${prioBadge}${fromName}${kindBadge}: ${body}`;
|
||||||
|
// `from_id` MUST be a stable replyable id. Older clients of this
|
||||||
|
// channel have been pasting from_id straight back into
|
||||||
|
// `claudemesh send <id>`; if from_id is the SESSION pubkey it
|
||||||
|
// bounces with "no connected peer" the moment the sender's
|
||||||
|
// session restarts. Send the MEMBER pubkey (stable across
|
||||||
|
// reconnects) as from_id, and keep the ephemeral session pubkey
|
||||||
|
// available under from_session_pubkey for crypto-aware callers.
|
||||||
|
const fromMemberPubkey = msg.senderMemberPubkey ?? fromPubkey;
|
||||||
try {
|
try {
|
||||||
await server.notification({
|
await server.notification({
|
||||||
method: "notifications/claude/channel",
|
method: "notifications/claude/channel",
|
||||||
params: {
|
params: {
|
||||||
content,
|
content,
|
||||||
meta: {
|
meta: {
|
||||||
from_id: fromPubkey,
|
from_id: fromMemberPubkey,
|
||||||
from_pubkey: fromPubkey,
|
from_pubkey: fromMemberPubkey,
|
||||||
|
from_session_pubkey: fromPubkey,
|
||||||
from_name: fromName,
|
from_name: fromName,
|
||||||
...(msg.senderMemberId ? { from_member_id: msg.senderMemberId } : {}),
|
...(msg.senderMemberId ? { from_member_id: msg.senderMemberId } : {}),
|
||||||
mesh_slug: client.meshSlug,
|
mesh_slug: client.meshSlug,
|
||||||
|
|||||||
@@ -100,7 +100,12 @@ export interface PeerInfo {
|
|||||||
export interface InboundPush {
|
export interface InboundPush {
|
||||||
messageId: string;
|
messageId: string;
|
||||||
meshId: string;
|
meshId: string;
|
||||||
|
/** Sender's *session* pubkey — ephemeral. Rotates on session restart.
|
||||||
|
* Used by crypto_box_open to verify the seal. Prefer the member
|
||||||
|
* pubkey for replies. */
|
||||||
senderPubkey: string;
|
senderPubkey: string;
|
||||||
|
/** Sender's *member* pubkey — stable. Use as the reply target. */
|
||||||
|
senderMemberPubkey?: string;
|
||||||
/** Stable mesh.member id of the sender — preferred id for replies. */
|
/** Stable mesh.member id of the sender — preferred id for replies. */
|
||||||
senderMemberId?: string;
|
senderMemberId?: string;
|
||||||
/** Sender's current display name (a join from the broker). */
|
/** Sender's current display name (a join from the broker). */
|
||||||
@@ -2036,6 +2041,7 @@ export class BrokerClient {
|
|||||||
messageId: String(msg.messageId ?? ""),
|
messageId: String(msg.messageId ?? ""),
|
||||||
meshId: String(msg.meshId ?? ""),
|
meshId: String(msg.meshId ?? ""),
|
||||||
senderPubkey,
|
senderPubkey,
|
||||||
|
...(msg.senderMemberPubkey ? { senderMemberPubkey: String(msg.senderMemberPubkey) } : {}),
|
||||||
...(msg.senderMemberId ? { senderMemberId: String(msg.senderMemberId) } : {}),
|
...(msg.senderMemberId ? { senderMemberId: String(msg.senderMemberId) } : {}),
|
||||||
...(msg.senderName ? { senderName: String(msg.senderName) } : {}),
|
...(msg.senderName ? { senderName: String(msg.senderName) } : {}),
|
||||||
...(msg.topic ? { topic: String(msg.topic) } : {}),
|
...(msg.topic ? { topic: String(msg.topic) } : {}),
|
||||||
|
|||||||
Reference in New Issue
Block a user