Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 134 additions & 46 deletions packages/pglite/src/base.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { query as queryTemplate } from './templating.js'
import { parseDescribeStatementResults, parseResults } from './parse.js'
import { parseDescribeStatementResults, parseResult } from './parse.js'
import {
type Serializer,
type Parser,
Expand Down Expand Up @@ -62,6 +62,7 @@ export abstract class BasePGlite
abstract execProtocolStream(
message: Uint8Array,
{ syncToFs, onNotice }: ExecProtocolOptions,
onBackendMessage?: (result: BackendMessage) => void,
): Promise<BackendMessage[]>

/**
Expand Down Expand Up @@ -165,11 +166,16 @@ export abstract class BasePGlite
async #execProtocolNoSync(
message: Uint8Array,
options: ExecProtocolOptions = {},
onBackendMessage?: (result: BackendMessage) => void,
): Promise<BackendMessage[]> {
const results = await this.execProtocolStream(message, {
...options,
syncToFs: false,
})
const results = await this.execProtocolStream(
message,
{
...options,
syncToFs: false,
},
onBackendMessage,
)

return results
}
Expand All @@ -193,13 +199,14 @@ export abstract class BasePGlite
query: string,
params?: any[],
options?: QueryOptions,
onResult?: (results: Results<T>) => void,
): Promise<Results<T>> {
await this._checkReady()
// We wrap the public query method in the transaction mutex to ensure that
// only one query can be executed at a time and not concurrently with a
// transaction.
return await this._runExclusiveTransaction(async () => {
return await this.#runQuery<T>(query, params, options)
return await this.#runQuery<T>(query, params, options, onResult)
})
}

Expand Down Expand Up @@ -255,18 +262,28 @@ export abstract class BasePGlite
query: string,
params: any[] = [],
options?: QueryOptions,
onResult?: (result: Results<T>) => void,
): Promise<Results<T>> {
return await this._runExclusiveQuery(async () => {
// We need to parse, bind and execute a query with parameters
this.#log('runQuery', query, params, options)
await this._handleBlob(options?.blob)

let results = []
const resultSet: Results<T> = {
rows: [],
fields: [],
affectedRows: 0,
blob: undefined,
}

const onBackendMessage = (bm: BackendMessage) => {
this.#parseBackendMessage(bm, options, resultSet, onResult)
}
try {
const parseResults = await this.#execProtocolNoSync(
await this.#execProtocolNoSync(
serializeProtocol.parse({ text: query, types: options?.paramTypes }),
options,
onBackendMessage,
)

const dataTypeIDs = parseDescribeStatementResults(
Expand All @@ -289,47 +306,110 @@ export abstract class BasePGlite
}
})

results = [
...parseResults,
...(await this.#execProtocolNoSync(
serializeProtocol.bind({
values,
}),
options,
)),
...(await this.#execProtocolNoSync(
serializeProtocol.describe({ type: 'P' }),
options,
)),
...(await this.#execProtocolNoSync(
serializeProtocol.execute({}),
options,
)),
]
await this.#execProtocolNoSync(
serializeProtocol.bind({
values,
}),
options,
onBackendMessage,
)
await this.#execProtocolNoSync(
serializeProtocol.describe({ type: 'P' }),
options,
onBackendMessage,
)
await this.#execProtocolNoSync(
serializeProtocol.execute({}),
options,
onBackendMessage,
)
} catch (e) {
if (e instanceof DatabaseError) {
const pgError = makePGliteError({ e, options, params, query })
throw pgError
}
throw e
} finally {
results.push(
...(await this.#execProtocolNoSync(
serializeProtocol.sync(),
options,
)),
await this.#execProtocolNoSync(
serializeProtocol.sync(),
options,
onBackendMessage,
)
}

await this._cleanupBlob()
if (!this.#inTransaction) {
await this.syncToFs()
}
const blob = await this._getWrittenBlob()
return parseResults(results, this.parsers, options, blob)[0] as Results<T>
// const blob = await this._getWrittenBlob()

// if (onResult && commandComplete) {
// const result = parseResults([commandComplete], this.parsers, options, blob)[0] as Results<T>
// onResult(result)
// }
return resultSet
})
}

#parseBackendMessage<T>(
bm: BackendMessage,
options: QueryOptions | undefined,
resultSet: Results<T>,
onResult: ((result: Results<T>) => void) | undefined,
) {
const result = parseResult(
bm,
this.parsers,
options,
undefined,
resultSet.fields,
) as Results<T>

if (onResult) {
switch (bm.name) {
case 'dataRow': {
onResult(result)
break
}
case 'commandComplete': {
const blob = (this as any)._getWrittenBlobSync()
if (blob) resultSet.blob = blob
onResult(result)
break
}
default: {
// ignore other messages
break
}
}
} else {
switch (bm.name) {
case 'rowDescription': {
if (result.fields.length) {
resultSet.fields.push(...result.fields)
}
break
}
case 'dataRow': {
resultSet.rows.push(...result.rows)
break
}
case 'commandComplete': {
const blob = (this as any)._getWrittenBlobSync()
if (blob) {
resultSet.blob = blob
}
resultSet.affectedRows = result.affectedRows ?? 0
break
}
default: {
// ignore other messages
break
}
}
}
}

/**
* Internal method to execute a query
* Not protected by the transaction mutex, so it can be used inside a transaction
Expand All @@ -340,16 +420,30 @@ export abstract class BasePGlite
async #runExec(
query: string,
options?: QueryOptions,
onResult?: (result: Results) => void,
): Promise<Array<Results>> {
return await this._runExclusiveQuery(async () => {
// No params so we can just send the query
this.#log('runExec', query, options)
await this._handleBlob(options?.blob)
let results = []
const results = new Array<Results>()
let resultSet: Results = { rows: [], fields: [], affectedRows: 0 }

const onBackendMessage = (bm: BackendMessage) => {
this.#parseBackendMessage(bm, options, resultSet, onResult)
if (!onResult) {
if (bm.name === 'commandComplete') {
results.push(resultSet)
resultSet = { rows: [], fields: [], affectedRows: 0 }
}
}
}

try {
results = await this.#execProtocolNoSync(
await this.#execProtocolNoSync(
serializeProtocol.query(query),
options,
onBackendMessage,
)
} catch (e) {
if (e instanceof DatabaseError) {
Expand All @@ -363,24 +457,18 @@ export abstract class BasePGlite
}
throw e
} finally {
results.push(
...(await this.#execProtocolNoSync(
serializeProtocol.sync(),
options,
)),
await this.#execProtocolNoSync(
serializeProtocol.sync(),
options,
onBackendMessage,
)
}
this._cleanupBlob()
if (!this.#inTransaction) {
await this.syncToFs()
}
const blob = await this._getWrittenBlob()
return parseResults(
results,
this.parsers,
options,
blob,
) as Array<Results>

return results
})
}

Expand Down
Loading
Loading