8 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
706e681d6e feat: 1.33.0 — m1 ship: peerRole rename + client_ack wired + version bump
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Resolves the merge of m1-broker-drain-race-and-presence-role and
m1-cli-lifecycle-and-role-peer-list into main:

* Rename wire-level role classification field `role` → `peerRole`
  to avoid collision with 1.31.5's top-level `role` lift of
  `profile.role` (user-supplied string consumed by the agent-vibes
  claudemesh skill). `peerRole` is the broker presence taxonomy
  (control-plane/session/service); top-level `role` keeps its 1.31.5
  semantics.
  - apps/broker/src/broker.ts (listPeersInMesh return)
  - apps/broker/src/index.ts (peers_list response)
  - apps/broker/src/types.ts (WSPeersListMessage)
  - apps/cli/src/commands/peers.ts (PeerRecord + filter + lift)

* Wire CLI client_ack emission: handleBrokerPush gains
  ackClientMessage callback; daemon-WS and session-WS each got a
  sendClientAck() method that frames {type:"client_ack",
  clientMessageId, brokerMessageId?} and forwards via the lifecycle
  helper. Run.ts wires the callback into both onPush paths.
  Receiver dedupes against existing inbox row first then acks
  unconditionally — broker needs the ack regardless of dedupe to
  release its claim lease.
  - apps/cli/src/daemon/inbound.ts (ackClientMessage in InboundContext)
  - apps/cli/src/daemon/broker.ts + session-broker.ts (sendClientAck)
  - apps/cli/src/daemon/run.ts (wire-up)

* Version bump 1.32.1 → 1.33.0; CHANGELOG entry replaces "Unreleased"
  with full m1 description.

Verification: tsc clean across cli + broker; CLI 83/83 unit tests
pass; broker 50 unit tests pass (5 integration test files require a
live Postgres and were skipped — pre-existing infra gap, not a
regression). CLI bundle rebuilt; version 1.33.0 baked.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 18:17:45 +01:00
Alejandro Gutiérrez
c036f759c3 Merge m1-cli-lifecycle-and-role-peer-list into main
Milestone 1 CLI side:
- New apps/cli/src/daemon/ws-lifecycle.ts: connectWsWithBackoff helper
- DaemonBrokerClient + SessionBrokerClient refactored to use the helper
- DaemonBrokerClient: stray sessionPubkey + getSessionKeys() removed
- daemon-WS onPush no longer carries session secret (member-only decrypt)
- IPC send paths now sign with mesh member secret
- peers.ts: filters role==='control-plane' by default; --all opts in;
  JSON output exposes role field

NOTE: a follow-up commit on main renames the wire-level field 'role'
to 'peerRole' to avoid collision with 1.31.5's profile.role lift.
2026-05-04 18:11:47 +01:00
Alejandro Gutiérrez
54e00109ab Merge m1-broker-drain-race-and-presence-role into main
Milestone 1 broker side:
- Schema: claimedAt + claimId + claimExpiresAt on message_queue,
  role on presence (default 'session')
- Migration 0029_drain_lease_and_presence_role.sql
- drainForMember rewritten for two-phase claim/deliver with 30s lease
- New markDelivered() called on receipt of client_ack
- New sweepExpiredClaims() running every 15s
- handleHello sets role='control-plane', handleSessionHello sets 'session'
- listPeersInMesh returns role
- WSClientAckMessage type added; broker accepts and dispatches client_ack
2026-05-04 18:11:47 +01:00
Alejandro Gutiérrez
16c148a87f docs(specs): m1 — agentic-comms architecture spec (v1 + v2 frozen)
v1: initial 3-layer architecture proposal, reviewed by Codex GPT-5.2 (high)
v2: full end-state with hybrid P2P data plane, broker as coordination
    plane only, 6 layers, 8 architectural milestones, Codex-2 corrections
    (at-least-once requires client_ack, service_pubkey explicit, meta
    required in v2 envelope, streamId required for stream channel,
    explicit revocation flow). v2 is frozen for implementation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 18:11:29 +01:00
Alejandro Gutiérrez
b57e47ed65 feat(broker): m1 — two-phase claim/deliver + client_ack + role-tagged presence
Three correctness fixes on top of the m1 schema migration:

1) Fix the drainForMember claim-then-push race
   ----------------------------------------------------------------
   Previously the claim CTE set delivered_at = NOW() *before* the WS
   send. If readyState !== OPEN at push time, the row was marked
   delivered and the message dropped silently — at-most-once with no
   retry hook.

   The new flow:
     - claim sets (claimed_at, claim_id, claim_expires_at = NOW()+30s)
     - delivered_at stays NULL until the recipient acks
     - re-eligibility predicate now also accepts rows whose lease
       expired, so dropped pushes redeliver (at-least-once)

   Adds two helpers:
     - markDelivered() — scoped to (mesh_id, recipient pubkey) so a
       peer can only ack its own messages
     - sweepExpiredClaims() — clears expired (claimed_at, claim_id,
       claim_expires_at) every 15s, wired into startSweepers

2) Accept `client_ack` from recipients
   ----------------------------------------------------------------
   New WS message type handled in the dispatcher right after `send`.
   Lookups by clientMessageId or brokerMessageId; either is fine. Until
   the daemon (apps/cli, separate worktree) starts emitting acks, leases
   will simply expire and re-deliver — which is the desired retry
   behaviour.

3) Tag presence rows with `role`
   ----------------------------------------------------------------
   handleHello (member-keyed, used by the long-lived daemon WS) →
     role: 'control-plane'
   handleSessionHello (per-Claude-Code session WS) →
     role: 'session'

   listPeersInMesh exposes the new field; the peers_list response
   surfaces it. WSPeersListMessage type adds an optional `role` plus the
   long-undocumented `memberPubkey`. CLI-side filter swap from peerType
   to role lands in a follow-up worktree — that's why the CLI is
   untouched here per the M1 spec.

Typechecks clean (apps/broker tsc --noEmit, packages/db tsc --noEmit).
Test suite needs a real DB so wasn't run in this worktree; existing
dup-delivery and broker tests use drainForMember positionally and the
new claimerPresenceId arg is optional, so they should continue to pass.
2026-05-04 18:10:25 +01:00
Alejandro Gutiérrez
5a8db796a0 feat(db): m1 — message_queue claim lease + presence.role columns
Schema groundwork for v2 agentic-comms milestone 1.

mesh.message_queue gets three nullable columns (claimed_at, claim_id,
claim_expires_at) so drainForMember can move from "claim-and-deliver in
one UPDATE" to a two-phase claim/lease + recipient-ack model. This is
the at-least-once retry hook the broker has been missing.

mesh.presence gets a typed `role` column ('control-plane' | 'session'
| 'service') with default 'session' so legacy hellos keep working. The
CLI's hidden-daemon hack (peerType === 'claudemesh-daemon') will swap
to a role-based filter in a follow-up worktree.

Migration is hand-authored as 0029_*.sql to match the existing pattern
(drizzle-kit's _journal.json drifted long ago — the runtime migrator
in apps/broker/src/migrate.ts tracks files lexicographically via
mesh.__cmh_migrations, not the journal).
2026-05-04 18:10:04 +01:00
Alejandro Gutiérrez
dab80f475e refactor(cli): m1 lifecycle + role-aware peer list
Foundational cleanups before agentic-comms architecture work
(.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md).
All behavior-preserving.

1. Extract `connectWsWithBackoff` into apps/cli/src/daemon/ws-lifecycle.ts.
   Both DaemonBrokerClient and SessionBrokerClient now share one
   lifecycle implementation (connect, hello-handshake, ack-timeout,
   close + backoff reconnect). Each client provides its own buildHello
   / isHelloAck / onMessage hooks and keeps its own RPC bookkeeping
   (pendingAcks, peerListResolvers, onPush). Composition over
   inheritance per Codex's review; no protocol shape changes.

2. Drop daemon-WS ephemeral session pubkey. DaemonBrokerClient no
   longer mints + sends a per-reconnect ephemeral keypair in its
   hello. Session-targeted DMs land on SessionBrokerClient since
   1.32.1, not the member-keyed daemon-WS, so the field was
   vestigial. Send-encrypt path now signs DMs with the stable mesh
   member secret. handleBrokerPush invocations from daemon-WS only
   pass the member secret — session decryption is the session-WS's
   job.

3. Role-aware peer list. `peer list` now hides peers whose
   broker-emitted `role` is `'control-plane'`. `--all` opts back in.
   JSON output emits `role` at top level. Older brokers that don't
   emit role yet default to 'session', so legacy peer rows stay
   visible without the broker-side change shipped first. Replaces
   the prior `peerType === 'claudemesh-daemon'` channel-name hack.

Typecheck + tests + build all green.
2026-05-04 18:08:32 +01:00
Alejandro Gutiérrez
a25102a79f fix(cli): 1.32.1 — DMs to session pubkeys finally land in inbox
SessionBrokerClient (daemon-side, since 1.30.0) was constructed
without a push handler and silently dropped every inbound `push` /
`inbound` frame. Header docstring claimed it handled "inbound DM
delivery for messages targeted at the session pubkey" but the
callback was never wired.

Net effect: any DM sent to a peer's session pubkey (everything
`peer list` returns now) was queued, broker-acked, marked
delivered_at on the broker, and thrown away by the recipient
daemon. inbox.db stayed at zero rows; `claudemesh inbox` reported
"no messages" no matter what arrived.

Two-session smoke surfaced this — sender outbox status=done with
broker_message_id, recipient inbox empty.

Fix: wire SessionBrokerClient to forward push/inbound frames to
the same handleBrokerPush the member-keyed broker already uses.
Pass the per-session secret key as sessionSecretKeyHex so
decryptOrFallback tries it first; member key remains the fallback
for legacy member-targeted traffic.

Verified end-to-end with two registered sessions sending in both
directions — inbox.db row count went 0 → 2.

Files: apps/cli/src/daemon/session-broker.ts,
apps/cli/src/daemon/run.ts. No broker change required.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 17:33:18 +01:00
16 changed files with 1936 additions and 360 deletions

View File

