diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 1482dac..0e85541 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -75,15 +75,15 @@ export class BrokerClient { private outbound: Array<() => void> = []; // closures that send once ws is open private pushHandlers = new Set(); private pushBuffer: InboundPush[] = []; - private listPeersResolvers: Array<(peers: PeerInfo[]) => void> = []; - private stateResolvers: Array<(result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void> = []; - private stateListResolvers: Array<(entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void> = []; - private memoryStoreResolvers: Array<(id: string | null) => void> = []; - private memoryRecallResolvers: Array<(memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void> = []; + private listPeersResolvers = new Map void; timer: NodeJS.Timeout }>(); + private stateResolvers = new Map void; timer: NodeJS.Timeout }>(); + private stateListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private memoryStoreResolvers = new Map void; timer: NodeJS.Timeout }>(); + private memoryRecallResolvers = new Map) => void; timer: NodeJS.Timeout }>(); private stateChangeHandlers = new Set<(change: { key: string; value: unknown; updatedBy: string }) => void>(); private sessionPubkey: string | null = null; private sessionSecretKey: string | null = null; - private grantFileAccessResolvers: Array<(ok: boolean) => void> = []; + private grantFileAccessResolvers = new Map void; timer: NodeJS.Timeout }>(); private closed = false; private reconnectAttempt = 0; private helloTimer: NodeJS.Timeout | null = null; @@ -116,6 +116,10 @@ export class BrokerClient { /** Session secret key hex (null before first connection). */ getSessionSecretKey(): string | null { return this.sessionSecretKey; } + private makeReqId(): string { + return Math.random().toString(36).slice(2) + Date.now().toString(36); + } + /** Open WS, send hello, resolve when hello_ack received. */ async connect(): Promise { if (this.closed) throw new Error("client is closed"); @@ -305,16 +309,11 @@ export class BrokerClient { async listPeers(): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.listPeersResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_peers" })); - // Timeout after 5s — return empty list rather than hang. - setTimeout(() => { - const idx = this.listPeersResolvers.indexOf(resolve); - if (idx !== -1) { - this.listPeersResolvers.splice(idx, 1); - resolve([]); - } - }, 5_000); + const reqId = this.makeReqId(); + this.listPeersResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.listPeersResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId })); }); } @@ -348,15 +347,11 @@ export class BrokerClient { async getState(key: string): Promise<{ key: string; value: unknown; updatedBy: string; updatedAt: string } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.stateResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "get_state", key })); - setTimeout(() => { - const idx = this.stateResolvers.indexOf(resolve); - if (idx !== -1) { - this.stateResolvers.splice(idx, 1); - resolve(null); - } - }, 5_000); + const reqId = this.makeReqId(); + this.stateResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.stateResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId })); }); } @@ -364,15 +359,11 @@ export class BrokerClient { async listState(): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.stateListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_state" })); - setTimeout(() => { - const idx = this.stateListResolvers.indexOf(resolve); - if (idx !== -1) { - this.stateListResolvers.splice(idx, 1); - resolve([]); - } - }, 5_000); + const reqId = this.makeReqId(); + this.stateListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.stateListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId })); }); } @@ -382,15 +373,11 @@ export class BrokerClient { async remember(content: string, tags?: string[]): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.memoryStoreResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "remember", content, tags })); - setTimeout(() => { - const idx = this.memoryStoreResolvers.indexOf(resolve); - if (idx !== -1) { - this.memoryStoreResolvers.splice(idx, 1); - resolve(null); - } - }, 5_000); + const reqId = this.makeReqId(); + this.memoryStoreResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.memoryStoreResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId })); }); } @@ -398,15 +385,11 @@ export class BrokerClient { async recall(query: string): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.memoryRecallResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "recall", query })); - setTimeout(() => { - const idx = this.memoryRecallResolvers.indexOf(resolve); - if (idx !== -1) { - this.memoryRecallResolvers.splice(idx, 1); - resolve([]); - } - }, 5_000); + const reqId = this.makeReqId(); + this.memoryRecallResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.memoryRecallResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId })); }); } @@ -417,33 +400,32 @@ export class BrokerClient { } /** Check delivery status of a sent message. */ - private messageStatusResolvers: Array<(result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void> = []; - private fileUrlResolvers: Array<(result: { url: string; name: string; encrypted?: boolean; sealedKey?: string } | null) => void> = []; - private fileListResolvers: Array<(files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void> = []; - private fileStatusResolvers: Array<(accesses: Array<{ peerName: string; accessedAt: string }>) => void> = []; - private vectorStoredResolvers: Array<(id: string | null) => void> = []; - private vectorResultsResolvers: Array<(results: Array<{ id: string; text: string; score: number; metadata?: Record }>) => void> = []; - private collectionListResolvers: Array<(collections: string[]) => void> = []; - private graphResultResolvers: Array<(rows: Array>) => void> = []; - private contextListResolvers: Array<(contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void> = []; - private contextResultsResolvers: Array<(contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void> = []; - private taskCreatedResolvers: Array<(id: string | null) => void> = []; - private taskListResolvers: Array<(tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void> = []; - private meshQueryResolvers: Array<(result: { columns: string[]; rows: Array>; rowCount: number } | null) => void> = []; - private meshSchemaResolvers: Array<(tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void> = []; - private streamCreatedResolvers: Array<(id: string | null) => void> = []; - private streamListResolvers: Array<(streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void> = []; + private messageStatusResolvers = new Map } | null) => void; timer: NodeJS.Timeout }>(); + private fileUrlResolvers = new Map void; timer: NodeJS.Timeout }>(); + private fileListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private fileStatusResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private vectorStoredResolvers = new Map void; timer: NodeJS.Timeout }>(); + private vectorResultsResolvers = new Map }>) => void; timer: NodeJS.Timeout }>(); + private collectionListResolvers = new Map void; timer: NodeJS.Timeout }>(); + private graphResultResolvers = new Map>) => void; timer: NodeJS.Timeout }>(); + private contextListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private contextResultsResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private taskCreatedResolvers = new Map void; timer: NodeJS.Timeout }>(); + private taskListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private meshQueryResolvers = new Map>; rowCount: number } | null) => void; timer: NodeJS.Timeout }>(); + private meshSchemaResolvers = new Map }>) => void; timer: NodeJS.Timeout }>(); + private streamCreatedResolvers = new Map void; timer: NodeJS.Timeout }>(); + private streamListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); private streamDataHandlers = new Set<(data: { stream: string; data: unknown; publishedBy: string }) => void>(); async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.messageStatusResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "message_status", messageId })); - setTimeout(() => { - const idx = this.messageStatusResolvers.indexOf(resolve); - if (idx !== -1) { this.messageStatusResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.messageStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.messageStatusResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "message_status", messageId, _reqId: reqId })); }); } @@ -453,15 +435,11 @@ export class BrokerClient { async getFile(fileId: string): Promise<{ url: string; name: string; encrypted?: boolean; sealedKey?: string } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.fileUrlResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "get_file", fileId })); - setTimeout(() => { - const idx = this.fileUrlResolvers.indexOf(resolve); - if (idx !== -1) { - this.fileUrlResolvers.splice(idx, 1); - resolve(null); - } - }, 5_000); + const reqId = this.makeReqId(); + this.fileUrlResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.fileUrlResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "get_file", fileId, _reqId: reqId })); }); } @@ -469,15 +447,11 @@ export class BrokerClient { async listFiles(query?: string, from?: string): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.fileListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_files", query, from })); - setTimeout(() => { - const idx = this.fileListResolvers.indexOf(resolve); - if (idx !== -1) { - this.fileListResolvers.splice(idx, 1); - resolve([]); - } - }, 5_000); + const reqId = this.makeReqId(); + this.fileListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.fileListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_files", query, from, _reqId: reqId })); }); } @@ -485,15 +459,11 @@ export class BrokerClient { async fileStatus(fileId: string): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.fileStatusResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "file_status", fileId })); - setTimeout(() => { - const idx = this.fileStatusResolvers.indexOf(resolve); - if (idx !== -1) { - this.fileStatusResolvers.splice(idx, 1); - resolve([]); - } - }, 5_000); + const reqId = this.makeReqId(); + this.fileStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.fileStatusResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "file_status", fileId, _reqId: reqId })); }); } @@ -547,13 +517,11 @@ export class BrokerClient { async grantFileAccess(fileId: string, peerPubkey: string, sealedKey: string): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false; return new Promise((resolve) => { - const resolvers = this.grantFileAccessResolvers; - resolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey })); - setTimeout(() => { - const idx = resolvers.indexOf(resolve); - if (idx !== -1) { resolvers.splice(idx, 1); resolve(false); } - }, 5_000); + const reqId = this.makeReqId(); + this.grantFileAccessResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.grantFileAccessResolvers.delete(reqId)) resolve(false); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey, _reqId: reqId })); }); } @@ -563,12 +531,11 @@ export class BrokerClient { async vectorStore(collection: string, text: string, metadata?: Record): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.vectorStoredResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata })); - setTimeout(() => { - const idx = this.vectorStoredResolvers.indexOf(resolve); - if (idx !== -1) { this.vectorStoredResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.vectorStoredResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.vectorStoredResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata, _reqId: reqId })); }); } @@ -576,12 +543,11 @@ export class BrokerClient { async vectorSearch(collection: string, query: string, limit?: number): Promise }>> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.vectorResultsResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit })); - setTimeout(() => { - const idx = this.vectorResultsResolvers.indexOf(resolve); - if (idx !== -1) { this.vectorResultsResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.vectorResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.vectorResultsResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit, _reqId: reqId })); }); } @@ -595,12 +561,11 @@ export class BrokerClient { async listCollections(): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.collectionListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_collections" })); - setTimeout(() => { - const idx = this.collectionListResolvers.indexOf(resolve); - if (idx !== -1) { this.collectionListResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.collectionListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.collectionListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_collections", _reqId: reqId })); }); } @@ -610,12 +575,11 @@ export class BrokerClient { async graphQuery(cypher: string): Promise>> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.graphResultResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "graph_query", cypher })); - setTimeout(() => { - const idx = this.graphResultResolvers.indexOf(resolve); - if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.graphResultResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "graph_query", cypher, _reqId: reqId })); }); } @@ -623,12 +587,11 @@ export class BrokerClient { async graphExecute(cypher: string): Promise>> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.graphResultResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "graph_execute", cypher })); - setTimeout(() => { - const idx = this.graphResultResolvers.indexOf(resolve); - if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.graphResultResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "graph_execute", cypher, _reqId: reqId })); }); } @@ -644,12 +607,11 @@ export class BrokerClient { async getContext(query: string): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.contextResultsResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "get_context", query })); - setTimeout(() => { - const idx = this.contextResultsResolvers.indexOf(resolve); - if (idx !== -1) { this.contextResultsResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.contextResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.contextResultsResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "get_context", query, _reqId: reqId })); }); } @@ -657,12 +619,11 @@ export class BrokerClient { async listContexts(): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.contextListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_contexts" })); - setTimeout(() => { - const idx = this.contextListResolvers.indexOf(resolve); - if (idx !== -1) { this.contextListResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.contextListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.contextListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_contexts", _reqId: reqId })); }); } @@ -672,12 +633,11 @@ export class BrokerClient { async createTask(title: string, assignee?: string, priority?: string, tags?: string[]): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.taskCreatedResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags })); - setTimeout(() => { - const idx = this.taskCreatedResolvers.indexOf(resolve); - if (idx !== -1) { this.taskCreatedResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.taskCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.taskCreatedResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags, _reqId: reqId })); }); } @@ -697,12 +657,11 @@ export class BrokerClient { async listTasks(status?: string, assignee?: string): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.taskListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee })); - setTimeout(() => { - const idx = this.taskListResolvers.indexOf(resolve); - if (idx !== -1) { this.taskListResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.taskListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.taskListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee, _reqId: reqId })); }); } @@ -712,12 +671,11 @@ export class BrokerClient { async meshQuery(sql: string): Promise<{ columns: string[]; rows: Array>; rowCount: number } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.meshQueryResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "mesh_query", sql })); - setTimeout(() => { - const idx = this.meshQueryResolvers.indexOf(resolve); - if (idx !== -1) { this.meshQueryResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.meshQueryResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.meshQueryResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "mesh_query", sql, _reqId: reqId })); }); } @@ -731,12 +689,11 @@ export class BrokerClient { async meshSchema(): Promise }>> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.meshSchemaResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "mesh_schema" })); - setTimeout(() => { - const idx = this.meshSchemaResolvers.indexOf(resolve); - if (idx !== -1) { this.meshSchemaResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.meshSchemaResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.meshSchemaResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "mesh_schema", _reqId: reqId })); }); } @@ -746,12 +703,11 @@ export class BrokerClient { async createStream(name: string): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.streamCreatedResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "create_stream", name })); - setTimeout(() => { - const idx = this.streamCreatedResolvers.indexOf(resolve); - if (idx !== -1) { this.streamCreatedResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.streamCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.streamCreatedResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "create_stream", name, _reqId: reqId })); }); } @@ -777,12 +733,11 @@ export class BrokerClient { async listStreams(): Promise> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; return new Promise((resolve) => { - this.streamListResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "list_streams" })); - setTimeout(() => { - const idx = this.streamListResolvers.indexOf(resolve); - if (idx !== -1) { this.streamListResolvers.splice(idx, 1); resolve([]); } - }, 5_000); + const reqId = this.makeReqId(); + this.streamListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.streamListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_streams", _reqId: reqId })); }); } @@ -799,17 +754,16 @@ export class BrokerClient { } // --- Mesh info --- - private meshInfoResolvers: Array<(result: Record | null) => void> = []; + private meshInfoResolvers = new Map | null) => void; timer: NodeJS.Timeout }>(); async meshInfo(): Promise | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { - this.meshInfoResolvers.push(resolve); - this.ws!.send(JSON.stringify({ type: "mesh_info" })); - setTimeout(() => { - const idx = this.meshInfoResolvers.indexOf(resolve); - if (idx !== -1) { this.meshInfoResolvers.splice(idx, 1); resolve(null); } - }, 5_000); + const reqId = this.makeReqId(); + this.meshInfoResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.meshInfoResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "mesh_info", _reqId: reqId })); }); } @@ -829,7 +783,33 @@ export class BrokerClient { // --- Internals --- + private resolveFromMap( + map: Map void; timer: NodeJS.Timeout }>, + reqId: string | undefined, + value: T, + ): boolean { + let entry = reqId ? map.get(reqId) : undefined; + if (!entry) { + // Fallback: oldest pending (FIFO, for brokers that don't echo _reqId) + const first = map.entries().next().value as [string, { resolve: (v: T) => void; timer: NodeJS.Timeout }] | undefined; + if (first) { + entry = first[1]; + map.delete(first[0]); + } + } else { + map.delete(reqId!); + } + if (entry) { + clearTimeout(entry.timer); + entry.resolve(value); + return true; + } + return false; + } + private handleServerMessage(msg: Record): void { + const msgReqId = msg._reqId as string | undefined; + if (msg.type === "ack") { const pending = this.pendingSends.get(String(msg.id ?? "")); if (pending) { @@ -843,8 +823,7 @@ export class BrokerClient { } if (msg.type === "peers_list") { const peers = (msg.peers as PeerInfo[]) ?? []; - const resolver = this.listPeersResolvers.shift(); - if (resolver) resolver(peers); + this.resolveFromMap(this.listPeersResolvers, msgReqId, peers); return; } if (msg.type === "push") { @@ -922,25 +901,21 @@ export class BrokerClient { // both, it would be consumed here by the next pending get_state resolver, // returning the wrong value (cross-contamination). The broker's set_state // handler was fixed to omit state_result; only get_state sends it. - const resolver = this.stateResolvers.shift(); - if (resolver) { - if (msg.key) { - resolver({ - key: String(msg.key), - value: msg.value, - updatedBy: String(msg.updatedBy ?? ""), - updatedAt: String(msg.updatedAt ?? ""), - }); - } else { - resolver(null); - } + if (msg.key) { + this.resolveFromMap(this.stateResolvers, msgReqId, { + key: String(msg.key), + value: msg.value, + updatedBy: String(msg.updatedBy ?? ""), + updatedAt: String(msg.updatedAt ?? ""), + }); + } else { + this.resolveFromMap(this.stateResolvers, msgReqId, null); } return; } if (msg.type === "state_list") { const entries = (msg.entries as Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) ?? []; - const resolver = this.stateListResolvers.shift(); - if (resolver) resolver(entries); + this.resolveFromMap(this.stateListResolvers, msgReqId, entries); return; } if (msg.type === "state_change") { @@ -955,131 +930,108 @@ export class BrokerClient { return; } if (msg.type === "memory_stored") { - const resolver = this.memoryStoreResolvers.shift(); - if (resolver) resolver(msg.id ? String(msg.id) : null); + this.resolveFromMap(this.memoryStoreResolvers, msgReqId, msg.id ? String(msg.id) : null); return; } if (msg.type === "memory_results") { const memories = (msg.memories as Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) ?? []; - const resolver = this.memoryRecallResolvers.shift(); - if (resolver) resolver(memories); + this.resolveFromMap(this.memoryRecallResolvers, msgReqId, memories); return; } if (msg.type === "message_status_result") { - const resolver = this.messageStatusResolvers.shift(); - if (resolver) resolver(msg as any); + this.resolveFromMap(this.messageStatusResolvers, msgReqId, msg as any); return; } if (msg.type === "file_url") { - const resolver = this.fileUrlResolvers.shift(); - if (resolver) { - if (msg.url) { - resolver({ - url: String(msg.url), - name: String(msg.name ?? ""), - encrypted: msg.encrypted ? true : undefined, - sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined, - }); - } else { - resolver(null); - } + if (msg.url) { + this.resolveFromMap(this.fileUrlResolvers, msgReqId, { + url: String(msg.url), + name: String(msg.name ?? ""), + encrypted: msg.encrypted ? true : undefined, + sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined, + }); + } else { + this.resolveFromMap(this.fileUrlResolvers, msgReqId, null); } return; } if (msg.type === "file_list") { const files = (msg.files as Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) ?? []; - const resolver = this.fileListResolvers.shift(); - if (resolver) resolver(files); + this.resolveFromMap(this.fileListResolvers, msgReqId, files); return; } if (msg.type === "file_status_result") { const accesses = (msg.accesses as Array<{ peerName: string; accessedAt: string }>) ?? []; - const resolver = this.fileStatusResolvers.shift(); - if (resolver) resolver(accesses); + this.resolveFromMap(this.fileStatusResolvers, msgReqId, accesses); return; } if (msg.type === "grant_file_access_ok") { - const resolver = this.grantFileAccessResolvers.shift(); - if (resolver) resolver(true); + this.resolveFromMap(this.grantFileAccessResolvers, msgReqId, true); return; } if (msg.type === "vector_stored") { - const resolver = this.vectorStoredResolvers.shift(); - if (resolver) resolver(msg.id ? String(msg.id) : null); + this.resolveFromMap(this.vectorStoredResolvers, msgReqId, msg.id ? String(msg.id) : null); return; } if (msg.type === "vector_results") { const results = (msg.results as Array<{ id: string; text: string; score: number; metadata?: Record }>) ?? []; - const resolver = this.vectorResultsResolvers.shift(); - if (resolver) resolver(results); + this.resolveFromMap(this.vectorResultsResolvers, msgReqId, results); return; } if (msg.type === "collection_list") { const collections = (msg.collections as string[]) ?? []; - const resolver = this.collectionListResolvers.shift(); - if (resolver) resolver(collections); + this.resolveFromMap(this.collectionListResolvers, msgReqId, collections); return; } if (msg.type === "graph_result") { // Broker sends { type: "graph_result", records: [...] } const rows = (msg.records as Array>) ?? []; - const resolver = this.graphResultResolvers.shift(); - if (resolver) resolver(rows); + this.resolveFromMap(this.graphResultResolvers, msgReqId, rows); return; } if (msg.type === "context_list") { const contexts = (msg.contexts as Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) ?? []; - const resolver = this.contextListResolvers.shift(); - if (resolver) resolver(contexts); + this.resolveFromMap(this.contextListResolvers, msgReqId, contexts); return; } if (msg.type === "context_results") { const contexts = (msg.contexts as Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) ?? []; - const resolver = this.contextResultsResolvers.shift(); - if (resolver) resolver(contexts); + this.resolveFromMap(this.contextResultsResolvers, msgReqId, contexts); return; } if (msg.type === "task_created") { - const resolver = this.taskCreatedResolvers.shift(); - if (resolver) resolver(msg.id ? String(msg.id) : null); + this.resolveFromMap(this.taskCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null); return; } if (msg.type === "task_list") { const tasks = (msg.tasks as Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) ?? []; - const resolver = this.taskListResolvers.shift(); - if (resolver) resolver(tasks); + this.resolveFromMap(this.taskListResolvers, msgReqId, tasks); return; } if (msg.type === "mesh_query_result") { - const resolver = this.meshQueryResolvers.shift(); - if (resolver) { - if (msg.columns) { - resolver({ - columns: (msg.columns as string[]) ?? [], - rows: (msg.rows as Array>) ?? [], - rowCount: (msg.rowCount as number) ?? 0, - }); - } else { - resolver(null); - } + if (msg.columns) { + this.resolveFromMap(this.meshQueryResolvers, msgReqId, { + columns: (msg.columns as string[]) ?? [], + rows: (msg.rows as Array>) ?? [], + rowCount: (msg.rowCount as number) ?? 0, + }); + } else { + this.resolveFromMap(this.meshQueryResolvers, msgReqId, null); } return; } if (msg.type === "mesh_schema_result") { const tables = (msg.tables as Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) ?? []; - const resolver = this.meshSchemaResolvers.shift(); - if (resolver) resolver(tables); + this.resolveFromMap(this.meshSchemaResolvers, msgReqId, tables); return; } if (msg.type === "stream_created") { - const resolver = this.streamCreatedResolvers.shift(); - if (resolver) resolver(msg.id ? String(msg.id) : null); + this.resolveFromMap(this.streamCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null); return; } if (msg.type === "stream_list") { const streams = (msg.streams as Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) ?? []; - const resolver = this.streamListResolvers.shift(); - if (resolver) resolver(streams); + this.resolveFromMap(this.streamListResolvers, msgReqId, streams); return; } if (msg.type === "stream_data") { @@ -1094,8 +1046,7 @@ export class BrokerClient { return; } if (msg.type === "mesh_info_result") { - const resolver = this.meshInfoResolvers.shift(); - if (resolver) resolver(msg as Record); + this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record); return; } if (msg.type === "error") { @@ -1116,37 +1067,40 @@ export class BrokerClient { if (!handledByPendingSend) { // Best-effort: unblock the first waiting resolver so callers don't // hang for 5s. We don't know which tool triggered the error, so we - // pop the first non-empty resolver queue in priority order. - if (this.stateResolvers.length > 0) { - this.stateResolvers.shift()!(null); - } else if (this.stateListResolvers.length > 0) { - this.stateListResolvers.shift()!([]); - } else if (this.memoryStoreResolvers.length > 0) { - this.memoryStoreResolvers.shift()!(null); - } else if (this.memoryRecallResolvers.length > 0) { - this.memoryRecallResolvers.shift()!([]); - } else if (this.fileUrlResolvers.length > 0) { - this.fileUrlResolvers.shift()!(null); - } else if (this.fileListResolvers.length > 0) { - this.fileListResolvers.shift()!([]); - } else if (this.fileStatusResolvers.length > 0) { - this.fileStatusResolvers.shift()!([]); - } else if (this.graphResultResolvers.length > 0) { - this.graphResultResolvers.shift()!([]); - } else if (this.vectorStoredResolvers.length > 0) { - this.vectorStoredResolvers.shift()!(null); - } else if (this.vectorResultsResolvers.length > 0) { - this.vectorResultsResolvers.shift()!([]); - } else if (this.taskListResolvers.length > 0) { - this.taskListResolvers.shift()!([]); - } else if (this.meshQueryResolvers.length > 0) { - this.meshQueryResolvers.shift()!(null); - } else if (this.contextResultsResolvers.length > 0) { - this.contextResultsResolvers.shift()!([]); - } else if (this.contextListResolvers.length > 0) { - this.contextListResolvers.shift()!([]); - } else if (this.streamListResolvers.length > 0) { - this.streamListResolvers.shift()!([]); + // pop the first non-empty resolver map in priority order. + const allMaps: Array<[Map void; timer: NodeJS.Timeout }>, unknown]> = [ + [this.stateResolvers, null], + [this.stateListResolvers, []], + [this.memoryStoreResolvers, null], + [this.memoryRecallResolvers, []], + [this.fileUrlResolvers, null], + [this.fileListResolvers, []], + [this.fileStatusResolvers, []], + [this.graphResultResolvers, []], + [this.vectorStoredResolvers, null], + [this.vectorResultsResolvers, []], + [this.taskListResolvers, []], + [this.meshQueryResolvers, null], + [this.contextResultsResolvers, []], + [this.contextListResolvers, []], + [this.streamListResolvers, []], + [this.messageStatusResolvers, null], + [this.grantFileAccessResolvers, false], + [this.collectionListResolvers, []], + [this.meshSchemaResolvers, []], + [this.taskCreatedResolvers, null], + [this.streamCreatedResolvers, null], + [this.listPeersResolvers, []], + [this.meshInfoResolvers, null], + ]; + for (const [map, defaultVal] of allMaps) { + const first = (map as Map).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined; + if (first) { + (map as Map).delete(first[0]); + clearTimeout(first[1].timer); + first[1].resolve(defaultVal); + break; // only pop one + } } } }