Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions packages/bitcore-node/src/modules/moralis/api/csp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider {

// @override
async streamBlocks(params: StreamBlocksParams) {
const { chain, network, req, res } = params;
const { chain, network } = params;
const { web3 } = await this.getWeb3(network);
const chainId = await this.getChainId({ network });
const blockRange = await this.getBlocksRange({ ...params, chainId });
Expand Down Expand Up @@ -146,8 +146,7 @@ export class MoralisStateProvider extends BaseEVMStateProvider {
}
});

return ExternalApiStream.onStream(stream, req!, res!);

return stream;
}

// @override
Expand All @@ -165,10 +164,10 @@ export class MoralisStateProvider extends BaseEVMStateProvider {

// @override
async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) {
const { req, res, args, network, address } = params;
const { args, network, address } = params;

const chainId = await this.getChainId({ network });
const txStream = await this._streamAddressTransactionsFromMoralis({
return this._streamAddressTransactionsFromMoralis({
chainId,
chain: this.chain,
network,
Expand All @@ -178,11 +177,6 @@ export class MoralisStateProvider extends BaseEVMStateProvider {
...args
}
});
// TODO unify `ExternalApiStream.onStream` and `Storage.apiStream` which are effectively doing the same thing
const result = await ExternalApiStream.onStream(txStream, req!, res!);
if (!result?.success) {
logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error);
}
}

// @override
Expand Down
8 changes: 2 additions & 6 deletions packages/bitcore-node/src/modules/multiProvider/api/csp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider {
// @override — sequential failover with preflight check.
// Buffers first item before piping to response; failover only before response bytes are written.
async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) {
const { req, res, args, network, address } = params;
const { args, network, address } = params;
const chainId = await this.getChainId({ network });
const providers = this.getProvidersForNetwork(network);
const PREFLIGHT_TIMEOUT_MS = 5000;
Expand Down Expand Up @@ -261,11 +261,7 @@ export class MultiProviderEVMStateProvider extends BaseEVMStateProvider {
txStream.resume();
}

const result = await ExternalApiStream.onStream(outputStream, req!, res!);
if (!result?.success) {
logger.error('Error mid-stream (streamAddressTransactions): %o', result.error?.log || result.error);
}
return; // Stream handled
return outputStream;
} catch (error) {
if (error instanceof AdapterError && (error as AdapterError).code === AdapterErrorCode.INVALID_REQUEST) throw error; // 400 — no failover
provider.health.recordFailure(error as Error);
Expand Down
5 changes: 2 additions & 3 deletions packages/bitcore-node/src/modules/ripple/api/csp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { CacheStorage } from '../../../models/cache';
import { ICoin } from '../../../models/coin';
import { WalletAddressStorage } from '../../../models/walletAddress';
import { InternalStateProvider } from '../../../providers/chain-state/internal/internal';
import { Storage } from '../../../services/storage';
import { IBlock } from '../../../types/Block';
import { ChainNetwork } from '../../../types/ChainNetwork';
import {
Expand Down Expand Up @@ -305,7 +304,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain
const transformed = txs.map(tx => this.transformAccountTx(tx, params.network));
this.streamTxs(transformed, readable);
readable.push(null);
Storage.stream(readable, params.req!, params.res!);
return readable;
}

async streamTransactions(params: StreamTransactionsParams) {
Expand All @@ -316,7 +315,7 @@ export class RippleStateProvider extends InternalStateProvider implements IChain
const txs = ledger.transactions || [];
this.streamTxs(txs, readable);
readable.push(null);
Storage.stream(readable, params.req, params.res);
return readable;
}

async getTransaction(params: StreamTransactionParams) {
Expand Down
135 changes: 62 additions & 73 deletions packages/bitcore-node/src/providers/chain-state/evm/api/csp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Readable } from 'stream';
import { CryptoRpc } from '@bitpay-labs/crypto-rpc';
import { Utils, Web3, type Web3Types } from '@bitpay-labs/crypto-wallet-core';
import {
Expand All @@ -18,7 +19,7 @@ import { SpentHeightIndicators } from '../../../../types/Coin';
import { normalizeChainNetwork, partition, range } from '../../../../utils';
import { StatsUtil } from '../../../../utils/stats';
import { TransformWithEventPipe } from '../../../../utils/streamWithEventPipe';
import { ExternalApiStream } from '../../external/streams/apiStream';
import { AdapterError, AdapterErrorCode } from '../../external/adapters/errors';
import { AavePoolAbi } from '../abi/aavePool';
import { AavePoolAbiV2 } from '../abi/aavePoolV2';
import { ERC20Abi } from '../abi/erc20';
Expand Down Expand Up @@ -59,6 +60,10 @@ export interface BuildWalletTxsStreamParams {
transactionStream: TransformWithEventPipe;
populateEffects: PopulateEffectsForAddressTransform;
walletAddresses: string[];
// _buildWalletTransactionsStream pushes teardown callbacks here (e.g. cursor.close).
// streamWalletTransactions runs them when the FINAL piped stream closes/ends, so the
// hook lives on the stream the route actually destroys on disconnect.
cleanups?: Array<() => void>;
}


Expand Down Expand Up @@ -531,18 +536,11 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
}

async streamAddressTransactions(params: StreamAddressUtxosParams) {
return new Promise<void>(async (resolve, reject) => {
try {
await this._buildAddressTransactionsStream(params);
return resolve();
} catch (err) {
return reject(err);
}
});
return this._buildAddressTransactionsStream(params);
}

async _buildAddressTransactionsStream(params: StreamAddressUtxosParams) {
const { req, res, args, chain, network, address } = params;
const { args, chain, network, address } = params;
const { limit, /* since,*/ tokenAddress } = args;

if (!args.tokenAddress) {
Expand All @@ -557,23 +555,20 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai

// NOTE: commented out since and paging for now b/c they were causing extra long query times on insight.
// The case where an address has >1000 txns is an edge case ATM and can be addressed later
Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ }, req!, res!);
} else {
try {
const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args);
res!.json(tokenTransfers);
} catch (err: any) {
logger.error('Error streaming address transactions: %o', err.stack || err.message || err);
throw err;
}
return Storage.apiStreamingFind(EVMTransactionStorage, query, { limit /* since, paging: '_id'*/ });
}
const tokenTransfers = await this.getErc20Transfers(network, address, tokenAddress, args);
// Streams elements one-by-one so the route wraps them via streamJsonArray.
// The response remains a JSON array of the same N transfer objects; only inter-element
// whitespace differs from the prior res.json() output (compact `[..]` vs newline-separated).
return Readable.from(tokenTransfers, { objectMode: true });
}


