feat: 1.33.0 — m1 ship: peerRole rename + client_ack wired + version bump
Resolves the merge of m1-broker-drain-race-and-presence-role and
m1-cli-lifecycle-and-role-peer-list into main:
* Rename wire-level role classification field `role` → `peerRole`
to avoid collision with 1.31.5's top-level `role` lift of
`profile.role` (user-supplied string consumed by the agent-vibes
claudemesh skill). `peerRole` is the broker presence taxonomy
(control-plane/session/service); top-level `role` keeps its 1.31.5
semantics.
- apps/broker/src/broker.ts (listPeersInMesh return)
- apps/broker/src/index.ts (peers_list response)
- apps/broker/src/types.ts (WSPeersListMessage)
- apps/cli/src/commands/peers.ts (PeerRecord + filter + lift)
* Wire CLI client_ack emission: handleBrokerPush gains
ackClientMessage callback; daemon-WS and session-WS each got a
sendClientAck() method that frames {type:"client_ack",
clientMessageId, brokerMessageId?} and forwards via the lifecycle
helper. Run.ts wires the callback into both onPush paths.
Receiver dedupes against existing inbox row first then acks
unconditionally — broker needs the ack regardless of dedupe to
release its claim lease.
- apps/cli/src/daemon/inbound.ts (ackClientMessage in InboundContext)
- apps/cli/src/daemon/broker.ts + session-broker.ts (sendClientAck)
- apps/cli/src/daemon/run.ts (wire-up)
* Version bump 1.32.1 → 1.33.0; CHANGELOG entry replaces "Unreleased"
with full m1 description.
Verification: tsc clean across cli + broker; CLI 83/83 unit tests
pass; broker 50 unit tests pass (5 integration test files require a
live Postgres and were skipped — pre-existing infra gap, not a
regression). CLI bundle rebuilt; version 1.33.0 baked.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -281,6 +281,22 @@ export class DaemonBrokerClient {
|
||||
return !!sock && sock.readyState === sock.OPEN;
|
||||
}
|
||||
|
||||
/** v2 agentic-comms (M1): send `client_ack` back to the broker after
|
||||
* successfully landing an inbound push in inbox.db. Broker uses the
|
||||
* ack to set `delivered_at` (atomic at-least-once). Best-effort —
|
||||
* if the WS isn't open, drop the ack; broker's 30s lease will
|
||||
* re-deliver. */
|
||||
sendClientAck(clientMessageId: string, brokerMessageId: string | null): void {
|
||||
if (!this.isOpen()) return;
|
||||
try {
|
||||
this.lifecycle!.send({
|
||||
type: "client_ack",
|
||||
clientMessageId,
|
||||
...(brokerMessageId ? { brokerMessageId } : {}),
|
||||
});
|
||||
} catch { /* drop; lease re-delivers */ }
|
||||
}
|
||||
|
||||
/** Send one outbox row. Resolves on broker ack/timeout. */
|
||||
send(req: BrokerSendArgs): Promise<BrokerSendResult> {
|
||||
return new Promise<BrokerSendResult>((resolve) => {
|
||||
|
||||
@@ -18,6 +18,13 @@ export interface InboundContext {
|
||||
/** Daemon's session secret key hex (rotates per connect). When the
|
||||
* sender encrypted to our session pubkey, decrypt with this instead. */
|
||||
sessionSecretKeyHex?: string;
|
||||
/** v2 agentic-comms (M1): emit `client_ack` back to the broker after
|
||||
* the message lands in inbox.db. Broker uses the ack to set
|
||||
* `delivered_at` (atomic at-least-once). Without it, the broker's
|
||||
* 30s lease expires and re-delivers — correct but noisy. The WS
|
||||
* client owns this callback because it's the one that owns the
|
||||
* socket; inbound.ts just signals "I accepted this id." */
|
||||
ackClientMessage?: (clientMessageId: string, brokerMessageId: string | null) => void;
|
||||
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
|
||||
}
|
||||
|
||||
@@ -73,6 +80,12 @@ export async function handleBrokerPush(msg: Record<string, unknown>, ctx: Inboun
|
||||
reply_to_id: replyToId,
|
||||
});
|
||||
|
||||
// Whether the row was newly inserted or already existed (dedupe), the
|
||||
// broker still wants to know we received and processed this message —
|
||||
// ack regardless. Skipping ack on dedupe would leak: broker would
|
||||
// re-deliver after lease, and the receiver would re-dedupe forever.
|
||||
ctx.ackClientMessage?.(clientMessageId, brokerMessageId);
|
||||
|
||||
if (!inserted) return; // already had this id; no event
|
||||
|
||||
ctx.bus.publish("message", {
|
||||
|
||||
@@ -127,7 +127,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
|
||||
const meshConfigs = new Map<string, typeof cfg.meshes[number]>();
|
||||
for (const mesh of meshes) {
|
||||
meshConfigs.set(mesh.slug, mesh);
|
||||
const broker = new DaemonBrokerClient(mesh, {
|
||||
const broker: DaemonBrokerClient = new DaemonBrokerClient(mesh, {
|
||||
displayName: opts.displayName,
|
||||
onStatusChange: (s) => {
|
||||
process.stdout.write(JSON.stringify({
|
||||
@@ -146,6 +146,9 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
|
||||
bus,
|
||||
meshSlug: mesh.slug,
|
||||
recipientSecretKeyHex: mesh.secretKey,
|
||||
// v2 agentic-comms (M1): client_ack closes the at-least-once
|
||||
// loop. Broker holds the row claimed (not delivered) until ack.
|
||||
ackClientMessage: (cmid, bmid) => broker.sendClientAck(cmid, bmid),
|
||||
});
|
||||
},
|
||||
});
|
||||
@@ -187,7 +190,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
|
||||
// session secret key; member key remains the fallback for legacy
|
||||
// member-targeted traffic that happens to fan out here.
|
||||
const sessionSecretKeyHex = info.presence.sessionSecretKey;
|
||||
const client = new SessionBrokerClient({
|
||||
const client: SessionBrokerClient = new SessionBrokerClient({
|
||||
mesh: meshConfig,
|
||||
sessionPubkey: info.presence.sessionPubkey,
|
||||
sessionSecretKey: info.presence.sessionSecretKey,
|
||||
@@ -204,6 +207,8 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
|
||||
meshSlug: meshConfig.slug,
|
||||
recipientSecretKeyHex: meshConfig.secretKey,
|
||||
sessionSecretKeyHex,
|
||||
// v2 agentic-comms (M1): close the at-least-once loop.
|
||||
ackClientMessage: (cmid, bmid) => client.sendClientAck(cmid, bmid),
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
@@ -167,6 +167,20 @@ export class SessionBrokerClient {
|
||||
});
|
||||
}
|
||||
|
||||
/** v2 agentic-comms (M1): send `client_ack` back to the broker after
|
||||
* successfully landing an inbound push in inbox.db. Broker uses the
|
||||
* ack to set `delivered_at`. Best-effort. */
|
||||
sendClientAck(clientMessageId: string, brokerMessageId: string | null): void {
|
||||
if (this._status !== "open" || !this.lifecycle) return;
|
||||
try {
|
||||
this.lifecycle.send({
|
||||
type: "client_ack",
|
||||
clientMessageId,
|
||||
...(brokerMessageId ? { brokerMessageId } : {}),
|
||||
});
|
||||
} catch { /* drop; lease re-delivers */ }
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
this.closed = true;
|
||||
if (this.lifecycle) {
|
||||
|
||||
Reference in New Issue
Block a user