feat(broker): encrypt env vars at rest, restore on reboot
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

- broker-crypto.ts: AES-256-GCM encrypt/decrypt with BROKER_ENCRYPTION_KEY
- mcp_deploy stores env as _encryptedEnv in mesh.service.config (no plaintext in DB)
- boot restore: decrypts _encryptedEnv and re-spawns services via service-manager
- auto-generates ephemeral key if BROKER_ENCRYPTION_KEY not set (logs warning)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-08 12:25:48 +01:00
parent 75ca892ea7
commit 070a3b7422
3 changed files with 122 additions and 2 deletions

View File

@@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws";
import { and, eq, isNull, sql } from "drizzle-orm";
import { env } from "./env";
import { db } from "./db";
import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
import { mesh, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
import {
claimTask,
completeTask,
@@ -79,6 +79,7 @@ import {
getService,
listDbMeshServices,
deleteService,
getRunningServices,
} from "./broker";
import * as serviceManager from "./service-manager";
import { ensureBucket, meshBucketName, minioClient } from "./minio";
@@ -3178,12 +3179,19 @@ function handleConnection(ws: WebSocket): void {
sendError(ws, "limit", `max ${env.MAX_SERVICES_PER_MESH} services per mesh`, undefined, md._reqId);
break;
}
// Encrypt env vars at rest (broker-side AES-256-GCM)
const deployConfig = { ...(md.config ?? {}) };
if (deployConfig.env && Object.keys(deployConfig.env).length > 0) {
const { encryptForStorage } = await import("./broker-crypto");
deployConfig._encryptedEnv = encryptForStorage(JSON.stringify(deployConfig.env));
delete deployConfig.env; // don't store plaintext in DB
}
await upsertService(conn.meshId, md.server_name, {
type: "mcp", sourceType: md.source.type, description: `MCP server: ${md.server_name}`,
sourceFileId: md.source.type === "zip" ? md.source.file_id : undefined,
sourceGitUrl: md.source.type === "git" ? md.source.url : undefined,
sourceGitBranch: md.source.type === "git" ? md.source.branch : undefined,
runtime: md.config?.runtime, status: "building", config: md.config ?? {},
runtime: md.config?.runtime, status: "building", config: deployConfig,
scope: md.scope ?? "peer", deployedBy: conn.memberId, deployedByName: conn.displayName,
});
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "building", _reqId: md._reqId } as any);
@@ -3564,6 +3572,49 @@ function main(): void {
startDbHealth();
serviceManager.startHealthChecks();
// Restore managed services that were running before broker restart
(async () => {
try {
const { decryptFromStorage } = await import("./broker-crypto");
// Get all meshes with running services
const allMeshes = await db.select({ id: mesh.id }).from(mesh);
for (const m of allMeshes) {
const running = await getRunningServices(m.id);
if (running.length === 0) continue;
log.info("restoring services for mesh", { mesh_id: m.id, count: running.length });
for (const svc of running) {
try {
const config = (svc.config as Record<string, unknown>) ?? {};
// Decrypt env vars if stored encrypted
let resolvedEnv: Record<string, string> | undefined;
if (config._encryptedEnv) {
const decrypted = decryptFromStorage(config._encryptedEnv as string);
if (decrypted) {
resolvedEnv = JSON.parse(decrypted);
} else {
log.warn("failed to decrypt env for service", { service: svc.name });
}
}
const sourcePath = `${env.CLAUDEMESH_SERVICES_DIR}/${m.id}/${svc.name}/source`;
await serviceManager.deploy({
meshId: m.id,
name: svc.name,
sourcePath,
config: { runtime: svc.runtime as any, ...(config.memory_mb ? { memory_mb: config.memory_mb as number } : {}) },
resolvedEnv,
});
log.info("service restored", { service: svc.name, mesh_id: m.id });
} catch (e) {
log.error("service restore failed", { service: svc.name, error: e instanceof Error ? e.message : String(e) });
await updateServiceStatus(m.id, svc.name, "failed");
}
}
}
} catch (e) {
log.error("service restore error", { error: e instanceof Error ? e.message : String(e) });
}
})();
// Ensure audit log table exists and load hash chain state
ensureAuditLogTable()
.then(() => loadLastHashes())