feat(db): runtime presence + message queue + pending_status tables for broker

- mesh.presence: live WS connection tracking (memberId, sessionId, pid,
  cwd, status, statusSource, statusUpdatedAt, connectedAt, lastPingAt,
  disconnectedAt). Persisted so broker can resume after restart.
- mesh.message_queue: E2E-encrypted envelopes awaiting delivery (meshId,
  senderMemberId, targetSpec, priority, nonce, ciphertext, expiresAt).
  Broker routes ciphertext only — crypto happens client-side.
- mesh.pending_status: first-turn race catcher keyed by (pid, cwd). No
  FK to member (member doesn't exist yet when hook fires pre-register).
- Enums: presence_status, presence_status_source, message_priority.
- Relations: mesh→messageQueue, member→presences+sentMessages,
  presence→member, messageQueue→mesh+sender.
- Cascade chain: mesh → member → presence + messageQueue (no orphans).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-04 21:28:24 +01:00
parent d5d0e6fdbb
commit e758205eb8

View File

@@ -35,6 +35,24 @@ export const meshTierEnum = schema.enum("tier", [
export const meshRoleEnum = schema.enum("role", ["admin", "member"]); export const meshRoleEnum = schema.enum("role", ["admin", "member"]);
export const presenceStatusEnum = schema.enum("presence_status", [
"idle",
"working",
"dnd",
]);
export const presenceStatusSourceEnum = schema.enum("presence_status_source", [
"hook",
"manual",
"jsonl",
]);
export const messagePriorityEnum = schema.enum("message_priority", [
"now",
"next",
"low",
]);
/** /**
* A mesh is a peer group of Claude Code sessions that can talk to each * A mesh is a peer group of Claude Code sessions that can talk to each
* other via the broker. Ownership is tied to a user; transport/tier * other via the broker. Ownership is tied to a user; transport/tier
@@ -113,6 +131,72 @@ export const auditLog = schema.table("audit_log", {
createdAt: timestamp().defaultNow().notNull(), createdAt: timestamp().defaultNow().notNull(),
}); });
/**
* Live WebSocket connection tracking for a member. One presence row per
* active Claude Code session: created on connect, updated on every
* heartbeat/hook signal, closed out (disconnectedAt set) on disconnect.
* Persisted so the broker can resume state after a restart.
*/
export const presence = schema.table("presence", {
id: text().primaryKey().notNull().$defaultFn(generateId),
memberId: text()
.references(() => member.id, { onDelete: "cascade", onUpdate: "cascade" })
.notNull(),
sessionId: text().notNull(),
pid: integer().notNull(),
cwd: text().notNull(),
status: presenceStatusEnum().notNull().default("idle"),
statusSource: presenceStatusSourceEnum().notNull().default("jsonl"),
statusUpdatedAt: timestamp().defaultNow().notNull(),
connectedAt: timestamp().defaultNow().notNull(),
lastPingAt: timestamp().defaultNow().notNull(),
disconnectedAt: timestamp(),
});
/**
* In-flight E2E-encrypted message envelopes awaiting delivery.
* The broker only ever sees ciphertext + routing metadata — the
* nonce+ciphertext pair is sealed with libsodium client-side.
*
* `targetSpec` is free-form text and can address: a specific member
* pubkey (direct message), a channel (`#general`), a tag (`tag:admins`),
* or a broadcast (`*`). Resolution happens in broker logic, not SQL.
*/
export const messageQueue = schema.table("message_queue", {
id: text().primaryKey().notNull().$defaultFn(generateId),
meshId: text()
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
.notNull(),
senderMemberId: text()
.references(() => member.id, { onDelete: "cascade", onUpdate: "cascade" })
.notNull(),
targetSpec: text().notNull(),
priority: messagePriorityEnum().notNull().default("next"),
nonce: text().notNull(),
ciphertext: text().notNull(),
createdAt: timestamp().defaultNow().notNull(),
deliveredAt: timestamp(),
expiresAt: timestamp(),
});
/**
* First-turn race handler: hook signals that fire BEFORE the peer has
* a registered mesh.member row get stashed here keyed by (pid, cwd),
* then applied to the member's presence on register. Swept after TTL.
*
* Intentionally NOT linked to member/mesh via FK — the whole point is
* that no member row exists yet when the hook fires.
*/
export const pendingStatus = schema.table("pending_status", {
id: text().primaryKey().notNull().$defaultFn(generateId),
pid: integer().notNull(),
cwd: text().notNull(),
status: text().notNull(),
statusSource: text().notNull(),
createdAt: timestamp().defaultNow().notNull(),
appliedAt: timestamp(),
});
export const meshRelations = relations(mesh, ({ one, many }) => ({ export const meshRelations = relations(mesh, ({ one, many }) => ({
owner: one(user, { owner: one(user, {
fields: [mesh.ownerUserId], fields: [mesh.ownerUserId],
@@ -121,9 +205,10 @@ export const meshRelations = relations(mesh, ({ one, many }) => ({
members: many(member), members: many(member),
invites: many(invite), invites: many(invite),
auditLogs: many(auditLog), auditLogs: many(auditLog),
messageQueue: many(messageQueue),
})); }));
export const memberRelations = relations(member, ({ one }) => ({ export const memberRelations = relations(member, ({ one, many }) => ({
mesh: one(mesh, { mesh: one(mesh, {
fields: [member.meshId], fields: [member.meshId],
references: [mesh.id], references: [mesh.id],
@@ -132,6 +217,26 @@ export const memberRelations = relations(member, ({ one }) => ({
fields: [member.userId], fields: [member.userId],
references: [user.id], references: [user.id],
}), }),
presences: many(presence),
sentMessages: many(messageQueue),
}));
export const presenceRelations = relations(presence, ({ one }) => ({
member: one(member, {
fields: [presence.memberId],
references: [member.id],
}),
}));
export const messageQueueRelations = relations(messageQueue, ({ one }) => ({
mesh: one(mesh, {
fields: [messageQueue.meshId],
references: [mesh.id],
}),
sender: one(member, {
fields: [messageQueue.senderMemberId],
references: [member.id],
}),
})); }));
export const inviteRelations = relations(invite, ({ one }) => ({ export const inviteRelations = relations(invite, ({ one }) => ({
@@ -160,6 +265,12 @@ export const selectInviteSchema = createSelectSchema(invite);
export const insertInviteSchema = createInsertSchema(invite); export const insertInviteSchema = createInsertSchema(invite);
export const selectAuditLogSchema = createSelectSchema(auditLog); export const selectAuditLogSchema = createSelectSchema(auditLog);
export const insertAuditLogSchema = createInsertSchema(auditLog); export const insertAuditLogSchema = createInsertSchema(auditLog);
export const selectPresenceSchema = createSelectSchema(presence);
export const insertPresenceSchema = createInsertSchema(presence);
export const selectMessageQueueSchema = createSelectSchema(messageQueue);
export const insertMessageQueueSchema = createInsertSchema(messageQueue);
export const selectPendingStatusSchema = createSelectSchema(pendingStatus);
export const insertPendingStatusSchema = createInsertSchema(pendingStatus);
export type SelectMesh = typeof mesh.$inferSelect; export type SelectMesh = typeof mesh.$inferSelect;
export type InsertMesh = typeof mesh.$inferInsert; export type InsertMesh = typeof mesh.$inferInsert;
@@ -169,3 +280,9 @@ export type SelectInvite = typeof invite.$inferSelect;
export type InsertInvite = typeof invite.$inferInsert; export type InsertInvite = typeof invite.$inferInsert;
export type SelectAuditLog = typeof auditLog.$inferSelect; export type SelectAuditLog = typeof auditLog.$inferSelect;
export type InsertAuditLog = typeof auditLog.$inferInsert; export type InsertAuditLog = typeof auditLog.$inferInsert;
export type SelectPresence = typeof presence.$inferSelect;
export type InsertPresence = typeof presence.$inferInsert;
export type SelectMessageQueue = typeof messageQueue.$inferSelect;
export type InsertMessageQueue = typeof messageQueue.$inferInsert;
export type SelectPendingStatus = typeof pendingStatus.$inferSelect;
export type InsertPendingStatus = typeof pendingStatus.$inferInsert;