diff --git a/packages/pglite/src/base.ts b/packages/pglite/src/base.ts index e18efb911..16ede0c11 100644 --- a/packages/pglite/src/base.ts +++ b/packages/pglite/src/base.ts @@ -1,5 +1,5 @@ import { query as queryTemplate } from './templating.js' -import { parseDescribeStatementResults, parseResults } from './parse.js' +import { parseDescribeStatementResults, parseResult } from './parse.js' import { type Serializer, type Parser, @@ -62,6 +62,7 @@ export abstract class BasePGlite abstract execProtocolStream( message: Uint8Array, { syncToFs, onNotice }: ExecProtocolOptions, + onBackendMessage?: (result: BackendMessage) => void, ): Promise /** @@ -165,11 +166,16 @@ export abstract class BasePGlite async #execProtocolNoSync( message: Uint8Array, options: ExecProtocolOptions = {}, + onBackendMessage?: (result: BackendMessage) => void, ): Promise { - const results = await this.execProtocolStream(message, { - ...options, - syncToFs: false, - }) + const results = await this.execProtocolStream( + message, + { + ...options, + syncToFs: false, + }, + onBackendMessage, + ) return results } @@ -193,13 +199,14 @@ export abstract class BasePGlite query: string, params?: any[], options?: QueryOptions, + onResult?: (results: Results) => void, ): Promise> { await this._checkReady() // We wrap the public query method in the transaction mutex to ensure that // only one query can be executed at a time and not concurrently with a // transaction. return await this._runExclusiveTransaction(async () => { - return await this.#runQuery(query, params, options) + return await this.#runQuery(query, params, options, onResult) }) } @@ -255,18 +262,28 @@ export abstract class BasePGlite query: string, params: any[] = [], options?: QueryOptions, + onResult?: (result: Results) => void, ): Promise> { return await this._runExclusiveQuery(async () => { // We need to parse, bind and execute a query with parameters this.#log('runQuery', query, params, options) await this._handleBlob(options?.blob) - let results = [] + const resultSet: Results = { + rows: [], + fields: [], + affectedRows: 0, + blob: undefined, + } + const onBackendMessage = (bm: BackendMessage) => { + this.#parseBackendMessage(bm, options, resultSet, onResult) + } try { - const parseResults = await this.#execProtocolNoSync( + await this.#execProtocolNoSync( serializeProtocol.parse({ text: query, types: options?.paramTypes }), options, + onBackendMessage, ) const dataTypeIDs = parseDescribeStatementResults( @@ -289,23 +306,23 @@ export abstract class BasePGlite } }) - results = [ - ...parseResults, - ...(await this.#execProtocolNoSync( - serializeProtocol.bind({ - values, - }), - options, - )), - ...(await this.#execProtocolNoSync( - serializeProtocol.describe({ type: 'P' }), - options, - )), - ...(await this.#execProtocolNoSync( - serializeProtocol.execute({}), - options, - )), - ] + await this.#execProtocolNoSync( + serializeProtocol.bind({ + values, + }), + options, + onBackendMessage, + ) + await this.#execProtocolNoSync( + serializeProtocol.describe({ type: 'P' }), + options, + onBackendMessage, + ) + await this.#execProtocolNoSync( + serializeProtocol.execute({}), + options, + onBackendMessage, + ) } catch (e) { if (e instanceof DatabaseError) { const pgError = makePGliteError({ e, options, params, query }) @@ -313,11 +330,10 @@ export abstract class BasePGlite } throw e } finally { - results.push( - ...(await this.#execProtocolNoSync( - serializeProtocol.sync(), - options, - )), + await this.#execProtocolNoSync( + serializeProtocol.sync(), + options, + onBackendMessage, ) } @@ -325,11 +341,75 @@ export abstract class BasePGlite if (!this.#inTransaction) { await this.syncToFs() } - const blob = await this._getWrittenBlob() - return parseResults(results, this.parsers, options, blob)[0] as Results + // const blob = await this._getWrittenBlob() + + // if (onResult && commandComplete) { + // const result = parseResults([commandComplete], this.parsers, options, blob)[0] as Results + // onResult(result) + // } + return resultSet }) } + #parseBackendMessage( + bm: BackendMessage, + options: QueryOptions | undefined, + resultSet: Results, + onResult: ((result: Results) => void) | undefined, + ) { + const result = parseResult( + bm, + this.parsers, + options, + undefined, + resultSet.fields, + ) as Results + + if (onResult) { + switch (bm.name) { + case 'dataRow': { + onResult(result) + break + } + case 'commandComplete': { + const blob = (this as any)._getWrittenBlobSync() + if (blob) resultSet.blob = blob + onResult(result) + break + } + default: { + // ignore other messages + break + } + } + } else { + switch (bm.name) { + case 'rowDescription': { + if (result.fields.length) { + resultSet.fields.push(...result.fields) + } + break + } + case 'dataRow': { + resultSet.rows.push(...result.rows) + break + } + case 'commandComplete': { + const blob = (this as any)._getWrittenBlobSync() + if (blob) { + resultSet.blob = blob + } + resultSet.affectedRows = result.affectedRows ?? 0 + break + } + default: { + // ignore other messages + break + } + } + } + } + /** * Internal method to execute a query * Not protected by the transaction mutex, so it can be used inside a transaction @@ -340,16 +420,30 @@ export abstract class BasePGlite async #runExec( query: string, options?: QueryOptions, + onResult?: (result: Results) => void, ): Promise> { return await this._runExclusiveQuery(async () => { // No params so we can just send the query this.#log('runExec', query, options) await this._handleBlob(options?.blob) - let results = [] + const results = new Array() + let resultSet: Results = { rows: [], fields: [], affectedRows: 0 } + + const onBackendMessage = (bm: BackendMessage) => { + this.#parseBackendMessage(bm, options, resultSet, onResult) + if (!onResult) { + if (bm.name === 'commandComplete') { + results.push(resultSet) + resultSet = { rows: [], fields: [], affectedRows: 0 } + } + } + } + try { - results = await this.#execProtocolNoSync( + await this.#execProtocolNoSync( serializeProtocol.query(query), options, + onBackendMessage, ) } catch (e) { if (e instanceof DatabaseError) { @@ -363,24 +457,18 @@ export abstract class BasePGlite } throw e } finally { - results.push( - ...(await this.#execProtocolNoSync( - serializeProtocol.sync(), - options, - )), + await this.#execProtocolNoSync( + serializeProtocol.sync(), + options, + onBackendMessage, ) } this._cleanupBlob() if (!this.#inTransaction) { await this.syncToFs() } - const blob = await this._getWrittenBlob() - return parseResults( - results, - this.parsers, - options, - blob, - ) as Array + + return results }) } diff --git a/packages/pglite/src/parse.ts b/packages/pglite/src/parse.ts index 06e4490d7..616f9d580 100644 --- a/packages/pglite/src/parse.ts +++ b/packages/pglite/src/parse.ts @@ -12,11 +12,12 @@ import { parseType, type Parser } from './types.js' * This function is used to parse the results of either a simple or extended query. * https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-SIMPLE-QUERY */ -export function parseResults( +export function parseResults( messages: Array, defaultParsers: Record, options?: QueryOptions, blob?: Blob, + onResult?: (result: Results) => void, ): Array { const resultSets: Results[] = [] let currentResultSet: Results = { rows: [], fields: [] } @@ -44,18 +45,21 @@ export function parseResults( ) } else { // rowMode === "object" - currentResultSet.rows.push( - Object.fromEntries( - msg.fields.map((field, i) => [ - currentResultSet!.fields[i].name, - parseType( - field, - currentResultSet!.fields[i].dataTypeID, - parsers, - ), - ]), - ), + const result = Object.fromEntries( + msg.fields.map((field, i) => [ + currentResultSet!.fields[i].name, + parseType(field, currentResultSet!.fields[i].dataTypeID, parsers), + ]), ) + if (onResult) { + const res: Results = { + rows: result.rows, + fields: currentResultSet.fields, + } + onResult(res as Results) + } else { + currentResultSet.rows.push(result) + } } break } @@ -79,13 +83,79 @@ export function parseResults( resultSets.push({ affectedRows: 0, rows: [], - fields: [], + fields: currentResultSet.fields ?? [], }) } return resultSets } +export function parseResult( + message: BackendMessage, + defaultParsers: Record, + options?: QueryOptions, + blob?: Blob, + fields?: { name: string; dataTypeID: number }[], +): Results { + // const resultSets: Results[] = [] + let resultSet: Results = { rows: [], fields: fields ?? [] } + const parsers = { ...defaultParsers, ...options?.parsers } + + switch (message.name) { + case 'rowDescription': { + const msg = message as RowDescriptionMessage + resultSet.fields = msg.fields.map((field) => ({ + name: field.name, + dataTypeID: field.dataTypeID, + })) + break + } + case 'dataRow': { + if (!resultSet) break + const msg = message as DataRowMessage + if (options?.rowMode === 'array') { + resultSet.rows.push( + msg.fields.map((field, i) => + parseType(field, resultSet!.fields[i].dataTypeID, parsers), + ), + ) + } else { + // rowMode === "object" + const result = Object.fromEntries( + msg.fields.map((field, i) => [ + resultSet!.fields[i].name, + parseType(field, resultSet!.fields[i].dataTypeID, parsers), + ]), + ) + resultSet.rows.push(result) + } + break + } + case 'commandComplete': { + const msg = message as CommandCompleteMessage + const affectedRows = retrieveRowCount(msg) + + resultSet = { + ...resultSet, + affectedRows, + ...(blob ? { blob } : {}), + } + } + } + + return resultSet + + // if (resultSets.length === 0) { + // resultSets.push({ + // affectedRows: 0, + // rows: [], + // fields: currentResultSet.fields ?? [], + // }) + // } + + // return resultSets +} + function retrieveRowCount(msg: CommandCompleteMessage): number { const parts = msg.text.split(' ') switch (parts[0]) { diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 1814b9224..789dff974 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -47,6 +47,7 @@ export class PGlite // we handle Postgres' main longjmp manually, by intercepting it and exiting with this error code // keep in sync with pglitec.c->POSTGRES_MAIN_LONGJMP private readonly POSTGRES_MAIN_LONGJMP = 100 + #currentOnBackendMessage: ((result: any) => void) | undefined get ENV(): any { return this.mod?.ENV @@ -613,7 +614,11 @@ export class PGlite this.#protocolParser.parse(bytes, (msg) => { const parsedMsg = this.#parse(msg) if (parsedMsg) { - this.#currentResults.push(parsedMsg) + if (this.#currentOnBackendMessage) { + this.#currentOnBackendMessage(parsedMsg) + } else { + this.#currentResults.push(parsedMsg) + } } }) if (this.#keepRawResponse) { @@ -772,6 +777,14 @@ export class PGlite * @returns The written blob */ async _getWrittenBlob(): Promise { + return this._getWrittenBlobSync() + } + + /** + * Get the written blob from the current query + * @returns The written blob + */ + _getWrittenBlobSync(): Blob | undefined { if (!this.#queryWriteChunks) { return undefined } @@ -957,9 +970,11 @@ export class PGlite async execProtocolStream( message: Uint8Array, { syncToFs, throwOnError = true, onNotice }: ExecProtocolOptions = {}, + onBackendMessage?: (msg: BackendMessage) => void, ): Promise { this.#currentThrowOnError = throwOnError this.#currentOnNotice = onNotice + this.#currentOnBackendMessage = onBackendMessage this.#currentResults = [] this.#currentDatabaseError = null @@ -972,6 +987,7 @@ export class PGlite const databaseError = this.#currentDatabaseError this.#currentThrowOnError = false this.#currentOnNotice = undefined + this.#currentOnBackendMessage = undefined this.#currentDatabaseError = null const result = this.#currentResults this.#currentResults = [] diff --git a/packages/pglite/tests/basic.test.ts b/packages/pglite/tests/basic.test.ts index db88ebe38..3d6db3a3a 100644 --- a/packages/pglite/tests/basic.test.ts +++ b/packages/pglite/tests/basic.test.ts @@ -33,7 +33,7 @@ await testEsmCjsAndDTC(async (importType) => { fields: [], }, { - affectedRows: 2, + affectedRows: 1, rows: [], fields: [], }, @@ -43,7 +43,7 @@ await testEsmCjsAndDTC(async (importType) => { { name: 'id', dataTypeID: 23 }, { name: 'name', dataTypeID: 25 }, ], - affectedRows: 2, + affectedRows: 0, }, ]) diff --git a/packages/pglite/tests/largeobjects.test.js b/packages/pglite/tests/largeobjects.test.js index b469d37fa..83162d5bc 100644 --- a/packages/pglite/tests/largeobjects.test.js +++ b/packages/pglite/tests/largeobjects.test.js @@ -75,3 +75,52 @@ describe.skipIf(!process.env.PGLITE_TEST_LOTS_OF_DATA)('lots of data', () => { } }) }) + +describe.skipIf(!process.env.PGLITE_TEST_LOTS_OF_DATA)('on result', () => { + it('on result should work', async () => { + const db = await PGlite.create() + await db.exec( + `CREATE TABLE IF NOT EXISTS test (uuid1 TEXT, uuid2 TEXT, + uuid3 TEXT, uuid4 TEXT, uuid5 TEXT, uuid6 TEXT, uuid7 TEXT, uuid8 TEXT, uuid9 TEXT, uuid10 TEXT, + uuid11 TEXT, uuid12 TEXT, uuid13 TEXT, uuid14 TEXT, uuid15 TEXT, uuid16 TEXT, uuid17 TEXT, uuid18 TEXT, uuid19 TEXT, uuid20 TEXT)`, + ) + let i = 0 + const uuid = '3add1088-51ce-42fb-9955-484e4d9b2716' + while (i < 1_000) { + ++i + if (i % 10 === 0) console.log(`Already run ${i} times`) + await db.query( + `INSERT INTO test (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7, uuid8, uuid9, uuid10, + uuid11, uuid12, uuid13, uuid14, uuid15, uuid16, uuid17, uuid18, uuid19, uuid20) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)`, + [ + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + uuid, + ], + ) + } + i = 0 + const results = await db.query('SELECT * FROM test', [], undefined, (r) => { + console.log(`Result ${++i}`, JSON.stringify(r)) + }) + console.log('This should be empty', results) + }) +})