fix(broker): shareContext stable upsert key + createStream atomic upsert
- shareContext: adds optional memberId param; when provided, upserts on (meshId, memberId) instead of (meshId, presenceId) — prevents stale context rows accumulating on every reconnect. Falls back to presenceId for legacy/anonymous connections. Also refreshes presenceId on update so it stays current. - schema: adds member_id column + unique index context_mesh_member_idx on mesh.context table; new migration 0013_context-stable-member-key.sql. - index.ts call site updated to pass conn.memberId as the stable key. - createStream: replaces SELECT-then-INSERT TOCTOU race with atomic INSERT ... ON CONFLICT DO NOTHING RETURNING, followed by SELECT on miss. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -955,8 +955,13 @@ export async function grantFileKey(
|
|||||||
// --- Context sharing ---
|
// --- Context sharing ---
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair
|
* Upsert a context snapshot for a peer. When `memberId` is provided the
|
||||||
* has at most one context row — repeated calls update it in place.
|
* row is keyed on (meshId, memberId) — a stable identifier that survives
|
||||||
|
* reconnects. This prevents stale rows from accumulating every time a
|
||||||
|
* session reconnects with a fresh ephemeral presenceId.
|
||||||
|
*
|
||||||
|
* Falls back to (meshId, presenceId) lookup when memberId is absent
|
||||||
|
* (e.g. legacy callers or anonymous connections).
|
||||||
*/
|
*/
|
||||||
export async function shareContext(
|
export async function shareContext(
|
||||||
meshId: string,
|
meshId: string,
|
||||||
@@ -966,24 +971,27 @@ export async function shareContext(
|
|||||||
filesRead?: string[],
|
filesRead?: string[],
|
||||||
keyFindings?: string[],
|
keyFindings?: string[],
|
||||||
tags?: string[],
|
tags?: string[],
|
||||||
|
memberId?: string,
|
||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
// Try to find existing context for this presence in this mesh.
|
|
||||||
|
// Build the WHERE clause: prefer stable memberId, fall back to presenceId.
|
||||||
|
const lookupWhere = memberId
|
||||||
|
? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId))
|
||||||
|
: and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId));
|
||||||
|
|
||||||
const [existing] = await db
|
const [existing] = await db
|
||||||
.select({ id: meshContext.id })
|
.select({ id: meshContext.id })
|
||||||
.from(meshContext)
|
.from(meshContext)
|
||||||
.where(
|
.where(lookupWhere)
|
||||||
and(
|
|
||||||
eq(meshContext.meshId, meshId),
|
|
||||||
eq(meshContext.presenceId, presenceId),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.limit(1);
|
.limit(1);
|
||||||
|
|
||||||
if (existing) {
|
if (existing) {
|
||||||
await db
|
await db
|
||||||
.update(meshContext)
|
.update(meshContext)
|
||||||
.set({
|
.set({
|
||||||
|
// Keep presenceId current so it reflects the latest connection.
|
||||||
|
presenceId,
|
||||||
peerName: peerName ?? null,
|
peerName: peerName ?? null,
|
||||||
summary,
|
summary,
|
||||||
filesRead: filesRead ?? [],
|
filesRead: filesRead ?? [],
|
||||||
@@ -999,6 +1007,7 @@ export async function shareContext(
|
|||||||
.insert(meshContext)
|
.insert(meshContext)
|
||||||
.values({
|
.values({
|
||||||
meshId,
|
meshId,
|
||||||
|
memberId: memberId ?? null,
|
||||||
presenceId,
|
presenceId,
|
||||||
peerName: peerName ?? null,
|
peerName: peerName ?? null,
|
||||||
summary,
|
summary,
|
||||||
@@ -1248,16 +1257,22 @@ export async function createStream(
|
|||||||
name: string,
|
name: string,
|
||||||
createdByName: string,
|
createdByName: string,
|
||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
const existing = await db
|
// Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race
|
||||||
|
// when two callers concurrently attempt to create the same stream.
|
||||||
|
const [inserted] = await db
|
||||||
|
.insert(meshStream)
|
||||||
|
.values({ meshId, name, createdByName })
|
||||||
|
.onConflictDoNothing()
|
||||||
|
.returning({ id: meshStream.id });
|
||||||
|
|
||||||
|
if (inserted) return inserted.id;
|
||||||
|
|
||||||
|
// Row already existed — fetch the id.
|
||||||
|
const [existing] = await db
|
||||||
.select({ id: meshStream.id })
|
.select({ id: meshStream.id })
|
||||||
.from(meshStream)
|
.from(meshStream)
|
||||||
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
|
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
|
||||||
if (existing.length > 0) return existing[0]!.id;
|
return existing!.id;
|
||||||
const [row] = await db
|
|
||||||
.insert(meshStream)
|
|
||||||
.values({ meshId, name, createdByName })
|
|
||||||
.returning({ id: meshStream.id });
|
|
||||||
return row!.id;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,3 @@
|
|||||||
|
ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint
|
||||||
|
ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint
|
||||||
|
CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id");
|
||||||
@@ -353,24 +353,37 @@ export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Per-peer context snapshot. Each peer (presence) has at most one context
|
* Per-peer context snapshot. Each peer (member) has at most one context
|
||||||
* entry per mesh, upserted on each share_context call. Allows peers to
|
* entry per mesh, upserted on each share_context call. Allows peers to
|
||||||
* discover what others are working on, which files they've read, and
|
* discover what others are working on, which files they've read, and
|
||||||
* key findings — without sending a direct message.
|
* key findings — without sending a direct message.
|
||||||
|
*
|
||||||
|
* `memberId` is the stable upsert key (survives reconnects). `presenceId`
|
||||||
|
* is kept for backwards-compat but is nullable — new rows should always
|
||||||
|
* populate `memberId`. The unique index on (meshId, memberId) prevents
|
||||||
|
* stale rows from accumulating when a session reconnects with a new
|
||||||
|
* ephemeral presenceId.
|
||||||
*/
|
*/
|
||||||
export const meshContext = meshSchema.table("context", {
|
export const meshContext = meshSchema.table(
|
||||||
id: text().primaryKey().notNull().$defaultFn(generateId),
|
"context",
|
||||||
meshId: text()
|
{
|
||||||
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
|
id: text().primaryKey().notNull().$defaultFn(generateId),
|
||||||
.notNull(),
|
meshId: text()
|
||||||
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
|
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
|
||||||
peerName: text(),
|
.notNull(),
|
||||||
summary: text().notNull(),
|
memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }),
|
||||||
filesRead: text().array().default([]),
|
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
|
||||||
keyFindings: text().array().default([]),
|
peerName: text(),
|
||||||
tags: text().array().default([]),
|
summary: text().notNull(),
|
||||||
updatedAt: timestamp().defaultNow().notNull(),
|
filesRead: text().array().default([]),
|
||||||
});
|
keyFindings: text().array().default([]),
|
||||||
|
tags: text().array().default([]),
|
||||||
|
updatedAt: timestamp().defaultNow().notNull(),
|
||||||
|
},
|
||||||
|
(table) => [
|
||||||
|
uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId),
|
||||||
|
],
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mesh-scoped task board. Peers can create tasks, claim them, and mark
|
* Mesh-scoped task board. Peers can create tasks, claim them, and mark
|
||||||
|
|||||||
Reference in New Issue
Block a user