feat(cli): fix field mismatches + error propagation
- claim_task/complete_task: send taskId not id - graph_result: read msg.records not msg.rows - message_status: try all mesh clients, not only first - broker: omit state_result for set_state (fixes get_state cross-contamination) - error handler: unblock first pending resolver on unmatched broker errors Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -844,14 +844,8 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
updatedBy: stateRow.updatedBy,
|
updatedBy: stateRow.updatedBy,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// Send confirmation back to sender as state_result.
|
// Fire-and-forget: no state_result sent back to sender.
|
||||||
sendToPeer(presenceId, {
|
// The client (server.ts) returns success immediately without waiting.
|
||||||
type: "state_result",
|
|
||||||
key: stateRow.key,
|
|
||||||
value: stateRow.value,
|
|
||||||
updatedBy: stateRow.updatedBy,
|
|
||||||
updatedAt: stateRow.updatedAt.toISOString(),
|
|
||||||
});
|
|
||||||
log.info("ws set_state", {
|
log.info("ws set_state", {
|
||||||
presence_id: presenceId,
|
presence_id: presenceId,
|
||||||
key: ss.key,
|
key: ss.key,
|
||||||
@@ -1171,6 +1165,7 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
sc.filesRead,
|
sc.filesRead,
|
||||||
sc.keyFindings,
|
sc.keyFindings,
|
||||||
sc.tags,
|
sc.tags,
|
||||||
|
conn.memberId,
|
||||||
);
|
);
|
||||||
sendToPeer(presenceId, {
|
sendToPeer(presenceId, {
|
||||||
type: "context_shared",
|
type: "context_shared",
|
||||||
@@ -1381,6 +1376,7 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
if (!streamSubscriptions.has(key))
|
if (!streamSubscriptions.has(key))
|
||||||
streamSubscriptions.set(key, new Set());
|
streamSubscriptions.set(key, new Set());
|
||||||
streamSubscriptions.get(key)!.add(presenceId);
|
streamSubscriptions.get(key)!.add(presenceId);
|
||||||
|
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream });
|
||||||
log.info("ws subscribe", {
|
log.info("ws subscribe", {
|
||||||
presence_id: presenceId,
|
presence_id: presenceId,
|
||||||
stream: sub.stream,
|
stream: sub.stream,
|
||||||
@@ -1453,40 +1449,42 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
case "vector_store": {
|
case "vector_store": {
|
||||||
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
|
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
|
||||||
const collName = meshCollectionName(conn.meshId, vs.collection);
|
const collName = meshCollectionName(conn.meshId, vs.collection);
|
||||||
await ensureCollection(collName);
|
try {
|
||||||
const { generateId } = await import("@turbostarter/shared/utils");
|
await ensureCollection(collName);
|
||||||
const pointId = generateId();
|
const { generateId } = await import("@turbostarter/shared/utils");
|
||||||
// Store text + metadata as payload. Use a zero vector as placeholder
|
const pointId = generateId();
|
||||||
// — real embeddings should be computed client-side and sent directly
|
// Store text + metadata as payload. Use a zero vector as placeholder
|
||||||
// to Qdrant in a future version.
|
// — real embeddings should be computed client-side and sent directly
|
||||||
const zeroVector = new Array(1536).fill(0) as number[];
|
// to Qdrant in a future version.
|
||||||
await qdrant.upsert(collName, {
|
const zeroVector = new Array(1536).fill(0) as number[];
|
||||||
wait: true,
|
await qdrant.upsert(collName, {
|
||||||
points: [
|
wait: true,
|
||||||
{
|
points: [
|
||||||
id: pointId,
|
{
|
||||||
vector: zeroVector,
|
id: pointId,
|
||||||
payload: {
|
vector: zeroVector,
|
||||||
text: vs.text,
|
payload: {
|
||||||
mesh_id: conn.meshId,
|
text: vs.text,
|
||||||
stored_by: conn.memberPubkey,
|
mesh_id: conn.meshId,
|
||||||
stored_at: new Date().toISOString(),
|
stored_by: conn.memberPubkey,
|
||||||
...(vs.metadata ?? {}),
|
stored_at: new Date().toISOString(),
|
||||||
|
...(vs.metadata ?? {}),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
],
|
||||||
],
|
});
|
||||||
});
|
sendToPeer(presenceId, {
|
||||||
sendToPeer(presenceId, {
|
type: "vector_stored",
|
||||||
type: "ack" as const,
|
id: pointId,
|
||||||
id: pointId,
|
});
|
||||||
messageId: pointId,
|
log.info("ws vector_store", {
|
||||||
queued: false,
|
presence_id: presenceId,
|
||||||
});
|
collection: vs.collection,
|
||||||
log.info("ws vector_store", {
|
point_id: pointId,
|
||||||
presence_id: presenceId,
|
});
|
||||||
collection: vs.collection,
|
} catch (e) {
|
||||||
point_id: pointId,
|
sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e));
|
||||||
});
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case "vector_search": {
|
case "vector_search": {
|
||||||
@@ -1730,7 +1728,6 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
]);
|
]);
|
||||||
const allGroups = new Set<string>();
|
const allGroups = new Set<string>();
|
||||||
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
|
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
|
||||||
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
|
|
||||||
const peerConn = connections.get(presenceId);
|
const peerConn = connections.get(presenceId);
|
||||||
// Find own display name: match sessionPubkey from the peer list
|
// Find own display name: match sessionPubkey from the peer list
|
||||||
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;
|
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;
|
||||||
|
|||||||
@@ -327,9 +327,15 @@ Your message mode is "${messageMode}".
|
|||||||
case "message_status": {
|
case "message_status": {
|
||||||
const { id } = (args ?? {}) as { id?: string };
|
const { id } = (args ?? {}) as { id?: string };
|
||||||
if (!id) return text("message_status: `id` required", true);
|
if (!id) return text("message_status: `id` required", true);
|
||||||
const client = allClients()[0];
|
const clients = allClients();
|
||||||
if (!client) return text("message_status: not connected", true);
|
if (!clients.length) return text("message_status: not connected", true);
|
||||||
const result = await client.messageStatus(id);
|
// Try each connected mesh client — we don't know which mesh the
|
||||||
|
// messageId belongs to, so query all and return the first hit.
|
||||||
|
let result = null;
|
||||||
|
for (const c of clients) {
|
||||||
|
result = await c.messageStatus(id);
|
||||||
|
if (result) break;
|
||||||
|
}
|
||||||
if (!result) return text(`Message ${id} not found or timed out.`);
|
if (!result) return text(`Message ${id} not found or timed out.`);
|
||||||
const recipientLines = result.recipients.map(
|
const recipientLines = result.recipients.map(
|
||||||
(r: { name: string; pubkey: string; status: string }) =>
|
(r: { name: string; pubkey: string; status: string }) =>
|
||||||
|
|||||||
@@ -684,13 +684,13 @@ export class BrokerClient {
|
|||||||
/** Claim an unclaimed task. */
|
/** Claim an unclaimed task. */
|
||||||
async claimTask(id: string): Promise<void> {
|
async claimTask(id: string): Promise<void> {
|
||||||
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
||||||
this.ws.send(JSON.stringify({ type: "claim_task", id }));
|
this.ws.send(JSON.stringify({ type: "claim_task", taskId: id }));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Mark a task done with optional result. */
|
/** Mark a task done with optional result. */
|
||||||
async completeTask(id: string, result?: string): Promise<void> {
|
async completeTask(id: string, result?: string): Promise<void> {
|
||||||
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
||||||
this.ws.send(JSON.stringify({ type: "complete_task", id, result }));
|
this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result }));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** List tasks filtered by status/assignee. */
|
/** List tasks filtered by status/assignee. */
|
||||||
@@ -917,6 +917,11 @@ export class BrokerClient {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (msg.type === "state_result") {
|
if (msg.type === "state_result") {
|
||||||
|
// DEPENDENCY: The broker must NOT send state_result for set_state
|
||||||
|
// operations (only for get_state). If the broker sends state_result for
|
||||||
|
// both, it would be consumed here by the next pending get_state resolver,
|
||||||
|
// returning the wrong value (cross-contamination). The broker's set_state
|
||||||
|
// handler was fixed to omit state_result; only get_state sends it.
|
||||||
const resolver = this.stateResolvers.shift();
|
const resolver = this.stateResolvers.shift();
|
||||||
if (resolver) {
|
if (resolver) {
|
||||||
if (msg.key) {
|
if (msg.key) {
|
||||||
@@ -1016,7 +1021,8 @@ export class BrokerClient {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (msg.type === "graph_result") {
|
if (msg.type === "graph_result") {
|
||||||
const rows = (msg.rows as Array<Record<string, unknown>>) ?? [];
|
// Broker sends { type: "graph_result", records: [...] }
|
||||||
|
const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
|
||||||
const resolver = this.graphResultResolvers.shift();
|
const resolver = this.graphResultResolvers.shift();
|
||||||
if (resolver) resolver(rows);
|
if (resolver) resolver(rows);
|
||||||
return;
|
return;
|
||||||
@@ -1095,6 +1101,7 @@ export class BrokerClient {
|
|||||||
if (msg.type === "error") {
|
if (msg.type === "error") {
|
||||||
this.debug(`broker error: ${msg.code} ${msg.message}`);
|
this.debug(`broker error: ${msg.code} ${msg.message}`);
|
||||||
const id = msg.id ? String(msg.id) : null;
|
const id = msg.id ? String(msg.id) : null;
|
||||||
|
let handledByPendingSend = false;
|
||||||
if (id) {
|
if (id) {
|
||||||
const pending = this.pendingSends.get(id);
|
const pending = this.pendingSends.get(id);
|
||||||
if (pending) {
|
if (pending) {
|
||||||
@@ -1103,6 +1110,43 @@ export class BrokerClient {
|
|||||||
error: `${msg.code}: ${msg.message}`,
|
error: `${msg.code}: ${msg.message}`,
|
||||||
});
|
});
|
||||||
this.pendingSends.delete(id);
|
this.pendingSends.delete(id);
|
||||||
|
handledByPendingSend = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!handledByPendingSend) {
|
||||||
|
// Best-effort: unblock the first waiting resolver so callers don't
|
||||||
|
// hang for 5s. We don't know which tool triggered the error, so we
|
||||||
|
// pop the first non-empty resolver queue in priority order.
|
||||||
|
if (this.stateResolvers.length > 0) {
|
||||||
|
this.stateResolvers.shift()!(null);
|
||||||
|
} else if (this.stateListResolvers.length > 0) {
|
||||||
|
this.stateListResolvers.shift()!([]);
|
||||||
|
} else if (this.memoryStoreResolvers.length > 0) {
|
||||||
|
this.memoryStoreResolvers.shift()!(null);
|
||||||
|
} else if (this.memoryRecallResolvers.length > 0) {
|
||||||
|
this.memoryRecallResolvers.shift()!([]);
|
||||||
|
} else if (this.fileUrlResolvers.length > 0) {
|
||||||
|
this.fileUrlResolvers.shift()!(null);
|
||||||
|
} else if (this.fileListResolvers.length > 0) {
|
||||||
|
this.fileListResolvers.shift()!([]);
|
||||||
|
} else if (this.fileStatusResolvers.length > 0) {
|
||||||
|
this.fileStatusResolvers.shift()!([]);
|
||||||
|
} else if (this.graphResultResolvers.length > 0) {
|
||||||
|
this.graphResultResolvers.shift()!([]);
|
||||||
|
} else if (this.vectorStoredResolvers.length > 0) {
|
||||||
|
this.vectorStoredResolvers.shift()!(null);
|
||||||
|
} else if (this.vectorResultsResolvers.length > 0) {
|
||||||
|
this.vectorResultsResolvers.shift()!([]);
|
||||||
|
} else if (this.taskListResolvers.length > 0) {
|
||||||
|
this.taskListResolvers.shift()!([]);
|
||||||
|
} else if (this.meshQueryResolvers.length > 0) {
|
||||||
|
this.meshQueryResolvers.shift()!(null);
|
||||||
|
} else if (this.contextResultsResolvers.length > 0) {
|
||||||
|
this.contextResultsResolvers.shift()!([]);
|
||||||
|
} else if (this.contextListResolvers.length > 0) {
|
||||||
|
this.contextListResolvers.shift()!([]);
|
||||||
|
} else if (this.streamListResolvers.length > 0) {
|
||||||
|
this.streamListResolvers.shift()!([]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user