@historical
@internal
async streamTransactions(params: StreamTransactionsParams) {
const { chain, network, req, res, args } = params;
const { chain, network, args } = params;
const { blockHash, blockHeight } = args;
if (!chain || !network) {
throw new Error('Missing chain or network');
Expand All @@ -590,7 +585,7 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
}
const tip = await this.getLocalTip(params);
const tipHeight = tip ? tip.height : 0;
return Storage.apiStreamingFind(EVMTransactionStorage, query, args, req, res, t => {
return Storage.apiStreamingFind(EVMTransactionStorage, query, args, t => {
let confirmations = 0;
if (t.blockHeight !== undefined && t.blockHeight >= 0) {
confirmations = tipHeight - t.blockHeight + 1;
Expand Down Expand Up @@ -677,47 +672,48 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
}

async streamWalletTransactions(params: StreamWalletTransactionsParams) {
return new Promise<void>(async (resolve, reject) => {
const { network, wallet, req, res, args } = params;
const { web3 } = await this.getWeb3(network);
args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined;
const { network, wallet, args } = params;
const { web3 } = await this.getWeb3(network);
args.tokenAddress = args.tokenAddress ? web3.utils.toChecksumAddress(args.tokenAddress) : undefined;

let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true });
const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address);
if (walletAddresses.length === 0) {
// Status remains 400 via respondWithError; body shape changes from text/plain to
// the JSON {error, message} shape used by every other 4xx path.
throw new AdapterError('walletAddresses', AdapterErrorCode.INVALID_REQUEST, 'No addresses found for wallet');
}
const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress);
const populateReceipt = new PopulateReceiptTransform(this);
const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses);

const cleanups: Array<() => void> = [];
const streamParams: BuildWalletTxsStreamParams = {
transactionStream,
populateEffects,
walletAddresses,
cleanups
};
transactionStream = await this._buildWalletTransactionsStream(params, streamParams);

let transactionStream = new TransformWithEventPipe({ objectMode: true, passThrough: true });
const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(waddres => waddres.address);
if (walletAddresses.length === 0) {
res.status(400).send('No addresses found for wallet');
return resolve();
}
const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses, args.tokenAddress);
const populateReceipt = new PopulateReceiptTransform(this);
const populateEffects = new PopulateEffectsForAddressTransform(this, walletAddresses);

const streamParams: BuildWalletTxsStreamParams = {
transactionStream,
populateEffects,
walletAddresses
};
transactionStream = await this._buildWalletTransactionsStream(params, streamParams);
if (!args.tokenAddress && wallet._id) {
const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id);
transactionStream = transactionStream.eventPipe(internalTxTransform);
}

if (!args.tokenAddress && wallet._id) {
const internalTxTransform = new InternalTxRelatedFilterTransform(web3, wallet._id);
transactionStream = transactionStream.eventPipe(internalTxTransform);
}
transactionStream = transactionStream
.eventPipe(populateReceipt)
.eventPipe(ethTransactionTransform);

transactionStream = transactionStream
.eventPipe(populateReceipt)
.eventPipe(ethTransactionTransform);
// Run upstream teardown callbacks (e.g. cursor.close) when the FINAL stream the route
// pipes from closes or ends. destroy() on this stream does not reliably propagate
// upstream through eventPipe chains, so the cleanup must live here.
const runCleanups = () => { for (const fn of cleanups) { try { fn(); } catch { /* noop */ } } };
transactionStream.on('close', runCleanups);
transactionStream.on('end', runCleanups);

try {
const result = await ExternalApiStream.onStream(transactionStream, req!, res!, { jsonl: true });
if (!result?.success) {
logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error);
}
return resolve();
} catch (err) {
return reject(err);
}
});
(transactionStream as any).jsonl = true;
return transactionStream;
}

