3 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
59848f0d3e chore(cli): v0.6.6 — correlation ID refactor (resolver Maps + _reqId)
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
2026-04-07 14:31:29 +01:00
Alejandro Gutiérrez
d0fa1c028f fix(broker): echo _reqId in all WS responses for correlation ID routing
Extract _reqId from incoming WS messages and include it in every direct
response sendToPeer call and sendError call. Clients can now match
responses to requests by ID instead of relying on FIFO ordering.
Old clients without _reqId are unaffected (field simply omitted).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:28:30 +01:00
Alejandro Gutiérrez
8f925d9a9e fix(cli): correlation ID refactor — resolver Maps with _reqId + FIFO fallback
Replace all 22 resolver Array<fn> patterns with Map<reqId, {resolve, timer}>.
Outgoing messages now include _reqId; on response the broker's echoed _reqId
is used for exact matching, with FIFO fallback for brokers that don't echo it.
Add makeReqId() helper and resolveFromMap() utility. Error propagation block
updated to iterate Maps and pop the oldest entry across all queues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:25:51 +01:00
4 changed files with 334 additions and 316 deletions

View File

@@ -547,8 +547,9 @@ function sendError(
code: string,
message: string,
id?: string,
reqId?: string,
): void {
const err: WSServerMessage = { type: "error", code, message, id };
const err: WSServerMessage = { type: "error", code, message, id, ...(reqId ? { _reqId: reqId } : {}) };
try {
ws.send(JSON.stringify(err));
} catch {
@@ -727,6 +728,7 @@ function handleConnection(ws: WebSocket): void {
ws.on("message", async (raw) => {
try {
const msg = JSON.parse(raw.toString()) as WSClientMessage;
const _reqId = (msg as any)._reqId as string | undefined;
if (msg.type === "hello") {
const result = await handleHello(ws, msg);
if (!result) return;
@@ -776,6 +778,7 @@ function handleConnection(ws: WebSocket): void {
sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
log.info("ws list_peers", {
@@ -862,6 +865,7 @@ function handleConnection(ws: WebSocket): void {
value: stateEntry.value,
updatedBy: stateEntry.updatedBy,
updatedAt: stateEntry.updatedAt.toISOString(),
...(_reqId ? { _reqId } : {}),
});
} else {
sendToPeer(presenceId, {
@@ -870,6 +874,7 @@ function handleConnection(ws: WebSocket): void {
value: null,
updatedBy: "",
updatedAt: "",
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws get_state", {
@@ -889,6 +894,7 @@ function handleConnection(ws: WebSocket): void {
updatedBy: e.updatedBy,
updatedAt: e.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_state", {
presence_id: presenceId,
@@ -911,6 +917,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "memory_stored",
id: memoryId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws remember", {
presence_id: presenceId,
@@ -930,6 +937,7 @@ function handleConnection(ws: WebSocket): void {
rememberedBy: m.rememberedBy,
rememberedAt: m.rememberedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws recall", {
presence_id: presenceId,
@@ -946,6 +954,7 @@ function handleConnection(ws: WebSocket): void {
id: fg.memoryId,
messageId: fg.memoryId,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws forget", {
presence_id: presenceId,
@@ -957,7 +966,7 @@ function handleConnection(ws: WebSocket): void {
const gf = msg as Extract<WSClientMessage, { type: "get_file" }>;
const file = await getFile(conn.meshId, gf.fileId);
if (!file) {
sendError(conn.ws, "not_found", "file not found");
sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break;
}
// Access control: if targetSpec is set, verify peer matches
@@ -967,7 +976,7 @@ function handleConnection(ws: WebSocket): void {
file.targetSpec === conn.sessionPubkey ||
file.targetSpec === "*";
if (!matches) {
sendError(conn.ws, "forbidden", "file not targeted at you");
sendError(conn.ws, "forbidden", "file not targeted at you", undefined, _reqId);
break;
}
}
@@ -980,7 +989,7 @@ function handleConnection(ws: WebSocket): void {
const isOwner = !!(file.ownerPubkey && peerPubkey === file.ownerPubkey);
sealedKey = peerPubkey ? await getFileKey(gf.fileId, peerPubkey) : null;
if (!sealedKey && !isOwner) {
sendError(conn.ws, "forbidden", "no decryption key for this file");
sendError(conn.ws, "forbidden", "no decryption key for this file", undefined, _reqId);
break;
}
}
@@ -1007,6 +1016,7 @@ function handleConnection(ws: WebSocket): void {
name: file.name,
encrypted: file.encrypted,
sealedKey: sealedKey ?? undefined,
...(_reqId ? { _reqId } : {}),
});
log.info("ws get_file", {
presence_id: presenceId,
@@ -1029,6 +1039,7 @@ function handleConnection(ws: WebSocket): void {
persistent: f.persistent,
encrypted: f.encrypted,
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_files", {
presence_id: presenceId,
@@ -1047,6 +1058,7 @@ function handleConnection(ws: WebSocket): void {
peerName: a.peerName,
accessedAt: a.accessedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws file_status", {
presence_id: presenceId,
@@ -1058,16 +1070,16 @@ function handleConnection(ws: WebSocket): void {
const gfa = msg as { type: "grant_file_access"; fileId: string; peerPubkey: string; sealedKey: string };
const file = await getFile(conn.meshId, gfa.fileId);
if (!file) {
sendError(conn.ws, "not_found", "file not found");
sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break;
}
const requestorPubkey = conn.sessionPubkey ?? conn.memberPubkey;
if (file.ownerPubkey && file.ownerPubkey !== requestorPubkey) {
sendError(conn.ws, "forbidden", "only the file owner can grant access");
sendError(conn.ws, "forbidden", "only the file owner can grant access", undefined, _reqId);
break;
}
await grantFileKey(gfa.fileId, gfa.peerPubkey, gfa.sealedKey, requestorPubkey ?? undefined);
sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey });
sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey, ...(_reqId ? { _reqId } : {}) });
log.info("ws grant_file_access", { presence_id: presenceId, file_id: gfa.fileId, peer: gfa.peerPubkey });
break;
}
@@ -1079,6 +1091,7 @@ function handleConnection(ws: WebSocket): void {
id: df.fileId,
messageId: df.fileId,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws delete_file", {
presence_id: presenceId,
@@ -1099,7 +1112,7 @@ function handleConnection(ws: WebSocket): void {
.from(messageQueue)
.where(eq(messageQueue.id, ms.messageId));
if (!mqRow || mqRow.meshId !== conn.meshId) {
sendError(conn.ws, "not_found", "message not found");
sendError(conn.ws, "not_found", "message not found", undefined, _reqId);
break;
}
// Build per-recipient status from connected peers.
@@ -1143,6 +1156,7 @@ function handleConnection(ws: WebSocket): void {
delivered: !!mqRow.deliveredAt,
deliveredAt: mqRow.deliveredAt?.toISOString() ?? null,
recipients,
...(_reqId ? { _reqId } : {}),
};
sendToPeer(presenceId, resp);
log.info("ws message_status", {
@@ -1201,6 +1215,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws get_context", {
presence_id: presenceId,
@@ -1219,6 +1234,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_contexts", {
presence_id: presenceId,
@@ -1243,6 +1259,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "task_created",
id: taskId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws create_task", {
presence_id: presenceId,
@@ -1263,7 +1280,7 @@ function handleConnection(ws: WebSocket): void {
memberInfo?.displayName,
);
if (!claimed) {
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist");
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist", undefined, _reqId);
break;
}
// Return updated task list so caller sees the change.
@@ -1281,6 +1298,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws claim_task", {
presence_id: presenceId,
@@ -1296,7 +1314,7 @@ function handleConnection(ws: WebSocket): void {
cpt.result,
);
if (!completed) {
sendError(conn.ws, "task_not_found", "task not found in this mesh");
sendError(conn.ws, "task_not_found", "task not found in this mesh", undefined, _reqId);
break;
}
// Return updated task list.
@@ -1314,6 +1332,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws complete_task", {
presence_id: presenceId,
@@ -1337,6 +1356,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_tasks", {
presence_id: presenceId,
@@ -1362,6 +1382,7 @@ function handleConnection(ws: WebSocket): void {
type: "stream_created",
id: streamId,
name: cs.name,
...(_reqId ? { _reqId } : {}),
});
log.info("ws create_stream", {
presence_id: presenceId,
@@ -1376,7 +1397,7 @@ function handleConnection(ws: WebSocket): void {
if (!streamSubscriptions.has(key))
streamSubscriptions.set(key, new Set());
streamSubscriptions.get(key)!.add(presenceId);
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream });
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream, ...(_reqId ? { _reqId } : {}) });
log.info("ws subscribe", {
presence_id: presenceId,
stream: sub.stream,
@@ -1435,6 +1456,7 @@ function handleConnection(ws: WebSocket): void {
subscriberCount: streamSubscriptions.get(key)?.size ?? 0,
};
}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_streams", {
presence_id: presenceId,
@@ -1476,6 +1498,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "vector_stored",
id: pointId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws vector_store", {
presence_id: presenceId,
@@ -1483,7 +1506,7 @@ function handleConnection(ws: WebSocket): void {
point_id: pointId,
});
} catch (e) {
sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e));
sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e), undefined, _reqId);
}
break;
}
@@ -1518,12 +1541,14 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "vector_results",
results: matches,
...(_reqId ? { _reqId } : {}),
});
} catch {
// Collection may not exist yet — return empty results.
sendToPeer(presenceId, {
type: "vector_results",
results: [],
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws vector_search", {
@@ -1549,6 +1574,7 @@ function handleConnection(ws: WebSocket): void {
id: vd.id,
messageId: vd.id,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws vector_delete", {
presence_id: presenceId,
@@ -1568,11 +1594,13 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "collection_list",
collections: meshCollections,
...(_reqId ? { _reqId } : {}),
});
} catch {
sendToPeer(presenceId, {
type: "collection_list",
collections: [],
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws list_collections", {
@@ -1607,9 +1635,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "graph_result",
records: gqRecords,
...(_reqId ? { _reqId } : {}),
});
} catch (gqErr) {
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr));
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr), undefined, _reqId);
} finally {
await gqSession.close();
}
@@ -1641,9 +1670,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "graph_result",
records: geRecords,
...(_reqId ? { _reqId } : {}),
});
} catch (geErr) {
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr));
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr), undefined, _reqId);
} finally {
await geSession.close();
}
@@ -1660,12 +1690,14 @@ function handleConnection(ws: WebSocket): void {
const mq = msg as Extract<WSClientMessage, { type: "mesh_query" }>;
try {
const result = await meshQuery(conn.meshId, mq.sql);
sendToPeer(presenceId, { type: "mesh_query_result", ...result });
sendToPeer(presenceId, { type: "mesh_query_result", ...result, ...(_reqId ? { _reqId } : {}) });
} catch (e) {
sendError(
conn.ws,
"query_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_query", {
@@ -1683,12 +1715,15 @@ function handleConnection(ws: WebSocket): void {
columns: [],
rows: [],
rowCount: result.rowCount,
...(_reqId ? { _reqId } : {}),
});
} catch (e) {
sendError(
conn.ws,
"execute_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_execute", {
@@ -1700,12 +1735,14 @@ function handleConnection(ws: WebSocket): void {
case "mesh_schema": {
try {
const tables = await meshSchema(conn.meshId);
sendToPeer(presenceId, { type: "mesh_schema_result", tables });
sendToPeer(presenceId, { type: "mesh_schema_result", tables, ...(_reqId ? { _reqId } : {}) });
} catch (e) {
sendError(
conn.ws,
"schema_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_schema", { presence_id: presenceId });
@@ -1746,6 +1783,7 @@ function handleConnection(ws: WebSocket): void {
collections: [],
yourName: selfPeer?.displayName ?? "unknown",
yourGroups: peerConn?.groups ?? [],
...(_reqId ? { _reqId } : {}),
});
log.info("ws mesh_info", { presence_id: presenceId });
break;

View File

@@ -161,6 +161,7 @@ export interface WSAckMessage {
id: string; // echoes client-side correlation id
messageId: string;
queued: boolean;
_reqId?: string;
}
/** Broker → client: hello handshake acknowledgement. */
@@ -182,6 +183,7 @@ export interface WSPeersListMessage {
sessionId: string;
connectedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: a state key was changed by another peer. */
@@ -199,6 +201,7 @@ export interface WSStateResultMessage {
value: unknown;
updatedAt: string;
updatedBy: string;
_reqId?: string;
}
/** Broker → client: response to list_state. */
@@ -210,12 +213,14 @@ export interface WSStateListMessage {
updatedBy: string;
updatedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: acknowledgement for a remember. */
export interface WSMemoryStoredMessage {
type: "memory_stored";
id: string;
_reqId?: string;
}
/** Broker → client: response to recall. */
@@ -228,6 +233,7 @@ export interface WSMemoryResultsMessage {
rememberedBy: string;
rememberedAt: string;
}>;
_reqId?: string;
}
// --- Vector storage messages ---
@@ -299,6 +305,7 @@ export interface WSMeshSchemaMessage {
export interface WSVectorStoredMessage {
type: "vector_stored";
id: string;
_reqId?: string;
}
/** Broker → client: vector search results. */
@@ -310,18 +317,21 @@ export interface WSVectorResultsMessage {
score: number;
metadata?: Record<string, unknown>;
}>;
_reqId?: string;
}
/** Broker → client: list of vector collections. */
export interface WSCollectionListMessage {
type: "collection_list";
collections: string[];
_reqId?: string;
}
/** Broker → client: graph query results. */
export interface WSGraphResultMessage {
type: "graph_result";
records: Array<Record<string, unknown>>;
_reqId?: string;
}
/** Broker → client: mesh SQL query results. */
@@ -330,6 +340,7 @@ export interface WSMeshQueryResultMessage {
columns: string[];
rows: Array<Record<string, unknown>>;
rowCount: number;
_reqId?: string;
}
/** Broker → client: mesh schema introspection results. */
@@ -339,6 +350,7 @@ export interface WSMeshSchemaResultMessage {
name: string;
columns: Array<{ name: string; type: string; nullable: boolean }>;
}>;
_reqId?: string;
}
/** Client → broker: get full mesh overview. */
@@ -361,6 +373,7 @@ export interface WSMeshInfoResultMessage {
collections: string[];
yourName: string;
yourGroups: Array<{ name: string; role?: string }>;
_reqId?: string;
}
/** Client → broker: check delivery status of a message. */
@@ -381,6 +394,7 @@ export interface WSMessageStatusResultMessage {
pubkey: string;
status: "delivered" | "held" | "disconnected";
}>;
_reqId?: string;
}
// --- File sharing messages ---
@@ -426,6 +440,7 @@ export interface WSFileUrlMessage {
name: string;
encrypted?: boolean;
sealedKey?: string;
_reqId?: string;
}
/** Broker → client: list of files in the mesh. */
@@ -441,6 +456,7 @@ export interface WSFileListMessage {
persistent: boolean;
encrypted: boolean;
}>;
_reqId?: string;
}
/** Broker → client: acknowledgement for grant_file_access. */
@@ -448,6 +464,7 @@ export interface WSGrantFileAccessOkMessage {
type: "grant_file_access_ok";
fileId: string;
peerPubkey: string;
_reqId?: string;
}
/** Broker → client: access log for a file. */
@@ -458,6 +475,7 @@ export interface WSFileStatusResultMessage {
peerName: string;
accessedAt: string;
}>;
_reqId?: string;
}
// --- Context sharing messages ---
@@ -499,6 +517,7 @@ export interface WSContextResultsMessage {
tags: string[];
updatedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: response to list_contexts. */
@@ -510,6 +529,7 @@ export interface WSContextListMessage {
tags: string[];
updatedAt: string;
}>;
_reqId?: string;
}
// --- Task messages ---
@@ -547,6 +567,7 @@ export interface WSListTasksMessage {
export interface WSTaskCreatedMessage {
type: "task_created";
id: string;
_reqId?: string;
}
/** Broker → client: response to list_tasks, claim_task, complete_task. */
@@ -563,6 +584,7 @@ export interface WSTaskListMessage {
tags: string[];
createdAt: string;
}>;
_reqId?: string;
}
// --- Stream messages ---
@@ -602,6 +624,7 @@ export interface WSStreamCreatedMessage {
type: "stream_created";
id: string;
name: string;
_reqId?: string;
}
/** Broker → client: real-time data pushed from a stream. */
@@ -616,6 +639,7 @@ export interface WSStreamDataMessage {
export interface WSSubscribedMessage {
type: "subscribed";
stream: string;
_reqId?: string;
}
/** Broker → client: response to list_streams. */
@@ -628,6 +652,7 @@ export interface WSStreamListMessage {
createdAt: string;
subscriberCount: number;
}>;
_reqId?: string;
}
/** Broker → client: structured error. */
@@ -636,6 +661,7 @@ export interface WSErrorMessage {
code: string;
message: string;
id?: string;
_reqId?: string;
}
export type WSClientMessage =

View File

@@ -1,6 +1,6 @@
{
"name": "claudemesh-cli",
"version": "0.6.5",
"version": "0.6.6",
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
"keywords": [
"claude-code",

View File

@@ -75,15 +75,15 @@ export class BrokerClient {
private outbound: Array<() => void> = []; // closures that send once ws is open
private pushHandlers = new Set<PushHandler>();
private pushBuffer: InboundPush[] = [];
private listPeersResolvers: Array<(peers: PeerInfo[]) => void> = [];
private stateResolvers: Array<(result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void> = [];
private stateListResolvers: Array<(entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void> = [];
private memoryStoreResolvers: Array<(id: string | null) => void> = [];
private memoryRecallResolvers: Array<(memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void> = [];
private listPeersResolvers = new Map<string, { resolve: (peers: PeerInfo[]) => void; timer: NodeJS.Timeout }>();
private stateResolvers = new Map<string, { resolve: (result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void; timer: NodeJS.Timeout }>();
private stateListResolvers = new Map<string, { resolve: (entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers = new Map<string, { resolve: (memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void; timer: NodeJS.Timeout }>();
private stateChangeHandlers = new Set<(change: { key: string; value: unknown; updatedBy: string }) => void>();
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private grantFileAccessResolvers: Array<(ok: boolean) => void> = [];
private grantFileAccessResolvers = new Map<string, { resolve: (ok: boolean) => void; timer: NodeJS.Timeout }>();
private closed = false;
private reconnectAttempt = 0;
private helloTimer: NodeJS.Timeout | null = null;
@@ -116,6 +116,10 @@ export class BrokerClient {
/** Session secret key hex (null before first connection). */
getSessionSecretKey(): string | null { return this.sessionSecretKey; }
private makeReqId(): string {
return Math.random().toString(36).slice(2) + Date.now().toString(36);
}
/** Open WS, send hello, resolve when hello_ack received. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client is closed");
@@ -305,16 +309,11 @@ export class BrokerClient {
async listPeers(): Promise<PeerInfo[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.listPeersResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_peers" }));
// Timeout after 5s — return empty list rather than hang.
setTimeout(() => {
const idx = this.listPeersResolvers.indexOf(resolve);
if (idx !== -1) {
this.listPeersResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.listPeersResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.listPeersResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId }));
});
}
@@ -348,15 +347,11 @@ export class BrokerClient {
async getState(key: string): Promise<{ key: string; value: unknown; updatedBy: string; updatedAt: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.stateResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_state", key }));
setTimeout(() => {
const idx = this.stateResolvers.indexOf(resolve);
if (idx !== -1) {
this.stateResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.stateResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.stateResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId }));
});
}
@@ -364,15 +359,11 @@ export class BrokerClient {
async listState(): Promise<Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.stateListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_state" }));
setTimeout(() => {
const idx = this.stateListResolvers.indexOf(resolve);
if (idx !== -1) {
this.stateListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.stateListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.stateListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId }));
});
}
@@ -382,15 +373,11 @@ export class BrokerClient {
async remember(content: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.memoryStoreResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "remember", content, tags }));
setTimeout(() => {
const idx = this.memoryStoreResolvers.indexOf(resolve);
if (idx !== -1) {
this.memoryStoreResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.memoryStoreResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId }));
});
}
@@ -398,15 +385,11 @@ export class BrokerClient {
async recall(query: string): Promise<Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.memoryRecallResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "recall", query }));
setTimeout(() => {
const idx = this.memoryRecallResolvers.indexOf(resolve);
if (idx !== -1) {
this.memoryRecallResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.memoryRecallResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId }));
});
}
@@ -417,33 +400,32 @@ export class BrokerClient {
}
/** Check delivery status of a sent message. */
private messageStatusResolvers: Array<(result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void> = [];
private fileUrlResolvers: Array<(result: { url: string; name: string; encrypted?: boolean; sealedKey?: string } | null) => void> = [];
private fileListResolvers: Array<(files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void> = [];
private fileStatusResolvers: Array<(accesses: Array<{ peerName: string; accessedAt: string }>) => void> = [];
private vectorStoredResolvers: Array<(id: string | null) => void> = [];
private vectorResultsResolvers: Array<(results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void> = [];
private collectionListResolvers: Array<(collections: string[]) => void> = [];
private graphResultResolvers: Array<(rows: Array<Record<string, unknown>>) => void> = [];
private contextListResolvers: Array<(contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void> = [];
private contextResultsResolvers: Array<(contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void> = [];
private taskCreatedResolvers: Array<(id: string | null) => void> = [];
private taskListResolvers: Array<(tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void> = [];
private meshQueryResolvers: Array<(result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void> = [];
private meshSchemaResolvers: Array<(tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void> = [];
private streamCreatedResolvers: Array<(id: string | null) => void> = [];
private streamListResolvers: Array<(streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void> = [];
private messageStatusResolvers = new Map<string, { resolve: (result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void; timer: NodeJS.Timeout }>();
private fileUrlResolvers = new Map<string, { resolve: (result: { url: string; name: string; encrypted?: boolean; sealedKey?: string } | null) => void; timer: NodeJS.Timeout }>();
private fileListResolvers = new Map<string, { resolve: (files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void; timer: NodeJS.Timeout }>();
private fileStatusResolvers = new Map<string, { resolve: (accesses: Array<{ peerName: string; accessedAt: string }>) => void; timer: NodeJS.Timeout }>();
private vectorStoredResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private vectorResultsResolvers = new Map<string, { resolve: (results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void; timer: NodeJS.Timeout }>();
private collectionListResolvers = new Map<string, { resolve: (collections: string[]) => void; timer: NodeJS.Timeout }>();
private graphResultResolvers = new Map<string, { resolve: (rows: Array<Record<string, unknown>>) => void; timer: NodeJS.Timeout }>();
private contextListResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private contextResultsResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private taskCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private taskListResolvers = new Map<string, { resolve: (tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void; timer: NodeJS.Timeout }>();
private meshQueryResolvers = new Map<string, { resolve: (result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void; timer: NodeJS.Timeout }>();
private meshSchemaResolvers = new Map<string, { resolve: (tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void; timer: NodeJS.Timeout }>();
private streamCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private streamListResolvers = new Map<string, { resolve: (streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void; timer: NodeJS.Timeout }>();
private streamDataHandlers = new Set<(data: { stream: string; data: unknown; publishedBy: string }) => void>();
async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.messageStatusResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "message_status", messageId }));
setTimeout(() => {
const idx = this.messageStatusResolvers.indexOf(resolve);
if (idx !== -1) { this.messageStatusResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.messageStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.messageStatusResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "message_status", messageId, _reqId: reqId }));
});
}
@@ -453,15 +435,11 @@ export class BrokerClient {
async getFile(fileId: string): Promise<{ url: string; name: string; encrypted?: boolean; sealedKey?: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.fileUrlResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_file", fileId }));
setTimeout(() => {
const idx = this.fileUrlResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileUrlResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileUrlResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileUrlResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_file", fileId, _reqId: reqId }));
});
}
@@ -469,15 +447,11 @@ export class BrokerClient {
async listFiles(query?: string, from?: string): Promise<Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.fileListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_files", query, from }));
setTimeout(() => {
const idx = this.fileListResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_files", query, from, _reqId: reqId }));
});
}
@@ -485,15 +459,11 @@ export class BrokerClient {
async fileStatus(fileId: string): Promise<Array<{ peerName: string; accessedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.fileStatusResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "file_status", fileId }));
setTimeout(() => {
const idx = this.fileStatusResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileStatusResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileStatusResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "file_status", fileId, _reqId: reqId }));
});
}
@@ -547,13 +517,11 @@ export class BrokerClient {
async grantFileAccess(fileId: string, peerPubkey: string, sealedKey: string): Promise<boolean> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false;
return new Promise((resolve) => {
const resolvers = this.grantFileAccessResolvers;
resolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey }));
setTimeout(() => {
const idx = resolvers.indexOf(resolve);
if (idx !== -1) { resolvers.splice(idx, 1); resolve(false); }
}, 5_000);
const reqId = this.makeReqId();
this.grantFileAccessResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.grantFileAccessResolvers.delete(reqId)) resolve(false);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey, _reqId: reqId }));
});
}
@@ -563,12 +531,11 @@ export class BrokerClient {
async vectorStore(collection: string, text: string, metadata?: Record<string, unknown>): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.vectorStoredResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata }));
setTimeout(() => {
const idx = this.vectorStoredResolvers.indexOf(resolve);
if (idx !== -1) { this.vectorStoredResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.vectorStoredResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.vectorStoredResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata, _reqId: reqId }));
});
}
@@ -576,12 +543,11 @@ export class BrokerClient {
async vectorSearch(collection: string, query: string, limit?: number): Promise<Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.vectorResultsResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit }));
setTimeout(() => {
const idx = this.vectorResultsResolvers.indexOf(resolve);
if (idx !== -1) { this.vectorResultsResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.vectorResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.vectorResultsResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit, _reqId: reqId }));
});
}
@@ -595,12 +561,11 @@ export class BrokerClient {
async listCollections(): Promise<string[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.collectionListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_collections" }));
setTimeout(() => {
const idx = this.collectionListResolvers.indexOf(resolve);
if (idx !== -1) { this.collectionListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.collectionListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.collectionListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_collections", _reqId: reqId }));
});
}
@@ -610,12 +575,11 @@ export class BrokerClient {
async graphQuery(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.graphResultResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "graph_query", cypher }));
setTimeout(() => {
const idx = this.graphResultResolvers.indexOf(resolve);
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.graphResultResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "graph_query", cypher, _reqId: reqId }));
});
}
@@ -623,12 +587,11 @@ export class BrokerClient {
async graphExecute(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.graphResultResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "graph_execute", cypher }));
setTimeout(() => {
const idx = this.graphResultResolvers.indexOf(resolve);
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.graphResultResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "graph_execute", cypher, _reqId: reqId }));
});
}
@@ -644,12 +607,11 @@ export class BrokerClient {
async getContext(query: string): Promise<Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.contextResultsResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_context", query }));
setTimeout(() => {
const idx = this.contextResultsResolvers.indexOf(resolve);
if (idx !== -1) { this.contextResultsResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.contextResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.contextResultsResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_context", query, _reqId: reqId }));
});
}
@@ -657,12 +619,11 @@ export class BrokerClient {
async listContexts(): Promise<Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.contextListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_contexts" }));
setTimeout(() => {
const idx = this.contextListResolvers.indexOf(resolve);
if (idx !== -1) { this.contextListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.contextListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.contextListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_contexts", _reqId: reqId }));
});
}
@@ -672,12 +633,11 @@ export class BrokerClient {
async createTask(title: string, assignee?: string, priority?: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.taskCreatedResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags }));
setTimeout(() => {
const idx = this.taskCreatedResolvers.indexOf(resolve);
if (idx !== -1) { this.taskCreatedResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.taskCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.taskCreatedResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags, _reqId: reqId }));
});
}
@@ -697,12 +657,11 @@ export class BrokerClient {
async listTasks(status?: string, assignee?: string): Promise<Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.taskListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee }));
setTimeout(() => {
const idx = this.taskListResolvers.indexOf(resolve);
if (idx !== -1) { this.taskListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.taskListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.taskListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee, _reqId: reqId }));
});
}
@@ -712,12 +671,11 @@ export class BrokerClient {
async meshQuery(sql: string): Promise<{ columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.meshQueryResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_query", sql }));
setTimeout(() => {
const idx = this.meshQueryResolvers.indexOf(resolve);
if (idx !== -1) { this.meshQueryResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.meshQueryResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshQueryResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_query", sql, _reqId: reqId }));
});
}
@@ -731,12 +689,11 @@ export class BrokerClient {
async meshSchema(): Promise<Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.meshSchemaResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_schema" }));
setTimeout(() => {
const idx = this.meshSchemaResolvers.indexOf(resolve);
if (idx !== -1) { this.meshSchemaResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.meshSchemaResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshSchemaResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_schema", _reqId: reqId }));
});
}
@@ -746,12 +703,11 @@ export class BrokerClient {
async createStream(name: string): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.streamCreatedResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "create_stream", name }));
setTimeout(() => {
const idx = this.streamCreatedResolvers.indexOf(resolve);
if (idx !== -1) { this.streamCreatedResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.streamCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.streamCreatedResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "create_stream", name, _reqId: reqId }));
});
}
@@ -777,12 +733,11 @@ export class BrokerClient {
async listStreams(): Promise<Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.streamListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_streams" }));
setTimeout(() => {
const idx = this.streamListResolvers.indexOf(resolve);
if (idx !== -1) { this.streamListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.streamListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.streamListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_streams", _reqId: reqId }));
});
}
@@ -799,17 +754,16 @@ export class BrokerClient {
}
// --- Mesh info ---
private meshInfoResolvers: Array<(result: Record<string, unknown> | null) => void> = [];
private meshInfoResolvers = new Map<string, { resolve: (result: Record<string, unknown> | null) => void; timer: NodeJS.Timeout }>();
async meshInfo(): Promise<Record<string, unknown> | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.meshInfoResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_info" }));
setTimeout(() => {
const idx = this.meshInfoResolvers.indexOf(resolve);
if (idx !== -1) { this.meshInfoResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.meshInfoResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshInfoResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_info", _reqId: reqId }));
});
}
@@ -829,7 +783,33 @@ export class BrokerClient {
// --- Internals ---
private resolveFromMap<T>(
map: Map<string, { resolve: (v: T) => void; timer: NodeJS.Timeout }>,
reqId: string | undefined,
value: T,
): boolean {
let entry = reqId ? map.get(reqId) : undefined;
if (!entry) {
// Fallback: oldest pending (FIFO, for brokers that don't echo _reqId)
const first = map.entries().next().value as [string, { resolve: (v: T) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
entry = first[1];
map.delete(first[0]);
}
} else {
map.delete(reqId!);
}
if (entry) {
clearTimeout(entry.timer);
entry.resolve(value);
return true;
}
return false;
}
private handleServerMessage(msg: Record<string, unknown>): void {
const msgReqId = msg._reqId as string | undefined;
if (msg.type === "ack") {
const pending = this.pendingSends.get(String(msg.id ?? ""));
if (pending) {
@@ -843,8 +823,7 @@ export class BrokerClient {
}
if (msg.type === "peers_list") {
const peers = (msg.peers as PeerInfo[]) ?? [];
const resolver = this.listPeersResolvers.shift();
if (resolver) resolver(peers);
this.resolveFromMap(this.listPeersResolvers, msgReqId, peers);
return;
}
if (msg.type === "push") {
@@ -922,25 +901,21 @@ export class BrokerClient {
// both, it would be consumed here by the next pending get_state resolver,
// returning the wrong value (cross-contamination). The broker's set_state
// handler was fixed to omit state_result; only get_state sends it.
const resolver = this.stateResolvers.shift();
if (resolver) {
if (msg.key) {
resolver({
key: String(msg.key),
value: msg.value,
updatedBy: String(msg.updatedBy ?? ""),
updatedAt: String(msg.updatedAt ?? ""),
});
} else {
resolver(null);
}
if (msg.key) {
this.resolveFromMap(this.stateResolvers, msgReqId, {
key: String(msg.key),
value: msg.value,
updatedBy: String(msg.updatedBy ?? ""),
updatedAt: String(msg.updatedAt ?? ""),
});
} else {
this.resolveFromMap(this.stateResolvers, msgReqId, null);
}
return;
}
if (msg.type === "state_list") {
const entries = (msg.entries as Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) ?? [];
const resolver = this.stateListResolvers.shift();
if (resolver) resolver(entries);
this.resolveFromMap(this.stateListResolvers, msgReqId, entries);
return;
}
if (msg.type === "state_change") {
@@ -955,131 +930,108 @@ export class BrokerClient {
return;
}
if (msg.type === "memory_stored") {
const resolver = this.memoryStoreResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.memoryStoreResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "memory_results") {
const memories = (msg.memories as Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) ?? [];
const resolver = this.memoryRecallResolvers.shift();
if (resolver) resolver(memories);
this.resolveFromMap(this.memoryRecallResolvers, msgReqId, memories);
return;
}
if (msg.type === "message_status_result") {
const resolver = this.messageStatusResolvers.shift();
if (resolver) resolver(msg as any);
this.resolveFromMap(this.messageStatusResolvers, msgReqId, msg as any);
return;
}
if (msg.type === "file_url") {
const resolver = this.fileUrlResolvers.shift();
if (resolver) {
if (msg.url) {
resolver({
url: String(msg.url),
name: String(msg.name ?? ""),
encrypted: msg.encrypted ? true : undefined,
sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined,
});
} else {
resolver(null);
}
if (msg.url) {
this.resolveFromMap(this.fileUrlResolvers, msgReqId, {
url: String(msg.url),
name: String(msg.name ?? ""),
encrypted: msg.encrypted ? true : undefined,
sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined,
});
} else {
this.resolveFromMap(this.fileUrlResolvers, msgReqId, null);
}
return;
}
if (msg.type === "file_list") {
const files = (msg.files as Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) ?? [];
const resolver = this.fileListResolvers.shift();
if (resolver) resolver(files);
this.resolveFromMap(this.fileListResolvers, msgReqId, files);
return;
}
if (msg.type === "file_status_result") {
const accesses = (msg.accesses as Array<{ peerName: string; accessedAt: string }>) ?? [];
const resolver = this.fileStatusResolvers.shift();
if (resolver) resolver(accesses);
this.resolveFromMap(this.fileStatusResolvers, msgReqId, accesses);
return;
}
if (msg.type === "grant_file_access_ok") {
const resolver = this.grantFileAccessResolvers.shift();
if (resolver) resolver(true);
this.resolveFromMap(this.grantFileAccessResolvers, msgReqId, true);
return;
}
if (msg.type === "vector_stored") {
const resolver = this.vectorStoredResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.vectorStoredResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "vector_results") {
const results = (msg.results as Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) ?? [];
const resolver = this.vectorResultsResolvers.shift();
if (resolver) resolver(results);
this.resolveFromMap(this.vectorResultsResolvers, msgReqId, results);
return;
}
if (msg.type === "collection_list") {
const collections = (msg.collections as string[]) ?? [];
const resolver = this.collectionListResolvers.shift();
if (resolver) resolver(collections);
this.resolveFromMap(this.collectionListResolvers, msgReqId, collections);
return;
}
if (msg.type === "graph_result") {
// Broker sends { type: "graph_result", records: [...] }
const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
const resolver = this.graphResultResolvers.shift();
if (resolver) resolver(rows);
this.resolveFromMap(this.graphResultResolvers, msgReqId, rows);
return;
}
if (msg.type === "context_list") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextListResolvers.shift();
if (resolver) resolver(contexts);
this.resolveFromMap(this.contextListResolvers, msgReqId, contexts);
return;
}
if (msg.type === "context_results") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextResultsResolvers.shift();
if (resolver) resolver(contexts);
this.resolveFromMap(this.contextResultsResolvers, msgReqId, contexts);
return;
}
if (msg.type === "task_created") {
const resolver = this.taskCreatedResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.taskCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "task_list") {
const tasks = (msg.tasks as Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) ?? [];
const resolver = this.taskListResolvers.shift();
if (resolver) resolver(tasks);
this.resolveFromMap(this.taskListResolvers, msgReqId, tasks);
return;
}
if (msg.type === "mesh_query_result") {
const resolver = this.meshQueryResolvers.shift();
if (resolver) {
if (msg.columns) {
resolver({
columns: (msg.columns as string[]) ?? [],
rows: (msg.rows as Array<Record<string, unknown>>) ?? [],
rowCount: (msg.rowCount as number) ?? 0,
});
} else {
resolver(null);
}
if (msg.columns) {
this.resolveFromMap(this.meshQueryResolvers, msgReqId, {
columns: (msg.columns as string[]) ?? [],
rows: (msg.rows as Array<Record<string, unknown>>) ?? [],
rowCount: (msg.rowCount as number) ?? 0,
});
} else {
this.resolveFromMap(this.meshQueryResolvers, msgReqId, null);
}
return;
}
if (msg.type === "mesh_schema_result") {
const tables = (msg.tables as Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) ?? [];
const resolver = this.meshSchemaResolvers.shift();
if (resolver) resolver(tables);
this.resolveFromMap(this.meshSchemaResolvers, msgReqId, tables);
return;
}
if (msg.type === "stream_created") {
const resolver = this.streamCreatedResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.streamCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "stream_list") {
const streams = (msg.streams as Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) ?? [];
const resolver = this.streamListResolvers.shift();
if (resolver) resolver(streams);
this.resolveFromMap(this.streamListResolvers, msgReqId, streams);
return;
}
if (msg.type === "stream_data") {
@@ -1094,8 +1046,7 @@ export class BrokerClient {
return;
}
if (msg.type === "mesh_info_result") {
const resolver = this.meshInfoResolvers.shift();
if (resolver) resolver(msg as Record<string, unknown>);
this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record<string, unknown>);
return;
}
if (msg.type === "error") {
@@ -1116,37 +1067,40 @@ export class BrokerClient {
if (!handledByPendingSend) {
// Best-effort: unblock the first waiting resolver so callers don't
// hang for 5s. We don't know which tool triggered the error, so we
// pop the first non-empty resolver queue in priority order.
if (this.stateResolvers.length > 0) {
this.stateResolvers.shift()!(null);
} else if (this.stateListResolvers.length > 0) {
this.stateListResolvers.shift()!([]);
} else if (this.memoryStoreResolvers.length > 0) {
this.memoryStoreResolvers.shift()!(null);
} else if (this.memoryRecallResolvers.length > 0) {
this.memoryRecallResolvers.shift()!([]);
} else if (this.fileUrlResolvers.length > 0) {
this.fileUrlResolvers.shift()!(null);
} else if (this.fileListResolvers.length > 0) {
this.fileListResolvers.shift()!([]);
} else if (this.fileStatusResolvers.length > 0) {
this.fileStatusResolvers.shift()!([]);
} else if (this.graphResultResolvers.length > 0) {
this.graphResultResolvers.shift()!([]);
} else if (this.vectorStoredResolvers.length > 0) {
this.vectorStoredResolvers.shift()!(null);
} else if (this.vectorResultsResolvers.length > 0) {
this.vectorResultsResolvers.shift()!([]);
} else if (this.taskListResolvers.length > 0) {
this.taskListResolvers.shift()!([]);
} else if (this.meshQueryResolvers.length > 0) {
this.meshQueryResolvers.shift()!(null);
} else if (this.contextResultsResolvers.length > 0) {
this.contextResultsResolvers.shift()!([]);
} else if (this.contextListResolvers.length > 0) {
this.contextListResolvers.shift()!([]);
} else if (this.streamListResolvers.length > 0) {
this.streamListResolvers.shift()!([]);
// pop the first non-empty resolver map in priority order.
const allMaps: Array<[Map<string, { resolve: (v: any) => void; timer: NodeJS.Timeout }>, unknown]> = [
[this.stateResolvers, null],
[this.stateListResolvers, []],
[this.memoryStoreResolvers, null],
[this.memoryRecallResolvers, []],
[this.fileUrlResolvers, null],
[this.fileListResolvers, []],
[this.fileStatusResolvers, []],
[this.graphResultResolvers, []],
[this.vectorStoredResolvers, null],
[this.vectorResultsResolvers, []],
[this.taskListResolvers, []],
[this.meshQueryResolvers, null],
[this.contextResultsResolvers, []],
[this.contextListResolvers, []],
[this.streamListResolvers, []],
[this.messageStatusResolvers, null],
[this.grantFileAccessResolvers, false],
[this.collectionListResolvers, []],
[this.meshSchemaResolvers, []],
[this.taskCreatedResolvers, null],
[this.streamCreatedResolvers, null],
[this.listPeersResolvers, []],
[this.meshInfoResolvers, null],
];
for (const [map, defaultVal] of allMaps) {
const first = (map as Map<string, any>).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
(map as Map<string, any>).delete(first[0]);
clearTimeout(first[1].timer);
first[1].resolve(defaultVal);
break; // only pop one
}
}
}
}