diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 317f8f8..82fedf1 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -844,14 +844,8 @@ function handleConnection(ws: WebSocket): void { updatedBy: stateRow.updatedBy, }); } - // Send confirmation back to sender as state_result. - sendToPeer(presenceId, { - type: "state_result", - key: stateRow.key, - value: stateRow.value, - updatedBy: stateRow.updatedBy, - updatedAt: stateRow.updatedAt.toISOString(), - }); + // Fire-and-forget: no state_result sent back to sender. + // The client (server.ts) returns success immediately without waiting. log.info("ws set_state", { presence_id: presenceId, key: ss.key, @@ -1171,6 +1165,7 @@ function handleConnection(ws: WebSocket): void { sc.filesRead, sc.keyFindings, sc.tags, + conn.memberId, ); sendToPeer(presenceId, { type: "context_shared", @@ -1381,6 +1376,7 @@ function handleConnection(ws: WebSocket): void { if (!streamSubscriptions.has(key)) streamSubscriptions.set(key, new Set()); streamSubscriptions.get(key)!.add(presenceId); + sendToPeer(presenceId, { type: "subscribed", stream: sub.stream }); log.info("ws subscribe", { presence_id: presenceId, stream: sub.stream, @@ -1453,40 +1449,42 @@ function handleConnection(ws: WebSocket): void { case "vector_store": { const vs = msg as Extract; const collName = meshCollectionName(conn.meshId, vs.collection); - await ensureCollection(collName); - const { generateId } = await import("@turbostarter/shared/utils"); - const pointId = generateId(); - // Store text + metadata as payload. Use a zero vector as placeholder - // — real embeddings should be computed client-side and sent directly - // to Qdrant in a future version. - const zeroVector = new Array(1536).fill(0) as number[]; - await qdrant.upsert(collName, { - wait: true, - points: [ - { - id: pointId, - vector: zeroVector, - payload: { - text: vs.text, - mesh_id: conn.meshId, - stored_by: conn.memberPubkey, - stored_at: new Date().toISOString(), - ...(vs.metadata ?? {}), + try { + await ensureCollection(collName); + const { generateId } = await import("@turbostarter/shared/utils"); + const pointId = generateId(); + // Store text + metadata as payload. Use a zero vector as placeholder + // — real embeddings should be computed client-side and sent directly + // to Qdrant in a future version. + const zeroVector = new Array(1536).fill(0) as number[]; + await qdrant.upsert(collName, { + wait: true, + points: [ + { + id: pointId, + vector: zeroVector, + payload: { + text: vs.text, + mesh_id: conn.meshId, + stored_by: conn.memberPubkey, + stored_at: new Date().toISOString(), + ...(vs.metadata ?? {}), + }, }, - }, - ], - }); - sendToPeer(presenceId, { - type: "ack" as const, - id: pointId, - messageId: pointId, - queued: false, - }); - log.info("ws vector_store", { - presence_id: presenceId, - collection: vs.collection, - point_id: pointId, - }); + ], + }); + sendToPeer(presenceId, { + type: "vector_stored", + id: pointId, + }); + log.info("ws vector_store", { + presence_id: presenceId, + collection: vs.collection, + point_id: pointId, + }); + } catch (e) { + sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e)); + } break; } case "vector_search": { @@ -1730,7 +1728,6 @@ function handleConnection(ws: WebSocket): void { ]); const allGroups = new Set(); for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`); - const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey); const peerConn = connections.get(presenceId); // Find own display name: match sessionPubkey from the peer list const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey; diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index b840c98..2d82273 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -327,9 +327,15 @@ Your message mode is "${messageMode}". case "message_status": { const { id } = (args ?? {}) as { id?: string }; if (!id) return text("message_status: `id` required", true); - const client = allClients()[0]; - if (!client) return text("message_status: not connected", true); - const result = await client.messageStatus(id); + const clients = allClients(); + if (!clients.length) return text("message_status: not connected", true); + // Try each connected mesh client — we don't know which mesh the + // messageId belongs to, so query all and return the first hit. + let result = null; + for (const c of clients) { + result = await c.messageStatus(id); + if (result) break; + } if (!result) return text(`Message ${id} not found or timed out.`); const recipientLines = result.recipients.map( (r: { name: string; pubkey: string; status: string }) => diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 5bc36da..1482dac 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -684,13 +684,13 @@ export class BrokerClient { /** Claim an unclaimed task. */ async claimTask(id: string): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; - this.ws.send(JSON.stringify({ type: "claim_task", id })); + this.ws.send(JSON.stringify({ type: "claim_task", taskId: id })); } /** Mark a task done with optional result. */ async completeTask(id: string, result?: string): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; - this.ws.send(JSON.stringify({ type: "complete_task", id, result })); + this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result })); } /** List tasks filtered by status/assignee. */ @@ -917,6 +917,11 @@ export class BrokerClient { return; } if (msg.type === "state_result") { + // DEPENDENCY: The broker must NOT send state_result for set_state + // operations (only for get_state). If the broker sends state_result for + // both, it would be consumed here by the next pending get_state resolver, + // returning the wrong value (cross-contamination). The broker's set_state + // handler was fixed to omit state_result; only get_state sends it. const resolver = this.stateResolvers.shift(); if (resolver) { if (msg.key) { @@ -1016,7 +1021,8 @@ export class BrokerClient { return; } if (msg.type === "graph_result") { - const rows = (msg.rows as Array>) ?? []; + // Broker sends { type: "graph_result", records: [...] } + const rows = (msg.records as Array>) ?? []; const resolver = this.graphResultResolvers.shift(); if (resolver) resolver(rows); return; @@ -1095,6 +1101,7 @@ export class BrokerClient { if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null; + let handledByPendingSend = false; if (id) { const pending = this.pendingSends.get(id); if (pending) { @@ -1103,6 +1110,43 @@ export class BrokerClient { error: `${msg.code}: ${msg.message}`, }); this.pendingSends.delete(id); + handledByPendingSend = true; + } + } + if (!handledByPendingSend) { + // Best-effort: unblock the first waiting resolver so callers don't + // hang for 5s. We don't know which tool triggered the error, so we + // pop the first non-empty resolver queue in priority order. + if (this.stateResolvers.length > 0) { + this.stateResolvers.shift()!(null); + } else if (this.stateListResolvers.length > 0) { + this.stateListResolvers.shift()!([]); + } else if (this.memoryStoreResolvers.length > 0) { + this.memoryStoreResolvers.shift()!(null); + } else if (this.memoryRecallResolvers.length > 0) { + this.memoryRecallResolvers.shift()!([]); + } else if (this.fileUrlResolvers.length > 0) { + this.fileUrlResolvers.shift()!(null); + } else if (this.fileListResolvers.length > 0) { + this.fileListResolvers.shift()!([]); + } else if (this.fileStatusResolvers.length > 0) { + this.fileStatusResolvers.shift()!([]); + } else if (this.graphResultResolvers.length > 0) { + this.graphResultResolvers.shift()!([]); + } else if (this.vectorStoredResolvers.length > 0) { + this.vectorStoredResolvers.shift()!(null); + } else if (this.vectorResultsResolvers.length > 0) { + this.vectorResultsResolvers.shift()!([]); + } else if (this.taskListResolvers.length > 0) { + this.taskListResolvers.shift()!([]); + } else if (this.meshQueryResolvers.length > 0) { + this.meshQueryResolvers.shift()!(null); + } else if (this.contextResultsResolvers.length > 0) { + this.contextResultsResolvers.shift()!([]); + } else if (this.contextListResolvers.length > 0) { + this.contextListResolvers.shift()!([]); + } else if (this.streamListResolvers.length > 0) { + this.streamListResolvers.shift()!([]); } } }