diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 714553b..2e828de 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -955,8 +955,13 @@ export async function grantFileKey( // --- Context sharing --- /** - * Upsert a context snapshot for a peer. Each (meshId, presenceId) pair - * has at most one context row — repeated calls update it in place. + * Upsert a context snapshot for a peer. When `memberId` is provided the + * row is keyed on (meshId, memberId) — a stable identifier that survives + * reconnects. This prevents stale rows from accumulating every time a + * session reconnects with a fresh ephemeral presenceId. + * + * Falls back to (meshId, presenceId) lookup when memberId is absent + * (e.g. legacy callers or anonymous connections). */ export async function shareContext( meshId: string, @@ -966,24 +971,27 @@ export async function shareContext( filesRead?: string[], keyFindings?: string[], tags?: string[], + memberId?: string, ): Promise { const now = new Date(); - // Try to find existing context for this presence in this mesh. + + // Build the WHERE clause: prefer stable memberId, fall back to presenceId. + const lookupWhere = memberId + ? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId)) + : and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId)); + const [existing] = await db .select({ id: meshContext.id }) .from(meshContext) - .where( - and( - eq(meshContext.meshId, meshId), - eq(meshContext.presenceId, presenceId), - ), - ) + .where(lookupWhere) .limit(1); if (existing) { await db .update(meshContext) .set({ + // Keep presenceId current so it reflects the latest connection. + presenceId, peerName: peerName ?? null, summary, filesRead: filesRead ?? [], @@ -999,6 +1007,7 @@ export async function shareContext( .insert(meshContext) .values({ meshId, + memberId: memberId ?? null, presenceId, peerName: peerName ?? null, summary, @@ -1248,16 +1257,22 @@ export async function createStream( name: string, createdByName: string, ): Promise { - const existing = await db + // Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race + // when two callers concurrently attempt to create the same stream. + const [inserted] = await db + .insert(meshStream) + .values({ meshId, name, createdByName }) + .onConflictDoNothing() + .returning({ id: meshStream.id }); + + if (inserted) return inserted.id; + + // Row already existed — fetch the id. + const [existing] = await db .select({ id: meshStream.id }) .from(meshStream) .where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name))); - if (existing.length > 0) return existing[0]!.id; - const [row] = await db - .insert(meshStream) - .values({ meshId, name, createdByName }) - .returning({ id: meshStream.id }); - return row!.id; + return existing!.id; } /** diff --git a/packages/db/migrations/0013_context-stable-member-key.sql b/packages/db/migrations/0013_context-stable-member-key.sql new file mode 100644 index 0000000..9370048 --- /dev/null +++ b/packages/db/migrations/0013_context-stable-member-key.sql @@ -0,0 +1,3 @@ +ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint +ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint +CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id"); diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 9583bd7..3a549a7 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -353,24 +353,37 @@ export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({ })); /** - * Per-peer context snapshot. Each peer (presence) has at most one context + * Per-peer context snapshot. Each peer (member) has at most one context * entry per mesh, upserted on each share_context call. Allows peers to * discover what others are working on, which files they've read, and * key findings — without sending a direct message. + * + * `memberId` is the stable upsert key (survives reconnects). `presenceId` + * is kept for backwards-compat but is nullable — new rows should always + * populate `memberId`. The unique index on (meshId, memberId) prevents + * stale rows from accumulating when a session reconnects with a new + * ephemeral presenceId. */ -export const meshContext = meshSchema.table("context", { - id: text().primaryKey().notNull().$defaultFn(generateId), - meshId: text() - .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) - .notNull(), - presenceId: text().references(() => presence.id, { onDelete: "cascade" }), - peerName: text(), - summary: text().notNull(), - filesRead: text().array().default([]), - keyFindings: text().array().default([]), - tags: text().array().default([]), - updatedAt: timestamp().defaultNow().notNull(), -}); +export const meshContext = meshSchema.table( + "context", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }), + presenceId: text().references(() => presence.id, { onDelete: "cascade" }), + peerName: text(), + summary: text().notNull(), + filesRead: text().array().default([]), + keyFindings: text().array().default([]), + tags: text().array().default([]), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [ + uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId), + ], +); /** * Mesh-scoped task board. Peers can create tasks, claim them, and mark