3 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
e26a36e543 fix(broker): vector_stored type, set_state no-resp, subscribe ack
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- vector_store sends {type:"vector_stored",id}; wrapped in try/catch
- set_state no longer sends state_result (fire-and-forget)
- subscribe sends {type:"subscribed",stream} confirmation
- remove broken myPresence lookup in mesh_info
- add WSVectorStoredMessage + WSSubscribedMessage to types union

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:08:06 +01:00
Alejandro Gutiérrez
60c74d9463 fix(broker): shareContext stable upsert key + createStream atomic upsert
- shareContext: adds optional memberId param; when provided, upserts on
  (meshId, memberId) instead of (meshId, presenceId) — prevents stale
  context rows accumulating on every reconnect. Falls back to presenceId
  for legacy/anonymous connections. Also refreshes presenceId on update
  so it stays current.
- schema: adds member_id column + unique index context_mesh_member_idx
  on mesh.context table; new migration 0013_context-stable-member-key.sql.
- index.ts call site updated to pass conn.memberId as the stable key.
- createStream: replaces SELECT-then-INSERT TOCTOU race with atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING, followed by SELECT on miss.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:58 +01:00
Alejandro Gutiérrez
6fba9bd4eb feat(cli): fix field mismatches + error propagation
- claim_task/complete_task: send taskId not id
- graph_result: read msg.records not msg.rows
- message_status: try all mesh clients, not only first
- broker: omit state_result for set_state (fixes get_state cross-contamination)
- error handler: unblock first pending resolver on unmatched broker errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:25 +01:00
7 changed files with 170 additions and 78 deletions

View File

