diff --git a/.changeset/repair-pglite-corrupt-wal.md b/.changeset/repair-pglite-corrupt-wal.md new file mode 100644 index 000000000..686a2961e --- /dev/null +++ b/.changeset/repair-pglite-corrupt-wal.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite': patch +--- + +Recover NodeFS data directories from corrupt WAL/checkpoint startup failures by resetting WAL in place and retrying startup once, preserving existing data files instead of requiring a fresh database. diff --git a/packages/pglite/src/fs/base.ts b/packages/pglite/src/fs/base.ts index 014cca306..342831a2c 100644 --- a/packages/pglite/src/fs/base.ts +++ b/packages/pglite/src/fs/base.ts @@ -43,6 +43,13 @@ export interface Filesystem { * Close the filesystem. */ closeFs(): Promise + + /** + * Repair a data directory whose WAL is corrupt but whose data files should be + * preserved. Filesystems without direct persistent storage may leave this + * unsupported. + */ + repairWal?(): Promise<{ dataDir: string }> } /** diff --git a/packages/pglite/src/fs/nodefs.ts b/packages/pglite/src/fs/nodefs.ts index 2dbc94d66..fed4b9e4e 100644 --- a/packages/pglite/src/fs/nodefs.ts +++ b/packages/pglite/src/fs/nodefs.ts @@ -4,6 +4,7 @@ import { EmscriptenBuiltinFilesystem } from './base.js' import type { PostgresMod } from '../postgresMod.js' import { PGlite } from '../pglite.js' import { PGDATA } from '../initdb.js' +import { resetWal } from './pgResetWal.js' export class NodeFS extends EmscriptenBuiltinFilesystem { protected rootDir: string @@ -35,4 +36,9 @@ export class NodeFS extends EmscriptenBuiltinFilesystem { async closeFs(): Promise { this.pg!.Module.FS.quit() } + + async repairWal(): Promise<{ dataDir: string }> { + await resetWal(this.rootDir) + return { dataDir: this.rootDir } + } } diff --git a/packages/pglite/src/fs/pgResetWal.ts b/packages/pglite/src/fs/pgResetWal.ts new file mode 100644 index 000000000..a447b4dfb --- /dev/null +++ b/packages/pglite/src/fs/pgResetWal.ts @@ -0,0 +1,240 @@ +import { existsSync } from 'fs' +import { mkdir, open, readdir, readFile, unlink } from 'fs/promises' +import { join } from 'path' + +const PG_CONTROL_FILE_SIZE = 8192 +const PG_CONTROL_VERSION = 1700 +const DB_SHUTDOWNED = 1 +const XLOG_BLCKSZ = 8192 +const MIN_WAL_SEG_SIZE = 1024 * 1024 +const MAX_WAL_SEG_SIZE = 1024 * 1024 * 1024 +const SIZE_OF_XLOG_LONG_PHD = 40 +const SIZE_OF_XLOG_RECORD = 24 +const SIZE_OF_CHECKPOINT = 88 +const XLOG_PAGE_MAGIC = 0xd116 +const XLP_LONG_HEADER = 0x0002 +const XLOG_CHECKPOINT_SHUTDOWN = 0x00 +const XLR_BLOCK_ID_DATA_SHORT = 255 +const RM_XLOG_ID = 0 + +const OFF = { + // PGlite 0.4.x ships PostgreSQL 17; these are ControlFileData offsets for + // that layout. + systemIdentifier: 0, + pgControlVersion: 8, + state: 16, + time: 24, + checkPoint: 32, + checkPointCopy: 40, + checkPointCopyRedo: 40, + checkPointCopyThisTimeLineID: 48, + checkPointCopyTime: 104, + minRecoveryPoint: 136, + minRecoveryPointTLI: 144, + backupStartPoint: 152, + backupEndPoint: 160, + backupEndRequired: 168, + walLevel: 172, + walLogHints: 176, + maxConnections: 180, + maxWorkerProcesses: 184, + maxWalSenders: 188, + maxPreparedXacts: 192, + maxLocksPerXact: 196, + trackCommitTimestamp: 200, + xlogBlcksz: 224, + xlogSegSize: 228, + crc: 288, +} as const + +const crcTable = new Uint32Array(256) +for (let i = 0; i < 256; i++) { + let crc = i + for (let j = 0; j < 8; j++) { + crc = crc & 1 ? (crc >>> 1) ^ 0x82f63b78 : crc >>> 1 + } + crcTable[i] = crc >>> 0 +} + +function crc32c(chunks: Uint8Array[]) { + let crc = 0xffffffff + for (const chunk of chunks) { + for (const byte of chunk) { + crc = (crc >>> 8) ^ crcTable[(crc ^ byte) & 0xff] + } + } + return (crc ^ 0xffffffff) >>> 0 +} + +function readUInt64LE(buf: Buffer, offset: number) { + return buf.readBigUInt64LE(offset) +} + +function writeUInt64LE(buf: Buffer, value: bigint, offset: number) { + buf.writeBigUInt64LE(value, offset) +} + +function parseWalSegNo(fileName: string, walSegSize: number) { + if (!/^[0-9A-F]{24}$/.test(fileName)) return null + const log = BigInt(`0x${fileName.slice(8, 16)}`) + const seg = BigInt(`0x${fileName.slice(16, 24)}`) + return log * (0x100000000n / BigInt(walSegSize)) + seg +} + +function xlogFileName(tli: number, segNo: bigint, walSegSize: number) { + const segmentsPerXlogId = 0x100000000n / BigInt(walSegSize) + const log = segNo / segmentsPerXlogId + const seg = segNo % segmentsPerXlogId + return [ + tli.toString(16).toUpperCase().padStart(8, '0'), + log.toString(16).toUpperCase().padStart(8, '0'), + seg.toString(16).toUpperCase().padStart(8, '0'), + ].join('') +} + +async function unlinkIfExists(path: string) { + await unlink(path).catch((error: NodeJS.ErrnoException) => { + if (error.code !== 'ENOENT') throw error + }) +} + +async function writeFileSynced(path: string, data: Buffer) { + const file = await open(path, 'w') + try { + await file.writeFile(data) + await file.sync() + } finally { + await file.close() + } +} + +export async function resetWal(rootDir: string) { + await unlinkIfExists(join(rootDir, 'postmaster.pid')) + + const pgVersion = (await readFile(join(rootDir, 'PG_VERSION'), 'utf8')).trim() + if (pgVersion !== '17') { + throw new Error(`Cannot reset WAL for unsupported PG_VERSION ${pgVersion}`) + } + + const controlPath = join(rootDir, 'global', 'pg_control') + const control = Buffer.from(await readFile(controlPath)) + if (control.length !== PG_CONTROL_FILE_SIZE) { + throw new Error(`Unexpected pg_control size ${control.length}`) + } + if (control.readUInt32LE(OFF.pgControlVersion) !== PG_CONTROL_VERSION) { + throw new Error('Unsupported pg_control version') + } + + const walSegSize = control.readUInt32LE(OFF.xlogSegSize) + const xlogBlcksz = control.readUInt32LE(OFF.xlogBlcksz) + if ( + walSegSize < MIN_WAL_SEG_SIZE || + walSegSize > MAX_WAL_SEG_SIZE || + (walSegSize & (walSegSize - 1)) !== 0 || + 0x100000000 % walSegSize !== 0 + ) { + throw new Error(`Unsupported WAL segment size ${walSegSize}`) + } + if (xlogBlcksz !== XLOG_BLCKSZ) { + throw new Error(`Unsupported WAL block size ${xlogBlcksz}`) + } + + const tli = control.readUInt32LE(OFF.checkPointCopyThisTimeLineID) + let newSegNo = + readUInt64LE(control, OFF.checkPointCopyRedo) / BigInt(walSegSize) + const walDir = join(rootDir, 'pg_wal') + await mkdir(join(walDir, 'archive_status'), { recursive: true }) + for (const file of await readdir(walDir)) { + const segNo = parseWalSegNo(file, walSegSize) + if (segNo !== null && segNo > newSegNo) { + newSegNo = segNo + } + } + newSegNo += 1n + + const redo = newSegNo * BigInt(walSegSize) + BigInt(SIZE_OF_XLOG_LONG_PHD) + const now = BigInt(Math.floor(Date.now() / 1000)) + + writeUInt64LE(control, redo, OFF.checkPointCopyRedo) + writeUInt64LE(control, now, OFF.checkPointCopyTime) + control.writeInt32LE(DB_SHUTDOWNED, OFF.state) + writeUInt64LE(control, now, OFF.time) + writeUInt64LE(control, redo, OFF.checkPoint) + writeUInt64LE(control, 0n, OFF.minRecoveryPoint) + control.writeUInt32LE(0, OFF.minRecoveryPointTLI) + writeUInt64LE(control, 0n, OFF.backupStartPoint) + writeUInt64LE(control, 0n, OFF.backupEndPoint) + control.writeUInt8(0, OFF.backupEndRequired) + control.writeInt32LE(0, OFF.walLevel) + control.writeUInt8(0, OFF.walLogHints) + control.writeInt32LE(100, OFF.maxConnections) + control.writeInt32LE(8, OFF.maxWorkerProcesses) + control.writeInt32LE(10, OFF.maxWalSenders) + control.writeInt32LE(0, OFF.maxPreparedXacts) + control.writeInt32LE(64, OFF.maxLocksPerXact) + control.writeUInt8(0, OFF.trackCommitTimestamp) + control.writeUInt32LE(crc32c([control.subarray(0, OFF.crc)]), OFF.crc) + + for (const file of await readdir(walDir)) { + if (/^[0-9A-F]{24}(?:\.partial)?$/.test(file)) { + await unlink(join(walDir, file)) + } + } + + const archiveStatusDir = join(walDir, 'archive_status') + if (existsSync(archiveStatusDir)) { + for (const file of await readdir(archiveStatusDir)) { + if (/^[0-9A-F]{24}(?:\.partial)?\.(?:ready|done)$/.test(file)) { + await unlink(join(archiveStatusDir, file)) + } + } + } + const walSummaryDir = join(walDir, 'summaries') + if (existsSync(walSummaryDir)) { + for (const file of await readdir(walSummaryDir)) { + if (/^[0-9A-F]{40}\.summary$/.test(file)) { + await unlink(join(walSummaryDir, file)) + } + } + } + + const wal = Buffer.alloc(walSegSize) + wal.writeUInt16LE(XLOG_PAGE_MAGIC, 0) + wal.writeUInt16LE(XLP_LONG_HEADER, 2) + wal.writeUInt32LE(tli, 4) + writeUInt64LE(wal, redo - BigInt(SIZE_OF_XLOG_LONG_PHD), 8) + wal.writeUInt32LE(0, 16) + writeUInt64LE(wal, readUInt64LE(control, OFF.systemIdentifier), 24) + wal.writeUInt32LE(walSegSize, 32) + wal.writeUInt32LE(XLOG_BLCKSZ, 36) + + const recordOffset = SIZE_OF_XLOG_LONG_PHD + const recordTotalLength = SIZE_OF_XLOG_RECORD + 2 + SIZE_OF_CHECKPOINT + wal.writeUInt32LE(recordTotalLength, recordOffset) + wal.writeUInt32LE(0, recordOffset + 4) + writeUInt64LE(wal, 0n, recordOffset + 8) + wal.writeUInt8(XLOG_CHECKPOINT_SHUTDOWN, recordOffset + 16) + wal.writeUInt8(RM_XLOG_ID, recordOffset + 17) + wal.writeUInt16LE(0, recordOffset + 18) + wal.writeUInt8(XLR_BLOCK_ID_DATA_SHORT, recordOffset + SIZE_OF_XLOG_RECORD) + wal.writeUInt8(SIZE_OF_CHECKPOINT, recordOffset + SIZE_OF_XLOG_RECORD + 1) + control.copy( + wal, + recordOffset + SIZE_OF_XLOG_RECORD + 2, + OFF.checkPointCopy, + OFF.checkPointCopy + SIZE_OF_CHECKPOINT, + ) + + const record = wal.subarray(recordOffset, recordOffset + recordTotalLength) + const recordCrc = crc32c([ + record.subarray(SIZE_OF_XLOG_RECORD), + record.subarray(0, 20), + ]) + wal.writeUInt32LE(recordCrc, recordOffset + 20) + + await writeFileSynced( + join(walDir, xlogFileName(tli, newSegNo, walSegSize)), + wal, + ) + await writeFileSynced(controlPath, control) +} diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index aa4c4a10a..a4cad88bb 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -85,6 +85,7 @@ export interface DumpDataDirResult { export interface PGliteOptions { noInitDb?: boolean dataDir?: string + dataDirRepair?: 'auto' | 'none' username?: string database?: string fs?: Filesystem @@ -107,6 +108,7 @@ export type PGliteInterface = readonly debug: DebugLevel readonly ready: boolean readonly closed: boolean + readonly repairedDataDir?: string close(): Promise query( diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 297efd2b8..6388e2aa8 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -36,6 +36,54 @@ import { pglUtils } from '@electric-sql/pglite-utils' const postgresExePath = '/pglite/bin/postgres' const initdbExePath = '/pglite/bin/initdb' +const maxCapturedStartupMessages = 100 + +function captureStartupMessage(messages: string[], text: string) { + for (const line of text.split(/\r?\n/)) { + if (!line) continue + messages.push(line) + if (messages.length > maxCapturedStartupMessages) { + messages.shift() + } + } +} + +function createStartupError(error: unknown, messages: string[]) { + const details = messages.length + ? `\nPostgres startup logs:\n${messages.join('\n')}` + : '' + const startupError = new Error( + `PGlite failed to start Postgres.${details}`, + ) as Error & { cause?: unknown } + startupError.name = 'PGliteStartupError' + startupError.cause = error + return startupError +} + +function shouldRepairWal(options: PGliteOptions, messages: string[]) { + if (options.dataDirRepair === 'none') { + return false + } + const message = messages.join('\n') + return ( + message.includes('invalid checkpoint record') || + message.includes('could not locate a valid checkpoint record') || + message.includes('could not read from WAL segment') + ) +} + +function getProcessExitCode(): typeof process.exitCode | undefined { + if (typeof process === 'undefined') { + return undefined + } + return process.exitCode +} + +function restoreProcessExitCode(exitCode: typeof process.exitCode | undefined) { + if (typeof process !== 'undefined') { + process.exitCode = exitCode ?? 0 + } +} export class PGlite extends BasePGlite @@ -53,6 +101,7 @@ export class PGlite } readonly dataDir?: string + repairedDataDir?: string #ready = false #closing = false @@ -183,7 +232,7 @@ export class PGlite this.#extensions = options.extensions ?? {} // Initialize the database, and store the promise so we can wait for it to be ready - this.waitReady = this.#init(options ?? {}) + this.waitReady = this.#init(options ?? {}, true) } /** @@ -255,13 +304,20 @@ export class PGlite * Initialize the database * @returns A promise that resolves when the database is ready */ - async #init(options: PGliteOptions) { + async #init(options: PGliteOptions, allowWalRepair: boolean) { if (options.fs) { this.fs = options.fs } else { const { dataDir, fsType } = parseDataDir(options.dataDir) this.fs = await loadFs(dataDir, fsType) } + const startupMessages: string[] = [] + let captureStartupMessages = true + const recordStartupMessage = (text: string) => { + if (captureStartupMessages) { + captureStartupMessage(startupMessages, text) + } + } const extensionBundlePromises: Record> = {} const extensionInitFns: Array<() => Promise> = [] @@ -316,9 +372,11 @@ export class PGlite // Provide a stdin that returns EOF to avoid browser prompt stdin: () => null, print: (text: string) => { + recordStartupMessage(text) this.#print(text) }, printErr: (text: string) => { + recordStartupMessage(text) this.#printErr(text) }, instantiateWasm: (imports, successCallback) => { @@ -543,15 +601,35 @@ export class PGlite // Start compiling dynamic extensions present in FS. await loadExtensions(this.mod, (...args) => this.#log(...args)) - this.mod!._pgl_setPGliteActive(1) - this.#startInSingleMode({ - pgDataFolder: PGDATA, - startParams: [ - ...(options.startParams || PGlite.defaultStartParams), - ...(this.debug ? ['-d', this.debug.toString()] : []), - ], - }) - this.#setPGliteActive() + const exitCodeBeforeStart = getProcessExitCode() + try { + this.mod!._pgl_setPGliteActive(1) + this.#startInSingleMode({ + pgDataFolder: PGDATA, + startParams: [ + ...(options.startParams || PGlite.defaultStartParams), + ...(this.debug ? ['-d', this.debug.toString()] : []), + ], + }) + this.#setPGliteActive() + } catch (e) { + restoreProcessExitCode(exitCodeBeforeStart) + const startupError = createStartupError(e, startupMessages) + if (allowWalRepair && shouldRepairWal(options, startupMessages)) { + const repaired = await this.#repairWal() + if (repaired) { + this.repairedDataDir = repaired.dataDir + console.warn( + `PGlite reset corrupted WAL in ${repaired.dataDir} and restarted. Data files were preserved, but uncheckpointed transactions may be lost.`, + ) + await this.#init(options, false) + return + } + } + throw startupError + } finally { + captureStartupMessages = false + } this.#ready = true @@ -569,6 +647,31 @@ export class PGlite } } + async #repairWal() { + if (!this.fs?.repairWal) { + return + } + + try { + await this.fs.closeFs() + } catch { + // The runtime may already be aborting; WAL repair only needs the + // persistent data directory. + } + + try { + this.mod?._emscripten_force_exit(0) + } catch { + // Ignore abort-state cleanup failures; the current module is discarded. + } + + this.mod = undefined + this.#ready = false + this.#running = false + + return this.fs.repairWal() + } + #onRuntimeInitialized(mod: PostgresMod) { // we override system() to intercept any calls that might generate unexpected output this.#system_fn = mod.addFunction((cmd_ptr: number) => { diff --git a/packages/pglite/tests/startup-wal-repair.test.ts b/packages/pglite/tests/startup-wal-repair.test.ts new file mode 100644 index 000000000..04b70ada6 --- /dev/null +++ b/packages/pglite/tests/startup-wal-repair.test.ts @@ -0,0 +1,69 @@ +import { describe, it, expect } from 'vitest' +import { mkdtemp, readdir, rm, truncate } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { testEsmCjsAndDTC } from './test-utils.ts' + +async function createCorruptedDataDir() { + const dataDir = await mkdtemp(join(tmpdir(), 'pglite-startup-wal-repair-')) + + const { PGlite } = await import('../dist/index.js') + const db = await PGlite.create(dataDir) + await db.query('CREATE TABLE keep_me (id int primary key, value text)') + await db.query("INSERT INTO keep_me VALUES (1, 'still here')") + await db.close() + + const walDir = join(dataDir, 'pg_wal') + const walFile = (await readdir(walDir)).find((file) => + /^[0-9A-F]{24}$/.test(file), + ) + expect(walFile).toBeDefined() + await truncate(join(walDir, walFile!), 1024) + + return dataDir +} + +await testEsmCjsAndDTC(async (importType) => { + const { PGlite } = + importType === 'esm' + ? await import('../dist/index.js') + : ((await import( + '../dist/index.cjs' + )) as unknown as typeof import('../dist/index.js')) + + describe('startup WAL repair', () => { + it('resets corrupt WAL in place and preserves existing data', async () => { + const dataDir = await createCorruptedDataDir() + try { + const exitCode = process.exitCode + const recovered = await PGlite.create(dataDir) + expect(recovered.repairedDataDir).toBe(dataDir) + await expect(recovered.query('SELECT * FROM keep_me')).resolves.toEqual( + { + rows: [{ id: 1, value: 'still here' }], + fields: [ + { name: 'id', dataTypeID: 23 }, + { name: 'value', dataTypeID: 25 }, + ], + affectedRows: 0, + }, + ) + await recovered.close() + expect(process.exitCode).toBe(exitCode ?? 0) + } finally { + await rm(dataDir, { recursive: true, force: true }) + } + }) + + it('can disable automatic WAL repair', async () => { + const dataDir = await createCorruptedDataDir() + try { + await expect( + PGlite.create({ dataDir, dataDirRepair: 'none' }), + ).rejects.toThrow(/PGlite failed to start Postgres/) + } finally { + await rm(dataDir, { recursive: true, force: true }) + } + }) + }) +})