async _buildWalletTransactionsStream(params: StreamWalletTransactionsParams, streamParams: BuildWalletTxsStreamParams) {
Expand All @@ -731,22 +727,15 @@ export class BaseEVMStateProvider extends InternalStateProvider implements IChai
.sort({ blockTimeNormalized: 1 })
.addCursorFlag('noCursorTimeout', true);

// Add cleanup handlers when client disconnects
// Cursor cleanup is registered with the caller and triggered against the final piped
// stream. Hooking it here against the intermediate transform would miss disconnects
// because destroy() does not propagate upstream through eventPipe chains reliably.
let cursorClosed = false;
const cleanupCursor = () => {
if (!cursorClosed) {
cursorClosed = true;
try {
cursor.close();
} catch {
// Cursor might already be closed, ignore
}
}
};

const { req, res } = params;
req.on('close', cleanupCursor);
res.on('close', cleanupCursor);
streamParams.cleanups?.push(() => {
if (cursorClosed) return;
cursorClosed = true;
try { cursor.close(); } catch { /* already closed */ }
});

// Pipe cursor to transform stream
transactionStream = cursor.pipe(new TransformWithEventPipe({ objectMode: true, passThrough: true }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export class GnosisApi {
}

async streamGnosisWalletTransactions(params: { multisigContractAddress: string } & StreamWalletTransactionsParams) {
const { chain, network, multisigContractAddress, res, args } = params;
const { chain, network, multisigContractAddress, args } = params;
const transactionQuery = getCSP(chain, network).getWalletTransactionQuery(params);
delete transactionQuery.wallets;
delete transactionQuery['wallets.0'];
Expand Down Expand Up @@ -201,7 +201,6 @@ export class GnosisApi {
.sort({ blockTimeNormalized: 1 })
.addCursorFlag('noCursorTimeout', true);

// Add cleanup handlers when client disconnects
let cursorClosed = false;
const cleanupCursor = () => {
if (!cursorClosed) {
Expand All @@ -214,21 +213,20 @@ export class GnosisApi {
}
};

const { req } = params;
req.on('close', cleanupCursor);
res.on('close', cleanupCursor);

transactionStream = cursor.pipe(populateEffects); // For old db entries

if (multisigContractAddress) {
const multisigTransform = new MultisigRelatedFilterTransform(multisigContractAddress, args.tokenAddress);
transactionStream = transactionStream.pipe(multisigTransform);
}

transactionStream
const finalStream: any = transactionStream
.pipe(populateReceipt)
.pipe(ethTransactionTransform)
.pipe(res);
.pipe(ethTransactionTransform);
finalStream.jsonl = true;
finalStream.on('close', cleanupCursor);
finalStream.on('end', cleanupCursor);
return finalStream;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Router } from 'express';
import config from '../../../../config';
import logger from '../../../../logger';
import { WebhookStorage } from '../../../../models/webhook';
import { respondWithError, streamJsonArray } from '../../../../routes/apiUtils';
import { Config } from '../../../../services/config';
import { IEVMNetworkConfig } from '../../../../types/Config';
import { castToBool } from '../../../../utils';
Expand Down Expand Up @@ -235,22 +236,25 @@ export class EVMRouter {
});
};

private streamGnosisWalletTransactions(router: Router) {
private streamGnosisWalletTransactions(router: Router) {
router.get(`/api/${this.chain}/:network/ethmultisig/transactions/:multisigContractAddress`, async (req, res) => {
const { network, multisigContractAddress } = req.params;
try {
return await Gnosis.streamGnosisWalletTransactions({
const stream = await Gnosis.streamGnosisWalletTransactions({
chain: this.chain,
network,
multisigContractAddress,
wallet: {} as any,
req,
res,
args: req.query
});
const result = await streamJsonArray(stream, req, res);
if (!result.success) {
logger.error('Error mid-stream (streamGnosisWalletTransactions): %o', result.error?.log || result.error);
}
return;
} catch (err: any) {
logger.error('Multisig Transactions Error::%o', err.stack || err.message || err);
return res.status(500).send(err.message || err);
return respondWithError(res, err);
}
});
};
Expand Down
Loading