@@ -0,0 +1,506 @@
---
title: claudemesh — full end-state architecture for agentic peer communication
status: draft (v2 — supersedes v1: removes time-boxed phasing, adds P2P data plane, applies Codex-2 correctness/scope-gap edits)
target: end-state (architectural milestones, not version timelines)
author: Alejandro + Claude (Codex GPT-5.2 cross-checked twice)
date: 2026-05-04
supersedes: 2026-05-04-agentic-comms-architecture.md (v1)
references:
- 2026-05-02-architecture-north-star.md (CLI-first commitment, push-pipe)
- 2026-05-04-per-session-presence.md (per-launch session pubkey + attestation)
- apps/cli/CHANGELOG.md (1.30.01.32.1 history)
---
# claudemesh — agentic peer communication, full end-state
## What this document is
The end-state architecture for claudemesh as a transport-agnostic agentic peer-comms platform. Not a release plan, not a sprint roadmap — the **shape** the system needs to converge on. Implementation order at the end is a *suggestion*, not a contract; time estimates are deliberately omitted because the surface is too cross-cutting to phase by weeks.
v1 of this spec (same date, no `-v2` suffix) treated the broker as the sole data plane. v2 corrects that: **the broker is a coordination plane (signaling, discovery, offline queue, fan-out, registry, revocation); the data plane is hybrid P2P** with broker fallback for the cases P2P can't cover. Closer to how Tailscale, libp2p, LiveKit, and modern WebRTC stacks work in production.
## TL;DR
- **Identity** — three keypair types (member, session, service) all rooted in a member's secret key. Member is durable, session is per-launch, service is a member-scoped delegate for non-Claude integrations. Every service has its own pubkey and explicit revocation.
- **Coordination plane** — broker handles signaling, peer discovery, offline message queue, group/topic fan-out, mesh state authority, revocation gossip. Always reachable.
- **Data plane** — hybrid:
- **P2P first** (WebRTC data channels, future: QUIC) when both peers online + NAT-traversable.
- **Broker-relayed** when peers are NAT-blocked, when one peer is offline, or for group/topic/broadcast where fan-out at the broker is structurally cheaper than N-way sender-side fan-out.
- **Pure broker** for service identities that can't run a P2P stack (HTTP webhook senders, OpenAI Assistants, browser SDKs without WebRTC).
- **Channels** — typed envelope (dm, group, topic, rpc, system, stream). Channel type drives crypto, routing, and transport selection. `meta` is required in v2 envelope.
- **Transports** — pluggable adapters under one interface: WS-to-broker (today), WebRTC P2P, HTTP webhook, future LiveKit/QUIC/etc. Broker negotiates which adapter a peer pair uses.
- **Crypto** — every direct message is E2E encrypted to recipient's pubkey regardless of transport. Broker never sees plaintext. P2P doesn't get any extra trust just because it's direct.
- **Delivery** — at-least-once **requires receiver ack** before broker marks `delivered_at`. The retry path before that is best-effort with idempotent dedupe at the receiver.
The CLI-first commitment from the North Star spec stays intact. Every channel type and every transport is invocable from `claudemesh <verb>`. MCP serves only `claude/channel` mid-turn push.
---
## The forcing functions (why this shape, not a smaller one)
1. **Multi-session interconnect already broke** (1.30.0 → 1.32.1) because the per-session WS subsystem shipped without push handler. Symptom of "broker is the data plane and we keep bolting on" thinking. Need to formalize roles and transport adapters before the next bolt-on.
2. **Codex review surfaced a correctness bug** in `drainForMember` — claims `delivered_at = NOW()` *before* WS push succeeds; if `ws.readyState !== OPEN` the row is marked delivered and message is lost. At-most-once with no retry. Inherited by every channel/transport added unless fixed at the foundation.
3. **The agentic-comms domain has standardized on hybrid P2P + central coordinator.** Tailscale (control plane + WireGuard P2P), LiveKit (signaling + SFU + P2P data channels), libp2p (DHT discovery + multi-transport), Iroh (gossip + QUIC P2P). Pure-broker is a 2010s pattern; pure-P2P is academic. Hybrid is the norm.
4. **claudemesh's pricing/economics demand P2P.** Every byte through the broker is your cost. Voice transcripts, file transfers, real-time tool I/O — bandwidth-heavy. P2P data plane lets the broker scale linearly with peer count, not message volume.
5. **Privacy/sovereignty matters as the agent ecosystem grows.** "Your agents talk to my agents" should default to peer-to-peer paths when possible. Broker as relay is fine; broker as forced middleman is not.
---
## Audience for this architecture
| Peer type | Identity | Online presence | Data plane preference | Notes |
|---|---|---|---|---|
| **Claude Code session** | Per-launch session pubkey, member-attested | WS to broker (control + signaling) | P2P first, broker fallback | Mid-turn push via MCP `claude/channel` |
| **Daemon, no launch** (idle Mac with daemon running) | Member pubkey | WS to broker | Broker only (no P2P partner unless launched) | Receives broadcasts + member-targeted DMs |
| **Voice agent** (LiveKit, Pipecat) | Service identity, member-signed | LiveKit room + bridge | LiveKit room data channels intra-room; bridge over broker for cross-mesh | Side-car bridges room ↔ broker |
| **OpenAI Assistant / Anthropic Skill** | Service identity, scoped token | HTTP outbound, webhook inbound | Broker only (can't run P2P) | Daemon does delegated re-encryption |
| **Browser-based peer** (web dashboard, SDK) | Member or service identity | WS to broker, WebRTC for P2P | P2P-where-possible (browsers ARE WebRTC-native) | Full feature parity once on-mesh |
| **Webhook consumer** (Stripe-style passive) | Service identity | HTTP webhook inbound only | Broker only | Topic subscriptions; no inbound channel |
| **Bridge** (Slack, WhatsApp, IRC, Matrix) | Service identity per bridge + per-end-user delegated | WS to broker | Broker only for bridge ↔ broker; native protocol for bridge ↔ external | Trust delegated to bridge operator |
| **Cron / scheduled actor** | Member pubkey or service identity | Ephemeral; HTTP send only | Broker only | No long-lived connection |
| **CLI-only user** (no Claude Code) | Member pubkey | Ephemeral on each `claudemesh send` | Broker only | Command-line agent, queues via outbox |
Every row in this table works without changing the broker's coordination plane.
---
## Layer 1: Identity
Three keypair types, one auth model.
### Member identity (durable)
- Ed25519 keypair, generated at `claudemesh join <invite>`. Held in `~/.claudemesh/config.json` per mesh.
- The auth boundary — grants, kicks, bans operate on members.
- Used for hello signature on the daemon's control-plane WS.
- Used as cryptographic root of trust for sibling sessions and service identities.
### Session identity (ephemeral, per-launch)
- Ed25519 keypair generated by each `claudemesh launch`. Held in process memory only.
- Parent-signed attestation vouches for it (TTL 12h, broker cap 24h). Rotation = new launch.
- Used for hello signature on the per-session WS, and as routing key for DMs targeted at *this specific launched session*.
- Session secret never touches disk; lives only in the daemon's `sessionBrokers` map keyed by IPC token.
### Service identity (third type, additive)
For non-Claude integrations that can't or shouldn't use a per-launch session.
```
ServiceIdentity {
service_id // Stable string id ("openai-assistant-foo", "livekit-room-bar")
service_pubkey // Ed25519 pubkey — the cryptographic identity. crypto_box targets this.
member_id // The mesh member that owns this service (auth boundary)
service_type // "openai-assistant" | "livekit-room" | "webhook" | "voice-agent" | ...
scopes // ["dm:read", "topic:write", "rpc:invoke", ...]
attestation // member-signed: { service_id, service_pubkey, scopes, expires_at, signature }
transport_hint // "ws" | "http-webhook" | "sse" | "livekit" — informs how the broker reaches it
delegate_daemon_pubkey? // Optional. Set when the daemon holds the service's secret on its behalf.
}
```
Two flavors:
- **Holds-secret service** — has its own keypair (`service_pubkey` + service-secret kept by the service itself). Runs E2E crypto end-to-end. Voice agent side-cars, browser SDK, MQTT bridges.
- **Delegated service** — daemon holds the service-secret on the service's behalf. Senders still encrypt to `service_pubkey`; daemon decrypts on receipt and forwards plaintext (or re-signs) to the service via its `transport_hint`. Used by HTTP webhook consumers, OpenAI Assistants. Trust is in the daemon owner. `delegate_daemon_pubkey` records who's holding.
All three identity types resolve to a `member_id` for authorization. They differ in liveness (member = always; session = per-launch; service = scoped) and transport hint (member/session = WS-resident; service = polymorphic).
### Identity revocation (explicit)
Existing v1 left this implicit. v2 makes it concrete:
- **CLI verb:** `claudemesh service revoke <service_id>` (also `claudemesh peer revoke <pubkey>` for member revocation).
- **Broker effect:** add row to `revocation` table with `(mesh_id, revoked_pubkey, revoked_at, revoked_by, reason?)`. Drop any active WS for that pubkey (close 4002 "revoked"). Reject future helloes.
- **Drain effect:** `drainForMember` checks revocation list at drain time; ciphertext-in-flight from the revoked sender is dropped (sender already broker-acked, but recipient never sees it).
- **Gossip:** revocation events publish on the `system` channel (highest priority). Online peers cache; offline peers see on reconnect. Required so P2P sessions also honor revoke (otherwise a revoked peer's stored attestations could keep working over direct paths).
- **Latency target:** <30s for online peers to receive and apply.
- **Expiry vs revoke distinction:** `expires_at` is graceful (predictable, scheduled rotation); revoke is emergency (leaked secret, fired employee, compromised host). Both use the same revocation table; `expires_at` enforces silently when reached, revoke is logged as an audit event.
---
## Layer 2: Coordination plane (the broker, properly scoped)
The broker is **not** the data plane. Its real responsibilities:
1. **Mesh state authority** — member roster, group memberships, topic registry, service registrations, revocation list. Source of truth for who's in a mesh and what they can do.
2. **Peer discovery**`list_peers` returns currently-online presences. Broker is the only system that knows which peers are reachable now and over which transports.
3. **Signaling for P2P upgrades** — when peer A wants to open a P2P connection to peer B, A sends a SDP offer through the broker; B responds with an SDP answer through the broker. Once the data channel is up, broker is out of the path. Same as WebRTC signaling.
4. **Offline message queue** — when recipient is offline, broker stores the (encrypted) message until they reconnect. P2P can't do this without an "always-on peer" model, which is awkward to bootstrap.
5. **Group / topic / broadcast fan-out** — broker is the cheap fan-out point. Sender publishes once; broker delivers to N recipients. P2P fan-out (gossipsub) is possible but adds significant complexity for a feature most meshes won't need at scale.
6. **TURN-style relay for NAT-blocked pairs** — when P2P negotiation fails (symmetric NAT, restrictive corporate firewall), broker carries the data. Functionally equivalent to TURN.
7. **Revocation gossip publisher** — broker pushes revocation events to all online peers via the `system` channel; peers cache them.
8. **Audit log + persistence layer** — encrypted message metadata for compliance. Bodies are E2E-encrypted, so audit is over (sender, recipient, channel, timestamp, size), not content.
The broker is **NOT**:
- The default path for online-online direct messages (P2P should win).
- The decryptor for any direct message (E2E means broker sees ciphertext only).
- A bottleneck on bulk data (file transfer, voice, screen share — these go P2P or fail).
- The sole identity authority for active sessions (P2P sessions verify attestations locally via cached mesh state).
### Two roles per mesh on the WS layer (Codex-1 correction, kept)
Within the broker's WS surface, the daemon holds two roles per mesh, not one connection per launch:
- **Control-plane connection** — one per mesh, member-keyed. Carries: signaling + outbox drain + RPCs + broadcast/member-targeted inbound + revocation gossip subscription.
- **Session connections** — N per mesh, session-keyed. Carries: presence row keyed on session pubkey + signaling for P2P upgrades involving this session + inbound for session-targeted DMs that arrive via broker fallback.
A peer who's purely on the broker (no P2P) functions exactly as today. A peer who upgrades to P2P with another peer keeps its broker WS for the other roles.
---
## Layer 3: Data plane (hybrid P2P + broker fallback)
The data plane is what carries actual message bodies. Three modes, selected per (sender, recipient, channel) tuple:
### Mode 1: Direct P2P (preferred when possible)
Two peers run a WebRTC data channel (or QUIC stream — pluggable, see Layer 4) between their daemons. Established via signaling through the broker; once up, broker is out of the path.
**When P2P is selected:**
- Both peers are online (have an active broker WS).
- Both peers' transports advertise P2P capability (WebRTC available; not a webhook-only service identity; not a browser without `RTCPeerConnection`).
- ICE negotiation succeeds (at least one candidate pair works — direct, server-reflexive, or peer-reflexive).
- Channel type is `dm`, `rpc`, or `stream` (the 1:1 cases).
**P2P session lifecycle:**
- Established lazily on first message (warm-up cost ~200ms; dominated by ICE + DTLS handshake). Subsequent messages reuse the channel.
- Idle timeout: 5min of no traffic → tear down. Re-established on next message.
- Hard timeout: 1h max regardless of activity, then re-handshake. Limits damage of compromised session keys.
- Either side can demote to broker-relay at any time; broker is the fallback always.
**Crypto on P2P:**
- DTLS handshake provides transport encryption (forward secrecy; recipient pubkey verified via cached attestation chain).
- Application-layer crypto_box ALSO runs on top — same as broker-relayed messages — so the wire format and decryption path are identical on the receiver side. Defense in depth, no special-case code.
### Mode 2: Broker-relayed (fallback)
The current path. Sender encrypts to recipient pubkey (member or session or service), pushes to broker via WS, broker queues, recipient pulls (or broker pushes to recipient's WS).
**When broker-relay is selected:**
- One peer offline → broker queues, delivers on reconnect.
- ICE negotiation fails → broker becomes the relay.
- Channel type is `group`, `topic`, or `broadcast` → broker fan-out is structurally cheaper than P2P fan-out for any group >2.
- Service identity at either end can't run P2P → broker is the only path.
**Crypto:** unchanged from today — E2E crypto_box, broker sees ciphertext only.
### Mode 3: Direct webhook (broker as broker, not as relay)
For service identities advertising `transport_hint: "http-webhook"`. Sender encrypts to service's `service_pubkey` (or to delegate-daemon's pubkey for delegated services), broker POSTs the ciphertext to the service's registered URL with HMAC signature + retry. No long-lived connection on the service side.
This is functionally a "broker queue, custom delivery transport" — broker still mediates, but delivery is HTTP not WS.
### Selection logic (deterministic, sender-side)
```
function pickTransport(sender, recipient, channel) -> Transport:
if channel in [group, topic, broadcast]:
return broker.relay # fan-out semantics
if recipient.transport_hint == "http-webhook":
return broker.relay # broker calls webhook
if recipient is offline:
return broker.queue # store-and-forward
if !recipient.capabilities.p2p:
return broker.relay # one-end can't P2P
if !sender.capabilities.p2p:
return broker.relay # we can't P2P
if has_active_p2p_session(sender, recipient):
return p2p.session # warm path
attempt_p2p_handshake(sender, recipient, timeout=2s) ->
if ok: return p2p.session
else: return broker.relay # fall through, log degraded
```
Policy lives in the daemon's send path. Broker doesn't know or care — it sees only the messages that actually go through it.
---
## Layer 4: Transport adapters (pluggable)
A transport adapter is an implementation of how *one peer pair* moves bytes. Defined by an interface; new adapters added without touching upper layers.
```typescript
interface PeerTransport {
readonly kind: string; // "ws-broker" | "webrtc-p2p" | "http-webhook" | ...
readonly capabilities: {
p2p: boolean;
bidirectional: boolean;
midTurnPush: boolean;
maxMessageBytes: number;
streamingChunks: boolean;
};
open(opts: TransportOpenOpts): Promise<TransportSession>;
send(envelope: Envelope): Promise<TransportSendResult>;
inbound(): AsyncIterable<Envelope>;
heartbeat(): Promise<boolean>;
close(reason?: string): Promise<void>;
}
```
### Concrete adapters at end-state
1. **`WsBrokerTransport`** — current code. WebSocket to `wss://ic.claudemesh.com/ws`. Underpins both broker-relay (Mode 2) and signaling for P2P upgrades.
2. **`WebRtcP2pTransport`** — RTCPeerConnection + RTCDataChannel. Browser, Node (`node-datachannel` or similar), CLI all supported. Chunking handled at envelope layer for `stream` channel.
3. **`HttpWebhookTransport`** — outbound HTTP POST to broker `/v1/send`; inbound HTTP POST to a registered webhook URL. Unidirectional from peer's perspective. Mid-turn push: no.
4. **`LiveKitRoomTransport`** — for voice agents. Side-car bridges a LiveKit room to claudemesh. Maps a LiveKit participant → claudemesh service identity.
Future adapters TBD as concrete needs surface — no commitments here. (v1 listed MQTT/gRPC/SSE as future named adapters; v2 drops the named list per Codex-2 should-cut feedback.)
The peer's daemon advertises transport capabilities at hello time; broker stores them in the presence row; senders consult them via `list_peers` (capability fields added to the response).
---
## Layer 5: Channels (typed envelope)
Channels define **semantics**: what the message means, what crypto to apply, what delivery guarantees, what fan-out, what backpressure.
```typescript
type ChannelType =
| "dm" // 1:1 direct, encrypted to recipient pubkey, at-least-once with ack
| "group" // post to named group, per-recipient encrypt or symmetric, at-least-once with ack
| "topic" // pub/sub topic, persisted history, per-topic symmetric key, at-least-once with ack
| "rpc" // request/response with correlation id + timeout, exactly-once via dedupe
| "system" // peer_joined / peer_left / topology / lifecycle / revocation (broker-originated)
| "stream"; // long-lived ordered chunks, idempotent per (stream_id, chunk_id)
interface Envelope {
v: 2;
channel: ChannelType;
/** Routing target — meaning depends on channel:
* dm: recipient pubkey (member, session, or service)
* group: group name (e.g. "@admins")
* topic: topic id (e.g. "#abc123")
* rpc: recipient pubkey
* system: ignored (sender-determined fan-out; broker fills in)
* stream: recipient pubkey (the stream_id is in meta.streamId — see below) */
target: string;
/** Sender identity pubkey (member, session, or service). */
from: string;
/** Encrypted payload. Channel + recipient determines crypto recipe:
* dm/rpc/stream: crypto_box to recipient pubkey
* group: per-recipient seal (or symmetric in v3)
* topic: per-topic symmetric key (v0.2.0 spec)
* system: broker-signed, plaintext metadata (event has no body) */
body: { nonce: string; ciphertext: string; bodyVersion: number };
/** Required in v2 (was optional in v1). Even minimal envelopes must carry
* clientMessageId for idempotent dedupe. */
meta: {
clientMessageId: string; // REQUIRED — idempotency id (spec §4.2)
requestFingerprint?: string;
priority?: "now" | "next" | "low"; // dm: gates mid-turn push; group/topic: fan-out priority
timeoutMs?: number; // rpc only
streamId?: string; // REQUIRED for channel:"stream"; identifies the stream
streamChunkId?: number; // stream only; monotonic; receiver dedupes
streamTerminator?: boolean; // stream only; signals end
rpcCorrelationId?: string; // rpc only; back-edge for response
rpcResponse?: boolean; // rpc only; this is a response, not request
replyToId?: string; // dm/topic threading
mentions?: string[]; // dm/topic; @-callouts
expiresAt?: number; // any; broker drops past this; default 7d for queued
};
/** Sender Ed25519 signature over canonical bytes. Verified by recipient
* (and by broker for system-message origin). */
signature: string;
}
```
### Stream concurrency
For `channel: "stream"`, **`meta.streamId` is required**. Two concurrent streams to the same recipient pubkey use distinct streamIds; receiver demuxes by `(from, streamId)`. Without this, multi-stream voice transcripts or file transfers from the same peer would collide.
### Crypto by channel
- `dm`, `rpc`, `stream` → crypto_box(plaintext, recipient_pubkey, sender_secretkey). Receiver verifies attestation chain to ensure recipient_pubkey is a valid identity rooted in a current member.
- `group` → for now: per-recipient crypto_box (sender encrypts N times, broker fans out). Future: hybrid Curve25519 → AES-GCM with sender key wrap, like Signal Sender Keys.
- `topic` → per-topic symmetric key (already in v0.2.0 spec). Key rotation = new topic + members re-subscribe. Keys distributed via DM at join time, encrypted to each member's pubkey.
- `system` → broker is the signer; receivers verify against the broker's published Ed25519 pubkey. Plaintext bodies allowed since these are operational events.
### Delivery semantics (Codex-2 correction applied)
**At-least-once requires receiver ack.** Today's broker sets `delivered_at = NOW()` inside the claim CTE before WS push succeeds — that's at-most-once with no retry. The end-state behavior:
1. Sender's daemon writes to outbox (durable).
2. Drain worker sends to broker; broker acks with `client_message_id` echo (this is sender → broker delivery ack, NOT end-to-end).
3. Broker queues with `claimed_at` NULL, `delivered_at` NULL.
4. On recipient hello / push opportunity: broker claims by setting `claimed_at = NOW(), claim_id = <presenceId>` (lease 30s).
5. Broker `sendToPeer` writes to WS / P2P / webhook.
6. Receiver processes envelope and emits `client_ack { clientMessageId }` back to broker.
7. Broker sets `delivered_at = NOW()` ON ACK RECEIPT.
8. If lease expires without ack → broker re-eligible to claim and re-deliver.
9. Receiver dedupes by `clientMessageId` (idempotent insert into inbox).
Until ack is wired (transitional state), the transitional label is **best-effort retry with idempotent dedupe**, not at-least-once. The outbox + claim/lease + dedupe combination upgrades to at-least-once when the ack path is in place.
`rpc` exactly-once is the same path with the addition that the response carries the `rpcCorrelationId`; sender retries the request until response received OR `timeoutMs` elapses; receiver-side dedupe ensures the handler runs at most once.
### Mid-turn push
`channel: "dm"` with `meta.priority: "now"` and recipient is a launched Claude Code session → recipient's daemon emits `claude/channel` MCP push; the session's Claude Code reads it mid-turn. Other priorities deliver via `claudemesh inbox` poll or at next tool boundary.
### Reply threading + mentions
Uniform across `dm` and `topic`: `meta.replyToId` references the original message's `clientMessageId`. `meta.mentions` is an array of pubkeys (or `@<group>`) — UI/CLI surfaces them; broker doesn't enforce.
---
## Layer 6: Mesh state — broker authority + signed gossip
The mesh state (members, groups, topics, services, revocations, policies) needs both:
- **Authority** — single source of truth. The broker DB. Mutations (add member, revoke, change policy) go through broker, signed by mesh owner / admin.
- **Replication** — every peer needs a current-enough copy to authorize incoming P2P messages locally (otherwise revoke can't be enforced when peers chat directly).
End-state: broker publishes signed mesh-state-update events on the `system` channel; peers cache and apply. Conflict resolution is trivial because broker is authority — peers merge updates by version vector. Eventually consistent in seconds, not the open-ended convergence of CRDT-only systems.
For peer revocation specifically: revocation gossip is highest priority and must propagate within 30s to all online peers. Offline peers see it on reconnect.
---
## Crypto — what doesn't change vs what does
### Doesn't change
- Per-peer Ed25519 keypairs (member + session + service).
- crypto_box (Curve25519 + XSalsa20 + Poly1305) for DMs/RPC/stream.
- Parent-attestation flow for sessions and services.
### Does change (additive)
- DTLS layer underneath WebRTC P2P (transport-level encryption for fingerprint binding).
- Per-topic symmetric keys (v0.2.0 baseline; v2 makes it a hard requirement for topics).
- Broker signing key for `system` channel events (single Ed25519 keypair the broker holds; pubkey published in mesh state).
- Service identity attestations carry `service_pubkey` + `scopes`.
- Forward-secrecy for long-lived P2P sessions: post-handshake, derive a fresh symmetric key per session epoch (1h max); rotate.
---
## Migration order (architectural milestones, NO time estimates)
The end-state above doesn't ship in one PR. The following ordering minimizes regression risk and lets each milestone be useful on its own. **No weeks/sprints attached** — work proceeds when the prior milestone is stable.
### Milestone 1 — Foundational correctness
*Required before anything else. Without this, every later milestone inherits the bugs.*
- Extract `connectWsWithBackoff` helper. Refactor `DaemonBrokerClient` and `SessionBrokerClient` to use it. Eliminates the drift bug class.
- Drop daemon's stray `sessionPubkey` field (or rename + document).
- Tighten daemon-WS inbound filter — `*` broadcasts and member-targeted DMs only; session-targeted DMs land on session WS exclusively.
- Add `presence.role` column at broker (`control-plane | session | service`); list_peers + fan-out + reconnect honor it.
- **Fix broker drain race** — schema migration adds `claimed_at`, `claim_id`, `claim_expires_at` columns. Rewrite `drainForMember` for two-phase claim/deliver. Re-claim if `claimed_at` older than lease (30s).
- Receiver-side `client_ack` for at-least-once with ack (Codex-2 correction). Without ack wiring this stays at "best-effort retry with idempotent dedupe."
- Receiver-side dedupe: idempotent insert on `clientMessageId`; finished + made required for v2 envelopes.
### Milestone 2 — Capability advertisement + transport abstraction
*Sets up the interface. No new transport yet.*
- Define `PeerTransport` interface; refactor existing WS code to be the first implementation. No behavioral change.
- Add capabilities field to hello payload + presence row + `list_peers` response.
- Define `Envelope v2` schema with `meta` required + `streamId` requirement on `stream` channel. Broker accepts both v1 and v2 (v1 auto-upgraded server-side by inferring `channel` from `targetSpec` shape). Senders start emitting v2.
### Milestone 3 — Service identity + HTTP webhook transport
*First non-WS transport. Validates abstraction. Includes revocation.*
- Service identity registration: `claudemesh service register --type webhook --pubkey <hex> --scopes ...` mints attestation, stores broker-side. Service pubkey explicit in attestation.
- Service revocation: `claudemesh service revoke <service_id>` writes broker denylist + closes any active connections + publishes `system` revocation event.
- Add `HttpWebhookTransport` (broker-side outbound: POST with HMAC + retry; daemon-side inbound: HTTP server receives webhook callbacks → handleBrokerPush).
- Add `/v1/send` HTTP POST endpoint on broker (today broker is WS-only for sends).
- Demo: cron job using only `curl` posts to mesh; webhook subscriber receives.
- (`SseTransport` deferred — Codex-2 should-cut feedback. Pull in when concrete browser need arises.)
### Milestone 4 — Typed channels: rpc, stream, system
*Channel layer becomes real.*
- `channel: "rpc"` end-to-end: correlation id routing through any transport, response timeout, `claudemesh rpc <peer> <method> <args>` CLI verb.
- `channel: "stream"` end-to-end: chunked + ordered + idempotent, multi-stream demux via `meta.streamId`, `claudemesh stream <peer> <stream-id>` CLI verb.
- `channel: "system"` formalized (broker-signed events for peer_joined, peer_left, topology, revocation, mesh-state-updates).
### Milestone 5 — P2P data plane (WebRTC adapter)
*The big architectural shift. Broker becomes coordinator, not data path.*
- Add `WebRtcP2pTransport` adapter. Uses `node-datachannel` (or libdatachannel binding) on Node; native WebRTC in browser.
- Add signaling protocol over the existing broker WS:
- `p2p_offer` (sender → broker → recipient): SDP offer + ICE candidates.
- `p2p_answer` (recipient → broker → sender): SDP answer + ICE candidates.
- `p2p_candidate` (either way): trickle ICE candidates.
- All signaling messages are broker-attested (only valid sender/recipient pairs).
- Add `pickTransport()` policy in daemon send path.
- Add P2P session manager: warm-cache, idle timeout, hard timeout, demote-to-broker on failure.
- Tag broker-relayed messages that *could have* gone P2P with a metric, so degradation rate is observable.
### Milestone 6 — Mesh state replication + revocation gossip
*Required before P2P is safe at scale.*
- Broker publishes signed `system` events for all mesh state mutations.
- Peers subscribe; cache and apply.
- Revocation propagation latency target: <30s for online peers.
- P2P sessions verify peer identity against cached state on every message (cheap, just a map lookup).
### Milestone 7 — External integrations (proof points, parallel)
*One PoC per category to validate the architecture, opportunistically.*
- LiveKit side-car (validates LiveKit room transport).
- OpenAI Assistant (validates delegated-key crypto + webhook transport).
- WhatsApp / Slack bridge (validates human-bridge service identity).
- Browser SDK (validates browser as a peer; uses WebRTC adapter natively).
### Milestone 8 — Group/topic crypto upgrade
*Group fan-out crypto efficiency.*
- Sender Keys protocol for group: sender derives group key, encrypts content once, encrypts group key per-recipient. Avoids N-way encryption per message.
- Per-topic key rotation policy (member join → optional re-key; member leave → forced re-key).
### Beyond Milestone 8
- Future transport adapters as concrete needs surface (no commitments).
- Multi-broker federation (mesh spans multiple brokers; gossip across).
- Onion routing option for adversarial environments.
---
## Non-goals (explicit)
- **Replacing Slack / Discord / Matrix as a human chat product.** claudemesh is for agent coordination; humans participate via bridges or direct DMs but UX is CLI-first.
- **Pure-P2P with no central coordinator.** The broker stays — for offline queue, group fan-out, mesh authority, revocation. "P2P-first hybrid" is the commitment, not "P2P-only."
- **Replacing the MCP `claude/channel` push-pipe.** Mid-turn interrupt stays MCP. The data-plane changes don't touch the daemon-to-Claude-Code path.
- **Real-time media (audio/video) directly in claudemesh data channels.** Bandwidth-heavy media goes through dedicated stacks (LiveKit, WebRTC SFU). claudemesh metadata + signaling glues them.
---
## Open questions
1. **Mid-turn push when sender is on P2P session.** P2P delivery to recipient's daemon → daemon emits MCP push. Same shape as broker-delivered. Confirm the MCP push respects per-session targeting (different session pubkey siblings of the same member).
2. **Browser peers and NAT traversal.** Browser ↔ browser via WebRTC works. Browser ↔ daemon (Node WebRTC binding) — needs testing under symmetric NAT. May require running a STUN server (Google's for now; eventually self-hosted). TURN fallback uses the broker WS.
3. **Backpressure on stream channel.** WebRTC data channels have built-in flow control. Broker-relayed streams need per-stream backpressure signaling to avoid OOM at the broker. Proposal: receiver advertises `stream_window_bytes` periodically; sender pauses when used.
4. **Multi-region brokers.** Today single broker. If we add a second broker (or federation), how do peers in mesh A on broker 1 talk to peers in mesh A on broker 2? Out of scope here; separate spec when forced.
---
## Acknowledgements
**Codex-1 (initial architecture review of existing code) caught:**
- "Remove daemon-WS inbound entirely" idea silently loses broadcasts + member-targeted DMs whenever zero launches exist. Corrected → retained.
- Inheritance for the dup'd lifecycle would become a god class. Composition via helper kept.
- Drain race needs `claimed_at` + delivered-on-success; "check OPEN before claim" still drops on crash. Kept.
- Token-keyed registry is correct (token = auth boundary), not a smell. Kept.
**Codex-2 (single-pass review of v1 of this spec) caught:**
- At-least-once requires receiver ack, not just "set delivered_at on success." → Layer 5 delivery semantics rewritten to require client_ack.
- Service identity needs explicit `service_pubkey` field, included in attestation. → Added to ServiceIdentity definition.
- v2 envelope `meta` should be non-optional with `clientMessageId` always present. → meta is now required.
- Service identity needed explicit revocation/disable story. → New CLI verb `claudemesh service revoke`, broker denylist, system-channel gossip propagation.
- `streamId` location ambiguous; concurrent streams to same peer would collide. → `meta.streamId` made REQUIRED for `channel: "stream"`.
- Defer `SseTransport` from Milestone 3. → Done.
- Drop named future-adapter list (MQTT/gRPC) to avoid false commitments. → Done.
The hybrid P2P data plane, transport adapter abstraction, typed channel envelope, mesh state replication, and milestone reordering are mine. Codex's reviews were targeted at correctness/scope-gap/should-cut, not redesign.
**This spec is now frozen for implementation.** No further architectural drift; deviations during implementation surface as new spec-deltas with explicit rationale, not silent edits to this document.

View File

@@ -0,0 +1,360 @@
---
title: claudemesh as agentic communication platform — architecture spec
status: draft
target: 2.0.0 (foundational cleanup) → 2.1.0 (transport adapters) → 2.2.0 (channel typing)
author: Alejandro + Claude (cross-checked with Codex GPT-5.2)
date: 2026-05-04
supersedes: none
references:
- 2026-05-02-architecture-north-star.md (CLI-first commitment, push-pipe)
- 2026-05-04-per-session-presence.md (per-launch session pubkey + attestation)
- apps/cli/CHANGELOG.md (1.30.01.32.1 history)
---
# claudemesh as agentic communication platform
## TL;DR
Today claudemesh is a **peer mesh for Claude Code sessions** — broker + CLI + per-session WS, encrypted DMs, peer list, mid-turn push via MCP. Tomorrow it has to be a **transport-agnostic agentic communication platform** that:
- treats Claude Code as **one channel type** among many (with first-class support for mid-turn interrupts via `claude/channel`)
- accepts **non-Claude agents** as peers — voice agents (LiveKit/Pipecat), OpenAI Assistants, raw HTTP webhook consumers, scheduled cron actors, human IM bridges
- exposes **typed channels** (DM, group, topic, RPC, system event, stream) so message semantics aren't shoved through one `targetSpec` string
- has a **pluggable transport layer** so a peer can join the mesh over WS, HTTP webhook, SSE, MQTT, or gRPC without changing the broker's data plane
- preserves **end-to-end encryption** as a non-negotiable for direct messages
This document specifies the architecture in three layers (identity, transport, channel), the foundational cleanup needed before adding any of it (Codex caught a few sharp issues), and the migration path that gets us there without a "v2 rewrite" event.
The CLI-first commitment from the North Star spec stays intact — every channel type and transport adapter must be invocable from `claudemesh <verb>` first, with MCP serving only `claude/channel` push.
---
## Why now
Three forcing functions:
1. **Multi-session interconnect already broke** (1.30.0 → 1.32.1). The per-session WS subsystem shipped without a push handler because the architecture assumed "one daemon WS per mesh handles everything" and then we bolted session WSes on top without finishing the inbound side. The shape is right; the wiring was incomplete. We need to formalize the role split before adding more transports.
2. **Codex review surfaced a correctness bug in the broker's drain.** `drainForMember` claims rows by setting `delivered_at = NOW()` *before* the WS push succeeds. If `ws.readyState !== OPEN` at push time, the row is marked delivered and the message is gone. This is at-most-once with no retry. Any future channel type or transport adapter inherits this bug if we don't fix it at the foundation.
3. **The agentic-comms market is becoming a thing.** Voice agents (LiveKit, Pipecat, ElevenLabs Conversational), OpenAI Assistants threads, MCP servers acting as autonomous workers, scheduled cron actors — they all need a "mesh" to coordinate. claudemesh has the right primitives (E2E crypto, peer presence, typed routing); it just needs the architecture to admit non-Claude peers without forking the codebase.
---
## Audience for this architecture
| Peer type | Identity | Transport | Channels they speak |
|---|---|---|---|
| **Claude Code session** (today) | Per-launch session pubkey, parent-attested by member key | WS to broker | DM, group, topic, system events; receives mid-turn push via MCP `claude/channel` |
| **Headless agent** (e.g. cron job, Hermes/OpenClaw worker) | Member pubkey (no per-launch session) | WS to broker, OR HTTP webhook outbound | DM, group, topic; no mid-turn push (polls inbox) |
| **Voice agent** (LiveKit/Pipecat call) | Service identity (signed by mesh owner) | WS to broker, possibly via TURN relay | DM (transcript stream), group (call participants), system events (call lifecycle) |
| **OpenAI Assistant / Anthropic Agent** (Skill SDK) | Service identity, OAuth-style scoped token | HTTP webhook (server-side push) OR WS | DM, RPC (tool-style request/response) |
| **Human via Slack/WhatsApp bridge** | Service identity for the bridge, end-user mapped via membership | WS (bridge to broker) | DM, topic |
| **Webhook consumer** (Stripe-style passive listener) | Service identity, scoped to one channel | HTTP webhook outbound only | Topic (subscribe to events) |
Every row in this table needs to work without changing the broker's data plane.
---
## Layer 1: Identity
### Today
Two identity types coexist:
- **Member identity** — stable Ed25519 keypair held in `~/.claudemesh/config.json`. One per joined mesh. Used for hello signature on the daemon's main WS; used as the cryptographic root of trust for sibling sessions.
- **Session identity** — ephemeral Ed25519 keypair generated per `claudemesh launch`. Parent-signed attestation vouches for it (TTL 12h, broker cap 24h). Used for hello signature on the per-session WS; used as the routing key for DMs targeted at *this specific launched session*.
This is enough for Claude Code peers. It's not enough for the audience table above.
### Proposed: third identity type — **service identity**
A service identity is what a non-Claude integration uses to authenticate:
```
ServiceIdentity {
member_id // The mesh member that owns this service (auth boundary)
service_id // Stable id for the service ("openai-assistant-foo", "livekit-room-bar")
service_type // "openai-assistant" | "livekit-room" | "webhook" | "voice-agent" | ...
scopes // ["dm:read", "topic:write", "rpc:invoke", ...]
attestation // member-signed: { service_id, scopes, expires_at, signature }
transport_hint // "ws" | "http-webhook" | "sse" — informs how the broker reaches it
}
```
**Three identity types, one auth model:**
- All identities resolve to a `member_id` (the auth boundary — grants, kicks, bans operate on members).
- Identities differ in *liveness* (member = always; session = per-launch; service = scoped/scheduled) and in *transport hint* (member/session = WS-resident; service = polymorphic).
**Backward compatibility:** existing member + session identities are unchanged. Service identity is additive.
### Cryptographic implications
- E2E encryption (`crypto_box`) targets a public key. Member pubkey, session pubkey, service pubkey all work the same way.
- A service that can't hold a long-lived secret (e.g. OpenAI Assistant calling out via HTTPS) gets a **delegated identity** the daemon holds — sender encrypts to the daemon's per-member key, daemon re-encrypts and forwards over the service's webhook. This adds trust in the daemon, but it's the only way to bridge to non-crypto-native peers without giving them raw secrets.
---
## Layer 2: Transport
### Today
One transport: **WebSocket to broker** (`wss://ic.claudemesh.com/ws`). Everything goes through it — hello, send, push, RPC. The CLI's daemon holds two WS instances per mesh (member-keyed `DaemonBrokerClient` + per-launch `SessionBrokerClient`).
### Proposed: transport adapter interface
```typescript
interface BrokerTransport {
/** One-time hello + auth handshake. Identity is opaque to the transport. */
connect(opts: TransportConnectOpts): Promise<TransportSession>;
/** Send a typed envelope. Returns a delivery promise (ack or terminal failure). */
send(envelope: Envelope): Promise<SendResult>;
/** Stream of inbound envelopes. Pull-model so a transport can be a webhook,
* not just a long-lived socket. */
inbound(): AsyncIterable<Envelope>;
/** Close cleanly. */
close(reason?: string): Promise<void>;
/** Capabilities surfaced to the daemon — broker uses this to decide
* whether mid-turn push is possible, whether RPC blocks are
* supported, etc. */
capabilities: TransportCapabilities;
}
```
**Concrete adapters at v2.1.0:**
1. **`WsBrokerTransport`** — current WS implementation. The `DaemonBrokerClient` and `SessionBrokerClient` are recast as two roles using this transport with different hello payloads.
2. **`HttpWebhookTransport`** — for service identities that can't hold a WS open. Outbound: HTTP POST to the broker's `/v1/send`. Inbound: broker calls back to a registered webhook URL with retry + signature. Mid-turn push is not possible (degrades gracefully).
3. **`SseTransport`** — for browsers / restricted environments. Outbound: HTTP POST. Inbound: SSE stream from broker to client.
**Future adapters (v2.3+):**
4. **`LiveKitTransport`** — for voice agents. The "broker" is a LiveKit room; messages are LiveKit data-channel packets. Bridges to the central broker via a daemon side-car.
5. **`MqttTransport`** — for IoT / fleet scenarios.
6. **`GrpcTransport`** — for low-latency intra-cluster.
Any new adapter implements the same interface; broker logic is transport-agnostic at the API boundary.
### The two-role model (Codex's correction)
Even within one transport, the daemon holds **two roles per mesh**, not one connection per launch:
- **Control-plane connection** — one per mesh, member-keyed. Carries: outbox drain (one queue, can't race), `list_peers`/state/memory/skill RPCs, inbound for `*` broadcasts and member-targeted DMs (legacy traffic + zero-launch state).
- **Session connections** — N per mesh, session-keyed. Carries: presence row keyed on session pubkey, inbound for session-targeted DMs.
This is what we have today; the spec just makes the role split explicit. The mistake in 1.30.01.32.0 was treating session connections as "presence-only" instead of "second-class peers." 1.32.1 corrects that.
### Foundational cleanup (ship first, before any new transport)
1. **Extract `connectWsWithBackoff` helper** — current `DaemonBrokerClient` and `SessionBrokerClient` duplicate the WS lifecycle (open, hello, ack-timeout, close, backoff, reconnect). Codex's recommendation: composition, not inheritance. A single helper takes `{ url, buildHello, onMessage, onStatusChange }` and both clients call it. Eliminates the drift bug class that produced session_replaced thrashing.
2. **Drop the daemon's stray `sessionPubkey`** (`apps/cli/src/daemon/broker.ts:113`). It's a leftover from the era when the daemon WS was the only WS. The session role now owns session pubkeys. If we want the daemon itself to be addressable by a stable pubkey, rename it `daemonPubkey` and document it; today it's dead ballast.
3. **Tighten daemon-WS inbound filter, don't remove it** (Codex's correction to my prior take). Daemon WS should still receive `*` broadcasts and member-targeted DMs (legacy senders, zero-launch state). It should NOT decrypt session-targeted DMs (that's the session WS's job, and decryption requires the session secret which the daemon WS doesn't have anyway).
4. **Fix the broker drain race** (`apps/broker/src/broker.ts:2399-2402`). Add `claimed_at` + `claim_id` columns; claim sets `claimed_at = NOW()` (NOT `delivered_at`); push runs; `delivered_at = NOW()` is set ONLY after `ws.send` succeeds. Re-eligible if `claimed_at` is older than the lease timeout (e.g. 30s). Combined with `client_message_id` dedupe on the receiver side, this gives at-least-once semantics, which is what an agentic comms platform needs.
5. **Decouple presence-WS-role from session-WS-role at the broker.** Today `connectPresence` is called from both `handleHello` and `handleSessionHello`. The two paths diverge in identity (member vs session pubkey) and dedup key (sessionId in both cases). Make the role explicit on the presence row (`role: "control-plane" | "session" | "service"`) so list_peers, fan-out, and reconnect can reason about it. Hidden `claudemesh-daemon` rows in 1.32.0's `peer list` are a hack covering for missing typing.
---
## Layer 3: Channels
### Today
One channel type: **direct messages with target-spec routing**. `targetSpec` is a string that the broker pattern-matches:
- `<64-hex-pubkey>` → DM to that member or session
- `*` → broadcast to mesh
- `@<groupname>` → group post
- `#<topicId>` → topic post
This works but it's overloaded — the same `send` verb covers DMs, broadcasts, groups, topics, and (since v0.9) tagged messages. As we add agentic peers, the semantics matter and the routing key string can't carry them.
### Proposed: typed channel envelope
```typescript
type ChannelType =
| "dm" // 1:1 message, encrypted to recipient pubkey
| "group" // post to named group, encrypted per-recipient (today: base64 plaintext)
| "topic" // pub/sub topic, persisted, history available, per-topic symmetric key
| "rpc" // request/response, correlation id, timeout, structured result
| "system" // peer_joined / peer_left / topology / lifecycle events
| "stream"; // long-lived data stream (voice transcript, log tail, file transfer chunks)
interface Envelope {
/** Schema version. v1 = current opaque shape. v2 = this typed shape. */
v: 2;
/** What semantics the receiver should apply. */
channel: ChannelType;
/** Target — pubkey for dm, group name for group, topic id for topic, etc.
* Same wire format as today's targetSpec, but typed. */
target: string;
/** Sender identity (member, session, or service pubkey). */
from: string;
/** Encrypted payload + crypto envelope. Channel type drives crypto:
* - dm: crypto_box to recipient pubkey
* - group: per-recipient seal (today: plaintext)
* - topic: symmetric key (today: plaintext, v0.2.0+ adds per-topic key)
* - rpc / system / stream: same as DM (crypto_box) */
body: { nonce: string; ciphertext: string; bodyVersion: number };
/** Optional metadata, varies by channel type. */
meta?: {
/** Stable client-supplied id for dedupe (existing field, made required for v2). */
clientMessageId: string;
/** Sender's canonical fingerprint per spec §4.4 (existing field). */
requestFingerprint?: string;
/** dm/group: priority gate (now/next/low). rpc: timeout_ms. stream: chunk_id. */
priority?: "now" | "next" | "low";
timeoutMs?: number;
streamChunkId?: number;
/** dm/topic: replyTo for threading. */
replyToId?: string;
/** topic: mentions list (existing field). */
mentions?: string[];
/** rpc: correlation back-edge so the broker can route the response. */
rpcCorrelationId?: string;
};
/** Sender signature over (channel, target, from, nonce, ciphertext, meta). */
signature?: string;
}
```
**Why this matters for agentic peers:**
- A voice agent sending a partial transcript wants `channel: "stream"` semantics — high-frequency, small chunks, idempotent, no per-message ack required.
- An OpenAI Assistant calling a tool wants `channel: "rpc"` — request-response with timeout, correlation back-edge so the response routes.
- A scheduled cron actor reporting completion wants `channel: "topic"` — fire-and-forget, persisted history.
- Today all of these get bolted onto `dm` with conventions; v2 envelope makes them first-class.
### Claude Code channels — first-class support
Two specific channel features for Claude Code:
1. **Mid-turn interrupt** (`claude/channel` push). Already implemented via the MCP push-pipe. The new envelope makes it explicit: `channel: "dm"` with `meta.priority: "now"` triggers MCP push to a launched session. Other priorities deliver at next inbox poll.
2. **Reply threading** (`meta.replyToId`). Already partially supported on topics; v2 makes it work uniformly across `dm` and `topic`. The receiver Claude Code session sees a structured reply thread instead of flat history.
3. **Mentions** (`meta.mentions`). Already supported on topics; v2 surfaces them on `dm` too — useful for `@<peer>` callouts in groups even when the message body is encrypted.
### Backward compatibility
Envelope v1 (today's shape) stays accepted by the broker until v3.x. v1 envelopes are auto-upgraded server-side: `channel` inferred from `targetSpec` shape (`*` → group/broadcast, `#` → topic, hex → dm). Existing CLIs keep working.
---
## Future integrations (concrete)
These are not part of v2.0 — they're the test cases the architecture must support:
### LiveKit voice agent
- Service identity: `livekit-room-<id>`, signed by mesh owner.
- Transport: dedicated daemon side-car hosts a LiveKit participant; data-channel packets bridge to the central broker via WS.
- Channels: `stream` for transcript chunks, `system` for call lifecycle (joined/left/muted), `dm` for sidebar text.
- E2E: per-call ephemeral keypair held by the side-car; participants' member keys are discovered via mesh peer list.
### OpenAI Assistant integration
- Service identity: `openai-assistant-<id>`, scoped to one or more topics + RPC.
- Transport: HTTP webhook out (broker → assistant API), HTTP POST in (assistant → broker `/v1/send`).
- Channels: `rpc` for tool-style invocations from claudemesh peers, `topic` for assistant-published events.
- Crypto: delegated to daemon (assistant can't hold a libsodium secret; daemon re-encrypts on its behalf).
### Generic webhook consumer (Stripe-style)
- Service identity: `webhook-<consumer-id>`, scoped to subscribed topics.
- Transport: HTTP webhook out only. No inbound — it's a passive sink.
- Channels: `topic` only.
- Crypto: not E2E; webhook bodies are signed (HMAC-SHA256, sender = mesh) but plaintext.
### Human-via-WhatsApp bridge
- Service identity: `whatsapp-bridge`, with member-mapping for each end-user.
- Transport: WS (bridge holds long connection to broker), bridges to WhatsApp Business API.
- Channels: `dm` (1:1 chat → WhatsApp DM), `topic` (claudemesh topic → WhatsApp group).
- E2E: bridge holds a per-end-user delegated key; not "true" E2E to the WhatsApp side, but signaled clearly in UX.
---
## Migration plan
### v2.0.0 — Foundational cleanup (no new external surface)
**Target: 12 weeks**
- [ ] Extract `connectWsWithBackoff` helper, refactor `DaemonBrokerClient` + `SessionBrokerClient` to use it.
- [ ] Drop daemon's stray `sessionPubkey` (or rename + document).
- [ ] Tighten daemon-WS inbound filter (broadcast + member-targeted only).
- [ ] Add `presence.role` column (`control-plane | session | service`); broker fan-out + list_peers honor it.
- [ ] **Fix drain race**: schema migration adds `claimed_at`, `claim_id`, `claim_expires_at` columns; rewrite `drainForMember` for two-phase claim/deliver; add re-claim path for stale leases.
- [ ] Receiver-side: harden `client_message_id` dedupe (already partial in 1.32.x; finish for at-least-once). Add idempotent insert that returns existing row on conflict.
**Success criteria:**
- Two-session smoke test still passes (1.32.1 baseline).
- Crash-mid-push test: kill broker between claim and send; verify message redelivers on broker restart + recipient reconnect.
- Reconnect storm test: 100 reconnect cycles per session over 60s; zero message loss.
### v2.1.0 — Transport adapter interface
**Target: 23 weeks after v2.0.0**
- [ ] Define `BrokerTransport` interface; refactor existing WS code to be the first implementation.
- [ ] Add `HttpWebhookTransport` adapter (broker side: outbound HTTP POST with retry + HMAC signature; daemon side: HTTP server that receives webhook callbacks and inserts into inbox).
- [ ] Add `/v1/send` HTTP endpoint on the broker (today the broker is WS-only for sends).
- [ ] Service identity registration flow: `claudemesh service register --type webhook --scopes dm:read,topic:write` mints attestation, stores it locally + on broker.
- [ ] Basic `SseTransport` for browser/CI use cases.
**Success criteria:**
- A scheduled cron job using only `curl` can send to the mesh (no daemon required).
- A webhook consumer subscribed to a topic receives messages within 5s of post.
### v2.2.0 — Typed channels (envelope v2)
**Target: 23 weeks after v2.1.0**
- [ ] Define `Envelope v2` schema; broker accepts both v1 and v2; sender-side code emits v2.
- [ ] `channel: "rpc"` end-to-end: correlation id routing, response timeout, `claudemesh rpc <peer> <method> <args>` CLI verb.
- [ ] `channel: "stream"` end-to-end: chunked delivery, ordered, idempotent, `claudemesh stream <peer> <stream-id>` CLI verb.
- [ ] Mid-turn push (`claude/channel`) honors `channel: "dm"` with `meta.priority: "now"` only.
- [ ] Mentions + replyToId surface uniformly across dm and topic.
**Success criteria:**
- Demo: a Claude Code session sends an `rpc` to another Claude Code session, gets a structured response.
- Demo: a voice-agent prototype sends `stream` chunks; another peer receives them in order with no gaps.
### v2.3+ — Concrete external integrations
**Target: opportunistic**
- LiveKit side-car (one PoC integration to validate the architecture).
- OpenAI Assistant integration (validate delegated-key crypto path).
- WhatsApp bridge (validate human-bridge service identity).
These are not on the critical path for the architecture; they prove it.
---
## Non-goals (explicit)
- **Replacing Slack / Discord.** claudemesh is for agent coordination. Human chat is a side-effect, not the headline.
- **Federation across multiple brokers.** v2.0 stays single-broker per mesh. Multi-broker (gossip / federation) is a separate spec, post-v3.
- **Sync-only / no-broker P2P.** Direct peer-to-peer (without the central broker) is a different architecture (libp2p, Iroh). Not in scope.
- **Replacing the MCP push-pipe.** Mid-turn interrupt stays MCP-based. The transport-adapter layer is broker-side; MCP is daemon-to-Claude-Code, untouched.
---
## Open questions
1. **How does a service identity prove liveness?** WS gives us implicit liveness via the connection. HTTP webhook services need an explicit heartbeat / health-check. Proposal: broker periodically POSTs to `<webhook>/health`; service is marked offline after 3 consecutive failures.
2. **RPC routing through offline peers — what's the failure mode?** If `claudemesh rpc <peer> ...` and the peer is offline, do we (a) queue and wait (DM semantics) or (b) fail fast (REST semantics)? Proposal: RPC fails fast with `peer_offline` after a 5s probe; explicit `--wait` flag opts into DM-style queue.
3. **Per-topic symmetric key rotation.** Existing v0.2.0 spec mentions per-topic keys. Rotation policy (when, who triggers, how members re-sync) is unsolved. Defer to a separate spec; v2.2.0 ships with one-shot keys (rotate by re-creating topic).
---
## Acknowledgements
Cross-checked with Codex (GPT-5.2, high reasoning) on the foundational cleanup section. Codex caught:
- The "remove daemon-WS inbound entirely" idea would silently lose broadcasts + member-targeted DMs whenever zero launches exist. Corrected.
- Inheritance for the dup'd lifecycle would become a god class. Composition via helper is the right call.
- The drain race needs a `claimed_at` + delivered-on-success fix; "check OPEN before claim" still drops on crash.
- Token-keyed registry is correct (token = auth boundary), not a smell.
The agentic-comms / typed-channels / transport-adapter layers are mine — Codex didn't touch those because the question I asked was about the existing architecture's smells, not the future roadmap.

View File

@@ -369,8 +369,19 @@ export interface ConnectParams {
pid: number;
cwd: string;
groups?: Array<{ name: string; role?: string }>;
/**
* v2 agentic-comms (M1) — connection role.
* 'control-plane' — daemon WS (hidden from user-facing peer lists).
* 'session' — per-Claude-Code-session WS (default).
* 'service' — autonomous bots/services attached to the mesh.
* Optional for backwards compatibility; defaults to 'session'.
*/
role?: PresenceRole;
}
/** v2 agentic-comms (M1): typed connection roles. */
export type PresenceRole = "control-plane" | "session" | "service";
/** Create a presence row for a new WS connection. */
export async function connectPresence(
params: ConnectParams,
@@ -389,6 +400,7 @@ export async function connectPresence(
statusSource: "jsonl",
statusUpdatedAt: now,
groups: params.groups ?? [],
role: params.role ?? "session",
connectedAt: now,
lastPingAt: now,
})
@@ -431,6 +443,11 @@ export async function listPeersInMesh(
sessionId: string;
cwd: string;
connectedAt: Date;
/** v2 agentic-comms (M1): connection role. CLI uses this to hide
* control-plane daemons from user-facing lists. Wire-level field
* is `peerRole` to avoid collision with 1.31.5's top-level `role`
* lift of profile.role (user-supplied string like "lead"). */
peerRole: PresenceRole;
}>
> {
const rows = await db
@@ -445,6 +462,7 @@ export async function listPeersInMesh(
sessionId: presence.sessionId,
cwd: presence.cwd,
connectedAt: presence.connectedAt,
peerRole: presence.role,
})
.from(presence)
.innerJoin(memberTable, eq(presence.memberId, memberTable.id))
@@ -469,6 +487,7 @@ export async function listPeersInMesh(
sessionId: r.sessionId,
cwd: r.cwd,
connectedAt: r.connectedAt,
peerRole: (r.peerRole ?? "session") as PresenceRole,
}));
}
@@ -2311,6 +2330,22 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
* targetSpec routing: matches either the member's pubkey directly or
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
* config that lives outside this function.
*
* v2 agentic-comms (M1): two-phase claim/deliver with a 30s lease.
*
* The legacy implementation set `delivered_at = NOW()` in the same
* UPDATE that selected the row. If the recipient WS was no longer
* OPEN at push time, the message dropped silently (the row read as
* "delivered" so the next reconnect's drain skipped it).
*
* The new behaviour:
* - claim sets (claimed_at, claim_id, claim_expires_at = NOW() + 30s)
* - delivered_at stays NULL until the recipient acks via `client_ack`
* - re-eligibility predicate accepts rows whose claim has expired,
* so dropped pushes are redelivered (at-least-once)
*
* `claimerPresenceId` is recorded on the row purely for debugging — it
* never gates re-claim; expiry alone does.
*/
export async function drainForMember(
meshId: string,
@@ -2320,6 +2355,7 @@ export async function drainForMember(
sessionPubkey?: string,
excludeSenderSessionPubkey?: string,
memberGroups?: string[],
claimerPresenceId?: string,
): Promise<
Array<{
id: string;
@@ -2385,6 +2421,11 @@ export async function drainForMember(
// (with id as tiebreaker so equal-timestamp rows stay deterministic).
// Sorting in SQL avoids JS Date's millisecond-precision collapse of
// Postgres microsecond timestamps.
//
// v2 (M1): claim sets the lease columns, NOT delivered_at. Re-eligibility
// accepts unclaimed rows AND rows with an expired claim (NULL or past
// NOW()). delivered_at stays NULL until a `client_ack` lands.
const claimerId = claimerPresenceId ?? null;
const result = await db.execute<{
id: string;
priority: string;
@@ -2398,12 +2439,15 @@ export async function drainForMember(
}>(sql`
WITH claimed AS (
UPDATE mesh.message_queue AS mq
SET delivered_at = NOW()
SET claimed_at = NOW(),
claim_id = ${claimerId},
claim_expires_at = NOW() + INTERVAL '30 seconds'
FROM mesh.member AS m
WHERE mq.id IN (
SELECT id FROM mesh.message_queue
WHERE mesh_id = ${meshId}
AND delivered_at IS NULL
AND (claimed_at IS NULL OR claim_expires_at IS NULL OR claim_expires_at < NOW())
AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``})
${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
@@ -2445,11 +2489,93 @@ export async function drainForMember(
}));
}
/**
* v2 agentic-comms (M1): mark a message_queue row as delivered.
*
* Called when the recipient WS replies with a `client_ack` carrying the
* original `client_message_id`. Lookup is scoped to (mesh_id, member_id)
* so a malicious peer can't ack messages addressed to others. Returns
* the number of rows marked (0 = unknown id, already delivered, or wrong
* recipient).
*/
export async function markDelivered(params: {
meshId: string;
/** memberId of the WS that's claiming to have received this message. */
recipientMemberId: string;
recipientMemberPubkey: string;
recipientSessionPubkey?: string | null;
clientMessageId?: string | null;
brokerMessageId?: string | null;
}): Promise<number> {
const {
meshId,
recipientMemberPubkey,
recipientSessionPubkey,
clientMessageId,
brokerMessageId,
} = params;
if (!clientMessageId && !brokerMessageId) return 0;
// Prefer broker id when available; falls back to clientMessageId.
// Scope to (mesh_id, target_spec ∈ {member-pubkey, session-pubkey, '*', @group, #topic}).
// For minimal blast radius we only allow direct/broadcast acks here —
// group/topic acks would need the same membership expansion drainForMember
// does and we'd rather under-ack than over-ack (re-claim is cheap).
const result = await db.execute<{ id: string }>(sql`
UPDATE mesh.message_queue
SET delivered_at = NOW()
WHERE mesh_id = ${meshId}
AND delivered_at IS NULL
AND (
${brokerMessageId ? sql`id = ${brokerMessageId}` : sql`FALSE`}
OR ${clientMessageId ? sql`client_message_id = ${clientMessageId}` : sql`FALSE`}
)
AND (
target_spec = ${recipientMemberPubkey}
${recipientSessionPubkey ? sql`OR target_spec = ${recipientSessionPubkey}` : sql``}
OR target_spec = '*'
OR target_spec LIKE '@%'
OR target_spec LIKE '#%'
)
RETURNING id
`);
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
return rows.length;
}
/**
* v2 agentic-comms (M1): reap expired claims so dropped pushes redeliver.
*
* Runs every 15s. Clears (claimed_at, claim_id, claim_expires_at) on rows
* where the lease has expired and no `client_ack` arrived. The next
* `drainForMember` call will pick the row up again — at-least-once.
*
* Returns the number of rows reaped.
*/
export async function sweepExpiredClaims(): Promise<number> {
const result = await db.execute<{ id: string }>(sql`
UPDATE mesh.message_queue
SET claimed_at = NULL,
claim_id = NULL,
claim_expires_at = NULL
WHERE delivered_at IS NULL
AND claim_expires_at IS NOT NULL
AND claim_expires_at < NOW()
RETURNING id
`);
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>;
return rows.length;
}
// --- Lifecycle ---
let ttlTimer: ReturnType<typeof setInterval> | null = null;
let pendingTimer: ReturnType<typeof setInterval> | null = null;
let staleTimer: ReturnType<typeof setInterval> | null = null;
let claimSweepTimer: ReturnType<typeof setInterval> | null = null;
/** v2 agentic-comms (M1): how often we reap expired message claims. */
const CLAIM_SWEEP_INTERVAL_MS = 15_000;
/** Start background sweepers. Idempotent. */
export function startSweepers(): void {
@@ -2467,6 +2593,13 @@ export function startSweepers(): void {
console.error("[broker] stale presence sweep:", e),
);
}, 30_000);
claimSweepTimer = setInterval(() => {
sweepExpiredClaims()
.then((n) => {
if (n > 0) console.log(`[broker] expired claims swept: ${n}`);
})
.catch((e) => console.error("[broker] claim sweep:", e));
}, CLAIM_SWEEP_INTERVAL_MS);
// Orphan-message sweep every hour; cheap, rows are all >7d at deletion time.
setInterval(() => {
sweepOrphanMessages()
@@ -2480,9 +2613,11 @@ export async function stopSweepers(): Promise<void> {
if (ttlTimer) clearInterval(ttlTimer);
if (pendingTimer) clearInterval(pendingTimer);
if (staleTimer) clearInterval(staleTimer);
if (claimSweepTimer) clearInterval(claimSweepTimer);
ttlTimer = null;
pendingTimer = null;
staleTimer = null;
claimSweepTimer = null;
await db
.update(presence)
.set({ disconnectedAt: new Date() })

View File

@@ -49,6 +49,7 @@ import {
listFiles,
listPeersInMesh,
listState,
markDelivered,
listTasks,
queueMessage,
recallMemory,
@@ -546,6 +547,7 @@ async function maybePushQueuedMessages(
conn.sessionPubkey ?? undefined,
excludeSenderSessionPubkey,
conn.groups.map((g) => g.name),
presenceId,
);
log.info("maybePush", {
presence_id: presenceId,
@@ -1772,6 +1774,11 @@ async function handleHello(
pid: hello.pid,
cwd: hello.cwd,
groups: initialGroups,
// v2 agentic-comms (M1): the regular member-keyed `hello` path is
// used by long-lived control-plane connections (claudemesh daemon,
// dashboard, automation). Per-Claude-Code sessions go through
// `session_hello` and get role='session'.
role: "control-plane",
});
const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, {
@@ -1796,6 +1803,7 @@ async function handleHello(
pubkey: hello.pubkey,
groups: initialGroups,
restored: !!saved,
role: "control-plane",
});
log.info("ws hello", {
mesh_id: hello.meshId,
@@ -1993,6 +2001,9 @@ async function handleSessionHello(
pid: hello.pid,
cwd: hello.cwd,
groups: initialGroups,
// v2 agentic-comms (M1): per-Claude-Code session WS — these are the
// user-facing peers shown in `claudemesh peer list`.
role: "session",
});
const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, {
@@ -2018,6 +2029,7 @@ async function handleSessionHello(
session_pubkey: hello.sessionPubkey,
groups: initialGroups,
via: "session_hello",
role: "session",
});
log.info("ws session_hello", {
mesh_id: hello.meshId,
@@ -2567,6 +2579,39 @@ function handleConnection(ws: WebSocket): void {
case "send":
await handleSend(conn, msg);
break;
case "client_ack": {
// v2 agentic-comms (M1): close out a previously pushed message.
// Lookup is scoped to (mesh_id, recipient pubkey) so a peer can
// only ack messages addressed to itself.
const ack = msg as Extract<WSClientMessage, { type: "client_ack" }>;
if (!ack.clientMessageId && !ack.brokerMessageId) {
// Nothing to do; don't error — the daemon may speculatively
// ack and we'd rather be lenient than break a CLI release.
break;
}
try {
const n = await markDelivered({
meshId: conn.meshId,
recipientMemberId: conn.memberId,
recipientMemberPubkey: conn.memberPubkey,
recipientSessionPubkey: conn.sessionPubkey ?? null,
clientMessageId: ack.clientMessageId ?? null,
brokerMessageId: ack.brokerMessageId ?? null,
});
log.debug("ws client_ack", {
presence_id: presenceId,
client_message_id: ack.clientMessageId,
broker_message_id: ack.brokerMessageId,
marked: n,
});
} catch (e) {
log.warn("ws client_ack failed", {
presence_id: presenceId,
error: e instanceof Error ? e.message : String(e),
});
}
break;
}
case "set_status":
await writeStatus(presenceId, msg.status, "manual", new Date());
log.info("ws set_status", {
@@ -2604,6 +2649,12 @@ function handleConnection(ws: WebSocket): void {
sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(),
cwd: pc?.cwd ?? p.cwd,
// v2 agentic-comms (M1): typed connection role. CLI uses
// this to hide control-plane daemons from user-facing
// peer lists (filter swap from peerType happens CLI-side).
// Wire field is `peerRole` to avoid collision with the
// 1.31.5 top-level `role` lift of profile.role.
peerRole: p.peerRole,
...(pc?.hostname ? { hostname: pc.hostname } : {}),
...(pc?.peerType ? { peerType: pc.peerType } : {}),
...(pc?.channel ? { channel: pc.channel } : {}),

View File

@@ -224,6 +224,26 @@ export interface WSSetStatusMessage {
status: PeerStatus;
}
/**
* Client → broker: confirm receipt of a previously pushed envelope so the
* broker can mark the message_queue row delivered.
*
* v2 agentic-comms (M1): pairs with the two-phase claim/lease introduced
* in `drainForMember`. Without this ack, the lease expires after 30s and
* the message is re-claimed and re-pushed (at-least-once retry).
*
* Either id is accepted; daemons that track inbox dedupe by clientMessageId
* should send that one. brokerMessageId is the row primary key, useful when
* the original send didn't carry a client_message_id (legacy traffic).
*/
export interface WSClientAckMessage {
type: "client_ack";
/** Original caller-supplied idempotency id from the `send` envelope. */
clientMessageId?: string;
/** Broker-side row id (the `messageId` field on the inbound `push`). */
brokerMessageId?: string;
}
/** Client → broker: request list of connected peers in the same mesh. */
export interface WSListPeersMessage {
type: "list_peers";
@@ -518,6 +538,8 @@ export interface WSPeersListMessage {
type: "peers_list";
peers: Array<{
pubkey: string;
/** Stable member pubkey — present on M1+ broker responses. */
memberPubkey?: string;
displayName: string;
status: PeerStatus;
summary: string | null;
@@ -525,6 +547,13 @@ export interface WSPeersListMessage {
sessionId: string;
connectedAt: string;
cwd?: string;
/** v2 agentic-comms (M1): typed connection role. CLI uses this to
* filter control-plane daemons out of user-facing peer lists.
* Optional for clients talking to a pre-M1 broker. Wire field is
* `peerRole` to avoid collision with 1.31.5's top-level `role`
* (which is a lift of `profile.role`, the user-supplied string
* like "lead" / "reviewer" / "human"). */
peerRole?: "control-plane" | "session" | "service";
hostname?: string;
peerType?: "ai" | "human" | "connector";
channel?: string;
@@ -1417,6 +1446,7 @@ export type WSClientMessage =
| WSHelloMessage
| WSSessionHelloMessage
| WSSendMessage
| WSClientAckMessage
| WSSetStatusMessage
| WSListPeersMessage
| WSSetSummaryMessage

View File

@@ -1,5 +1,149 @@
# Changelog
## 1.33.0 (2026-05-04) — Milestone 1: lifecycle cleanups + at-least-once with ack
First milestone of the agentic-comms architecture work
(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`).
Foundational correctness — no new external surface, but the wire
protocol grows two additions: a `peerRole` field on `peer list`
responses (presence classification) and a new client-→broker
`client_ack` frame.
### Lifecycle helper extraction
`DaemonBrokerClient` and `SessionBrokerClient` now share a single
lifecycle implementation in `apps/cli/src/daemon/ws-lifecycle.ts`
(`connectWsWithBackoff`). Each client supplies `buildHello` /
`isHelloAck` / `onMessage` and keeps its own RPC bookkeeping; the
helper handles connect, hello-ack timeout, close + backoff reconnect.
Composition over inheritance per Codex's review. Eliminates the drift
bug class that produced 1.32.0/1.32.1 (lifecycle copies diverging
silently when one side gained a feature).
### Daemon-WS no longer carries an ephemeral session keypair
Pre-1.33: every daemon-WS reconnect minted a fresh keypair, sent the
pubkey in the hello, and held the secret in memory for "session"
crypto. Vestigial since 1.30.0 introduced the per-launch
`SessionBrokerClient` that owns the real session pubkey. Daemon-WS
now uses the stable mesh member secret directly for outbound
encryption. Inbound on daemon-WS only attempts member-key decryption —
session decryption is the session-WS's job.
### `peerRole` wire field
The broker now emits a `peerRole` field on each `peer list` row —
`'control-plane' | 'session' | 'service'`. `control-plane` rows are
the daemon's own member-keyed presence (infrastructure), `session`
rows are launched Claude Code sessions, `service` rows are reserved
for v2.x service identities (HTTP webhook consumers, voice agents,
etc.).
The CLI hides `peerRole === 'control-plane'` rows from the human
renderer by default and exposes a `--all` flag for debugging. JSON
output emits `peerRole` on every row.
**Why `peerRole` and not just `role`:** 1.31.5 already lifted
`profile.role` (user-supplied string like "lead", "reviewer") to
top-level `role`, and the agent-vibes claudemesh skill consumes that
field. The presence classification is a different axis, so it gets
its own field name. `role` keeps its 1.31.5 semantics; `peerRole` is
the new field.
### `client_ack` and at-least-once delivery
The broker (M1 broker change) now uses two-phase claim/deliver:
`claimed_at` / `claim_id` / `claim_expires_at` columns track lease
ownership; `delivered_at` is set ONLY when the recipient acks. A 15s
sweeper re-claims rows whose 30s lease expired without ack.
The CLI side closes the loop: after `handleBrokerPush` lands a
message in `inbox.db` (or dedupes against an existing row), the
recipient daemon emits a `client_ack { type: "client_ack",
clientMessageId, brokerMessageId? }` frame on whichever WS the push
arrived on. Best-effort — if the WS is closed by ack time, the
broker's lease will naturally re-deliver, and the receiver dedupes
on `clientMessageId`.
Net behavior: at-least-once with idempotent dedupe. Net visible
change: zero, in the steady state. Crash-mid-push test (kill recipient
between broker claim and recipient ack) now redelivers instead of
silently dropping.
### Files
- New: `apps/cli/src/daemon/ws-lifecycle.ts` (234 lines).
- Refactored: `apps/cli/src/daemon/broker.ts`, `session-broker.ts`,
`inbound.ts`, `run.ts`, `commands/peers.ts`, `ipc/server.ts`.
- Broker side (separate commit): drain race fix, `presence.role`
column, `client_ack` handler, lease sweeper.
- DB migration `0029_drain_lease_and_presence_role.sql` ships with
the broker change.
Foundational refactor before the agentic-comms architecture work
(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). Three
changes, all behavior-preserving:
- **`connectWsWithBackoff` helper** (`apps/cli/src/daemon/ws-lifecycle.ts`).
Both `DaemonBrokerClient` and `SessionBrokerClient` now share one
lifecycle implementation — connect, hello-handshake, ack-timeout,
close + backoff reconnect. Each client supplies `buildHello` /
`isHelloAck` / `onMessage` and keeps its own RPC bookkeeping
(pendingAcks, peerListResolvers, onPush, etc). Composition over
inheritance per Codex's review; no protocol shape changes.
- **Drop daemon-WS ephemeral session pubkey.** `DaemonBrokerClient` no
longer mints + sends a per-reconnect ephemeral keypair in its hello.
Session-targeted DMs land on `SessionBrokerClient` (since 1.32.1),
not the member-keyed daemon-WS, so the field was vestigial. The
daemon's send-encrypt path now signs DMs with the stable mesh member
secret. Inbound on daemon-WS only attempts member-key decryption —
session decryption is the session-WS's job.
- **Role-aware peer list.** `peer list` now hides peers whose
broker-emitted `role` is `'control-plane'` (the daemon's own
member-keyed presence). `--all` opts back in. JSON output emits
`role` at the top level. Older brokers that don't emit `role` yet
default to `'session'`, so legacy peer rows stay visible without
the broker-side change shipped first. Replaces the prior
`peerType === 'claudemesh-daemon'` channel-name hack.
## 1.32.1 (2026-05-04) — DMs to session pubkeys actually deliver now
Critical fix. Sessions launched via `claudemesh launch` (1.30.0+) hold a
per-launch session WebSocket on the broker, separate from the daemon's
member-keyed WS. The broker correctly fans direct messages targeted at a
session pubkey out over THAT session WS — but the daemon's
`SessionBrokerClient` was constructed without a push handler and silently
dropped every inbound `push` / `inbound` frame. The header docstring
even claimed it handled "inbound DM delivery for messages targeted at
the session pubkey"; the code never wired the callback.
Net effect since 1.30.0: any DM sent to a peer's session pubkey
(everything `peer list` returns these days, since session pubkey is the
canonical routing key) was queued, broker-acked, marked `delivered_at`
on the broker side, and then thrown away by the recipient daemon. The
local `inbox.db` stayed at zero rows forever and `claudemesh inbox`
reported "no messages" no matter what arrived.
Two-session smoke test that surfaced this: peer A sent "hola" to peer
B's session pubkey — sender outbox showed `status=done` with a
`broker_message_id`, recipient inbox stayed empty, both sides confused.
The fix wires `SessionBrokerClient` to forward `push` / `inbound` frames
to the same `handleBrokerPush` the member-keyed broker already uses. The
session's secret key (registered via `/v1/sessions/register`) is passed
as `sessionSecretKeyHex` so `decryptOrFallback` tries it first; the
parent member key remains the fallback for legacy member-targeted
traffic that happens to fan out here.
Files: `apps/cli/src/daemon/session-broker.ts`,
`apps/cli/src/daemon/run.ts`. No broker change required — the broker
half (queue + fan-out + sendToPeer on the session WS) was already
correct; only the daemon-side intake was missing.
## 1.32.0 (2026-05-04) — multi-session UX bundle
Nine UX bugs surfaced from a real two-session interconnect smoke test

View File

@@ -1,6 +1,6 @@
{
"name": "claudemesh-cli",
"version": "1.32.0",
"version": "1.33.0",
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
"keywords": [
"claude-code",

View File

@@ -21,14 +21,28 @@ export interface PeersFlags {
mesh?: string;
/** `true`/`undefined` = full record; comma-separated string = field projection. */
json?: boolean | string;
/** When false (default), hide claudemesh-daemon presence rows from the
* human renderer — they're infrastructure, not interactive peers, and
* confused users into thinking the daemon counted as a "peer". The
* JSON output still includes them so scripts that need a full inventory
* can opt in via --all (or just consume JSON). */
/** When false (default), hide control-plane presence rows from the
* human renderer — they're infrastructure (daemon-WS member-keyed
* presence), not interactive peers, and confused users into thinking
* the daemon counted as a "peer". The JSON output still includes them
* so scripts that need a full inventory can opt in via --all (or
* just consume JSON).
*
* Source of truth is the broker-side `role` field
* (`'control-plane' | 'session' | 'service'`). Older brokers don't
* emit `role` yet — this code falls back to treating missing role as
* `'session'` so legacy peer rows stay visible. */
all?: boolean;
}
/**
* Broker-emitted peer classification, added 2026-05-04. Older brokers
* may omit it — treat missing as 'session' so legacy meshes still
* render their peers (and don't accidentally hide them all). The CLI
* never emits 'control-plane' on its own; that comes from the broker.
*/
export type PeerRole = "control-plane" | "session" | "service";
interface PeerRecord {
pubkey: string;
/** Stable member pubkey (independent of session). When sender shares
@@ -43,10 +57,20 @@ interface PeerRecord {
status?: string;
summary?: string;
groups: Array<{ name: string; role?: string }>;
/** Top-level convenience alias for `profile.role`. Lifted by the
* CLI so JSON consumers see role at the shape's top level instead
* of nested under profile. Same value either way. */
/** Top-level convenience alias for `profile.role`, lifted by the CLI
* since 1.31.5 so JSON consumers (the agent-vibes claudemesh skill,
* launched-session LLMs) see the user-supplied role string at the
* shape's top level. Same value as `profile.role`. Distinct from
* `peerRole` below — that's the broker's presence-class taxonomy. */
role?: string;
/** Broker-emitted presence classification: 'control-plane' | 'session'
* | 'service'. Source of truth for the --all visibility filter and
* the default-hide rule. Older brokers omit this; the CLI fills
* missing values with 'session' so legacy peer rows stay visible.
*
* Renamed from `role` to avoid collision with 1.31.5's profile.role
* lift above. Wire-level field on the broker is also `peerRole`. */
peerRole?: PeerRole;
peerType?: string;
channel?: string;
model?: string;
@@ -139,12 +163,14 @@ async function listPeersForMesh(slug: string): Promise<PeerRecord[]> {
* surfaced a sender's siblings as separate rows because they're separate
* presence rows; the cli just hadn't been making that visible.
*
* Also lifts `profile.role` to a top-level `role` field. The broker has
* always returned role nested under `profile.role`, but downstream JSON
* consumers (LLMs in launched sessions, jq pipelines, dashboards) kept
* missing it because nothing pointed at the nesting. A dedicated
* top-level alias makes the intent unmissable without breaking the
* `profile` object's shape for callers that already drill into it.
* Also normalizes the broker's `peerRole` classification: missing
* values (older brokers) default to 'session' so legacy peer rows stay
* visible under the default `--all=false` filter.
*
* And lifts `profile.role` to a top-level `role` field — the 1.31.5
* convenience alias for JSON consumers (skill SKILL.md, launched-session
* LLMs, jq pipelines). Same value as profile.role; distinct from
* peerRole (presence taxonomy).
*/
function annotateSelf(
peer: PeerRecord,
@@ -161,8 +187,15 @@ function annotateSelf(
selfSessionPubkey &&
peer.pubkey === selfSessionPubkey
);
const role = peer.profile?.role?.trim() || undefined;
return { ...peer, ...(role ? { role } : {}), isSelf, isThisSession };
const peerRole: PeerRole = peer.peerRole ?? "session";
const profileRole = peer.profile?.role?.trim() || undefined;
return {
...peer,
...(profileRole ? { role: profileRole } : {}),
peerRole,
isSelf,
isThisSession,
};
}
export async function runPeers(flags: PeersFlags): Promise<void> {
@@ -208,13 +241,18 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
continue;
}
// Hide claudemesh-daemon rows by default — they're infrastructure
// (the daemon's own member-keyed presence), not interactive peers,
// and they confused users into thinking the daemon counted as a
// Hide control-plane rows by default — they're infrastructure
// (daemon-WS member-keyed presence), not interactive peers, and
// they confused users into thinking the daemon counted as a
// separate peer. --all opts back in for debugging.
//
// Source of truth: broker-emitted `peerRole` field (added
// 2026-05-04). annotateSelf() filled in 'session' for older
// brokers that don't emit peerRole yet, so this filter is
// backwards-compatible by construction — legacy rows show up.
const visible = flags.all
? peers
: peers.filter((p) => p.channel !== "claudemesh-daemon");
: peers.filter((p) => p.peerRole !== "control-plane");
// Sort: this-session first, then your-other-sessions, then real
// peers. Within each group, idle/working ahead of dnd. Inside the
@@ -226,9 +264,9 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
return score(a) - score(b);
});
const hiddenDaemons = peers.length - visible.length;
const header = hiddenDaemons > 0
? `peers on ${slug} (${sorted.length}, ${hiddenDaemons} daemon hidden — use --all)`
const hiddenControlPlane = peers.length - visible.length;
const header = hiddenControlPlane > 0
? `peers on ${slug} (${sorted.length}, ${hiddenControlPlane} control-plane hidden — use --all)`
: `peers on ${slug} (${sorted.length})`;
render.section(header);

View File

@@ -7,13 +7,19 @@
// - Wire envelope adds `client_message_id` (broker may ignore in legacy
// mode; Sprint 7 promotes it to authoritative dedupe).
// - Reconnect with exponential backoff, signaled to the drain worker.
import WebSocket from "ws";
//
// 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) now
// lives in `ws-lifecycle.ts`. This class supplies the daemon-WS hello
// content and routes incoming RPC replies / pushes; the helper handles
// the rest. The hello no longer carries an ephemeral `sessionPubkey` —
// session-targeted DMs land on the per-session WS (SessionBrokerClient)
// since 1.32.1, so this socket only needs the member identity.
import type { JoinedMesh } from "~/services/config/facade.js";
import { signHello } from "~/services/broker/hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting";
export type ConnStatus = WsStatus;
export interface BrokerSendArgs {
/** Target as the broker expects it: peer name | pubkey | @group | * | topic. */
@@ -49,6 +55,8 @@ export interface PeerSummary {
hostname?: string;
peerType?: string;
channel?: string;
/** Broker-side classification, added 2026-05-04. Missing in older brokers. */
role?: "control-plane" | "session" | "service";
}
interface PendingPeerList {
@@ -84,9 +92,7 @@ export interface MemoryRow {
rememberedAt: string;
}
const HELLO_ACK_TIMEOUT_MS = 5_000;
const SEND_ACK_TIMEOUT_MS = 15_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export interface DaemonBrokerOptions {
displayName?: string;
@@ -96,12 +102,9 @@ export interface DaemonBrokerOptions {
}
export class DaemonBrokerClient {
private ws: WebSocket | null = null;
private lifecycle: WsLifecycle | null = null;
private _status: ConnStatus = "closed";
private closed = false;
private reconnectAttempt = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private helloTimer: NodeJS.Timeout | null = null;
private pendingAcks = new Map<string, PendingAck>();
private peerListResolvers = new Map<string, PendingPeerList>();
private skillListResolvers = new Map<string, { resolve: (rows: SkillSummary[]) => void; timer: NodeJS.Timeout }>();
@@ -110,8 +113,6 @@ export class DaemonBrokerClient {
private stateListResolvers = new Map<string, { resolve: (rows: StateRow[]) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers = new Map<string, { resolve: (rows: MemoryRow[]) => void; timer: NodeJS.Timeout }>();
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private opens: Array<() => void> = [];
private reqCounter = 0;
@@ -125,39 +126,25 @@ export class DaemonBrokerClient {
(this.opts.log ?? defaultLog)(level, msg, { mesh: this.mesh.slug, ...meta });
};
private setConnStatus(s: ConnStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run the hello handshake, resolve once the broker accepts. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return;
this.setConnStatus("connecting");
const ws = new WebSocket(this.mesh.brokerUrl);
this.ws = ws;
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
if (!this.sessionPubkey) {
const { generateKeypair } = await import("~/services/crypto/facade.js");
const kp = await generateKeypair();
this.sessionPubkey = kp.publicKey;
this.sessionSecretKey = kp.secretKey;
}
this.lifecycle = await connectWsWithBackoff({
url: this.mesh.brokerUrl,
buildHello: async () => {
const { timestamp, signature } = await signHello(
this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey,
);
ws.send(JSON.stringify({
return {
type: "hello",
meshId: this.mesh.meshId,
memberId: this.mesh.memberId,
pubkey: this.mesh.pubkey,
sessionPubkey: this.sessionPubkey,
// No `sessionPubkey` — daemon-WS is member-keyed only. The
// per-session presence WS (SessionBrokerClient) carries the
// ephemeral session pubkey. Spec §"Layer 1: Identity → Member identity".
displayName: this.opts.displayName,
sessionId: `daemon-${process.pid}`,
pid: process.pid,
@@ -167,35 +154,28 @@ export class DaemonBrokerClient {
channel: "claudemesh-daemon",
timestamp,
signature,
}));
this.helloTimer = setTimeout(() => {
this.log("warn", "broker_hello_ack_timeout");
try { ws.close(); } catch { /* ignore */ }
reject(new Error("hello_ack_timeout"));
}, HELLO_ACK_TIMEOUT_MS);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setConnStatus("open");
this.reconnectAttempt = 0;
// Flush deferred openers (drain worker, etc.)
};
},
isHelloAck: (msg) => msg.type === "hello_ack",
onMessage: (msg) => this.handleMessage(msg),
onStatusChange: (s) => {
this._status = s;
this.opts.onStatusChange?.(s);
if (s === "open") {
// Flush deferred openers (drain worker, etc.).
const queued = this.opens.slice();
this.opens.length = 0;
for (const fn of queued) { try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); } }
resolve();
return;
for (const fn of queued) {
try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); }
}
}
},
onBeforeReconnect: (code) => this.failPendingAcks(`broker_disconnected_${code}`),
log: (level, msg, meta) => this.log(level, `broker_${msg}`, meta),
});
}
private handleMessage(msg: Record<string, unknown>): void {
if (msg.type === "ack") {
// Broker shape: { type: "ack", id, messageId, queued, error? }
const id = String(msg.id ?? "");
@@ -293,30 +273,35 @@ export class DaemonBrokerClient {
this.opts.onPush?.(msg);
return;
}
});
}
ws.on("close", (code, reason) => {
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
this.failPendingAcks(`broker_disconnected_${code}`);
if (this.closed) { this.setConnStatus("closed"); return; }
this.setConnStatus("reconnecting");
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(() => this.connect().catch((err) => this.log("warn", "broker_reconnect_failed", { err: String(err) })), wait);
// First connection failure also rejects the original connect() promise.
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
});
/** True when underlying socket is OPEN-ready for direct sends. */
private isOpen(): boolean {
const sock = this.lifecycle?.ws;
return !!sock && sock.readyState === sock.OPEN;
}
ws.on("error", (err) => this.log("warn", "broker_ws_error", { err: err.message }));
/** v2 agentic-comms (M1): send `client_ack` back to the broker after
* successfully landing an inbound push in inbox.db. Broker uses the
* ack to set `delivered_at` (atomic at-least-once). Best-effort —
* if the WS isn't open, drop the ack; broker's 30s lease will
* re-deliver. */
sendClientAck(clientMessageId: string, brokerMessageId: string | null): void {
if (!this.isOpen()) return;
try {
this.lifecycle!.send({
type: "client_ack",
clientMessageId,
...(brokerMessageId ? { brokerMessageId } : {}),
});
} catch { /* drop; lease re-delivers */ }
}
/** Send one outbox row. Resolves on broker ack/timeout. */
send(req: BrokerSendArgs): Promise<BrokerSendResult> {
return new Promise<BrokerSendResult>((resolve) => {
const dispatch = () => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
if (!this.isOpen()) {
resolve({ ok: false, error: "broker_not_open", permanent: false });
return;
}
@@ -328,7 +313,7 @@ export class DaemonBrokerClient {
}, SEND_ACK_TIMEOUT_MS);
this.pendingAcks.set(id, { resolve, timer });
try {
this.ws.send(JSON.stringify({
this.lifecycle!.send({
type: "send",
id, // legacy correlation id
client_message_id: id, // forward-compat per spec §4.2
@@ -337,7 +322,7 @@ export class DaemonBrokerClient {
priority: req.priority,
nonce: req.nonce,
ciphertext: req.ciphertext,
}));
});
} catch (e) {
this.pendingAcks.delete(id);
clearTimeout(timer);
@@ -352,153 +337,149 @@ export class DaemonBrokerClient {
/** Ask the broker for the current peer list. */
async listPeers(timeoutMs = 5_000): Promise<PeerSummary[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<PeerSummary[]>((resolve) => {
const reqId = `pl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.peerListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.peerListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_peers", _reqId: reqId }); }
catch { this.peerListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** List mesh-published skills. Empty array on disconnect / timeout. */
async listSkills(query?: string, timeoutMs = 5_000): Promise<SkillSummary[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<SkillSummary[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.skillListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.skillListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_skills", query, _reqId: reqId }); }
catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Fetch one skill's full body. Null on not-found / disconnect / timeout. */
async getSkill(name: string, timeoutMs = 5_000): Promise<SkillFull | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<SkillFull | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.skillDataResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.skillDataResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "get_skill", name, _reqId: reqId }); }
catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** Read a single shared state row. Null on disconnect / timeout / not-found. */
async getState(key: string, timeoutMs = 5_000): Promise<StateRow | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<StateRow | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.stateGetResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.stateGetResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "get_state", key, _reqId: reqId }); }
catch { this.stateGetResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** List all shared state rows in the mesh. */
async listState(timeoutMs = 5_000): Promise<StateRow[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<StateRow[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.stateListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.stateListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_state", _reqId: reqId }); }
catch { this.stateListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Set a shared state value. Fire-and-forget. */
setState(key: string, value: unknown): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_state", key, value })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_state", key, value }); }
catch { /* ignore */ }
}
/** Store a memory in the mesh. Returns the assigned id, or null on timeout. */
async remember(content: string, tags?: string[], timeoutMs = 5_000): Promise<string | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<string | null>((resolve) => {
const reqId = `mr-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.memoryStoreResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "remember", content, tags, _reqId: reqId }); }
catch { this.memoryStoreResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** Search memories by relevance. */
async recall(query: string, timeoutMs = 5_000): Promise<MemoryRow[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<MemoryRow[]>((resolve) => {
const reqId = `mc-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.memoryRecallResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "recall", query, _reqId: reqId }); }
catch { this.memoryRecallResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Forget a memory by id. Fire-and-forget. */
forget(memoryId: string): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "forget", memoryId })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "forget", memoryId }); }
catch { /* ignore */ }
}
/** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */
setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_profile", ...profile })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_profile", ...profile }); }
catch { /* ignore */ }
}
setSummary(summary: string): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_summary", summary })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_summary", summary }); }
catch { /* ignore */ }
}
setStatus(status: "idle" | "working" | "dnd"): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_status", status })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_status", status }); }
catch { /* ignore */ }
}
setVisible(visible: boolean): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_visible", visible })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_visible", visible }); }
catch { /* ignore */ }
}
async close(): Promise<void> {
this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; }
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
this.failPendingAcks("daemon_shutdown");
try { this.ws?.close(); } catch { /* ignore */ }
this.setConnStatus("closed");
if (this.lifecycle) {
try { await this.lifecycle.close(); } catch { /* ignore */ }
this.lifecycle = null;
}
getSessionKeys(): { sessionPubkey: string; sessionSecretKey: string } | null {
if (!this.sessionPubkey || !this.sessionSecretKey) return null;
return { sessionPubkey: this.sessionPubkey, sessionSecretKey: this.sessionSecretKey };
this._status = "closed";
}
private failPendingAcks(reason: string) {

View File

@@ -18,6 +18,13 @@ export interface InboundContext {
/** Daemon's session secret key hex (rotates per connect). When the
* sender encrypted to our session pubkey, decrypt with this instead. */
sessionSecretKeyHex?: string;
/** v2 agentic-comms (M1): emit `client_ack` back to the broker after
* the message lands in inbox.db. Broker uses the ack to set
* `delivered_at` (atomic at-least-once). Without it, the broker's
* 30s lease expires and re-delivers — correct but noisy. The WS
* client owns this callback because it's the one that owns the
* socket; inbound.ts just signals "I accepted this id." */
ackClientMessage?: (clientMessageId: string, brokerMessageId: string | null) => void;
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
}
@@ -73,6 +80,12 @@ export async function handleBrokerPush(msg: Record<string, unknown>, ctx: Inboun
reply_to_id: replyToId,
});
// Whether the row was newly inserted or already existed (dedupe), the
// broker still wants to know we received and processed this message —
// ack regardless. Skipping ack on dedupe would leak: broker would
// re-deliver after lease, and the receiver would re-dedupe forever.
ctx.ackClientMessage?.(clientMessageId, brokerMessageId);
if (!inserted) return; // already had this id; no event
ctx.bus.publish("message", {

View File

@@ -870,11 +870,14 @@ async function resolveAndEncrypt(
return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" };
}
// 64-char hex pubkey → DM directly.
// 64-char hex pubkey → DM directly. Encrypt with the daemon's member
// secret: recipient decrypts using THEIR session pubkey's matching
// secret on their session-WS, so the sender side just needs any
// private key whose public counterpart is known to the recipient as
// "the sender". Member key is the stable choice and is what the
// recipient already trusts via mesh membership.
if (/^[0-9a-f]{64}$/i.test(to)) {
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, to, senderSecret);
const env = await encryptDirect(req.message, to, meshSecretKey);
return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
@@ -890,9 +893,7 @@ async function resolveAndEncrypt(
if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`);
if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`);
const recipient = matches[0]!.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
const env = await encryptDirect(req.message, recipient, meshSecretKey);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
@@ -900,9 +901,7 @@ async function resolveAndEncrypt(
const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase());
if (!match) throw new Error(`peer "${to}" not found`);
const recipient = match.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
const env = await encryptDirect(req.message, recipient, meshSecretKey);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}

View File

@@ -127,7 +127,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
const meshConfigs = new Map<string, typeof cfg.meshes[number]>();
for (const mesh of meshes) {
meshConfigs.set(mesh.slug, mesh);
const broker = new DaemonBrokerClient(mesh, {
const broker: DaemonBrokerClient = new DaemonBrokerClient(mesh, {
displayName: opts.displayName,
onStatusChange: (s) => {
process.stdout.write(JSON.stringify({
@@ -136,13 +136,19 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
bus.publish("broker_status", { mesh: mesh.slug, status: s });
},
onPush: (m) => {
const sessionKeys = broker.getSessionKeys();
// Daemon-WS is member-keyed, not session-keyed. Session-targeted
// DMs land on the per-session WS (SessionBrokerClient) since
// 1.32.1 and decrypt with the session secret there. Anything that
// arrives here can only be member-keyed (broadcasts, member DMs,
// system events) — pass member secret only.
void handleBrokerPush(m, {
db: inboxDb,
bus,
meshSlug: mesh.slug,
recipientSecretKeyHex: mesh.secretKey,
sessionSecretKeyHex: sessionKeys?.sessionSecretKey,
// v2 agentic-comms (M1): client_ack closes the at-least-once
// loop. Broker holds the row claimed (not delivered) until ack.
ackClientMessage: (cmid, bmid) => broker.sendClientAck(cmid, bmid),
});
},
});
@@ -177,7 +183,14 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
sessionBrokers.delete(info.token);
prior.close().catch(() => { /* ignore */ });
}
const client = new SessionBrokerClient({
// 1.32.1 — wire push delivery. Messages targeted at the launched
// session's pubkey land on THIS WS, not on the member-keyed one,
// so without this forward they'd silently disappear (the bug that
// kept inbox.db at zero rows since 1.30.0). Decrypt prefers the
// session secret key; member key remains the fallback for legacy
// member-targeted traffic that happens to fan out here.
const sessionSecretKeyHex = info.presence.sessionSecretKey;
const client: SessionBrokerClient = new SessionBrokerClient({
mesh: meshConfig,
sessionPubkey: info.presence.sessionPubkey,
sessionSecretKey: info.presence.sessionSecretKey,
@@ -187,6 +200,17 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
...(info.role ? { role: info.role } : {}),
...(info.cwd ? { cwd: info.cwd } : {}),
pid: info.pid,
onPush: (m) => {
void handleBrokerPush(m, {
db: inboxDb,
bus,
meshSlug: meshConfig.slug,
recipientSecretKeyHex: meshConfig.secretKey,
sessionSecretKeyHex,
// v2 agentic-comms (M1): close the at-least-once loop.
ackClientMessage: (cmid, bmid) => client.sendClientAck(cmid, bmid),
});
},
});
sessionBrokers.set(info.token, client);
client.connect().catch((err) =>

View File

@@ -15,8 +15,10 @@
* DaemonBrokerClient's job. Keeps the responsibility split clean
* and avoids two clients fighting over the same outbox row.
* - Does NOT carry list_peers / state / memory RPCs. This client is
* presence-only (and inbound DM delivery for messages targeted at
* the session pubkey).
* presence-only PLUS inbound DM delivery for messages targeted at
* the session pubkey — pushes are forwarded via the `onPush`
* callback to the daemon's shared handleBrokerPush, decrypted with
* this session's secret key.
*
* Old brokers reply with `unknown_message_type` on session_hello — we
* surface that as a one-shot `error` event and the daemon decides
@@ -24,15 +26,19 @@
* expected to be deployed first.
*
* Spec: .artifacts/specs/2026-05-04-per-session-presence.md.
*
* 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) lives
* in `ws-lifecycle.ts`. This class supplies session_hello content and
* routes the inbound onPush; the helper handles the rest.
*/
import { hostname as osHostname } from "node:os";
import WebSocket from "ws";
import type { JoinedMesh } from "~/services/config/facade.js";
import { signSessionHello } from "~/services/broker/session-hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type SessionBrokerStatus = "connecting" | "open" | "closed" | "reconnecting";
export type SessionBrokerStatus = WsStatus;
export interface ParentAttestation {
sessionPubkey: string;
@@ -62,19 +68,24 @@ export interface SessionBrokerOptions {
/** Pid of the launched session (NOT the daemon). */
pid: number;
onStatusChange?: (s: SessionBrokerStatus) => void;
/**
* Inbound push/inbound dispatch. The broker fans messages targeted at
* a session pubkey out over the corresponding session WS — without
* this callback they hit the floor and the daemon's inbox.db never
* sees them. Wired in run.ts to a handleBrokerPush call that decrypts
* with this session's secret key (member key as fallback).
*/
onPush?: (msg: Record<string, unknown>) => void;
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
}
const HELLO_ACK_TIMEOUT_MS = 5_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export class SessionBrokerClient {
private ws: WebSocket | null = null;
private lifecycle: WsLifecycle | null = null;
private _status: SessionBrokerStatus = "closed";
private closed = false;
private reconnectAttempt = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private helloTimer: NodeJS.Timeout | null = null;
/** Set when the broker rejects session_hello with `unknown_message_type` —
* older brokers without the 1.30.0 surface. We stop retrying. */
private brokerUnsupported = false;
constructor(private opts: SessionBrokerOptions) {}
@@ -90,31 +101,21 @@ export class SessionBrokerClient {
});
};
private setStatus(s: SessionBrokerStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run session_hello, resolve once the broker accepts. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return;
this.setStatus("connecting");
const ws = new WebSocket(this.opts.mesh.brokerUrl);
this.ws = ws;
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
this.lifecycle = await connectWsWithBackoff({
url: this.opts.mesh.brokerUrl,
buildHello: async () => {
const { timestamp, signature } = await signSessionHello({
meshId: this.opts.mesh.meshId,
parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey,
sessionSecretKey: this.opts.sessionSecretKey,
});
ws.send(JSON.stringify({
return {
type: "session_hello",
meshId: this.opts.mesh.meshId,
parentMemberId: this.opts.mesh.memberId,
@@ -132,70 +133,66 @@ export class SessionBrokerClient {
...(this.opts.role ? { role: this.opts.role } : {}),
timestamp,
signature,
}));
this.helloTimer = setTimeout(() => {
this.log("warn", "session_hello_ack_timeout");
try { ws.close(); } catch { /* ignore */ }
reject(new Error("session_hello_ack_timeout"));
}, HELLO_ACK_TIMEOUT_MS);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setStatus("open");
this.reconnectAttempt = 0;
resolve();
return;
}
};
},
isHelloAck: (msg) => msg.type === "hello_ack",
onMessage: (msg) => {
if (msg.type === "error") {
// Older brokers respond with `unknown_message_type` to session_hello;
// surface that so the daemon can decide to skip per-session presence
// rather than churn through reconnects.
// rather than churn through reconnects. Setting `closed` halts the
// helper's reconnect loop on the next close.
this.log("warn", "broker_error", { code: msg.code, message: msg.message });
if (msg.code === "unknown_message_type") {
this.brokerUnsupported = true;
this.closed = true;
void this.lifecycle?.close();
}
return;
}
// push / inbound — presence-only client ignores them; the daemon's
// member-keyed client handles all DM decryption.
});
ws.on("close", (code, reason) => {
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
if (this.closed) { this.setStatus("closed"); return; }
this.setStatus("reconnecting");
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "session_broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(
() => this.connect().catch((err) => this.log("warn", "session_broker_reconnect_failed", { err: String(err) })),
wait,
);
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
// 1.32.1 — DMs targeted at the launched session's pubkey arrive
// here, NOT on the daemon's member-keyed WS. Forward to the
// daemon-level push handler so they land in inbox.db.
if (msg.type === "push" || msg.type === "inbound") {
this.opts.onPush?.(msg);
return;
}
},
onStatusChange: (s) => {
this._status = s;
this.opts.onStatusChange?.(s);
},
log: (level, msg, meta) => this.log(level, `session_broker_${msg}`, meta),
});
}
ws.on("error", (err) => this.log("warn", "session_broker_ws_error", { err: err.message }));
/** v2 agentic-comms (M1): send `client_ack` back to the broker after
* successfully landing an inbound push in inbox.db. Broker uses the
* ack to set `delivered_at`. Best-effort. */
sendClientAck(clientMessageId: string, brokerMessageId: string | null): void {
if (this._status !== "open" || !this.lifecycle) return;
try {
this.lifecycle.send({
type: "client_ack",
clientMessageId,
...(brokerMessageId ? { brokerMessageId } : {}),
});
} catch { /* drop; lease re-delivers */ }
}
async close(): Promise<void> {
this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; }
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
try { this.ws?.close(); } catch { /* ignore */ }
this.setStatus("closed");
if (this.lifecycle) {
try { await this.lifecycle.close(); } catch { /* ignore */ }
this.lifecycle = null;
}
this._status = "closed";
}
/** True when the broker rejected our session_hello as unknown — caller
* may want to skip per-session presence entirely on this mesh. */
get isBrokerUnsupported(): boolean { return this.brokerUnsupported; }
}
function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) {

View File

@@ -0,0 +1,234 @@
/**
* Shared WS lifecycle helper for the daemon's two broker clients.
*
* Both `DaemonBrokerClient` (member-keyed, one per joined mesh) and
* `SessionBrokerClient` (session-keyed, one per launched session) used
* to inline the same connect/hello/ack-timeout/close-reconnect logic.
* They drifted apart subtly — different ack-timeout names, different
* reconnect log messages, slightly different status flips — and that's
* how 1.32.x bugs shipped (push handler attached to the wrong client,
* etc).
*
* This helper owns ONLY the lifecycle:
* - new WebSocket(url), wire up open/message/close/error
* - on open → call buildHello() and send the result
* - start an ack-timeout timer; if it fires before the hello ack
* arrives, close the socket and reject the connect promise
* - on message, gate on isHelloAck(); when true, flip status to
* "open", clear the ack timer, resolve. All other messages are
* forwarded to onMessage()
* - on close, schedule a backoff reconnect (unless explicitly closed)
*
* Each client keeps its own concerns: DaemonBrokerClient still owns
* pendingAcks / peerListResolvers / etc; SessionBrokerClient still owns
* its onPush callback. The helper just hands them an open WS and a
* stable status field, and reconnects under their feet on disconnect.
*
* Composition over inheritance — callers receive a `WsLifecycle` handle
* with `send` / `close` / `status`, NOT a subclass.
*/
import WebSocket from "ws";
export type WsStatus = "connecting" | "open" | "closed" | "reconnecting";
export type WsLogLevel = "info" | "warn" | "error";
export type WsLog = (level: WsLogLevel, msg: string, meta?: Record<string, unknown>) => void;
export interface WsLifecycleOptions {
/** Broker URL (e.g. wss://ic.claudemesh.com/ws). */
url: string;
/**
* Build the hello frame to send right after the WS opens. Async because
* signing the hello may need libsodium initialization. Whatever this
* returns is JSON.stringified and sent verbatim — the helper does NOT
* inspect or modify it.
*/
buildHello: () => Promise<unknown>;
/**
* Returns true iff `msg` is the hello ack the helper should treat as
* "broker accepted us; flip status to open". Both daemon-WS and
* session-WS use `{ type: "hello_ack" }` today, but keeping this a
* predicate lets either client narrow further (e.g. on a `code` field)
* without leaking client-specific shape into the helper.
*/
isHelloAck: (msg: Record<string, unknown>) => boolean;
/**
* Called for every parsed message that is NOT the hello ack. The
* helper does NOT decide which messages are pushes vs RPCs vs errors;
* that's the caller's concern.
*/
onMessage: (msg: Record<string, unknown>) => void;
onStatusChange?: (s: WsStatus) => void;
/**
* How long to wait for the broker's hello ack before giving up and
* forcing a close. Defaults 5s — same as both pre-refactor clients.
*/
helloAckTimeoutMs?: number;
/**
* Reconnect backoff schedule. Defaults [1s, 2s, 4s, 8s, 16s, 30s] —
* matches both pre-refactor clients exactly.
*/
backoffCapsMs?: readonly number[];
log?: WsLog;
/**
* Hook for the close path BEFORE the helper schedules a reconnect.
* Used by DaemonBrokerClient to fail its in-flight pendingAcks map
* with a "broker_disconnected_<code>" reason. The helper passes the
* raw close code so the caller can shape its rejection text.
*
* Returns nothing — close handling continues regardless.
*/
onBeforeReconnect?: (code: number, reason: string) => void;
}
export interface WsLifecycle {
/** Current connection status. Updated synchronously before onStatusChange fires. */
readonly status: WsStatus;
/** Underlying socket. Exposed for callers that need OPEN-state checks
* before sending (mirrors the pre-refactor `this.ws.readyState` checks). */
readonly ws: WebSocket | null;
/** Send a JSON payload over the open WS. Throws if not open — callers
* that need queue-while-disconnected semantics should layer that
* themselves (DaemonBrokerClient does, via its `opens` deferred-fn array). */
send(payload: unknown): void;
/** Close the WS and stop reconnecting. Idempotent. */
close(): Promise<void>;
}
const DEFAULT_HELLO_ACK_TIMEOUT_MS = 5_000;
const DEFAULT_BACKOFF_CAPS_MS: readonly number[] = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
const defaultLog: WsLog = (level, msg, meta) => {
const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() });
if (level === "info") process.stdout.write(line + "\n");
else process.stderr.write(line + "\n");
};
/**
* Connect a WebSocket with hello-handshake, ack-timeout, and reconnect
* with exponential backoff. Resolves once the broker accepts the hello;
* rejects if the first connect closes before the ack lands.
*
* Subsequent automatic reconnects are silent — they fire on the close
* handler's backoff timer and surface only via onStatusChange (and any
* caller-installed log).
*/
export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecycle> {
const helloAckTimeoutMs = opts.helloAckTimeoutMs ?? DEFAULT_HELLO_ACK_TIMEOUT_MS;
const backoffCapsMs = opts.backoffCapsMs ?? DEFAULT_BACKOFF_CAPS_MS;
const log: WsLog = opts.log ?? defaultLog;
let ws: WebSocket | null = null;
let status: WsStatus = "closed";
let closed = false;
let reconnectAttempt = 0;
let reconnectTimer: NodeJS.Timeout | null = null;
let helloTimer: NodeJS.Timeout | null = null;
const setStatus = (s: WsStatus) => {
if (status === s) return;
status = s;
opts.onStatusChange?.(s);
};
/**
* Open one WS attempt. Returns a promise that resolves on hello ack
* or rejects if the socket closes before we get one. Used by both the
* initial connect and the close-handler backoff timer (which awaits
* but ignores the rejection — by then the close handler has already
* scheduled its own reconnect).
*/
const openOnce = (): Promise<void> => {
if (closed) return Promise.reject(new Error("client_closed"));
setStatus("connecting");
const sock = new WebSocket(opts.url);
ws = sock;
return new Promise<void>((resolve, reject) => {
sock.on("open", () => {
// Build and send the hello inside a microtask so any sync
// throws from buildHello() reject this connect attempt cleanly.
(async () => {
try {
const hello = await opts.buildHello();
sock.send(JSON.stringify(hello));
helloTimer = setTimeout(() => {
log("warn", "hello_ack_timeout", { url: opts.url });
try { sock.close(); } catch { /* ignore */ }
reject(new Error("hello_ack_timeout"));
}, helloAckTimeoutMs);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
})();
});
sock.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (opts.isHelloAck(msg)) {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
setStatus("open");
reconnectAttempt = 0;
resolve();
// Don't forward hello_ack to onMessage — both pre-refactor
// clients consumed it inline and never delegated.
return;
}
opts.onMessage(msg);
});
sock.on("close", (code, reason) => {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
const reasonStr = reason.toString("utf8");
opts.onBeforeReconnect?.(code, reasonStr);
if (closed) {
setStatus("closed");
return;
}
setStatus("reconnecting");
const wait = backoffCapsMs[Math.min(reconnectAttempt, backoffCapsMs.length - 1)] ?? 30_000;
reconnectAttempt++;
log("info", "ws_reconnect_scheduled", { url: opts.url, wait_ms: wait, code, reason: reasonStr });
reconnectTimer = setTimeout(
() => openOnce().catch((err) => log("warn", "ws_reconnect_failed", { url: opts.url, err: String(err) })),
wait,
);
// First attempt failure (still in connecting) also rejects the
// initial connect promise so callers can surface it.
if (status === "connecting" || status === "reconnecting") {
reject(new Error(`closed_before_hello_${code}`));
}
});
sock.on("error", (err) => log("warn", "ws_error", { url: opts.url, err: err.message }));
});
};
return openOnce().then(() => {
const handle: WsLifecycle = {
get status() { return status; },
get ws() { return ws; },
send(payload: unknown) {
if (!ws || ws.readyState !== ws.OPEN) {
throw new Error("ws_not_open");
}
ws.send(JSON.stringify(payload));
},
async close() {
closed = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
try { ws?.close(); } catch { /* ignore */ }
setStatus("closed");
},
};
return handle;
});
}

View File

@@ -0,0 +1,48 @@
-- Milestone 1 (v2 agentic-comms architecture).
--
-- Two concerns rolled into one migration because both are tiny and both
-- ship together with the broker change in the same PR:
--
-- 1. message_queue claim/lease columns (drainForMember race fix)
-- --------------------------------------------------------------
-- Before this migration, drainForMember claimed rows by setting
-- `delivered_at = NOW()` inside the same UPDATE that selected them.
-- If the recipient WS was closed between claim-time and ws.send(),
-- the message was silently dropped — the row read as "delivered" so
-- the next reconnect's drain skipped it. At-most-once semantics with
-- no retry hook.
--
-- The fix moves to two-phase claim/deliver with a lease:
-- claimed_at — set when drainForMember picks the row
-- claim_id — presenceId of the claimer (debugging)
-- claim_expires_at — claimed_at + 30s; if no `client_ack` lands by
-- then, a sweeper clears the claim and the row
-- is re-eligible for a new drain (at-least-once).
--
-- `delivered_at` only gets set when the recipient WS replies with a
-- `client_ack` containing the original client_message_id. Until any
-- daemon emits `client_ack`, claims will simply expire and re-deliver
-- — which is the desired retry behaviour for unreliable transports.
--
-- 2. presence.role column
-- --------------------------------------------------------------
-- The CLI currently hides daemon connections from `peer list` by
-- matching `peerType === 'claudemesh-daemon'`, which is fragile and
-- overloads a free-form field. M1 introduces a typed `role` column on
-- presence with three documented values:
-- 'control-plane' — long-lived daemon WS (one per host)
-- 'session' — per-Claude-Code-session WS (default)
-- 'service' — autonomous bots/services attached to a mesh
--
-- Backfilled to 'session' (default) so legacy presence rows keep their
-- existing visibility. The two hello paths in the broker pass
-- 'control-plane' / 'session' explicitly. CLI-side filter swap
-- (peerType -> role) is a follow-up worktree.
ALTER TABLE "mesh"."message_queue"
ADD COLUMN "claimed_at" timestamp,
ADD COLUMN "claim_id" text,
ADD COLUMN "claim_expires_at" timestamp;
ALTER TABLE "mesh"."presence"
ADD COLUMN "role" text NOT NULL DEFAULT 'session';

View File

@@ -326,6 +326,14 @@ export const presence = meshSchema.table("presence", {
statusUpdatedAt: timestamp().defaultNow().notNull(),
summary: text(),
groups: jsonb().$type<{ name: string; role?: string }[]>().default([]),
// v2 agentic-comms (M1): connection role for routing/visibility.
// 'control-plane' — long-lived daemon WS (claudemesh daemon),
// used for fan-out and presence orchestration.
// Hidden from user-facing peer lists.
// 'session' — per-Claude-Code session WS (default).
// 'service' — autonomous bots/services attached to the mesh.
// Always populated; default 'session' keeps legacy hellos working.
role: text().notNull().default("session"),
connectedAt: timestamp().defaultNow().notNull(),
lastPingAt: timestamp().defaultNow().notNull(),
disconnectedAt: timestamp(),
@@ -367,6 +375,14 @@ export const messageQueue = meshSchema.table("message_queue", {
// §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want
// to enforce idempotency on retries will read this column.
requestFingerprint: text("request_fingerprint"),
// v2 agentic-comms (M1): two-phase claim/deliver with lease.
// `drainForMember` claims a row by setting (claimedAt, claimId,
// claimExpiresAt) — NOT deliveredAt. The recipient's WS only marks
// deliveredAt after replying with a `client_ack`. A periodic sweeper
// reaps expired claims so dropped pushes are redelivered (at-least-once).
claimedAt: timestamp(),
claimId: text("claim_id"),
claimExpiresAt: timestamp(),
});
/**