feat(broker): filename-tracked migration runner replaces drizzle's
drizzle's _journal.json drifted to idx=11 while the file system had 25 .sql files; the prod drizzle.__drizzle_migrations table was further behind with 3 rows. The runtime migrator silently skipped anything outside the journal, so every new schema change required psql -f by hand. The new runner tracks applied files in mesh.__cmh_migrations (filename PK + sha256 + applied_at). On startup it bootstraps the tracking table inline, lists migrations/*.sql lexicographically, filters out already-applied files, and runs the rest in transaction order under the existing pg_advisory_lock. SHA mismatches on already-applied files emit a warning but don't fail (cosmetic edits are common); production drift detection lives elsewhere. Bootstrap script at apps/broker/scripts/bootstrap-cmh-migrations.ts computes file hashes and seeds the tracking table — already run against prod with all 25 current files registered as applied. Future deploys pick up only truly new migrations. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,30 +1,57 @@
|
||||
/**
|
||||
* Runtime migrations on broker startup.
|
||||
*
|
||||
* Runs pending drizzle migrations against DATABASE_URL before the broker
|
||||
* listens. Uses pg_try_advisory_lock with retry+timeout so a stuck old
|
||||
* instance can't block new deploys indefinitely (the original
|
||||
* pg_advisory_lock version matched the "stuck 12h" symptom perfectly —
|
||||
* an old container held the lock and the new deploy waited forever).
|
||||
* Replaced drizzle's migrator with a filename-tracked runner because
|
||||
* drizzle's _journal.json drifted on the filesystem (last entry was
|
||||
* idx=11; idx 12-24 were never recorded), and the prod
|
||||
* drizzle.__drizzle_migrations table was even further behind (3 rows
|
||||
* for 25 files). The runtime migrator silently skipped anything
|
||||
* outside the journal, so every new schema change required `psql -f`
|
||||
* by hand.
|
||||
*
|
||||
* If migrations fail OR the lock can't be acquired within the timeout,
|
||||
* the process exits non-zero so the orchestrator (Coolify healthcheck)
|
||||
* sees the container as broken and doesn't route traffic to it.
|
||||
* The new runner tracks applied files in `mesh.__cmh_migrations`
|
||||
* (filename + sha256 + applied_at). On startup:
|
||||
* 1. Acquire advisory lock (unchanged)
|
||||
* 2. CREATE TABLE IF NOT EXISTS for the tracking table
|
||||
* 3. Read applied filenames from the table
|
||||
* 4. List `migrations/*.sql` lexicographically; filter out applied
|
||||
* 5. For each unapplied: BEGIN; execute file; INSERT row; COMMIT
|
||||
* 6. For each applied: optionally verify sha matches; warn (don't
|
||||
* fail) on mismatch — devs reformat migrations sometimes
|
||||
*
|
||||
* Bootstrap: run `apps/broker/scripts/bootstrap-cmh-migrations.ts`
|
||||
* against an existing prod DB to seed the tracking table with the
|
||||
* currently-applied set. Without that, the runner would try to
|
||||
* re-apply 0000-0024 and fail on duplicate-table errors.
|
||||
*
|
||||
* Failure modes (all exit non-zero so Coolify healthcheck fails closed):
|
||||
* - DATABASE_URL missing
|
||||
* - lock acquisition timeout
|
||||
* - migration SQL error mid-application
|
||||
*/
|
||||
|
||||
import { drizzle } from "drizzle-orm/postgres-js";
|
||||
import { migrate } from "drizzle-orm/postgres-js/migrator";
|
||||
import postgres from "postgres";
|
||||
import { join } from "node:path";
|
||||
import { existsSync, readdirSync } from "node:fs";
|
||||
import { existsSync, readdirSync, readFileSync } from "node:fs";
|
||||
import { createHash } from "node:crypto";
|
||||
|
||||
const LOCK_ID = 74737_73831; // "cmsh" ascii — stable magic constant
|
||||
|
||||
/** Max total time to wait for the advisory lock before giving up. */
|
||||
const LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
|
||||
/** Poll interval when lock is held by another instance. */
|
||||
const LOCK_RETRY_INTERVAL_MS = 2_000;
|
||||
|
||||
const TRACKING_TABLE_DDL = `
|
||||
CREATE SCHEMA IF NOT EXISTS mesh;
|
||||
CREATE TABLE IF NOT EXISTS mesh.__cmh_migrations (
|
||||
filename TEXT PRIMARY KEY,
|
||||
sha256 TEXT NOT NULL,
|
||||
applied_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||||
);
|
||||
`;
|
||||
|
||||
function sha256Hex(content: string): string {
|
||||
return createHash("sha256").update(content).digest("hex");
|
||||
}
|
||||
|
||||
export async function runMigrationsOnStartup(): Promise<void> {
|
||||
const url = process.env.DATABASE_URL;
|
||||
if (!url) {
|
||||
@@ -43,20 +70,17 @@ export async function runMigrationsOnStartup(): Promise<void> {
|
||||
console.error("[migrate] migrations folder not found — skipping. Searched:", candidates);
|
||||
return;
|
||||
}
|
||||
const count = readdirSync(migrationsFolder).filter((f) => f.endsWith(".sql")).length;
|
||||
console.log(`[migrate] ${count} migration files at ${migrationsFolder}`);
|
||||
|
||||
const sql = postgres(url, {
|
||||
max: 1,
|
||||
onnotice: () => { /* quiet */ },
|
||||
});
|
||||
const allFiles = readdirSync(migrationsFolder)
|
||||
.filter((f) => f.endsWith(".sql"))
|
||||
.sort(); // lexicographic = numeric for 0000_*..9999_*
|
||||
console.log(`[migrate] ${allFiles.length} migration files at ${migrationsFolder}`);
|
||||
|
||||
const sql = postgres(url, { max: 1, onnotice: () => {} });
|
||||
|
||||
try {
|
||||
// SET doesn't accept parameterized values ($1) — use unsafe() for
|
||||
// the literal. The value is a hardcoded constant, not user input.
|
||||
await sql.unsafe(`SET lock_timeout = '${LOCK_ACQUIRE_TIMEOUT_MS}ms'`);
|
||||
|
||||
// Try to grab the advisory lock; poll if someone else holds it.
|
||||
const deadline = Date.now() + LOCK_ACQUIRE_TIMEOUT_MS;
|
||||
let locked = false;
|
||||
while (Date.now() < deadline) {
|
||||
@@ -76,10 +100,58 @@ export async function runMigrationsOnStartup(): Promise<void> {
|
||||
}
|
||||
|
||||
try {
|
||||
const db = drizzle(sql);
|
||||
const start = Date.now();
|
||||
await migrate(db, { migrationsFolder });
|
||||
console.log(`[migrate] ok (${Date.now() - start}ms)`);
|
||||
// Bootstrap the tracking table itself. Idempotent.
|
||||
await sql.unsafe(TRACKING_TABLE_DDL);
|
||||
|
||||
const applied = await sql<{ filename: string; sha256: string }[]>`
|
||||
SELECT filename, sha256 FROM mesh.__cmh_migrations
|
||||
`;
|
||||
const appliedMap = new Map(applied.map((r) => [r.filename, r.sha256]));
|
||||
|
||||
const pending: Array<{ filename: string; sha: string; content: string }> = [];
|
||||
for (const filename of allFiles) {
|
||||
const path = join(migrationsFolder, filename);
|
||||
const content = readFileSync(path, "utf8");
|
||||
const sha = sha256Hex(content);
|
||||
const knownSha = appliedMap.get(filename);
|
||||
if (!knownSha) {
|
||||
pending.push({ filename, sha, content });
|
||||
} else if (knownSha !== sha) {
|
||||
// File content changed after application. Don't re-run; warn.
|
||||
// Hard-fail would block legit cosmetic edits (whitespace,
|
||||
// comments). Production drift detection lives elsewhere.
|
||||
console.warn(
|
||||
`[migrate] sha mismatch for ${filename} — file modified post-apply (was ${knownSha.slice(0, 12)}…, now ${sha.slice(0, 12)}…)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (pending.length === 0) {
|
||||
console.log(`[migrate] up to date · ${applied.length} applied`);
|
||||
} else {
|
||||
console.log(`[migrate] applying ${pending.length} pending: ${pending.map((p) => p.filename).join(", ")}`);
|
||||
for (const m of pending) {
|
||||
const start = Date.now();
|
||||
try {
|
||||
await sql.begin(async (tx) => {
|
||||
// drizzle migrations use `--> statement-breakpoint` to
|
||||
// separate statements; postgres-js can run a multi-stmt
|
||||
// script via .unsafe(), but transactional rollback wraps
|
||||
// everything as one unit which is what we want.
|
||||
await tx.unsafe(m.content);
|
||||
await tx`
|
||||
INSERT INTO mesh.__cmh_migrations (filename, sha256)
|
||||
VALUES (${m.filename}, ${m.sha})
|
||||
`;
|
||||
});
|
||||
console.log(`[migrate] ✓ ${m.filename} (${Date.now() - start}ms)`);
|
||||
} catch (e) {
|
||||
console.error(`[migrate] ✗ ${m.filename}:`, e instanceof Error ? e.message : e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
console.log(`[migrate] ok`);
|
||||
}
|
||||
} finally {
|
||||
await sql`SELECT pg_advisory_unlock(${LOCK_ID})`;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user