diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 82fedf1..b34947a 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -547,8 +547,9 @@ function sendError( code: string, message: string, id?: string, + reqId?: string, ): void { - const err: WSServerMessage = { type: "error", code, message, id }; + const err: WSServerMessage = { type: "error", code, message, id, ...(reqId ? { _reqId: reqId } : {}) }; try { ws.send(JSON.stringify(err)); } catch { @@ -727,6 +728,7 @@ function handleConnection(ws: WebSocket): void { ws.on("message", async (raw) => { try { const msg = JSON.parse(raw.toString()) as WSClientMessage; + const _reqId = (msg as any)._reqId as string | undefined; if (msg.type === "hello") { const result = await handleHello(ws, msg); if (!result) return; @@ -776,6 +778,7 @@ function handleConnection(ws: WebSocket): void { sessionId: p.sessionId, connectedAt: p.connectedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }; conn.ws.send(JSON.stringify(resp)); log.info("ws list_peers", { @@ -862,6 +865,7 @@ function handleConnection(ws: WebSocket): void { value: stateEntry.value, updatedBy: stateEntry.updatedBy, updatedAt: stateEntry.updatedAt.toISOString(), + ...(_reqId ? { _reqId } : {}), }); } else { sendToPeer(presenceId, { @@ -870,6 +874,7 @@ function handleConnection(ws: WebSocket): void { value: null, updatedBy: "", updatedAt: "", + ...(_reqId ? { _reqId } : {}), }); } log.info("ws get_state", { @@ -889,6 +894,7 @@ function handleConnection(ws: WebSocket): void { updatedBy: e.updatedBy, updatedAt: e.updatedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws list_state", { presence_id: presenceId, @@ -911,6 +917,7 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "memory_stored", id: memoryId, + ...(_reqId ? { _reqId } : {}), }); log.info("ws remember", { presence_id: presenceId, @@ -930,6 +937,7 @@ function handleConnection(ws: WebSocket): void { rememberedBy: m.rememberedBy, rememberedAt: m.rememberedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws recall", { presence_id: presenceId, @@ -946,6 +954,7 @@ function handleConnection(ws: WebSocket): void { id: fg.memoryId, messageId: fg.memoryId, queued: false, + ...(_reqId ? { _reqId } : {}), }); log.info("ws forget", { presence_id: presenceId, @@ -957,7 +966,7 @@ function handleConnection(ws: WebSocket): void { const gf = msg as Extract; const file = await getFile(conn.meshId, gf.fileId); if (!file) { - sendError(conn.ws, "not_found", "file not found"); + sendError(conn.ws, "not_found", "file not found", undefined, _reqId); break; } // Access control: if targetSpec is set, verify peer matches @@ -967,7 +976,7 @@ function handleConnection(ws: WebSocket): void { file.targetSpec === conn.sessionPubkey || file.targetSpec === "*"; if (!matches) { - sendError(conn.ws, "forbidden", "file not targeted at you"); + sendError(conn.ws, "forbidden", "file not targeted at you", undefined, _reqId); break; } } @@ -980,7 +989,7 @@ function handleConnection(ws: WebSocket): void { const isOwner = !!(file.ownerPubkey && peerPubkey === file.ownerPubkey); sealedKey = peerPubkey ? await getFileKey(gf.fileId, peerPubkey) : null; if (!sealedKey && !isOwner) { - sendError(conn.ws, "forbidden", "no decryption key for this file"); + sendError(conn.ws, "forbidden", "no decryption key for this file", undefined, _reqId); break; } } @@ -1007,6 +1016,7 @@ function handleConnection(ws: WebSocket): void { name: file.name, encrypted: file.encrypted, sealedKey: sealedKey ?? undefined, + ...(_reqId ? { _reqId } : {}), }); log.info("ws get_file", { presence_id: presenceId, @@ -1029,6 +1039,7 @@ function handleConnection(ws: WebSocket): void { persistent: f.persistent, encrypted: f.encrypted, })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws list_files", { presence_id: presenceId, @@ -1047,6 +1058,7 @@ function handleConnection(ws: WebSocket): void { peerName: a.peerName, accessedAt: a.accessedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws file_status", { presence_id: presenceId, @@ -1058,16 +1070,16 @@ function handleConnection(ws: WebSocket): void { const gfa = msg as { type: "grant_file_access"; fileId: string; peerPubkey: string; sealedKey: string }; const file = await getFile(conn.meshId, gfa.fileId); if (!file) { - sendError(conn.ws, "not_found", "file not found"); + sendError(conn.ws, "not_found", "file not found", undefined, _reqId); break; } const requestorPubkey = conn.sessionPubkey ?? conn.memberPubkey; if (file.ownerPubkey && file.ownerPubkey !== requestorPubkey) { - sendError(conn.ws, "forbidden", "only the file owner can grant access"); + sendError(conn.ws, "forbidden", "only the file owner can grant access", undefined, _reqId); break; } await grantFileKey(gfa.fileId, gfa.peerPubkey, gfa.sealedKey, requestorPubkey ?? undefined); - sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey }); + sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey, ...(_reqId ? { _reqId } : {}) }); log.info("ws grant_file_access", { presence_id: presenceId, file_id: gfa.fileId, peer: gfa.peerPubkey }); break; } @@ -1079,6 +1091,7 @@ function handleConnection(ws: WebSocket): void { id: df.fileId, messageId: df.fileId, queued: false, + ...(_reqId ? { _reqId } : {}), }); log.info("ws delete_file", { presence_id: presenceId, @@ -1099,7 +1112,7 @@ function handleConnection(ws: WebSocket): void { .from(messageQueue) .where(eq(messageQueue.id, ms.messageId)); if (!mqRow || mqRow.meshId !== conn.meshId) { - sendError(conn.ws, "not_found", "message not found"); + sendError(conn.ws, "not_found", "message not found", undefined, _reqId); break; } // Build per-recipient status from connected peers. @@ -1143,6 +1156,7 @@ function handleConnection(ws: WebSocket): void { delivered: !!mqRow.deliveredAt, deliveredAt: mqRow.deliveredAt?.toISOString() ?? null, recipients, + ...(_reqId ? { _reqId } : {}), }; sendToPeer(presenceId, resp); log.info("ws message_status", { @@ -1201,6 +1215,7 @@ function handleConnection(ws: WebSocket): void { tags: c.tags, updatedAt: c.updatedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws get_context", { presence_id: presenceId, @@ -1219,6 +1234,7 @@ function handleConnection(ws: WebSocket): void { tags: c.tags, updatedAt: c.updatedAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws list_contexts", { presence_id: presenceId, @@ -1243,6 +1259,7 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "task_created", id: taskId, + ...(_reqId ? { _reqId } : {}), }); log.info("ws create_task", { presence_id: presenceId, @@ -1263,7 +1280,7 @@ function handleConnection(ws: WebSocket): void { memberInfo?.displayName, ); if (!claimed) { - sendError(conn.ws, "task_not_claimable", "task is not open or does not exist"); + sendError(conn.ws, "task_not_claimable", "task is not open or does not exist", undefined, _reqId); break; } // Return updated task list so caller sees the change. @@ -1281,6 +1298,7 @@ function handleConnection(ws: WebSocket): void { tags: t.tags, createdAt: t.createdAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws claim_task", { presence_id: presenceId, @@ -1296,7 +1314,7 @@ function handleConnection(ws: WebSocket): void { cpt.result, ); if (!completed) { - sendError(conn.ws, "task_not_found", "task not found in this mesh"); + sendError(conn.ws, "task_not_found", "task not found in this mesh", undefined, _reqId); break; } // Return updated task list. @@ -1314,6 +1332,7 @@ function handleConnection(ws: WebSocket): void { tags: t.tags, createdAt: t.createdAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws complete_task", { presence_id: presenceId, @@ -1337,6 +1356,7 @@ function handleConnection(ws: WebSocket): void { tags: t.tags, createdAt: t.createdAt.toISOString(), })), + ...(_reqId ? { _reqId } : {}), }); log.info("ws list_tasks", { presence_id: presenceId, @@ -1362,6 +1382,7 @@ function handleConnection(ws: WebSocket): void { type: "stream_created", id: streamId, name: cs.name, + ...(_reqId ? { _reqId } : {}), }); log.info("ws create_stream", { presence_id: presenceId, @@ -1376,7 +1397,7 @@ function handleConnection(ws: WebSocket): void { if (!streamSubscriptions.has(key)) streamSubscriptions.set(key, new Set()); streamSubscriptions.get(key)!.add(presenceId); - sendToPeer(presenceId, { type: "subscribed", stream: sub.stream }); + sendToPeer(presenceId, { type: "subscribed", stream: sub.stream, ...(_reqId ? { _reqId } : {}) }); log.info("ws subscribe", { presence_id: presenceId, stream: sub.stream, @@ -1435,6 +1456,7 @@ function handleConnection(ws: WebSocket): void { subscriberCount: streamSubscriptions.get(key)?.size ?? 0, }; }), + ...(_reqId ? { _reqId } : {}), }); log.info("ws list_streams", { presence_id: presenceId, @@ -1476,6 +1498,7 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "vector_stored", id: pointId, + ...(_reqId ? { _reqId } : {}), }); log.info("ws vector_store", { presence_id: presenceId, @@ -1483,7 +1506,7 @@ function handleConnection(ws: WebSocket): void { point_id: pointId, }); } catch (e) { - sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e)); + sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e), undefined, _reqId); } break; } @@ -1518,12 +1541,14 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "vector_results", results: matches, + ...(_reqId ? { _reqId } : {}), }); } catch { // Collection may not exist yet — return empty results. sendToPeer(presenceId, { type: "vector_results", results: [], + ...(_reqId ? { _reqId } : {}), }); } log.info("ws vector_search", { @@ -1549,6 +1574,7 @@ function handleConnection(ws: WebSocket): void { id: vd.id, messageId: vd.id, queued: false, + ...(_reqId ? { _reqId } : {}), }); log.info("ws vector_delete", { presence_id: presenceId, @@ -1568,11 +1594,13 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "collection_list", collections: meshCollections, + ...(_reqId ? { _reqId } : {}), }); } catch { sendToPeer(presenceId, { type: "collection_list", collections: [], + ...(_reqId ? { _reqId } : {}), }); } log.info("ws list_collections", { @@ -1607,9 +1635,10 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "graph_result", records: gqRecords, + ...(_reqId ? { _reqId } : {}), }); } catch (gqErr) { - sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr)); + sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr), undefined, _reqId); } finally { await gqSession.close(); } @@ -1641,9 +1670,10 @@ function handleConnection(ws: WebSocket): void { sendToPeer(presenceId, { type: "graph_result", records: geRecords, + ...(_reqId ? { _reqId } : {}), }); } catch (geErr) { - sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr)); + sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr), undefined, _reqId); } finally { await geSession.close(); } @@ -1660,12 +1690,14 @@ function handleConnection(ws: WebSocket): void { const mq = msg as Extract; try { const result = await meshQuery(conn.meshId, mq.sql); - sendToPeer(presenceId, { type: "mesh_query_result", ...result }); + sendToPeer(presenceId, { type: "mesh_query_result", ...result, ...(_reqId ? { _reqId } : {}) }); } catch (e) { sendError( conn.ws, "query_error", e instanceof Error ? e.message : String(e), + undefined, + _reqId, ); } log.info("ws mesh_query", { @@ -1683,12 +1715,15 @@ function handleConnection(ws: WebSocket): void { columns: [], rows: [], rowCount: result.rowCount, + ...(_reqId ? { _reqId } : {}), }); } catch (e) { sendError( conn.ws, "execute_error", e instanceof Error ? e.message : String(e), + undefined, + _reqId, ); } log.info("ws mesh_execute", { @@ -1700,12 +1735,14 @@ function handleConnection(ws: WebSocket): void { case "mesh_schema": { try { const tables = await meshSchema(conn.meshId); - sendToPeer(presenceId, { type: "mesh_schema_result", tables }); + sendToPeer(presenceId, { type: "mesh_schema_result", tables, ...(_reqId ? { _reqId } : {}) }); } catch (e) { sendError( conn.ws, "schema_error", e instanceof Error ? e.message : String(e), + undefined, + _reqId, ); } log.info("ws mesh_schema", { presence_id: presenceId }); @@ -1746,6 +1783,7 @@ function handleConnection(ws: WebSocket): void { collections: [], yourName: selfPeer?.displayName ?? "unknown", yourGroups: peerConn?.groups ?? [], + ...(_reqId ? { _reqId } : {}), }); log.info("ws mesh_info", { presence_id: presenceId }); break; diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index c8cf16d..79308f3 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -161,6 +161,7 @@ export interface WSAckMessage { id: string; // echoes client-side correlation id messageId: string; queued: boolean; + _reqId?: string; } /** Broker → client: hello handshake acknowledgement. */ @@ -182,6 +183,7 @@ export interface WSPeersListMessage { sessionId: string; connectedAt: string; }>; + _reqId?: string; } /** Broker → client: a state key was changed by another peer. */ @@ -199,6 +201,7 @@ export interface WSStateResultMessage { value: unknown; updatedAt: string; updatedBy: string; + _reqId?: string; } /** Broker → client: response to list_state. */ @@ -210,12 +213,14 @@ export interface WSStateListMessage { updatedBy: string; updatedAt: string; }>; + _reqId?: string; } /** Broker → client: acknowledgement for a remember. */ export interface WSMemoryStoredMessage { type: "memory_stored"; id: string; + _reqId?: string; } /** Broker → client: response to recall. */ @@ -228,6 +233,7 @@ export interface WSMemoryResultsMessage { rememberedBy: string; rememberedAt: string; }>; + _reqId?: string; } // --- Vector storage messages --- @@ -299,6 +305,7 @@ export interface WSMeshSchemaMessage { export interface WSVectorStoredMessage { type: "vector_stored"; id: string; + _reqId?: string; } /** Broker → client: vector search results. */ @@ -310,18 +317,21 @@ export interface WSVectorResultsMessage { score: number; metadata?: Record; }>; + _reqId?: string; } /** Broker → client: list of vector collections. */ export interface WSCollectionListMessage { type: "collection_list"; collections: string[]; + _reqId?: string; } /** Broker → client: graph query results. */ export interface WSGraphResultMessage { type: "graph_result"; records: Array>; + _reqId?: string; } /** Broker → client: mesh SQL query results. */ @@ -330,6 +340,7 @@ export interface WSMeshQueryResultMessage { columns: string[]; rows: Array>; rowCount: number; + _reqId?: string; } /** Broker → client: mesh schema introspection results. */ @@ -339,6 +350,7 @@ export interface WSMeshSchemaResultMessage { name: string; columns: Array<{ name: string; type: string; nullable: boolean }>; }>; + _reqId?: string; } /** Client → broker: get full mesh overview. */ @@ -361,6 +373,7 @@ export interface WSMeshInfoResultMessage { collections: string[]; yourName: string; yourGroups: Array<{ name: string; role?: string }>; + _reqId?: string; } /** Client → broker: check delivery status of a message. */ @@ -381,6 +394,7 @@ export interface WSMessageStatusResultMessage { pubkey: string; status: "delivered" | "held" | "disconnected"; }>; + _reqId?: string; } // --- File sharing messages --- @@ -426,6 +440,7 @@ export interface WSFileUrlMessage { name: string; encrypted?: boolean; sealedKey?: string; + _reqId?: string; } /** Broker → client: list of files in the mesh. */ @@ -441,6 +456,7 @@ export interface WSFileListMessage { persistent: boolean; encrypted: boolean; }>; + _reqId?: string; } /** Broker → client: acknowledgement for grant_file_access. */ @@ -448,6 +464,7 @@ export interface WSGrantFileAccessOkMessage { type: "grant_file_access_ok"; fileId: string; peerPubkey: string; + _reqId?: string; } /** Broker → client: access log for a file. */ @@ -458,6 +475,7 @@ export interface WSFileStatusResultMessage { peerName: string; accessedAt: string; }>; + _reqId?: string; } // --- Context sharing messages --- @@ -499,6 +517,7 @@ export interface WSContextResultsMessage { tags: string[]; updatedAt: string; }>; + _reqId?: string; } /** Broker → client: response to list_contexts. */ @@ -510,6 +529,7 @@ export interface WSContextListMessage { tags: string[]; updatedAt: string; }>; + _reqId?: string; } // --- Task messages --- @@ -547,6 +567,7 @@ export interface WSListTasksMessage { export interface WSTaskCreatedMessage { type: "task_created"; id: string; + _reqId?: string; } /** Broker → client: response to list_tasks, claim_task, complete_task. */ @@ -563,6 +584,7 @@ export interface WSTaskListMessage { tags: string[]; createdAt: string; }>; + _reqId?: string; } // --- Stream messages --- @@ -602,6 +624,7 @@ export interface WSStreamCreatedMessage { type: "stream_created"; id: string; name: string; + _reqId?: string; } /** Broker → client: real-time data pushed from a stream. */ @@ -616,6 +639,7 @@ export interface WSStreamDataMessage { export interface WSSubscribedMessage { type: "subscribed"; stream: string; + _reqId?: string; } /** Broker → client: response to list_streams. */ @@ -628,6 +652,7 @@ export interface WSStreamListMessage { createdAt: string; subscriberCount: number; }>; + _reqId?: string; } /** Broker → client: structured error. */ @@ -636,6 +661,7 @@ export interface WSErrorMessage { code: string; message: string; id?: string; + _reqId?: string; } export type WSClientMessage =