Compare commits
3 Commits
5bcc1fe323
...
e26a36e543
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e26a36e543 | ||
|
|
60c74d9463 | ||
|
|
6fba9bd4eb |
@@ -955,8 +955,13 @@ export async function grantFileKey(
|
||||
// --- Context sharing ---
|
||||
|
||||
/**
|
||||
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair
|
||||
* has at most one context row — repeated calls update it in place.
|
||||
* Upsert a context snapshot for a peer. When `memberId` is provided the
|
||||
* row is keyed on (meshId, memberId) — a stable identifier that survives
|
||||
* reconnects. This prevents stale rows from accumulating every time a
|
||||
* session reconnects with a fresh ephemeral presenceId.
|
||||
*
|
||||
* Falls back to (meshId, presenceId) lookup when memberId is absent
|
||||
* (e.g. legacy callers or anonymous connections).
|
||||
*/
|
||||
export async function shareContext(
|
||||
meshId: string,
|
||||
@@ -966,24 +971,27 @@ export async function shareContext(
|
||||
filesRead?: string[],
|
||||
keyFindings?: string[],
|
||||
tags?: string[],
|
||||
memberId?: string,
|
||||
): Promise<string> {
|
||||
const now = new Date();
|
||||
// Try to find existing context for this presence in this mesh.
|
||||
|
||||
// Build the WHERE clause: prefer stable memberId, fall back to presenceId.
|
||||
const lookupWhere = memberId
|
||||
? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId))
|
||||
: and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId));
|
||||
|
||||
const [existing] = await db
|
||||
.select({ id: meshContext.id })
|
||||
.from(meshContext)
|
||||
.where(
|
||||
and(
|
||||
eq(meshContext.meshId, meshId),
|
||||
eq(meshContext.presenceId, presenceId),
|
||||
),
|
||||
)
|
||||
.where(lookupWhere)
|
||||
.limit(1);
|
||||
|
||||
if (existing) {
|
||||
await db
|
||||
.update(meshContext)
|
||||
.set({
|
||||
// Keep presenceId current so it reflects the latest connection.
|
||||
presenceId,
|
||||
peerName: peerName ?? null,
|
||||
summary,
|
||||
filesRead: filesRead ?? [],
|
||||
@@ -999,6 +1007,7 @@ export async function shareContext(
|
||||
.insert(meshContext)
|
||||
.values({
|
||||
meshId,
|
||||
memberId: memberId ?? null,
|
||||
presenceId,
|
||||
peerName: peerName ?? null,
|
||||
summary,
|
||||
@@ -1248,16 +1257,22 @@ export async function createStream(
|
||||
name: string,
|
||||
createdByName: string,
|
||||
): Promise<string> {
|
||||
const existing = await db
|
||||
// Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race
|
||||
// when two callers concurrently attempt to create the same stream.
|
||||
const [inserted] = await db
|
||||
.insert(meshStream)
|
||||
.values({ meshId, name, createdByName })
|
||||
.onConflictDoNothing()
|
||||
.returning({ id: meshStream.id });
|
||||
|
||||
if (inserted) return inserted.id;
|
||||
|
||||
// Row already existed — fetch the id.
|
||||
const [existing] = await db
|
||||
.select({ id: meshStream.id })
|
||||
.from(meshStream)
|
||||
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
|
||||
if (existing.length > 0) return existing[0]!.id;
|
||||
const [row] = await db
|
||||
.insert(meshStream)
|
||||
.values({ meshId, name, createdByName })
|
||||
.returning({ id: meshStream.id });
|
||||
return row!.id;
|
||||
return existing!.id;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -844,14 +844,8 @@ function handleConnection(ws: WebSocket): void {
|
||||
updatedBy: stateRow.updatedBy,
|
||||
});
|
||||
}
|
||||
// Send confirmation back to sender as state_result.
|
||||
sendToPeer(presenceId, {
|
||||
type: "state_result",
|
||||
key: stateRow.key,
|
||||
value: stateRow.value,
|
||||
updatedBy: stateRow.updatedBy,
|
||||
updatedAt: stateRow.updatedAt.toISOString(),
|
||||
});
|
||||
// Fire-and-forget: no state_result sent back to sender.
|
||||
// The client (server.ts) returns success immediately without waiting.
|
||||
log.info("ws set_state", {
|
||||
presence_id: presenceId,
|
||||
key: ss.key,
|
||||
@@ -1171,6 +1165,7 @@ function handleConnection(ws: WebSocket): void {
|
||||
sc.filesRead,
|
||||
sc.keyFindings,
|
||||
sc.tags,
|
||||
conn.memberId,
|
||||
);
|
||||
sendToPeer(presenceId, {
|
||||
type: "context_shared",
|
||||
@@ -1381,6 +1376,7 @@ function handleConnection(ws: WebSocket): void {
|
||||
if (!streamSubscriptions.has(key))
|
||||
streamSubscriptions.set(key, new Set());
|
||||
streamSubscriptions.get(key)!.add(presenceId);
|
||||
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream });
|
||||
log.info("ws subscribe", {
|
||||
presence_id: presenceId,
|
||||
stream: sub.stream,
|
||||
@@ -1453,40 +1449,42 @@ function handleConnection(ws: WebSocket): void {
|
||||
case "vector_store": {
|
||||
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
|
||||
const collName = meshCollectionName(conn.meshId, vs.collection);
|
||||
await ensureCollection(collName);
|
||||
const { generateId } = await import("@turbostarter/shared/utils");
|
||||
const pointId = generateId();
|
||||
// Store text + metadata as payload. Use a zero vector as placeholder
|
||||
// — real embeddings should be computed client-side and sent directly
|
||||
// to Qdrant in a future version.
|
||||
const zeroVector = new Array(1536).fill(0) as number[];
|
||||
await qdrant.upsert(collName, {
|
||||
wait: true,
|
||||
points: [
|
||||
{
|
||||
id: pointId,
|
||||
vector: zeroVector,
|
||||
payload: {
|
||||
text: vs.text,
|
||||
mesh_id: conn.meshId,
|
||||
stored_by: conn.memberPubkey,
|
||||
stored_at: new Date().toISOString(),
|
||||
...(vs.metadata ?? {}),
|
||||
try {
|
||||
await ensureCollection(collName);
|
||||
const { generateId } = await import("@turbostarter/shared/utils");
|
||||
const pointId = generateId();
|
||||
// Store text + metadata as payload. Use a zero vector as placeholder
|
||||
// — real embeddings should be computed client-side and sent directly
|
||||
// to Qdrant in a future version.
|
||||
const zeroVector = new Array(1536).fill(0) as number[];
|
||||
await qdrant.upsert(collName, {
|
||||
wait: true,
|
||||
points: [
|
||||
{
|
||||
id: pointId,
|
||||
vector: zeroVector,
|
||||
payload: {
|
||||
text: vs.text,
|
||||
mesh_id: conn.meshId,
|
||||
stored_by: conn.memberPubkey,
|
||||
stored_at: new Date().toISOString(),
|
||||
...(vs.metadata ?? {}),
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
sendToPeer(presenceId, {
|
||||
type: "ack" as const,
|
||||
id: pointId,
|
||||
messageId: pointId,
|
||||
queued: false,
|
||||
});
|
||||
log.info("ws vector_store", {
|
||||
presence_id: presenceId,
|
||||
collection: vs.collection,
|
||||
point_id: pointId,
|
||||
});
|
||||
],
|
||||
});
|
||||
sendToPeer(presenceId, {
|
||||
type: "vector_stored",
|
||||
id: pointId,
|
||||
});
|
||||
log.info("ws vector_store", {
|
||||
presence_id: presenceId,
|
||||
collection: vs.collection,
|
||||
point_id: pointId,
|
||||
});
|
||||
} catch (e) {
|
||||
sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "vector_search": {
|
||||
@@ -1730,7 +1728,6 @@ function handleConnection(ws: WebSocket): void {
|
||||
]);
|
||||
const allGroups = new Set<string>();
|
||||
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
|
||||
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
|
||||
const peerConn = connections.get(presenceId);
|
||||
// Find own display name: match sessionPubkey from the peer list
|
||||
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;
|
||||
|
||||
@@ -295,6 +295,12 @@ export interface WSMeshSchemaMessage {
|
||||
|
||||
// --- Vector/Graph response messages ---
|
||||
|
||||
/** Broker → client: confirmation that a vector point was stored. */
|
||||
export interface WSVectorStoredMessage {
|
||||
type: "vector_stored";
|
||||
id: string;
|
||||
}
|
||||
|
||||
/** Broker → client: vector search results. */
|
||||
export interface WSVectorResultsMessage {
|
||||
type: "vector_results";
|
||||
@@ -606,6 +612,12 @@ export interface WSStreamDataMessage {
|
||||
publishedBy: string;
|
||||
}
|
||||
|
||||
/** Broker → client: confirmation that a stream subscription was registered. */
|
||||
export interface WSSubscribedMessage {
|
||||
type: "subscribed";
|
||||
stream: string;
|
||||
}
|
||||
|
||||
/** Broker → client: response to list_streams. */
|
||||
export interface WSStreamListMessage {
|
||||
type: "stream_list";
|
||||
@@ -689,6 +701,7 @@ export type WSServerMessage =
|
||||
| WSContextListMessage
|
||||
| WSTaskCreatedMessage
|
||||
| WSTaskListMessage
|
||||
| WSVectorStoredMessage
|
||||
| WSVectorResultsMessage
|
||||
| WSCollectionListMessage
|
||||
| WSGraphResultMessage
|
||||
@@ -696,6 +709,7 @@ export type WSServerMessage =
|
||||
| WSMeshSchemaResultMessage
|
||||
| WSStreamCreatedMessage
|
||||
| WSStreamDataMessage
|
||||
| WSSubscribedMessage
|
||||
| WSStreamListMessage
|
||||
| WSMeshInfoResultMessage
|
||||
| WSErrorMessage;
|
||||
|
||||
@@ -327,9 +327,15 @@ Your message mode is "${messageMode}".
|
||||
case "message_status": {
|
||||
const { id } = (args ?? {}) as { id?: string };
|
||||
if (!id) return text("message_status: `id` required", true);
|
||||
const client = allClients()[0];
|
||||
if (!client) return text("message_status: not connected", true);
|
||||
const result = await client.messageStatus(id);
|
||||
const clients = allClients();
|
||||
if (!clients.length) return text("message_status: not connected", true);
|
||||
// Try each connected mesh client — we don't know which mesh the
|
||||
// messageId belongs to, so query all and return the first hit.
|
||||
let result = null;
|
||||
for (const c of clients) {
|
||||
result = await c.messageStatus(id);
|
||||
if (result) break;
|
||||
}
|
||||
if (!result) return text(`Message ${id} not found or timed out.`);
|
||||
const recipientLines = result.recipients.map(
|
||||
(r: { name: string; pubkey: string; status: string }) =>
|
||||
|
||||
@@ -684,13 +684,13 @@ export class BrokerClient {
|
||||
/** Claim an unclaimed task. */
|
||||
async claimTask(id: string): Promise<void> {
|
||||
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
||||
this.ws.send(JSON.stringify({ type: "claim_task", id }));
|
||||
this.ws.send(JSON.stringify({ type: "claim_task", taskId: id }));
|
||||
}
|
||||
|
||||
/** Mark a task done with optional result. */
|
||||
async completeTask(id: string, result?: string): Promise<void> {
|
||||
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
|
||||
this.ws.send(JSON.stringify({ type: "complete_task", id, result }));
|
||||
this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result }));
|
||||
}
|
||||
|
||||
/** List tasks filtered by status/assignee. */
|
||||
@@ -917,6 +917,11 @@ export class BrokerClient {
|
||||
return;
|
||||
}
|
||||
if (msg.type === "state_result") {
|
||||
// DEPENDENCY: The broker must NOT send state_result for set_state
|
||||
// operations (only for get_state). If the broker sends state_result for
|
||||
// both, it would be consumed here by the next pending get_state resolver,
|
||||
// returning the wrong value (cross-contamination). The broker's set_state
|
||||
// handler was fixed to omit state_result; only get_state sends it.
|
||||
const resolver = this.stateResolvers.shift();
|
||||
if (resolver) {
|
||||
if (msg.key) {
|
||||
@@ -1016,7 +1021,8 @@ export class BrokerClient {
|
||||
return;
|
||||
}
|
||||
if (msg.type === "graph_result") {
|
||||
const rows = (msg.rows as Array<Record<string, unknown>>) ?? [];
|
||||
// Broker sends { type: "graph_result", records: [...] }
|
||||
const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
|
||||
const resolver = this.graphResultResolvers.shift();
|
||||
if (resolver) resolver(rows);
|
||||
return;
|
||||
@@ -1095,6 +1101,7 @@ export class BrokerClient {
|
||||
if (msg.type === "error") {
|
||||
this.debug(`broker error: ${msg.code} ${msg.message}`);
|
||||
const id = msg.id ? String(msg.id) : null;
|
||||
let handledByPendingSend = false;
|
||||
if (id) {
|
||||
const pending = this.pendingSends.get(id);
|
||||
if (pending) {
|
||||
@@ -1103,6 +1110,43 @@ export class BrokerClient {
|
||||
error: `${msg.code}: ${msg.message}`,
|
||||
});
|
||||
this.pendingSends.delete(id);
|
||||
handledByPendingSend = true;
|
||||
}
|
||||
}
|
||||
if (!handledByPendingSend) {
|
||||
// Best-effort: unblock the first waiting resolver so callers don't
|
||||
// hang for 5s. We don't know which tool triggered the error, so we
|
||||
// pop the first non-empty resolver queue in priority order.
|
||||
if (this.stateResolvers.length > 0) {
|
||||
this.stateResolvers.shift()!(null);
|
||||
} else if (this.stateListResolvers.length > 0) {
|
||||
this.stateListResolvers.shift()!([]);
|
||||
} else if (this.memoryStoreResolvers.length > 0) {
|
||||
this.memoryStoreResolvers.shift()!(null);
|
||||
} else if (this.memoryRecallResolvers.length > 0) {
|
||||
this.memoryRecallResolvers.shift()!([]);
|
||||
} else if (this.fileUrlResolvers.length > 0) {
|
||||
this.fileUrlResolvers.shift()!(null);
|
||||
} else if (this.fileListResolvers.length > 0) {
|
||||
this.fileListResolvers.shift()!([]);
|
||||
} else if (this.fileStatusResolvers.length > 0) {
|
||||
this.fileStatusResolvers.shift()!([]);
|
||||
} else if (this.graphResultResolvers.length > 0) {
|
||||
this.graphResultResolvers.shift()!([]);
|
||||
} else if (this.vectorStoredResolvers.length > 0) {
|
||||
this.vectorStoredResolvers.shift()!(null);
|
||||
} else if (this.vectorResultsResolvers.length > 0) {
|
||||
this.vectorResultsResolvers.shift()!([]);
|
||||
} else if (this.taskListResolvers.length > 0) {
|
||||
this.taskListResolvers.shift()!([]);
|
||||
} else if (this.meshQueryResolvers.length > 0) {
|
||||
this.meshQueryResolvers.shift()!(null);
|
||||
} else if (this.contextResultsResolvers.length > 0) {
|
||||
this.contextResultsResolvers.shift()!([]);
|
||||
} else if (this.contextListResolvers.length > 0) {
|
||||
this.contextListResolvers.shift()!([]);
|
||||
} else if (this.streamListResolvers.length > 0) {
|
||||
this.streamListResolvers.shift()!([]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint
|
||||
ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint
|
||||
CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id");
|
||||
@@ -353,24 +353,37 @@ export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({
|
||||
}));
|
||||
|
||||
/**
|
||||
* Per-peer context snapshot. Each peer (presence) has at most one context
|
||||
* Per-peer context snapshot. Each peer (member) has at most one context
|
||||
* entry per mesh, upserted on each share_context call. Allows peers to
|
||||
* discover what others are working on, which files they've read, and
|
||||
* key findings — without sending a direct message.
|
||||
*
|
||||
* `memberId` is the stable upsert key (survives reconnects). `presenceId`
|
||||
* is kept for backwards-compat but is nullable — new rows should always
|
||||
* populate `memberId`. The unique index on (meshId, memberId) prevents
|
||||
* stale rows from accumulating when a session reconnects with a new
|
||||
* ephemeral presenceId.
|
||||
*/
|
||||
export const meshContext = meshSchema.table("context", {
|
||||
id: text().primaryKey().notNull().$defaultFn(generateId),
|
||||
meshId: text()
|
||||
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
|
||||
.notNull(),
|
||||
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
|
||||
peerName: text(),
|
||||
summary: text().notNull(),
|
||||
filesRead: text().array().default([]),
|
||||
keyFindings: text().array().default([]),
|
||||
tags: text().array().default([]),
|
||||
updatedAt: timestamp().defaultNow().notNull(),
|
||||
});
|
||||
export const meshContext = meshSchema.table(
|
||||
"context",
|
||||
{
|
||||
id: text().primaryKey().notNull().$defaultFn(generateId),
|
||||
meshId: text()
|
||||
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
|
||||
.notNull(),
|
||||
memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }),
|
||||
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
|
||||
peerName: text(),
|
||||
summary: text().notNull(),
|
||||
filesRead: text().array().default([]),
|
||||
keyFindings: text().array().default([]),
|
||||
tags: text().array().default([]),
|
||||
updatedAt: timestamp().defaultNow().notNull(),
|
||||
},
|
||||
(table) => [
|
||||
uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId),
|
||||
],
|
||||
);
|
||||
|
||||
/**
|
||||
* Mesh-scoped task board. Peers can create tasks, claim them, and mark
|
||||
|
||||
Reference in New Issue
Block a user