@@ -955,8 +955,13 @@ export async function grantFileKey(
// --- Context sharing --- // --- Context sharing ---
/** /**
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair * Upsert a context snapshot for a peer. When `memberId` is provided the
* has at most one context row — repeated calls update it in place. * row is keyed on (meshId, memberId) — a stable identifier that survives
* reconnects. This prevents stale rows from accumulating every time a
* session reconnects with a fresh ephemeral presenceId.
*
* Falls back to (meshId, presenceId) lookup when memberId is absent
* (e.g. legacy callers or anonymous connections).
*/ */
export async function shareContext( export async function shareContext(
meshId: string, meshId: string,
@@ -966,24 +971,27 @@ export async function shareContext(
filesRead?: string[], filesRead?: string[],
keyFindings?: string[], keyFindings?: string[],
tags?: string[], tags?: string[],
memberId?: string,
): Promise<string> { ): Promise<string> {
const now = new Date(); const now = new Date();
// Try to find existing context for this presence in this mesh.
// Build the WHERE clause: prefer stable memberId, fall back to presenceId.
const lookupWhere = memberId
? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId))
: and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId));
const [existing] = await db const [existing] = await db
.select({ id: meshContext.id }) .select({ id: meshContext.id })
.from(meshContext) .from(meshContext)
.where( .where(lookupWhere)
and(
eq(meshContext.meshId, meshId),
eq(meshContext.presenceId, presenceId),
),
)
.limit(1); .limit(1);
if (existing) { if (existing) {
await db await db
.update(meshContext) .update(meshContext)
.set({ .set({
// Keep presenceId current so it reflects the latest connection.
presenceId,
peerName: peerName ?? null, peerName: peerName ?? null,
summary, summary,
filesRead: filesRead ?? [], filesRead: filesRead ?? [],
@@ -999,6 +1007,7 @@ export async function shareContext(
.insert(meshContext) .insert(meshContext)
.values({ .values({
meshId, meshId,
memberId: memberId ?? null,
presenceId, presenceId,
peerName: peerName ?? null, peerName: peerName ?? null,
summary, summary,
@@ -1248,16 +1257,22 @@ export async function createStream(
name: string, name: string,
createdByName: string, createdByName: string,
): Promise<string> { ): Promise<string> {
const existing = await db // Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race
// when two callers concurrently attempt to create the same stream.
const [inserted] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.onConflictDoNothing()
.returning({ id: meshStream.id });
if (inserted) return inserted.id;
// Row already existed — fetch the id.
const [existing] = await db
.select({ id: meshStream.id }) .select({ id: meshStream.id })
.from(meshStream) .from(meshStream)
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name))); .where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
if (existing.length > 0) return existing[0]!.id; return existing!.id;
const [row] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.returning({ id: meshStream.id });
return row!.id;
} }
/** /**

View File

@@ -844,14 +844,8 @@ function handleConnection(ws: WebSocket): void {
updatedBy: stateRow.updatedBy, updatedBy: stateRow.updatedBy,
}); });
} }
// Send confirmation back to sender as state_result. // Fire-and-forget: no state_result sent back to sender.
sendToPeer(presenceId, { // The client (server.ts) returns success immediately without waiting.
type: "state_result",
key: stateRow.key,
value: stateRow.value,
updatedBy: stateRow.updatedBy,
updatedAt: stateRow.updatedAt.toISOString(),
});
log.info("ws set_state", { log.info("ws set_state", {
presence_id: presenceId, presence_id: presenceId,
key: ss.key, key: ss.key,
@@ -1171,6 +1165,7 @@ function handleConnection(ws: WebSocket): void {
sc.filesRead, sc.filesRead,
sc.keyFindings, sc.keyFindings,
sc.tags, sc.tags,
conn.memberId,
); );
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "context_shared", type: "context_shared",
@@ -1381,6 +1376,7 @@ function handleConnection(ws: WebSocket): void {
if (!streamSubscriptions.has(key)) if (!streamSubscriptions.has(key))
streamSubscriptions.set(key, new Set()); streamSubscriptions.set(key, new Set());
streamSubscriptions.get(key)!.add(presenceId); streamSubscriptions.get(key)!.add(presenceId);
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream });
log.info("ws subscribe", { log.info("ws subscribe", {
presence_id: presenceId, presence_id: presenceId,
stream: sub.stream, stream: sub.stream,
@@ -1453,40 +1449,42 @@ function handleConnection(ws: WebSocket): void {
case "vector_store": { case "vector_store": {
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>; const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
const collName = meshCollectionName(conn.meshId, vs.collection); const collName = meshCollectionName(conn.meshId, vs.collection);
await ensureCollection(collName); try {
const { generateId } = await import("@turbostarter/shared/utils"); await ensureCollection(collName);
const pointId = generateId(); const { generateId } = await import("@turbostarter/shared/utils");
// Store text + metadata as payload. Use a zero vector as placeholder const pointId = generateId();
// — real embeddings should be computed client-side and sent directly // Store text + metadata as payload. Use a zero vector as placeholder
// to Qdrant in a future version. // — real embeddings should be computed client-side and sent directly
const zeroVector = new Array(1536).fill(0) as number[]; // to Qdrant in a future version.
await qdrant.upsert(collName, { const zeroVector = new Array(1536).fill(0) as number[];
wait: true, await qdrant.upsert(collName, {
points: [ wait: true,
{ points: [
id: pointId, {
vector: zeroVector, id: pointId,
payload: { vector: zeroVector,
text: vs.text, payload: {
mesh_id: conn.meshId, text: vs.text,
stored_by: conn.memberPubkey, mesh_id: conn.meshId,
stored_at: new Date().toISOString(), stored_by: conn.memberPubkey,
...(vs.metadata ?? {}), stored_at: new Date().toISOString(),
...(vs.metadata ?? {}),
},
}, },
}, ],
], });
}); sendToPeer(presenceId, {
sendToPeer(presenceId, { type: "vector_stored",
type: "ack" as const, id: pointId,
id: pointId, });
messageId: pointId, log.info("ws vector_store", {
queued: false, presence_id: presenceId,
}); collection: vs.collection,
log.info("ws vector_store", { point_id: pointId,
presence_id: presenceId, });
collection: vs.collection, } catch (e) {
point_id: pointId, sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e));
}); }
break; break;
} }
case "vector_search": { case "vector_search": {
@@ -1730,7 +1728,6 @@ function handleConnection(ws: WebSocket): void {
]); ]);
const allGroups = new Set<string>(); const allGroups = new Set<string>();
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`); for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
const peerConn = connections.get(presenceId); const peerConn = connections.get(presenceId);
// Find own display name: match sessionPubkey from the peer list // Find own display name: match sessionPubkey from the peer list
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey; const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;

View File

@@ -295,6 +295,12 @@ export interface WSMeshSchemaMessage {
// --- Vector/Graph response messages --- // --- Vector/Graph response messages ---
/** Broker → client: confirmation that a vector point was stored. */
export interface WSVectorStoredMessage {
type: "vector_stored";
id: string;
}
/** Broker → client: vector search results. */ /** Broker → client: vector search results. */
export interface WSVectorResultsMessage { export interface WSVectorResultsMessage {
type: "vector_results"; type: "vector_results";
@@ -606,6 +612,12 @@ export interface WSStreamDataMessage {
publishedBy: string; publishedBy: string;
} }
/** Broker → client: confirmation that a stream subscription was registered. */
export interface WSSubscribedMessage {
type: "subscribed";
stream: string;
}
/** Broker → client: response to list_streams. */ /** Broker → client: response to list_streams. */
export interface WSStreamListMessage { export interface WSStreamListMessage {
type: "stream_list"; type: "stream_list";
@@ -689,6 +701,7 @@ export type WSServerMessage =
| WSContextListMessage | WSContextListMessage
| WSTaskCreatedMessage | WSTaskCreatedMessage
| WSTaskListMessage | WSTaskListMessage
| WSVectorStoredMessage
| WSVectorResultsMessage | WSVectorResultsMessage
| WSCollectionListMessage | WSCollectionListMessage
| WSGraphResultMessage | WSGraphResultMessage
@@ -696,6 +709,7 @@ export type WSServerMessage =
| WSMeshSchemaResultMessage | WSMeshSchemaResultMessage
| WSStreamCreatedMessage | WSStreamCreatedMessage
| WSStreamDataMessage | WSStreamDataMessage
| WSSubscribedMessage
| WSStreamListMessage | WSStreamListMessage
| WSMeshInfoResultMessage | WSMeshInfoResultMessage
| WSErrorMessage; | WSErrorMessage;

View File

@@ -327,9 +327,15 @@ Your message mode is "${messageMode}".
case "message_status": { case "message_status": {
const { id } = (args ?? {}) as { id?: string }; const { id } = (args ?? {}) as { id?: string };
if (!id) return text("message_status: `id` required", true); if (!id) return text("message_status: `id` required", true);
const client = allClients()[0]; const clients = allClients();
if (!client) return text("message_status: not connected", true); if (!clients.length) return text("message_status: not connected", true);
const result = await client.messageStatus(id); // Try each connected mesh client — we don't know which mesh the
// messageId belongs to, so query all and return the first hit.
let result = null;
for (const c of clients) {
result = await c.messageStatus(id);
if (result) break;
}
if (!result) return text(`Message ${id} not found or timed out.`); if (!result) return text(`Message ${id} not found or timed out.`);
const recipientLines = result.recipients.map( const recipientLines = result.recipients.map(
(r: { name: string; pubkey: string; status: string }) => (r: { name: string; pubkey: string; status: string }) =>

View File

@@ -684,13 +684,13 @@ export class BrokerClient {
/** Claim an unclaimed task. */ /** Claim an unclaimed task. */
async claimTask(id: string): Promise<void> { async claimTask(id: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "claim_task", id })); this.ws.send(JSON.stringify({ type: "claim_task", taskId: id }));
} }
/** Mark a task done with optional result. */ /** Mark a task done with optional result. */
async completeTask(id: string, result?: string): Promise<void> { async completeTask(id: string, result?: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "complete_task", id, result })); this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result }));
} }
/** List tasks filtered by status/assignee. */ /** List tasks filtered by status/assignee. */
@@ -917,6 +917,11 @@ export class BrokerClient {
return; return;
} }
if (msg.type === "state_result") { if (msg.type === "state_result") {
// DEPENDENCY: The broker must NOT send state_result for set_state
// operations (only for get_state). If the broker sends state_result for
// both, it would be consumed here by the next pending get_state resolver,
// returning the wrong value (cross-contamination). The broker's set_state
// handler was fixed to omit state_result; only get_state sends it.
const resolver = this.stateResolvers.shift(); const resolver = this.stateResolvers.shift();
if (resolver) { if (resolver) {
if (msg.key) { if (msg.key) {
@@ -1016,7 +1021,8 @@ export class BrokerClient {
return; return;
} }
if (msg.type === "graph_result") { if (msg.type === "graph_result") {
const rows = (msg.rows as Array<Record<string, unknown>>) ?? []; // Broker sends { type: "graph_result", records: [...] }
const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
const resolver = this.graphResultResolvers.shift(); const resolver = this.graphResultResolvers.shift();
if (resolver) resolver(rows); if (resolver) resolver(rows);
return; return;
@@ -1095,6 +1101,7 @@ export class BrokerClient {
if (msg.type === "error") { if (msg.type === "error") {
this.debug(`broker error: ${msg.code} ${msg.message}`); this.debug(`broker error: ${msg.code} ${msg.message}`);
const id = msg.id ? String(msg.id) : null; const id = msg.id ? String(msg.id) : null;
let handledByPendingSend = false;
if (id) { if (id) {
const pending = this.pendingSends.get(id); const pending = this.pendingSends.get(id);
if (pending) { if (pending) {
@@ -1103,6 +1110,43 @@ export class BrokerClient {
error: `${msg.code}: ${msg.message}`, error: `${msg.code}: ${msg.message}`,
}); });
this.pendingSends.delete(id); this.pendingSends.delete(id);
handledByPendingSend = true;
}
}
if (!handledByPendingSend) {
// Best-effort: unblock the first waiting resolver so callers don't
// hang for 5s. We don't know which tool triggered the error, so we
// pop the first non-empty resolver queue in priority order.
if (this.stateResolvers.length > 0) {
this.stateResolvers.shift()!(null);
} else if (this.stateListResolvers.length > 0) {
this.stateListResolvers.shift()!([]);
} else if (this.memoryStoreResolvers.length > 0) {
this.memoryStoreResolvers.shift()!(null);
} else if (this.memoryRecallResolvers.length > 0) {
this.memoryRecallResolvers.shift()!([]);
} else if (this.fileUrlResolvers.length > 0) {
this.fileUrlResolvers.shift()!(null);
} else if (this.fileListResolvers.length > 0) {
this.fileListResolvers.shift()!([]);
} else if (this.fileStatusResolvers.length > 0) {
this.fileStatusResolvers.shift()!([]);
} else if (this.graphResultResolvers.length > 0) {
this.graphResultResolvers.shift()!([]);
} else if (this.vectorStoredResolvers.length > 0) {
this.vectorStoredResolvers.shift()!(null);
} else if (this.vectorResultsResolvers.length > 0) {
this.vectorResultsResolvers.shift()!([]);
} else if (this.taskListResolvers.length > 0) {
this.taskListResolvers.shift()!([]);
} else if (this.meshQueryResolvers.length > 0) {
this.meshQueryResolvers.shift()!(null);
} else if (this.contextResultsResolvers.length > 0) {
this.contextResultsResolvers.shift()!([]);
} else if (this.contextListResolvers.length > 0) {
this.contextListResolvers.shift()!([]);
} else if (this.streamListResolvers.length > 0) {
this.streamListResolvers.shift()!([]);
} }
} }
} }

View File

@@ -0,0 +1,3 @@
ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint
ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint
CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id");

View File

@@ -353,24 +353,37 @@ export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({
})); }));
/** /**
* Per-peer context snapshot. Each peer (presence) has at most one context * Per-peer context snapshot. Each peer (member) has at most one context
* entry per mesh, upserted on each share_context call. Allows peers to * entry per mesh, upserted on each share_context call. Allows peers to
* discover what others are working on, which files they've read, and * discover what others are working on, which files they've read, and
* key findings — without sending a direct message. * key findings — without sending a direct message.
*
* `memberId` is the stable upsert key (survives reconnects). `presenceId`
* is kept for backwards-compat but is nullable — new rows should always
* populate `memberId`. The unique index on (meshId, memberId) prevents
* stale rows from accumulating when a session reconnects with a new
* ephemeral presenceId.
*/ */
export const meshContext = meshSchema.table("context", { export const meshContext = meshSchema.table(
id: text().primaryKey().notNull().$defaultFn(generateId), "context",
meshId: text() {
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) id: text().primaryKey().notNull().$defaultFn(generateId),
.notNull(), meshId: text()
presenceId: text().references(() => presence.id, { onDelete: "cascade" }), .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
peerName: text(), .notNull(),
summary: text().notNull(), memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }),
filesRead: text().array().default([]), presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
keyFindings: text().array().default([]), peerName: text(),
tags: text().array().default([]), summary: text().notNull(),
updatedAt: timestamp().defaultNow().notNull(), filesRead: text().array().default([]),
}); keyFindings: text().array().default([]),
tags: text().array().default([]),
updatedAt: timestamp().defaultNow().notNull(),
},
(table) => [
uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId),
],
);
/** /**
* Mesh-scoped task board. Peers can create tasks, claim them, and mark * Mesh-scoped task board. Peers can create tasks, claim them, and mark