diff --git a/packages/bitcore-node/src/modules/moralis/api/csp.ts b/packages/bitcore-node/src/modules/moralis/api/csp.ts index b949d9030ad..b39e71021f1 100644 --- a/packages/bitcore-node/src/modules/moralis/api/csp.ts +++ b/packages/bitcore-node/src/modules/moralis/api/csp.ts @@ -108,7 +108,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider { // @override async streamBlocks(params: StreamBlocksParams) { - const { chain, network, req, res } = params; + const { chain, network } = params; const { web3 } = await this.getWeb3(network); const chainId = await this.getChainId({ network }); const blockRange = await this.getBlocksRange({ ...params, chainId }); @@ -146,8 +146,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider { } }); - return ExternalApiStream.onStream(stream, req!, res!); - + return stream; } // @override @@ -165,10 +164,10 @@ export class MoralisStateProvider extends BaseEVMStateProvider { // @override async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, network, address } = params; + const { args, network, address } = params; const chainId = await this.getChainId({ network }); - const txStream = await this._streamAddressTransactionsFromMoralis({ + return this._streamAddressTransactionsFromMoralis({ chainId, chain: this.chain, network, @@ -178,11 +177,6 @@ export class MoralisStateProvider extends BaseEVMStateProvider { ...args } }); - // TODO unify `ExternalApiStream.onStream` and `Storage.apiStream` which are effectively doing the same thing - const result = await ExternalApiStream.onStream(txStream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } } // @override diff --git a/packages/bitcore-node/src/modules/multiProvider/api/csp.ts b/packages/bitcore-node/src/modules/multiProvider/api/csp.ts index 598aa3ba8a3..19d649acd47 100644 --- a/packages/bitcore-node/src/modules/multiProvider/api/csp.ts +++ b/packages/bitcore-node/src/modules/multiProvider/api/csp.ts @@ -186,7 +186,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider { // @override — sequential failover with preflight check. // Buffers first item before piping to response; failover only before response bytes are written. async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, network, address } = params; + const { args, network, address } = params; const chainId = await this.getChainId({ network }); const providers = this.getProvidersForNetwork(network); const PREFLIGHT_TIMEOUT_MS = 5000; @@ -261,11 +261,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider { txStream.resume(); } - const result = await ExternalApiStream.onStream(outputStream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } - return; // Stream handled + return outputStream; } catch (error) { if (error instanceof AdapterError && (error as AdapterError).code === AdapterErrorCode.INVALID_REQUEST) throw error; // 400 — no failover provider.health.recordFailure(error as Error); diff --git a/packages/bitcore-node/src/modules/ripple/api/csp.ts b/packages/bitcore-node/src/modules/ripple/api/csp.ts index 65e25bafc80..e795c08cc1d 100644 --- a/packages/bitcore-node/src/modules/ripple/api/csp.ts +++ b/packages/bitcore-node/src/modules/ripple/api/csp.ts @@ -9,7 +9,6 @@ import { CacheStorage } from '../../../models/cache'; import { ICoin } from '../../../models/coin'; import { WalletAddressStorage } from '../../../models/walletAddress'; import { InternalStateProvider } from '../../../providers/chain-state/internal/internal'; -import { Storage } from '../../../services/storage'; import { IBlock } from '../../../types/Block'; import { ChainNetwork } from '../../../types/ChainNetwork'; import { @@ -305,7 +304,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain const transformed = txs.map(tx => this.transformAccountTx(tx, params.network)); this.streamTxs(transformed, readable); readable.push(null); - Storage.stream(readable, params.req!, params.res!); + return readable; } async streamTransactions(params: StreamTransactionsParams) { @@ -316,7 +315,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain const txs = ledger.transactions || []; this.streamTxs(txs, readable); readable.push(null); - Storage.stream(readable, params.req, params.res); + return readable; } async getTransaction(params: StreamTransactionParams) { diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts index 8fdbecbd52b..a01958a5ddf 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts @@ -1,3 +1,4 @@ +import { Readable } from 'stream'; import { CryptoRpc } from '@bitpay-labs/crypto-rpc'; import { Utils, Web3, type Web3Types } from '@bitpay-labs/crypto-wallet-core'; import { @@ -18,7 +19,7 @@ import { SpentHeightIndicators } from '../../../../types/Coin'; import { normalizeChainNetwork, partition, range } from '../../../../utils'; import { StatsUtil } from '../../../../utils/stats'; import { TransformWithEventPipe } from '../../../../utils/streamWithEventPipe'; -import { ExternalApiStream } from '../../external/streams/apiStream'; +import { AdapterError, AdapterErrorCode } from '../../external/adapters/errors'; import { AavePoolAbi } from '../abi/aavePool'; import { AavePoolAbiV2 } from '../abi/aavePoolV2'; import { ERC20Abi } from '../abi/erc20'; @@ -59,6 +60,10 @@ export interface BuildWalletTxsStreamParams { transactionStream: TransformWithEventPipe; populateEffects: PopulateEffectsForAddressTransform; walletAddresses: string[]; + // _buildWalletTransactionsStream pushes teardown callbacks here (e.g. cursor.close). + // streamWalletTransactions runs them when the FINAL piped stream closes/ends, so the + // hook lives on the stream the route actually destroys on disconnect. + cleanups?: Array<() => void>; } @@ -531,18 +536,11 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } async streamAddressTransactions(params: StreamAddressUtxosParams) { - return new Promise(async (resolve, reject) => { - try { - await this._buildAddressTransactionsStream(params); - return resolve(); - } catch (err) { - return reject(err); - } - }); + return this._buildAddressTransactionsStream(params); } async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { - const { req, res, args, chain, network, address } = params; + const { args, chain, network, address } = params; const { limit, /* since,*/ tokenAddress } = args; if (!args.tokenAddress) { @@ -557,23 +555,20 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai // NOTE: commented out since and paging for now b/c they were causing extra long query times on insight. // The case where an address has >1000 txns is an edge case ATM and can be addressed later - Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }, req!, res!); - } else { - try { - const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args); - res!.json(tokenTransfers); - } catch (err: any) { - logger.error('Error streaming address transactions: %o', err.stack || err.message || err); - throw err; - } + return Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }); } + const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args); + // Streams elements one-by-one so the route wraps them via streamJsonArray. + // The response remains a JSON array of the same N transfer objects; only inter-element + // whitespace differs from the prior res.json() output (compact `[..]` vs newline-separated). + return Readable.from(tokenTransfers, { objectMode: true }); } @historical @internal async streamTransactions(params: StreamTransactionsParams) { - const { chain, network, req, res, args } = params; + const { chain, network, args } = params; const { blockHash, blockHeight } = args; if (!chain || !network) { throw new Error('Missing chain or network'); @@ -590,7 +585,7 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } const tip = await this.getLocalTip(params); const tipHeight = tip ? tip.height : 0; - return Storage.apiStreamingFind(EVMTransactionStorage, query, args, req, res, t => { + return Storage.apiStreamingFind(EVMTransactionStorage, query, args, t => { let confirmations = 0; if (t.blockHeight !== undefined && t.blockHeight >= 0) { confirmations = tipHeight - t.blockHeight + 1; @@ -677,47 +672,48 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai } async streamWalletTransactions(params: StreamWalletTransactionsParams) { - return new Promise(async (resolve, reject) => { - const { network, wallet, req, res, args } = params; - const { web3 } = await this.getWeb3(network); - args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined; + const { network, wallet, args } = params; + const { web3 } = await this.getWeb3(network); + args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined; + + let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); + const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address); + if (walletAddresses.length === 0) { + // Status remains 400 via respondWithError; body shape changes from text/plain to + // the JSON {error, message} shape used by every other 4xx path. + throw new AdapterError('walletAddresses', AdapterErrorCode.INVALID_REQUEST, 'No addresses found for wallet'); + } + const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress); + const populateReceipt = new PopulateReceiptTransform(this); + const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses); + + const cleanups: Array<() => void> = []; + const streamParams: BuildWalletTxsStreamParams = { + transactionStream, + populateEffects, + walletAddresses, + cleanups + }; + transactionStream = await this._buildWalletTransactionsStream(params, streamParams); - let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); - const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address); - if (walletAddresses.length === 0) { - res.status(400).send('No addresses found for wallet'); - return resolve(); - } - const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress); - const populateReceipt = new PopulateReceiptTransform(this); - const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses); - - const streamParams: BuildWalletTxsStreamParams = { - transactionStream, - populateEffects, - walletAddresses - }; - transactionStream = await this._buildWalletTransactionsStream(params, streamParams); + if (!args.tokenAddress && wallet._id) { + const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id); + transactionStream = transactionStream.eventPipe(internalTxTransform); + } - if (!args.tokenAddress && wallet._id) { - const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id); - transactionStream = transactionStream.eventPipe(internalTxTransform); - } + transactionStream = transactionStream + .eventPipe(populateReceipt) + .eventPipe(ethTransactionTransform); - transactionStream = transactionStream - .eventPipe(populateReceipt) - .eventPipe(ethTransactionTransform); + // Run upstream teardown callbacks (e.g. cursor.close) when the FINAL stream the route + // pipes from closes or ends. destroy() on this stream does not reliably propagate + // upstream through eventPipe chains, so the cleanup must live here. + const runCleanups = () => { for (const fn of cleanups) { try { fn(); } catch { /* noop */ } } }; + transactionStream.on('close', runCleanups); + transactionStream.on('end', runCleanups); - try { - const result = await ExternalApiStream.onStream(transactionStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err) { - return reject(err); - } - }); + (transactionStream as any).jsonl = true; + return transactionStream; } async _buildWalletTransactionsStream(params: StreamWalletTransactionsParams, streamParams: BuildWalletTxsStreamParams) { @@ -731,22 +727,15 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - // Add cleanup handlers when client disconnects + // Cursor cleanup is registered with the caller and triggered against the final piped + // stream. Hooking it here against the intermediate transform would miss disconnects + // because destroy() does not propagate upstream through eventPipe chains reliably. let cursorClosed = false; - const cleanupCursor = () => { - if (!cursorClosed) { - cursorClosed = true; - try { - cursor.close(); - } catch { - // Cursor might already be closed, ignore - } - } - }; - - const { req, res } = params; - req.on('close', cleanupCursor); - res.on('close', cleanupCursor); + streamParams.cleanups?.push(() => { + if (cursorClosed) return; + cursorClosed = true; + try { cursor.close(); } catch { /* already closed */ } + }); // Pipe cursor to transform stream transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true })); diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts index 42c06c54cea..38dace505a1 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/gnosis.ts @@ -154,7 +154,7 @@ export class GnosisApi { } async streamGnosisWalletTransactions(params: { multisigContractAddress: string } & StreamWalletTransactionsParams) { - const { chain, network, multisigContractAddress, res, args } = params; + const { chain, network, multisigContractAddress, args } = params; const transactionQuery = getCSP(chain, network).getWalletTransactionQuery(params); delete transactionQuery.wallets; delete transactionQuery['wallets.0']; @@ -201,7 +201,6 @@ export class GnosisApi { .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - // Add cleanup handlers when client disconnects let cursorClosed = false; const cleanupCursor = () => { if (!cursorClosed) { @@ -214,10 +213,6 @@ export class GnosisApi { } }; - const { req } = params; - req.on('close', cleanupCursor); - res.on('close', cleanupCursor); - transactionStream = cursor.pipe(populateEffects); // For old db entries if (multisigContractAddress) { @@ -225,10 +220,13 @@ export class GnosisApi { transactionStream = transactionStream.pipe(multisigTransform); } - transactionStream + const finalStream: any = transactionStream .pipe(populateReceipt) - .pipe(ethTransactionTransform) - .pipe(res); + .pipe(ethTransactionTransform); + finalStream.jsonl = true; + finalStream.on('close', cleanupCursor); + finalStream.on('end', cleanupCursor); + return finalStream; } } diff --git a/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts b/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts index 8c4e3b15459..1a8fcdb1e89 100644 --- a/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts +++ b/packages/bitcore-node/src/providers/chain-state/evm/api/routes.ts @@ -4,6 +4,7 @@ import { Router } from 'express'; import config from '../../../../config'; import logger from '../../../../logger'; import { WebhookStorage } from '../../../../models/webhook'; +import { respondWithError, streamJsonArray } from '../../../../routes/apiUtils'; import { Config } from '../../../../services/config'; import { IEVMNetworkConfig } from '../../../../types/Config'; import { castToBool } from '../../../../utils'; @@ -235,22 +236,25 @@ export class EVMRouter { }); }; - private streamGnosisWalletTransactions(router: Router) { + private streamGnosisWalletTransactions(router: Router) { router.get(`/api/${this.chain}/:network/ethmultisig/transactions/:multisigContractAddress`, async (req, res) => { const { network, multisigContractAddress } = req.params; try { - return await Gnosis.streamGnosisWalletTransactions({ + const stream = await Gnosis.streamGnosisWalletTransactions({ chain: this.chain, network, multisigContractAddress, wallet: {} as any, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamGnosisWalletTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Multisig Transactions Error::%o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); }; diff --git a/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts b/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts index 4f3255b9dc4..20dce148351 100644 --- a/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts +++ b/packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts @@ -1,13 +1,8 @@ -import { Readable, Stream, Transform } from 'stream'; +import { Stream, Transform } from 'stream'; import axios from 'axios'; -import { Request, Response } from 'express'; import { ReadableWithEventPipe, TransformWithEventPipe } from '../../../../utils/streamWithEventPipe'; -export interface StreamOpts { - jsonl?: boolean; -} - export class ExternalApiStream extends ReadableWithEventPipe { url: string; headers: any; @@ -73,97 +68,6 @@ export class ExternalApiStream extends ReadableWithEventPipe { } } - // handles events emitted by the streamed response, request from client, and response to client - static onStream(stream: Readable, req: Request, res: Response, opts: StreamOpts = {}): - Promise<{ success: boolean; error?: any }> { - return new Promise<{ success: boolean; error?: any }>((resolve, reject) => { - let closed = false; - let isFirst = true; - - req.on('close', function() { - closed = true; - }); - - res.type('json'); - res.on('close', function() { - closed = true; - }); - - stream.on('error', function(err: any) { - if (!closed) { - closed = true; - if (err.isAxiosError) { - err.log = { - url: err?.config?.url, - statusCode: err?.response?.status, - statusMsg: err?.response?.statusText, - data: err?.response?.data, - }; - } - if (err.log?.data?.message?.includes('not supported')) { - res.write('[]'); - res.end(); - return resolve({ success: false, error: err }); - } - if (!isFirst) { - // Data has already been written to the stream and status 200 headers have already been sent - // We notify and log the error instead of throwing - const errMsg = '{"error": "An error occurred during data stream"}'; - if (opts.jsonl) { - res.write(`${errMsg}`); - } else { - res.write(`,\n${errMsg}\n]`); - } - res.end(); - res.destroy(); - return resolve({ success: false, error: err }); - } else { - // Rejecting here allows downstream to send status 500 - return reject(err); - } - } - return; - }); - stream.on('data', function(data) { - if (!closed) { - // We are assuming jsonl data appended a new line upstream - if (!opts.jsonl) { - if (isFirst) { - res.write('[\n'); - } else { - res.write(',\n'); - } - } - if (isFirst) { - // All cases need isFirst set correctly for proper error handling - isFirst = false; - } - if (typeof data !== 'string') { - data = JSON.stringify(data); - } - res.write(data); - } else { - stream.destroy(); - } - }); - stream.on('end', function() { - if (!closed) { - closed = true; - if (!opts.jsonl) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - } - res.end(); - resolve({ success: true }); - } - }); - }); - } - static mergeStreams(streams: Stream[], destination: Transform): Transform { let activeStreams = streams.length; diff --git a/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts b/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts index 8dec499012d..87461590413 100644 --- a/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts +++ b/packages/bitcore-node/src/providers/chain-state/external/streams/nodeStream.ts @@ -1,5 +1,4 @@ import { Readable, Stream, Transform } from 'stream'; -import { Request, Response } from 'express'; import { ExternalApiStream } from './apiStream'; export class NodeQueryStream extends Readable { @@ -37,11 +36,6 @@ export class NodeQueryStream extends Readable { } } - static onStream(stream: Readable, req: Request, res: Response): - Promise<{ success: boolean; error?: any }> { - return ExternalApiStream.onStream(stream, req, res); - } - static mergeStreams(streams: Stream[], destination: Transform): Transform { return ExternalApiStream.mergeStreams(streams, destination); } diff --git a/packages/bitcore-node/src/providers/chain-state/internal/internal.ts b/packages/bitcore-node/src/providers/chain-state/internal/internal.ts index ba268033c2e..b11975f5602 100644 --- a/packages/bitcore-node/src/providers/chain-state/internal/internal.ts +++ b/packages/bitcore-node/src/providers/chain-state/internal/internal.ts @@ -81,10 +81,10 @@ export class InternalStateProvider implements IChainStateService { } async streamAddressTransactions(params: StreamAddressUtxosParams) { - const { req, res, args } = params; + const { args } = params; const { limit, since } = args; const query = this.getAddressQuery(params); - Storage.apiStreamingFind(CoinStorage, query, { limit, since, paging: '_id' }, req!, res!); + return Storage.apiStreamingFind(CoinStorage, query, { limit, since, paging: '_id' }); } async getBalanceForAddress(params: GetBalanceForAddressParams): Promise { @@ -100,10 +100,9 @@ export class InternalStateProvider implements IChainStateService { return balance; } - streamBlocks(params: StreamBlocksParams) { - const { req, res } = params; + async streamBlocks(params: StreamBlocksParams) { const { query, options } = this.getBlocksQuery(params); - Storage.apiStreamingFind(BitcoinBlockStorage, query, options, req, res); + return Storage.apiStreamingFind(BitcoinBlockStorage, query, options); } async getBlocks(params: GetBlockParams): Promise> { @@ -202,7 +201,7 @@ export class InternalStateProvider implements IChainStateService { } async streamTransactions(params: StreamTransactionsParams) { - const { chain, network, req, res, args } = params; + const { chain, network, args } = params; const { blockHash, blockHeight } = args; if (!chain || !network) { throw new Error('Missing chain or network'); @@ -219,7 +218,7 @@ export class InternalStateProvider implements IChainStateService { } const tip = await this.getLocalTip(params); const tipHeight = tip ? tip.height : 0; - return Storage.apiStreamingFind(TransactionStorage, query, args, req, res, t => { + return Storage.apiStreamingFind(TransactionStorage, query, args, t => { let confirmations = 0; if (t.blockHeight !== undefined && t.blockHeight >= 0) { confirmations = tipHeight - t.blockHeight + 1; @@ -305,9 +304,9 @@ export class InternalStateProvider implements IChainStateService { } streamWalletAddresses(params: StreamWalletAddressesParams) { - const { chain, network, walletId, req, res } = params; + const { chain, network, walletId } = params; const query = { chain, network, wallet: walletId }; - Storage.apiStreamingFind(WalletAddressStorage, query, {}, req, res); + return Storage.apiStreamingFind(WalletAddressStorage, query, {}); } async walletCheck(params: WalletCheckParams) { @@ -384,7 +383,7 @@ export class InternalStateProvider implements IChainStateService { } async streamWalletTransactions(params: StreamWalletTransactionsParams) { - const { chain, network, wallet, res, args } = params; + const { chain, network, wallet, args } = params; const query: any = { chain, network, @@ -430,8 +429,10 @@ export class InternalStateProvider implements IChainStateService { .find(query) .sort({ blockTimeNormalized: 1 }) .addCursorFlag('noCursorTimeout', true); - const listTransactionsStream = new this.WalletStreamTransform(wallet); - transactionStream.pipe(listTransactionsStream).pipe(res); + const listTransactionsStream: any = transactionStream.pipe(new this.WalletStreamTransform(wallet)); + listTransactionsStream.jsonl = true; + listTransactionsStream.on('close', () => { try { transactionStream.close(); } catch { /* noop */ } }); + return listTransactionsStream; } async getWalletBalance(params: GetWalletBalanceParams): Promise { @@ -457,7 +458,7 @@ export class InternalStateProvider implements IChainStateService { } async streamWalletUtxos(params: StreamWalletUtxosParams) { - const { wallet, limit, args = {}, req, res } = params; + const { wallet, limit, args = {} } = params; const query: any = { wallets: wallet._id, 'wallets.0': { $exists: true }, @@ -484,7 +485,7 @@ export class InternalStateProvider implements IChainStateService { return CoinStorage._apiTransform(c) as string; }; - Storage.apiStreamingFind(CoinStorage, query, { limit }, req, res, utxoTransform); + return Storage.apiStreamingFind(CoinStorage, query, { limit }, utxoTransform); } async getFee(params: GetEstimateSmartFeeParams) { diff --git a/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts b/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts index 2cff5652f09..c62def4b05a 100644 --- a/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts +++ b/packages/bitcore-node/src/providers/chain-state/svm/api/csp.ts @@ -183,89 +183,58 @@ export class BaseSVMStateProvider extends InternalStateProvider implements IChai } async streamTransactions(params: StreamTransactionsParams): Promise { - return new Promise(async (resolve, reject) => { - try { - const { chain, network, req, res, args } = params; - let { blockHeight } = args; - const { limit = 50 } = args; + const { chain, network, args } = params; + let { blockHeight } = args; + const { limit = 50 } = args; - if (!chain || !network) { - throw new Error('Missing chain or network'); - } - if (blockHeight !== undefined) { - blockHeight = Number(blockHeight); - } else { - throw new Error('Missing required block height / slot.'); - } + if (!chain || !network) { + throw new Error('Missing chain or network'); + } + if (blockHeight !== undefined) { + blockHeight = Number(blockHeight); + } else { + throw new Error('Missing required block height / slot.'); + } - const { rpc } = await this.getRpc(network); - const block: any = await rpc.getBlock({ height: blockHeight, transactionDetails: 'signatures' }); - if (!block) { - throw new Error('Block not found: ' + blockHeight); - } - const stream = new TransformWithEventPipe({ - objectMode: true, - passThrough: true - }); - let count = 0; - for (const signature of block?.signatures || []) { - if (limit && count >= limit) break; - const transformedTx = await this._getTransformedTx(rpc, network, { signature }); - stream.push(transformedTx); - count++; - } - stream.push(null); - const result = await ExternalApiStream.onStream(stream, req!, res!); - if (!result?.success) { - logger.error('Error mid-stream (streamTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming block transactions: %o', err.stack || err.message || err); - reject(err); - } + const { rpc } = await this.getRpc(network); + const block: any = await rpc.getBlock({ height: blockHeight, transactionDetails: 'signatures' }); + if (!block) { + throw new Error('Block not found: ' + blockHeight); + } + const stream = new TransformWithEventPipe({ + objectMode: true, + passThrough: true }); + let count = 0; + for (const signature of block?.signatures || []) { + if (limit && count >= limit) break; + const transformedTx = await this._getTransformedTx(rpc, network, { signature }); + stream.push(transformedTx); + count++; + } + stream.push(null); + return stream; } async streamAddressTransactions(params: StreamAddressUtxosParams) { - return new Promise(async (resolve, reject) => { - const { req, res } = params; - try { - const addressStream = await this._buildAddressTransactionsStream(params); - const result = await ExternalApiStream.onStream(addressStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err) { - return reject(err); - } - }); + const addressStream: any = await this._buildAddressTransactionsStream(params); + addressStream.jsonl = true; + return addressStream; } async streamWalletTransactions(params: StreamWalletTransactionsParams): Promise { - return new Promise(async (resolve, reject) => { - try { - const { wallet, req, res } = params; - const walletStream = new TransformWithEventPipe({ objectMode: true, passThrough: true }); - const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddress => waddress.address); - const addressStreams: TransformWithEventPipe[] = []; - - for (const address of walletAddresses) { - const addressStream = await this._buildAddressTransactionsStream({ ...params, address }); - addressStreams.push(addressStream); - } - ExternalApiStream.mergeStreams(addressStreams, walletStream); - const result = await ExternalApiStream.onStream(walletStream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming wallet transactions: %o', err.stack || err.message || err); - return reject(err); - } - }); + const { wallet } = params; + const walletStream: any = new TransformWithEventPipe({ objectMode: true, passThrough: true }); + const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddress => waddress.address); + const addressStreams: TransformWithEventPipe[] = []; + + for (const address of walletAddresses) { + const addressStream = await this._buildAddressTransactionsStream({ ...params, address }); + addressStreams.push(addressStream); + } + ExternalApiStream.mergeStreams(addressStreams, walletStream); + walletStream.jsonl = true; + return walletStream; } async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) { @@ -617,48 +586,37 @@ export class BaseSVMStateProvider extends InternalStateProvider implements IChai } async streamBlocks(params: StreamBlocksParams) { - return new Promise(async (resolve, reject) => { - try { - const { chain, network, req, res } = params; - if (!chain || !network) { - throw new Error('Missing chain or network'); - } - const { rpc } = await this.getRpc(network); - const blockRange = await this.getBlocksRange({ ...params }); - const { height } = await rpc.getTip(); - const stream = new TransformWithEventPipe({ - objectMode: true, - passThrough: true - }); - try { - let block; - let nextBlock; - for (const blockNum of blockRange) { - const thisNextBlock = Number(block?.height) === blockNum + 1 ? block : await this._getTransformedBlock(rpc, network, blockNum + 1); - block = Number(nextBlock?.number) === blockNum ? nextBlock : await this._getTransformedBlock(rpc, network, blockNum); - if (!block) { - continue; - } - nextBlock = thisNextBlock; - block.nextBlockHash = nextBlock?.hash; - block.confirmations = Number(BigInt(height) - BigInt(block.height) + 1n); - stream.push(block); - } - } catch (e: any) { - logger.error('Error streaming blocks: %o', e); + const { chain, network } = params; + if (!chain || !network) { + throw new Error('Missing chain or network'); + } + const { rpc } = await this.getRpc(network); + const blockRange = await this.getBlocksRange({ ...params }); + const { height } = await rpc.getTip(); + const stream: any = new TransformWithEventPipe({ + objectMode: true, + passThrough: true + }); + try { + let block; + let nextBlock; + for (const blockNum of blockRange) { + const thisNextBlock = Number(block?.height) === blockNum + 1 ? block : await this._getTransformedBlock(rpc, network, blockNum + 1); + block = Number(nextBlock?.number) === blockNum ? nextBlock : await this._getTransformedBlock(rpc, network, blockNum); + if (!block) { + continue; } - stream.push(null); - const result = await ExternalApiStream.onStream(stream, req!, res!, { jsonl: true }); - if (!result?.success) { - logger.error('Error mid-stream (streamBlocks): %o', result.error?.log || result.error); - } - return resolve(); - } catch (err: any) { - logger.error('Error streaming blocks: %o', err.stack || err.message || err); - reject(err); + nextBlock = thisNextBlock; + block.nextBlockHash = nextBlock?.hash; + block.confirmations = Number(BigInt(height) - BigInt(block.height) + 1n); + stream.push(block); } - }); - + } catch (e: any) { + logger.error('Error streaming blocks: %o', e); + } + stream.push(null); + stream.jsonl = true; + return stream; } async _getTransformedBlock(rpc, network, height ) { diff --git a/packages/bitcore-node/src/routes/api/address.ts b/packages/bitcore-node/src/routes/api/address.ts index be5bdfe8f91..b714e8bc38b 100644 --- a/packages/bitcore-node/src/routes/api/address.ts +++ b/packages/bitcore-node/src/routes/api/address.ts @@ -2,7 +2,7 @@ import express, { Request } from 'express'; import logger from '../../logger'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamAddressUtxosParams } from '../../types/namespaces/ChainStateProvider'; -import { respondWithError } from '../apiUtils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; const router = express.Router({ mergeParams: true }); @@ -14,11 +14,13 @@ async function streamCoins(req: Request, res) { chain, network, address, - req, - res, args: { ...req.query, unspent, limit, since } } as StreamAddressUtxosParams; - await ChainStateProvider.streamAddressTransactions(payload); + const stream = await ChainStateProvider.streamAddressTransactions(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error); + } } catch (err: any) { logger.error('Error streaming coins: %o', err.stack || err.message || err); respondWithError(res, err); diff --git a/packages/bitcore-node/src/routes/api/block.ts b/packages/bitcore-node/src/routes/api/block.ts index dfd0d3b2576..6b1723af4d9 100644 --- a/packages/bitcore-node/src/routes/api/block.ts +++ b/packages/bitcore-node/src/routes/api/block.ts @@ -5,6 +5,7 @@ import { CoinStorage, ICoin } from '../../models/coin'; import { TransactionStorage } from '../../models/transaction'; import { ChainStateProvider } from '../../providers/chain-state'; import { isDateValid } from '../../utils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; import { CacheTimes, Confirmations, SetCache } from '../middleware'; const router = express.Router({ mergeParams: true }); @@ -22,14 +23,17 @@ router.get('/', async function(req: Request, res: Response) { chain, network, sinceBlock, - args: { date, limit, since, direction, paging }, - req, - res + args: { date, limit, since, direction, paging } }; - return ChainStateProvider.streamBlocks(payload); + const stream = await ChainStateProvider.streamBlocks(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamBlocks): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error getting blocks: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); diff --git a/packages/bitcore-node/src/routes/api/tx.ts b/packages/bitcore-node/src/routes/api/tx.ts index c3e35322a53..cfe6f4fb76f 100644 --- a/packages/bitcore-node/src/routes/api/tx.ts +++ b/packages/bitcore-node/src/routes/api/tx.ts @@ -4,7 +4,7 @@ import { ICoin } from '../../models/coin'; import { ITransaction } from '../../models/transaction'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamTransactionsParams } from '../../types/namespaces/ChainStateProvider'; -import { respondWithError } from '../apiUtils'; +import { respondWithError, streamJsonArray } from '../apiUtils'; import { CacheTimes, SetCache } from '../middleware'; const router = Router({ mergeParams: true }); @@ -24,8 +24,6 @@ router.get('/', async function(req: Request, res: Response) { const payload: StreamTransactionsParams = { chain, network, - req, - res, args: { limit, since, direction, paging } }; @@ -35,7 +33,12 @@ router.get('/', async function(req: Request, res: Response) { if (blockHash !== undefined) { payload.args.blockHash = blockHash; } - return await ChainStateProvider.streamTransactions(payload); + const stream = await ChainStateProvider.streamTransactions(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming transactions: %o', err.stack || err.message || err); return respondWithError(res, err); diff --git a/packages/bitcore-node/src/routes/api/wallet.ts b/packages/bitcore-node/src/routes/api/wallet.ts index 4ce02e77c20..d58d260cb45 100644 --- a/packages/bitcore-node/src/routes/api/wallet.ts +++ b/packages/bitcore-node/src/routes/api/wallet.ts @@ -5,6 +5,7 @@ import logger from '../../logger'; import { ChainStateProvider } from '../../providers/chain-state'; import { StreamWalletAddressesParams } from '../../types/namespaces/ChainStateProvider'; import { Auth, AuthenticatedRequest } from '../../utils/auth'; +import { respondWithError, streamJsonArray } from '../apiUtils'; const router = Router({ mergeParams: true }); @@ -68,14 +69,17 @@ router.get('/:pubKey/addresses', Auth.authenticateMiddleware, async (req: Authen chain, network, walletId: wallet!._id!, - limit, - req, - res + limit }; - return await ChainStateProvider.streamWalletAddresses(payload); + const stream = await ChainStateProvider.streamWalletAddresses(payload); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletAddresses): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet addresses: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); @@ -143,17 +147,20 @@ router.post('/:pubKey', Auth.authenticateMiddleware, async (req: AuthenticatedRe router.get('/:pubKey/transactions', Auth.authenticateMiddleware, async (req: AuthenticatedRequest, res: Response) => { try { const { chain, network } = req.params; - return await ChainStateProvider.streamWalletTransactions({ + const stream = await ChainStateProvider.streamWalletTransactions({ chain, network, wallet: req.wallet!, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet txs: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); @@ -194,18 +201,21 @@ router.get('/:pubKey/utxos', Auth.authenticateMiddleware, async (req: Authentica const { chain, network } = req.params; const { limit } = req.query as any; try { - return ChainStateProvider.streamWalletUtxos({ + const stream = await ChainStateProvider.streamWalletUtxos({ chain, network, wallet: req.wallet!, limit, - req, - res, args: req.query }); + const result = await streamJsonArray(stream, req, res); + if (!result.success) { + logger.error('Error mid-stream (streamWalletUtxos): %o', result.error?.log || result.error); + } + return; } catch (err: any) { logger.error('Error streaming wallet utxos: %o', err.stack || err.message || err); - return res.status(500).send(err.message || err); + return respondWithError(res, err); } }); diff --git a/packages/bitcore-node/src/routes/apiUtils.ts b/packages/bitcore-node/src/routes/apiUtils.ts index 5197557e616..151c05481d7 100644 --- a/packages/bitcore-node/src/routes/apiUtils.ts +++ b/packages/bitcore-node/src/routes/apiUtils.ts @@ -1,4 +1,5 @@ -import { Response } from 'express'; +import { Readable } from 'stream'; +import { Request, Response } from 'express'; import { AdapterError, AdapterErrorCode, AllProvidersUnavailableError } from '../providers/chain-state/external/adapters/errors'; export function respondWithError(res: Response, err: any) { @@ -10,3 +11,140 @@ export function respondWithError(res: Response, err: any) { } return res.status(500).send(err.message || err); } + +export interface StreamJsonArrayOpts { + jsonl?: boolean; +} + +export interface StreamJsonArrayResult { + success: boolean; + error?: any; +} + +/** + * Pipe a Readable stream to an Express response as a JSON array (default) or JSONL. + * + * - Pre-data errors reject so the route can send a proper status code + * - Mid-stream errors append an inline error marker and end the response + * - Client/response disconnects destroy the stream (and call .close() if present, e.g. mongo cursor) + */ +export function streamJsonArray( + stream: Readable & { close?: () => void; jsonl?: boolean }, + req: Request, + res: Response, + opts: StreamJsonArrayOpts = {} +): Promise { + // Auto-detect jsonl flag attached to the stream so routes stay chain-agnostic. + const jsonl = opts.jsonl ?? stream.jsonl ?? false; + return new Promise((resolve, reject) => { + let closed = false; + let isFirst = true; + let settled = false; + + // Single-shot guards keep the promise from being resolved/rejected twice when + // a client disconnect races a stream end/error or a stream 'close' event follows destroy(). + const safeResolve = (result: StreamJsonArrayResult) => { if (!settled) { settled = true; resolve(result); } }; + const safeReject = (err: any) => { if (!settled) { settled = true; reject(err); } }; + + const tearDown = () => { + // close() handles mongo cursor streams; destroy() tears down piped Transform chains + // so cursor-cleanup listeners hooked to the Transform's 'close' event fire eagerly on disconnect. + if (typeof stream.close === 'function') { + try { stream.close(); } catch { /* noop */ } + } + if (typeof stream.destroy === 'function' && !stream.destroyed) { + try { stream.destroy(); } catch { /* noop */ } + } + }; + const cleanup = () => { + closed = true; + tearDown(); + }; + const onAbort = () => { + closed = true; + tearDown(); + // Settle the awaiting route handler so it can fall through to its catch/finally + // instead of hanging until the stream eventually emits 'close' (which may not happen + // on a destroyed pipeline if upstream never settles). + safeResolve({ success: false, error: new Error('client disconnected') }); + }; + + req.on('close', onAbort); + // ndjson in jsonl mode so JSON-aware clients (supertest, fetch().json()) don't try to + // parse a stream of newline-delimited objects as a single JSON document. + res.type(jsonl ? 'application/x-ndjson' : 'json'); + res.on('close', onAbort); + + stream.on('error', (err: any) => { + if (closed) { safeResolve({ success: false, error: err }); return; } + if (err?.isAxiosError) { + err.log = { + url: err?.config?.url, + statusCode: err?.response?.status, + statusMsg: err?.response?.statusText, + data: err?.response?.data, + }; + } + if (err?.log?.data?.message?.includes('not supported')) { + closed = true; + res.write('[]'); + res.end(); + return safeResolve({ success: false, error: err }); + } + if (!isFirst) { + // Headers already sent — emit inline error marker, end response, log upstream + closed = true; + const errMsg = '{"error": "An error occurred during data stream"}'; + if (jsonl) { + res.write(`${errMsg}`); + } else { + res.write(`,\n${errMsg}\n]`); + } + res.end(); + cleanup(); + return safeResolve({ success: false, error: err }); + } + // Pre-data — caller can send proper 5xx status + return safeReject(err); + }); + + stream.on('data', (data: any) => { + if (closed) { + cleanup(); + return; + } + if (!jsonl) { + if (isFirst) { + res.write('[\n'); + } else { + res.write(',\n'); + } + } + if (isFirst) { + isFirst = false; + } + if (typeof data !== 'string' && !Buffer.isBuffer(data)) { + data = JSON.stringify(data); + } + res.write(data); + }); + + stream.on('end', () => { + if (closed) return; + closed = true; + if (!jsonl) { + if (isFirst) { + res.write('[]'); + } else { + res.write('\n]'); + } + } + res.end(); + safeResolve({ success: true }); + }); + + // Backstop: if destroy() emits 'close' without a prior 'end' or 'error', settle the promise + // so the route handler doesn't await indefinitely on a torn-down pipeline. + stream.on('close', () => safeResolve({ success: closed, error: closed ? undefined : new Error('stream closed before end') })); + }); +} diff --git a/packages/bitcore-node/src/services/storage.ts b/packages/bitcore-node/src/services/storage.ts index 593b40c776e..c258e6c03c4 100644 --- a/packages/bitcore-node/src/services/storage.ts +++ b/packages/bitcore-node/src/services/storage.ts @@ -1,9 +1,8 @@ import { EventEmitter } from 'events'; import { Readable } from 'stream'; import { ObjectId } from 'bson'; -import { Request, Response } from 'express'; import { ObjectID } from 'mongodb'; -import { Cursor, Db, MongoClient } from 'mongodb'; +import { Db, MongoClient } from 'mongodb'; import { LoggifyClass } from '../decorators/Loggify'; import logger from '../logger'; import '../models'; @@ -113,91 +112,6 @@ export class StorageService { return typecastedValue; } - stream(input: Readable, req: Request, res: Response) { - let closed = false; - req.on('close', function() { - closed = true; - }); - res.on('close', function() { - closed = true; - }); - input.on('error', function(err) { - if (!closed) { - closed = true; - return res.status(500).end(err.message); - } - return; - }); - let isFirst = true; - res.type('json'); - input.on('data', function(data) { - if (!closed) { - if (isFirst) { - res.write('[\n'); - isFirst = false; - } else { - res.write(',\n'); - } - res.write(JSON.stringify(data)); - } - }); - input.on('end', function() { - if (!closed) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - res.end(); - } - }); - } - - apiStream(cursor: Cursor, req: Request, res: Response) { - let closed = false; - req.on('close', function() { - closed = true; - cursor.close(); - }); - res.on('close', function() { - closed = true; - cursor.close(); - }); - cursor.on('error', function(err) { - if (!closed) { - closed = true; - return res.status(500).end(err.message); - } - return; - }); - let isFirst = true; - res.type('json'); - cursor.on('data', function(data) { - if (!closed) { - if (isFirst) { - res.write('[\n'); - isFirst = false; - } else { - res.write(',\n'); - } - res.write(data); - } else { - cursor.close(); - } - }); - cursor.on('end', function() { - if (!closed) { - if (isFirst) { - // there was no data - res.write('[]'); - } else { - res.write('\n]'); - } - res.end(); - } - }); - } getFindOptions(model: TransformableModel, originalOptions: StreamingFindOptions) { const query: any = {}; let since: any = null; @@ -232,10 +146,8 @@ export class StorageService { model: TransformableModel, originalQuery: any, originalOptions: StreamingFindOptions, - req: Request, - res: Response, transform?: (data: T) => string | Buffer - ) { + ): Readable & { close?: () => void } { const { query, options } = this.getFindOptions(model, originalOptions); const finalQuery = Object.assign({}, originalQuery, query); let cursor = model.collection @@ -247,7 +159,7 @@ export class StorageService { if (options.sort) { cursor = cursor.sort(options.sort); } - return this.apiStream(cursor, req, res); + return cursor; } } diff --git a/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts b/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts index cc6d550e62a..e2ac4de6235 100644 --- a/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts +++ b/packages/bitcore-node/src/types/namespaces/ChainStateProvider.ts @@ -61,8 +61,6 @@ export type StreamBlocksParams = ChainNetwork & { blockId?: string; sinceBlock: number | string; args?: Partial<{ startDate: Date; endDate: Date; date: Date } & StreamingFindOptions>; - req: Request; - res: Response; }; export type FeeMode = 'ECONOMICAL' | 'CONSERVATIVE'; @@ -104,14 +102,10 @@ export type GetWalletBalanceAtTimeParams = ChainNetwork & { export type StreamAddressUtxosParams = ChainNetwork & { address: string; - req?: Request; - res?: Response; args: Partial & any>; }; export type StreamTransactionsParams = ChainNetwork & { - req: Request; - res: Response; args: any; }; export type StreamTransactionParams = ChainNetwork & { @@ -119,8 +113,6 @@ export type StreamTransactionParams = ChainNetwork & { }; export type StreamWalletAddressesParams = ChainNetwork & { walletId: ObjectId; - req: Request; - res: Response; limit: number; }; @@ -141,8 +133,6 @@ export type StreamWalletMissingAddressesParams = ChainNetwork & { export type StreamWalletTransactionsParams = ChainNetwork & { wallet: MongoBound; - req: Request; - res: Response; args: StreamWalletTransactionsArgs & any; }; @@ -160,8 +150,6 @@ export type StreamWalletUtxosParams = ChainNetwork & { wallet: MongoBound; limit: number; args: Partial; - req: Request; - res: Response; }; export type isValidParams = ChainNetwork & { diff --git a/packages/bitcore-node/test/integration/ethereum/csp.test.ts b/packages/bitcore-node/test/integration/ethereum/csp.test.ts index 8b234b3e7d6..2f722e9f5c3 100644 --- a/packages/bitcore-node/test/integration/ethereum/csp.test.ts +++ b/packages/bitcore-node/test/integration/ethereum/csp.test.ts @@ -12,6 +12,7 @@ import { ETH } from '../../../src/modules/ethereum/api/csp'; import { EVMBlockStorage } from '../../../src/providers/chain-state/evm/models/block'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; import { IEVMTransactionInProcess } from '../../../src/providers/chain-state/evm/types'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { StreamWalletTransactionsParams } from '../../../src/types/namespaces/ChainStateProvider'; import { ErigonEthBlocks } from '../../data/ETH/erigonDbBlocks'; import { ErigonEthTransactions } from '../../data/ETH/erigonDbTransactions'; @@ -230,7 +231,8 @@ describe('Ethereum API', function() { transform: (_data, _, cb) => cb(null) }) as unknown) as Request; - await ETH.streamAddressTransactions({ chain, network, address, res, req, args: {} }); + const stream = await ETH.streamAddressTransactions({ chain, network, address, args: {} }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -271,7 +273,8 @@ describe('Ethereum API', function() { } }) as unknown) as Request; - await ETH.streamTransactions({ chain, network, res, req, args: { blockHeight: 1 } }); + const stream = await ETH.streamTransactions({ chain, network, args: { blockHeight: 1 } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -316,7 +319,8 @@ describe('Ethereum API', function() { } }) as unknown) as Request; - await ETH.streamTransactions({ chain, network, res, req, args: { blockHash: '12345' } }); + const stream = await ETH.streamTransactions({ chain, network, args: { blockHash: '12345' } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -379,6 +383,18 @@ describe('Ethereum API', function() { await streamDexWalletTransactions(chain, network, wallet, address, web3); }); + + it('closes the wallet-tx cursor when the final stream is destroyed', async () => { + await EVMTransactionStorage.collection.insertMany( + new Array(5).fill({}).map(() => ({ chain, network, blockHeight: 1, gasPrice: 10 * 1e9, data: Buffer.from(''), from: address } as IEVMTransactionInProcess)) + ); + const stream: any = await ETH.streamWalletTransactions({ chain, network, wallet, args: {} } as StreamWalletTransactionsParams); + const cursorCloseSpy = sandbox.spy(); + stream.on('close', cursorCloseSpy); + stream.destroy(); + await new Promise(r => setImmediate(r)); + expect(cursorCloseSpy.called).to.eq(true); + }); }); }); @@ -455,12 +471,11 @@ const streamWalletTransactionsTest = async (chain: string, network: string, incl chain, network, wallet, - req, - res, args: { includeInvalidTxs } } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); @@ -485,7 +500,8 @@ const streamDexWalletTransactions = async (chain, network, wallet, address, web3 } }) as unknown) as Request; - ETH.streamWalletTransactions({ chain, network, wallet, res, req, args: {} }); + ETH.streamWalletTransactions({ chain, network, wallet, args: {} } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)); let total = BigInt(0); let totalRejected = BigInt(0); let totalFee = BigInt(0); diff --git a/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts b/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts index c95960b5621..80916095934 100644 --- a/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts +++ b/packages/bitcore-node/test/integration/ethereum/memory-leaks.test.ts @@ -7,6 +7,7 @@ import { MongoBound } from '../../../src/models/base'; import { IWallet, WalletStorage } from '../../../src/models/wallet'; import { WalletAddressStorage } from '../../../src/models/walletAddress'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; const chain = 'ETH'; @@ -97,13 +98,8 @@ describe('EVM Memory Leak Prevention', function() { const { req, res, reqEmitter } = createMockReqRes(); const streamPromise = ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }); + chain, network, wallet, args: {} + }).then(stream => streamJsonArray(stream, req, res)); // Wait for stream to start await new Promise(resolve => setTimeout(resolve, 100)); @@ -164,13 +160,8 @@ describe('EVM Memory Leak Prevention', function() { const { req, res, reqEmitter } = createMockReqRes(); const streamPromise = ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }); + chain, network, wallet, args: {} + }).then(stream => streamJsonArray(stream, req, res)); await new Promise(resolve => setTimeout(resolve, 50)); reqEmitter.emit('close'); @@ -232,14 +223,9 @@ describe('EVM Memory Leak Prevention', function() { resEmitter.on('finish', resolve); resEmitter.on('error', reject); - ETH.streamWalletTransactions({ - chain, - network, - wallet, - req, - res, - args: {} - }).catch(reject); + ETH.streamWalletTransactions({ chain, network, wallet, args: {} }) + .then(stream => streamJsonArray(stream, req, res)) + .catch(reject); }); // Verify that we received some transactions (stream worked) diff --git a/packages/bitcore-node/test/integration/matic/csp.test.ts b/packages/bitcore-node/test/integration/matic/csp.test.ts index 20658c4da86..1fedbaecdc9 100644 --- a/packages/bitcore-node/test/integration/matic/csp.test.ts +++ b/packages/bitcore-node/test/integration/matic/csp.test.ts @@ -12,6 +12,7 @@ import { MATIC } from '../../../src/modules/matic/api/csp'; import { IEVMTransactionInProcess } from '../../../src/providers/chain-state/evm//types'; import { EVMBlockStorage } from '../../../src/providers/chain-state/evm/models/block'; import { EVMTransactionStorage } from '../../../src/providers/chain-state/evm/models/transaction'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { StreamWalletTransactionsParams } from '../../../src/types/namespaces/ChainStateProvider'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; @@ -195,7 +196,8 @@ describe('Polygon/MATIC API', function() { transform: (_data, _, cb) => cb(null) }) as unknown) as Request; - await MATIC.streamAddressTransactions({ chain, network, address, res, req, args: {} }); + const stream = await MATIC.streamAddressTransactions({ chain, network, address, args: {} }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -236,7 +238,8 @@ describe('Polygon/MATIC API', function() { } }) as unknown) as Request; - await MATIC.streamTransactions({ chain, network, res, req, args: { blockHeight: 1 } }); + const stream = await MATIC.streamTransactions({ chain, network, args: { blockHeight: 1 } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -281,7 +284,8 @@ describe('Polygon/MATIC API', function() { } }) as unknown) as Request; - await MATIC.streamTransactions({ chain, network, res, req, args: { blockHash: '12345' } }); + const stream = await MATIC.streamTransactions({ chain, network, args: { blockHash: '12345' } }); + await streamJsonArray(stream as any, req, res); let counter = 0; await new Promise(r => { res @@ -413,12 +417,11 @@ const streamWalletTransactionsTest = async (chain: string, network: string, incl chain, network, wallet, - req, - res, args: { includeInvalidTxs } } as StreamWalletTransactionsParams) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); diff --git a/packages/bitcore-node/test/integration/solana/csp.test.ts b/packages/bitcore-node/test/integration/solana/csp.test.ts index 0e2ec2c214c..a85319ccc59 100644 --- a/packages/bitcore-node/test/integration/solana/csp.test.ts +++ b/packages/bitcore-node/test/integration/solana/csp.test.ts @@ -9,6 +9,7 @@ import { IWallet, WalletStorage } from '../../../src/models/wallet'; import { WalletAddressStorage } from '../../../src/models/walletAddress'; import { SOL } from '../../../src/modules/solana/api/csp'; import { SVMRouter } from '../../../src/providers/chain-state/svm/api/routes'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { intAfterHelper, intBeforeHelper } from '../../helpers/integration'; describe('Solana API', function() { @@ -210,10 +211,9 @@ describe('Solana API', function() { chain, network, wallet, - req, - res, args: {} - }) + } as any) + .then((stream: any) => streamJsonArray(stream, req, res)) .catch(e => r(e)); }); diff --git a/packages/bitcore-node/test/unit/services/storage.test.ts b/packages/bitcore-node/test/unit/services/storage.test.ts index 3bb6e39f8cb..fc4d2e6e0db 100644 --- a/packages/bitcore-node/test/unit/services/storage.test.ts +++ b/packages/bitcore-node/test/unit/services/storage.test.ts @@ -1,4 +1,7 @@ import { expect } from 'chai'; +import { EventEmitter } from 'events'; +import { Readable } from 'stream'; +import { streamJsonArray } from '../../../src/routes/apiUtils'; import { unitAfterHelper, unitBeforeHelper } from '../../helpers/unit'; describe('Storage Service', function() { @@ -9,3 +12,80 @@ describe('Storage Service', function() { expect(true).to.equal(true); }); }); + +describe('streamJsonArray', function() { + // Minimal req/res stand-ins: req only needs 'close', res captures writes and exposes 'close'. + function fakes() { + const req = new EventEmitter() as any; + const writes: string[] = []; + const res = Object.assign(new EventEmitter(), { + type: () => res, + write: (chunk: any) => { writes.push(typeof chunk === 'string' ? chunk : chunk.toString()); return true; }, + end: () => { (res as any).ended = true; }, + }) as any; + return { req, res, writes }; + } + + it('frames objects as a JSON array', async () => { + const { req, res, writes } = fakes(); + const result = await streamJsonArray(Readable.from([{ a: 1 }, { a: 2 }], { objectMode: true }), req, res); + expect(result.success).to.equal(true); + expect(writes.join('')).to.equal('[\n{"a":1},\n{"a":2}\n]'); + }); + + it('writes [] for empty stream', async () => { + const { req, res, writes } = fakes(); + await streamJsonArray(Readable.from([], { objectMode: true }), req, res); + expect(writes.join('')).to.equal('[]'); + }); + + it('honors stream.jsonl flag (no array framing)', async () => { + const { req, res, writes } = fakes(); + const stream: any = Readable.from(['{"a":1}\n', '{"a":2}\n'], { objectMode: true }); + stream.jsonl = true; + await streamJsonArray(stream, req, res); + expect(writes.join('')).to.equal('{"a":1}\n{"a":2}\n'); + }); + + it('appends inline error marker on mid-stream error', async () => { + const { req, res, writes } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => { + stream.push({ a: 1 }); + setImmediate(() => stream.emit('error', new Error('boom'))); + }); + const result = await streamJsonArray(stream, req, res); + expect(result.success).to.equal(false); + expect(writes.join('')).to.contain('"error"'); + expect(writes.join('')).to.match(/,\n\{"error".*\}\n\]$/); + }); + + it('rejects pre-data errors so the route can send a 5xx', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => stream.emit('error', new Error('upstream down'))); + let caught: any; + await streamJsonArray(stream, req, res).catch(e => caught = e); + expect(caught).to.be.instanceOf(Error); + expect(caught.message).to.equal('upstream down'); + }); + + it('settles the promise on client disconnect', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }); + setImmediate(() => req.emit('close')); + const result = await streamJsonArray(stream, req, res); + expect(result.success).to.equal(false); + expect(result.error?.message).to.contain('disconnected'); + }); + + it('calls .close() on cursor-style streams when the client disconnects', async () => { + const { req, res } = fakes(); + const stream = new Readable({ objectMode: true, read() {} }) as any; + let closed = false; + stream.close = () => { closed = true; }; + setImmediate(() => req.emit('close')); + await streamJsonArray(stream, req, res); + expect(closed).to.equal(true); + }); +});