diff --git a/go.mod b/go.mod index 2b35af710f3..5381fc36b7a 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ethereum/go-ethereum v1.16.9 github.com/ethersphere/batch-archive v0.0.7 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.4 + github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.19.0 diff --git a/go.sum b/go.sum index 4bebb87cec2..c9711f1d345 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,8 @@ github.com/ethersphere/batch-archive v0.0.7 h1:vb616eZIU5znxYiUSIBrPBD3/T4scEBNb github.com/ethersphere/batch-archive v0.0.7/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= -github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index babd816dd06..d2112105363 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -757,7 +757,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs, *redistribution.ClaimOpts) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) diff --git a/pkg/postage/postagecontract/contract.go b/pkg/postage/postagecontract/contract.go index 349ebd01f9a..11f04e99613 100644 --- a/pkg/postage/postagecontract/contract.go +++ b/pkg/postage/postagecontract/contract.go @@ -47,6 +47,7 @@ type Interface interface { DiluteBatch(ctx context.Context, batchID []byte, newDepth uint8) (common.Hash, error) Paused(ctx context.Context) (bool, error) MinimumValidityBlocks(ctx context.Context) (uint64, error) + ExpectedReward(ctx context.Context) (*big.Int, error) PostageBatchExpirer } @@ -339,6 +340,15 @@ func (c *postageContract) getProperty(ctx context.Context, propertyName string, return nil } +// ExpectedReward returns the current redistribution pot (totalPot) from the postage stamp contract. +func (c *postageContract) ExpectedReward(ctx context.Context) (*big.Int, error) { + pot := new(big.Int) + if err := c.getProperty(ctx, "totalPot", pot); err != nil { + return nil, fmt.Errorf("totalPot: %w", err) + } + return pot, nil +} + func (c *postageContract) getMinInitialBalance(ctx context.Context) (uint64, error) { var lastPrice uint64 err := c.getProperty(ctx, "lastPrice", &lastPrice) @@ -569,6 +579,10 @@ func (m *noOpPostageContract) ExpireBatches(context.Context) error { return ErrChainDisabled } +func (m *noOpPostageContract) ExpectedReward(context.Context) (*big.Int, error) { + return nil, ErrChainDisabled +} + func LookupERC20Address(ctx context.Context, transactionService transaction.Service, postageStampContractAddress common.Address, postageStampContractABI abi.ABI, chainEnabled bool) (common.Address, error) { if !chainEnabled { return common.Address{}, nil diff --git a/pkg/postage/postagecontract/contract_test.go b/pkg/postage/postagecontract/contract_test.go index 6592a7f23a7..66d56f2c455 100644 --- a/pkg/postage/postagecontract/contract_test.go +++ b/pkg/postage/postagecontract/contract_test.go @@ -74,7 +74,7 @@ func TestCreateBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: return txHashApprove, &types.Receipt{Status: 1}, nil @@ -308,7 +308,7 @@ func TestTopUpBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { switch *request.To { case bzzTokenAddress: return txHashApprove, &types.Receipt{Status: 1}, nil @@ -468,7 +468,7 @@ func TestDiluteBatch(t *testing.T) { postageStampContractABI, bzzTokenAddress, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == postageStampAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { return txHashApprove, &types.Receipt{Status: 1}, nil @@ -630,7 +630,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { return common.Hash{}, &types.Receipt{Status: 1}, nil }), ), @@ -768,7 +768,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == postageContractAddress { if bytes.Equal(expectedCallDataForExpireLimitedBatches[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("some error") @@ -891,7 +891,7 @@ func TestBatchExpirer(t *testing.T) { } } return nil, errors.New("unexpected call") - }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + }), transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { return common.Hash{}, &types.Receipt{Status: 0}, transaction.ErrTransactionReverted }), ), diff --git a/pkg/postage/postagecontract/mock/contract.go b/pkg/postage/postagecontract/mock/contract.go index c8d1879f9eb..65e8055743f 100644 --- a/pkg/postage/postagecontract/mock/contract.go +++ b/pkg/postage/postagecontract/mock/contract.go @@ -19,6 +19,7 @@ type contractMock struct { expireBatches func(ctx context.Context) error paused func(ctx context.Context) (bool, error) minimumValidityBlocks func(ctx context.Context) (uint64, error) + expectedReward func(ctx context.Context) (*big.Int, error) } func (c *contractMock) CreateBatch(ctx context.Context, initialBalance *big.Int, depth uint8, immutable bool, label string) (common.Hash, []byte, error) { @@ -45,6 +46,13 @@ func (c *contractMock) MinimumValidityBlocks(ctx context.Context) (uint64, error return c.minimumValidityBlocks(ctx) } +func (c *contractMock) ExpectedReward(ctx context.Context) (*big.Int, error) { + if c.expectedReward != nil { + return c.expectedReward(ctx) + } + return big.NewInt(1_000_000), nil +} + // Option is an option passed to New type Option func(*contractMock) @@ -94,3 +102,9 @@ func WithMinimumValidityBlocksFunc(f func(ctx context.Context) (uint64, error)) mock.minimumValidityBlocks = f } } + +func WithExpectedRewardFunc(f func(ctx context.Context) (*big.Int, error)) Option { + return func(m *contractMock) { + m.expectedReward = f + } +} diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 5142e97836e..bb9341851f3 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -41,6 +41,10 @@ const ( // average tx gas used by transactions issued from agent avgTxGas = 250_000 + + // forceClaimBlocksBeforeEnd is how many blocks before round end claim may + // bypass max-tx-cost when economics justify it (see redistribution.ClaimOpts). + forceClaimBlocksBeforeEnd = 10 ) type ChainBackend interface { @@ -59,8 +63,9 @@ type Agent struct { metrics metrics backend ChainBackend blocksPerRound uint64 + blockTime time.Duration contract redistribution.Contract - batchExpirer postagecontract.PostageBatchExpirer + postageContract postagecontract.Interface redistributionStatuser staking.RedistributionStatuser store storer.Reserve fullSyncedFunc func() bool @@ -78,7 +83,7 @@ func New(overlay swarm.Address, ethAddress common.Address, backend ChainBackend, contract redistribution.Contract, - batchExpirer postagecontract.PostageBatchExpirer, + postageContract postagecontract.Interface, redistributionStatuser staking.RedistributionStatuser, store storer.Reserve, fullSyncedFunc func() bool, @@ -98,10 +103,11 @@ func New(overlay swarm.Address, backend: backend, logger: logger.WithName(loggerName).Register(), contract: contract, - batchExpirer: batchExpirer, + postageContract: postageContract, store: store, fullSyncedFunc: fullSyncedFunc, blocksPerRound: blocksPerRound, + blockTime: blockTime, quit: make(chan struct{}), redistributionStatuser: redistributionStatuser, health: health, @@ -116,7 +122,7 @@ func New(overlay swarm.Address, a.state = state a.wg.Add(1) - go a.start(blockTime, a.blocksPerRound, blocksPerPhase) + go a.start(a.blockTime, a.blocksPerRound, blocksPerPhase) return a, nil } @@ -311,7 +317,7 @@ func (a *Agent) handleReveal(ctx context.Context, round uint64) error { a.metrics.ErrReveal.Inc() return err } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) a.state.SetHasRevealed(round) @@ -344,7 +350,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { // In case when there are too many expired batches, Claim trx could runs out of gas. // To prevent this, node should first expire batches before Claiming a reward. - err = a.batchExpirer.ExpireBatches(ctx) + err = a.postageContract.ExpireBatches(ctx) if err != nil { a.logger.Info("expire batches failed", "err", err) // Even when error happens, proceed with claim handler @@ -353,7 +359,7 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { errBalance := a.state.SetBalance(ctx) if errBalance != nil { - a.logger.Info("could not set balance", "err", err) + a.logger.Info("could not set balance", "err", errBalance) } sampleData, exists := a.state.SampleData(round - 1) @@ -371,8 +377,33 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { return fmt.Errorf("making inclusion proofs: %w", err) } - txHash, err := a.contract.Claim(ctx, proofs) + claimCtx := ctx + phaseEndBlock := (round+1)*a.blocksPerRound - 1 + if rem := int64(phaseEndBlock) - int64(a.state.currentBlock()); rem > 0 { + var cancel context.CancelFunc + claimCtx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Duration(rem)*a.blockTime)) + defer cancel() + } + + reward, err := a.postageContract.ExpectedReward(ctx) + if err != nil { + a.logger.Warning("could not estimate claim reward, override max_tx_cost option will be disabled", "error", err) + } + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: (round+1)*a.blocksPerRound - forceClaimBlocksBeforeEnd, + CurrentBlockFn: func() uint64 { return a.state.currentBlock() }, + ExpectedReward: reward, + RoundFees: a.state.RoundFees(round), + } + + txHash, err := a.contract.Claim(claimCtx, proofs, opts) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + a.logger.Info("claim aborted by context", "round", round, "err", err) + a.metrics.SkippedExpensivePhase.Inc() + return nil + } a.metrics.ErrClaim.Inc() return fmt.Errorf("claiming win: %w", err) } @@ -382,11 +413,11 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { if errBalance == nil { errReward := a.state.CalculateWinnerReward(ctx) if errReward != nil { - a.logger.Info("calculate winner reward", "err", err) + a.logger.Info("calculate winner reward", "err", errReward) } } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) return nil } @@ -539,7 +570,7 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err a.metrics.ErrCommit.Inc() return err } - a.state.AddFee(ctx, txHash) + a.state.AddRoundFee(ctx, round, txHash) a.state.SetCommitKey(round, key) diff --git a/pkg/storageincentives/agent_test.go b/pkg/storageincentives/agent_test.go index 6449ede9059..cbc28d8a813 100644 --- a/pkg/storageincentives/agent_test.go +++ b/pkg/storageincentives/agent_test.go @@ -307,7 +307,7 @@ func (m *mockContract) IsWinner(context.Context) (bool, error) { return false, nil } -func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs) (common.Hash, error) { +func (m *mockContract) Claim(context.Context, redistribution.ChunkInclusionProofs, *redistribution.ClaimOpts) (common.Hash, error) { m.mtx.Lock() defer m.mtx.Unlock() m.callsList = append(m.callsList, claimCall) diff --git a/pkg/storageincentives/metrics.go b/pkg/storageincentives/metrics.go index b376d9d20b2..3db40b39caa 100644 --- a/pkg/storageincentives/metrics.go +++ b/pkg/storageincentives/metrics.go @@ -31,6 +31,9 @@ type metrics struct { ErrClaim prometheus.Counter ErrWinner prometheus.Counter ErrCheckIsPlaying prometheus.Counter + + // cost control metrics + SkippedExpensivePhase prometheus.Counter } func newMetrics() metrics { @@ -137,6 +140,12 @@ func newMetrics() metrics { Name: "is_playing_errors", Help: "total neighborhood selected errors while processing", }), + SkippedExpensivePhase: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "skipped_expensive_phase", + Help: "Count of phases skipped because estimated tx cost exceeded configured limit.", + }), } } diff --git a/pkg/storageincentives/redistribution/redistribution.go b/pkg/storageincentives/redistribution/redistribution.go index 98cf737d3fe..81a48ecd4a4 100644 --- a/pkg/storageincentives/redistribution/redistribution.go +++ b/pkg/storageincentives/redistribution/redistribution.go @@ -8,26 +8,40 @@ import ( "context" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/log" - "github.com/ethersphere/bee/v2/pkg/sctx" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/transaction" ) const ( - loggerName = "redistributionContract" - // BoostTipPercent is used where the node sends transactions without retry. + loggerName = "redistributionContract" BoostTipPercent = 50 + + minEstimatedGasLimit = 250_000 + + // redistributionGameTransactionsRetryDelay caps the retry delay for redistribution game txs. + redistributionGameTransactionsRetryDelay = 35 * time.Second ) +// ClaimOpts configures optional claim behaviour: after OverrideAfterBlock (absolute +// chain block number), if ExpectedReward covers upper-bound claim cost plus +// RoundFees, the max-tx-cost limit is bypassed for that send attempt. +type ClaimOpts struct { + OverrideAfterBlock uint64 + CurrentBlockFn func() uint64 + ExpectedReward *big.Int + RoundFees *big.Int +} + type Contract interface { ReserveSalt(context.Context) ([]byte, error) IsPlaying(context.Context, uint8) (bool, error) IsWinner(context.Context) (bool, error) - Claim(context.Context, ChunkInclusionProofs) (common.Hash, error) + Claim(context.Context, ChunkInclusionProofs, *ClaimOpts) (common.Hash, error) Commit(context.Context, []byte, uint64) (common.Hash, error) Reveal(context.Context, uint8, []byte, []byte) (common.Hash, error) } @@ -40,6 +54,16 @@ type contract struct { incentivesContractAddress common.Address incentivesContractABI abi.ABI gasLimit uint64 + retryDelayRewrite func(time.Duration) time.Duration +} + +type Option func(*contract) + +// WithRetryDelayRewrite sets a function that rewrites the configured SendWithRetry delay. +func WithRetryDelayRewrite(fn func(time.Duration) time.Duration) Option { + return func(c *contract) { + c.retryDelayRewrite = fn + } } func New( @@ -50,8 +74,9 @@ func New( incentivesContractAddress common.Address, incentivesContractABI abi.ABI, gasLimit uint64, + opts ...Option, ) Contract { - return &contract{ + c := &contract{ overlay: overlay, owner: owner, logger: logger.WithName(loggerName).Register(), @@ -60,6 +85,14 @@ func New( incentivesContractABI: incentivesContractABI, gasLimit: gasLimit, } + for _, opt := range opts { + opt(c) + } + + if c.retryDelayRewrite == nil { + c.retryDelayRewrite = capRetryDelay + } + return c } // IsPlaying checks if the overlay is participating in the upcoming round. @@ -101,8 +134,12 @@ func (c *contract) IsWinner(ctx context.Context) (isWinner bool, err error) { return results[0].(bool), nil } -// Claim sends a transaction to blockchain if a win is claimed. -func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (common.Hash, error) { +// Claim sends a transaction to blockchain if a win is claimed. When opts is +// non-nil and the configured max-tx-price would block the broadcast, +// canOverrideClaim is consulted: if the override block threshold has passed +// and ExpectedReward covers the estimated cost plus previous round fees, +// the price cap is bypassed. +func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs, opts *ClaimOpts) (txHash common.Hash, err error) { callData, err := c.incentivesContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) if err != nil { return common.Hash{}, err @@ -110,20 +147,61 @@ func (c *contract) Claim(ctx context.Context, proofs ChunkInclusionProofs) (comm request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "claim win transaction", } - txHash, err := c.sendAndWait(ctx, request) + + retryOpts := []transaction.RetryOption{ + transaction.WithIgnoreMaxPrice(func(gasFeeCap *big.Int) bool { + return c.canOverrideClaim(opts, gasFeeCap) + }), + } + + txHash, err = c.sendAndWait(ctx, request, retryOpts...) if err != nil { return txHash, fmt.Errorf("claim: %w", err) } - return txHash, nil } +// canOverrideClaim decides whether the claim transaction should bypass the +// max-tx-price cap. gasFeeCap is the actual max fee per gas (wei) that the +// retry loop wants to use — it is provided by suggestGasFeeGasTipCapWithHistory +// so there is no redundant estimation. +func (c *contract) canOverrideClaim(opts *ClaimOpts, gasFeeCap *big.Int) bool { + if opts == nil || opts.OverrideAfterBlock == 0 || opts.CurrentBlockFn == nil || opts.RoundFees == nil { + return false + } + + if opts.CurrentBlockFn() < opts.OverrideAfterBlock { + return false + } + + if opts.ExpectedReward == nil || opts.ExpectedReward.Sign() <= 0 { + return false + } + + gasUnits := c.gasLimit + if gasUnits <= 0 { + gasUnits = minEstimatedGasLimit + } + + txCost := new(big.Int).Mul(gasFeeCap, big.NewInt(int64(gasUnits))) + totalSpent := new(big.Int).Add(txCost, opts.RoundFees) + if opts.ExpectedReward.Cmp(totalSpent) < 0 { + c.logger.Info("claim override: reward does not cover cost", + "tx_cost", txCost, + "round_fees", opts.RoundFees, + "total_spent", totalSpent, + "expected_reward", opts.ExpectedReward, + ) + return false + } + return true +} + // Commit submits the obfusHash hash by sending a transaction to the blockchain. func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) (common.Hash, error) { callData, err := c.incentivesContractABI.Pack("commit", common.BytesToHash(obfusHash), round) @@ -133,9 +211,8 @@ func (c *contract) Commit(ctx context.Context, obfusHash []byte, round uint64) ( request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "commit transaction", } @@ -156,9 +233,8 @@ func (c *contract) Reveal(ctx context.Context, storageDepth uint8, reserveCommit request := &transaction.TxRequest{ To: &c.incentivesContractAddress, Data: callData, - GasPrice: sctx.GetGasPrice(ctx), - GasLimit: max(sctx.GetGasLimit(ctx), c.gasLimit), - MinEstimatedGasLimit: 500_000, + GasLimit: c.gasLimit, + MinEstimatedGasLimit: minEstimatedGasLimit, Value: big.NewInt(0), Description: "reveal transaction", } @@ -190,7 +266,11 @@ func (c *contract) ReserveSalt(ctx context.Context) ([]byte, error) { return salt[:], nil } -func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, err error) { +func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (txHash common.Hash, err error) { + if c.retryDelayRewrite != nil { + opts = append(opts, transaction.WithRetryDelay(c.retryDelayRewrite)) + } + defer func() { err = c.txService.UnwrapABIError( ctx, @@ -200,14 +280,8 @@ func (c *contract) sendAndWait(ctx context.Context, request *transaction.TxReque ) }() - txHash, receipt, err := c.txService.SendWithRetry(ctx, request) - if err != nil { - return txHash, err - } - if receipt == nil { - return txHash, fmt.Errorf("missing receipt after send with retry") - } - return txHash, nil + txHash, _, err = c.txService.SendWithRetry(ctx, request, opts...) + return txHash, err } // callTx simulates a transaction based on tx request. @@ -221,3 +295,12 @@ func (c *contract) callTx(ctx context.Context, callData []byte) ([]byte, error) } return result, nil } + +// capRetryDelay limits the retry delay for redistribution game transactions +// to redistributionGameTransactionsRetryDelay. +func capRetryDelay(d time.Duration) time.Duration { + if d > redistributionGameTransactionsRetryDelay { + return redistributionGameTransactionsRetryDelay + } + return d +} diff --git a/pkg/storageincentives/redistribution/redistribution_test.go b/pkg/storageincentives/redistribution/redistribution_test.go index 85ec99b7b19..fe604068723 100644 --- a/pkg/storageincentives/redistribution/redistribution_test.go +++ b/pkg/storageincentives/redistribution/redistribution_test.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/big" + "sync/atomic" "testing" "github.com/ethereum/go-ethereum/common" @@ -24,9 +25,12 @@ import ( transactionMock "github.com/ethersphere/bee/v2/pkg/transaction/mock" "github.com/ethersphere/bee/v2/pkg/util/abiutil" "github.com/ethersphere/bee/v2/pkg/util/testutil" + "github.com/stretchr/testify/assert" ) -var redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +var ( + redistributionContractABI = abiutil.MustParseABI(chaincfg.Testnet.RedistributionABI) +) func randChunkInclusionProof(t *testing.T) redistribution.ChunkInclusionProof { t.Helper() @@ -203,7 +207,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -218,7 +222,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Claim(ctx, proofs) + _, err = contract.Claim(ctx, proofs, nil) if err != nil { t.Fatal(err) } @@ -237,7 +241,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -252,7 +256,7 @@ func TestRedistribution(t *testing.T) { 0, ) - _, err = contract.Claim(ctx, proofs) + _, err = contract.Claim(ctx, proofs, nil) if !errors.Is(err, transaction.ErrTransactionReverted) { t.Fatal(err) } @@ -272,7 +276,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -309,7 +313,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData[:32], request.Data[:32]) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -401,7 +405,7 @@ func TestRedistribution(t *testing.T) { owner, log.Noop, transactionMock.New( - transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if *request.To == redistributionContractAddress { if !bytes.Equal(expectedCallData, request.Data) { return common.Hash{}, nil, fmt.Errorf("got wrong call data. wanted %x, got %x", expectedCallData, request.Data) @@ -422,3 +426,151 @@ func TestRedistribution(t *testing.T) { } }) } + +func TestCommit_CriticalErrorFails(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + testobfus := common.Hex2Bytes("hash") + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, _ *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + return common.Hash{}, nil, transaction.ErrTransactionReverted + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + _, err := c.Commit(ctx, testobfus, 0) + assert.Error(t, err) + assert.ErrorIs(t, err, transaction.ErrTransactionReverted) +} + +func TestCommit_withoutGasFeeCapOnRequest(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + testobfus := common.Hex2Bytes("hash") + expectedHash := common.HexToHash("bbbb") + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, request *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + assert.Nil(t, request.GasFeeCap) + assert.Nil(t, request.GasPrice) + return expectedHash, &types.Receipt{Status: 1}, nil + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + h, err := c.Commit(ctx, testobfus, 0) + assert.NoError(t, err) + assert.Equal(t, expectedHash, h) +} + +func TestClaim_sendsWithRetryOptions(t *testing.T) { + t.Parallel() + + ctx := context.Background() + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + proofs := randChunkInclusionProofs(t) + expectedHash := common.HexToHash("cafe") + + var sendCalls atomic.Int32 + var retryOptsLen int + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(_ context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + sendCalls.Add(1) + retryOptsLen = len(opts) + callData, err := redistributionContractABI.Pack("claim", proofs.A, proofs.B, proofs.C) + assert.NoError(t, err) + assert.Equal(t, callData, request.Data) + return expectedHash, &types.Receipt{Status: 1}, nil + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: 100, + CurrentBlockFn: func() uint64 { return 110 }, + ExpectedReward: new(big.Int).Mul(big.NewInt(50), big.NewInt(1_000_000)), + RoundFees: big.NewInt(100_000), + } + + h, err := c.Claim(ctx, proofs, opts) + assert.NoError(t, err) + assert.Equal(t, expectedHash, h) + assert.EqualValues(t, 1, sendCalls.Load()) + assert.Equal(t, 2, retryOptsLen, "Claim must pass WithIgnoreMaxPrice and WithRetryDelay retry options") +} + +func TestClaim_contextCanceled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + owner := common.HexToAddress("abcd") + overlay := swarm.NewAddress(common.HexToHash("cbd").Bytes()) + redistributionContractAddress := common.HexToAddress("ffff") + proofs := randChunkInclusionProofs(t) + + txSvc := transactionMock.New( + transactionMock.WithSendWithRetryFunc(func(ctx context.Context, _ *transaction.TxRequest, _ ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { + return common.Hash{}, nil, ctx.Err() + }), + ) + + c := redistribution.New( + overlay, + owner, + log.Noop, + txSvc, + redistributionContractAddress, + redistributionContractABI, + 0, + ) + + opts := &redistribution.ClaimOpts{ + OverrideAfterBlock: 100, + CurrentBlockFn: func() uint64 { return 200 }, + ExpectedReward: big.NewInt(1000), + RoundFees: big.NewInt(1), + } + + _, err := c.Claim(ctx, proofs, opts) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/pkg/storageincentives/redistributionstate.go b/pkg/storageincentives/redistributionstate.go index 9929a9e7831..adbcbdcbcc3 100644 --- a/pkg/storageincentives/redistributionstate.go +++ b/pkg/storageincentives/redistributionstate.go @@ -61,6 +61,7 @@ type RoundData struct { CommitKey []byte SampleData *SampleData HasRevealed bool + RoundFees *big.Int } type SampleData struct { @@ -184,8 +185,8 @@ func (r *RedistributionState) SetLastSelectedRound(round uint64) { r.save() } -// AddFee sets the internal node status -func (r *RedistributionState) AddFee(ctx context.Context, txHash common.Hash) { +// AddRoundFee tracks fees spent in a specific round. +func (r *RedistributionState) AddRoundFee(ctx context.Context, round uint64, txHash common.Hash) { fee, err := r.txService.TransactionFee(ctx, txHash) if err != nil { return @@ -194,10 +195,28 @@ func (r *RedistributionState) AddFee(ctx context.Context, txHash common.Hash) { r.mtx.Lock() defer r.mtx.Unlock() + rd := r.status.RoundData[round] + if rd.RoundFees == nil { + rd.RoundFees = new(big.Int) + } + rd.RoundFees.Add(rd.RoundFees, fee) + r.status.RoundData[round] = rd r.status.Fees.Add(r.status.Fees, fee) r.save() } +// RoundFees returns the total fees spent in a given round. +func (r *RedistributionState) RoundFees(round uint64) *big.Int { + r.mtx.Lock() + defer r.mtx.Unlock() + + rd, ok := r.status.RoundData[round] + if !ok || rd.RoundFees == nil { + return new(big.Int) + } + return new(big.Int).Set(rd.RoundFees) +} + // CalculateWinnerReward calculates the reward for the winner func (r *RedistributionState) CalculateWinnerReward(ctx context.Context) error { currentBalance, err := r.erc20Service.BalanceOf(ctx, r.ethAddress) diff --git a/pkg/storageincentives/redistributionstate_test.go b/pkg/storageincentives/redistributionstate_test.go index 9e2930642a4..318aca7e264 100644 --- a/pkg/storageincentives/redistributionstate_test.go +++ b/pkg/storageincentives/redistributionstate_test.go @@ -295,9 +295,10 @@ func TestReward(t *testing.T) { } } -// TestFee check if fees increments when called multiple times -func TestFee(t *testing.T) { +// TestRoundFee check if fees increments when called multiple times +func TestRoundFee(t *testing.T) { t.Parallel() + const round = 1 firstFee := big.NewInt(10) state := createRedistribution(t, nil, []transactionmock.Option{ transactionmock.WithTransactionFeeFunc(func(ctx context.Context, txHash common.Hash) (*big.Int, error) { @@ -305,7 +306,7 @@ func TestFee(t *testing.T) { }), }) ctx := context.Background() - state.AddFee(ctx, common.Hash{}) + state.AddRoundFee(ctx, round, common.Hash{}) gotFirstResult, err := state.Status() if err != nil { t.Fatal("failed to get status") @@ -320,7 +321,7 @@ func TestFee(t *testing.T) { }), }...) - state.AddFee(ctx, common.Hash{}) + state.AddRoundFee(ctx, round, common.Hash{}) gotSecondResult, err := state.Status() if err != nil { t.Fatal("failed to get status") diff --git a/pkg/transaction/backendmock/backend.go b/pkg/transaction/backendmock/backend.go index 045f9c5baab..c939edd0ee5 100644 --- a/pkg/transaction/backendmock/backend.go +++ b/pkg/transaction/backendmock/backend.go @@ -18,20 +18,20 @@ import ( var ErrNotImplemented = errors.New("not implemented") type backendMock struct { - callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) - sendTransaction func(ctx context.Context, tx *types.Transaction) error - suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) + callContract func(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + sendTransaction func(ctx context.Context, tx *types.Transaction) error + suggestedFeeAndTip func(ctx context.Context, gasPrice *big.Int, boostPercent int) (*big.Int, *big.Int, error) suggestedFeeAndTipsFromHistory func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) - suggestGasTipCap func(ctx context.Context) (*big.Int, error) - estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) - transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) - transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) - blockNumber func(ctx context.Context) (uint64, error) - headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) - balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) - nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) - codeAt func(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) + suggestGasTipCap func(ctx context.Context) (*big.Int, error) + estimateGas func(ctx context.Context, msg ethereum.CallMsg) (gas uint64, err error) + transactionReceipt func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + pendingNonceAt func(ctx context.Context, account common.Address) (uint64, error) + transactionByHash func(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) + blockNumber func(ctx context.Context) (uint64, error) + headerByNumber func(ctx context.Context, number *big.Int) (*types.Header, error) + balanceAt func(ctx context.Context, address common.Address, block *big.Int) (*big.Int, error) + nonceAt func(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + codeAt func(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) feeHistory func(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) } diff --git a/pkg/transaction/export_test.go b/pkg/transaction/export_test.go index b80ff2ff612..f150524ba93 100644 --- a/pkg/transaction/export_test.go +++ b/pkg/transaction/export_test.go @@ -19,12 +19,8 @@ var ( ) const ( - FeeTierLow = feeTierLow - FeeTierMarket = feeTierMarket - FeeTierAggressive = feeTierAggressive - - MempoolBumpPercent = mempoolBumpPercent - DefaultAttemptsPerTier = defaultAttemptsPerTier + FeeTierLow = feeTierLow + FeeTierMarket = feeTierMarket ) // SuggestGasFeeForTier exposes suggestGasFeeForTier for tests. @@ -34,11 +30,14 @@ func SuggestGasFeeForTier( ctx context.Context, tier int, previousTip *big.Int, + previousBaseFee *big.Int, + overrides *RetryOverrides, ) (gasFeeCap, gasTipCap *big.Int, err error) { svc := &transactionService{ logger: log.Noop, backend: backend, maxTxPrice: maxTxPrice, } - return svc.suggestGasFeeForTier(ctx, feeTier(tier), previousTip) + cap, tip, _, err := svc.suggestGasFeeForTier(ctx, feeTier(tier), previousTip, previousBaseFee, overrides) + return cap, tip, err } diff --git a/pkg/transaction/mock/transaction.go b/pkg/transaction/mock/transaction.go index 66ad081fbe4..8491ca0fa49 100644 --- a/pkg/transaction/mock/transaction.go +++ b/pkg/transaction/mock/transaction.go @@ -18,8 +18,9 @@ import ( ) type transactionServiceMock struct { + estimateTxCost func(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) send func(ctx context.Context, request *transaction.TxRequest, boost int) (txHash common.Hash, err error) - sendWithRetry func(ctx context.Context, request *transaction.TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) + sendWithRetry func(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) waitForReceipt func(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) watchSentTransaction func(txHash common.Hash) (chan types.Receipt, chan error, error) call func(ctx context.Context, request *transaction.TxRequest) (result []byte, err error) @@ -30,13 +31,20 @@ type transactionServiceMock struct { transactionFee func(ctx context.Context, txHash common.Hash) (*big.Int, error) } -func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest) (common.Hash, *types.Receipt, error) { +func (m *transactionServiceMock) SendWithRetry(ctx context.Context, request *transaction.TxRequest, opts ...transaction.RetryOption) (common.Hash, *types.Receipt, error) { if m.sendWithRetry != nil { - return m.sendWithRetry(ctx, request) + return m.sendWithRetry(ctx, request, opts...) } return common.Hash{}, nil, errors.New("not implemented") } +func (m *transactionServiceMock) EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (*big.Int, *big.Int, error) { + if m.estimateTxCost != nil { + return m.estimateTxCost(ctx, gasUnits, tip) + } + return big.NewInt(0), big.NewInt(0), nil +} + func (m *transactionServiceMock) Send(ctx context.Context, request *transaction.TxRequest, boostPercent int) (txHash common.Hash, err error) { if m.send != nil { return m.send(ctx, request, boostPercent) @@ -118,12 +126,18 @@ type optionFunc func(*transactionServiceMock) func (f optionFunc) apply(r *transactionServiceMock) { f(r) } -func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest) (common.Hash, *types.Receipt, error)) Option { +func WithSendWithRetryFunc(f func(context.Context, *transaction.TxRequest, ...transaction.RetryOption) (common.Hash, *types.Receipt, error)) Option { return optionFunc(func(s *transactionServiceMock) { s.sendWithRetry = f }) } +func WithEstimateTxCostFunc(f func(context.Context, int64, int) (*big.Int, *big.Int, error)) Option { + return optionFunc(func(s *transactionServiceMock) { + s.estimateTxCost = f + }) +} + func WithSendFunc(f func(context.Context, *transaction.TxRequest, int) (txHash common.Hash, err error)) Option { return optionFunc(func(s *transactionServiceMock) { s.send = f diff --git a/pkg/transaction/send_tx_with_retry.go b/pkg/transaction/send_tx_with_retry.go index 40c7e000abb..3b249a10e1e 100644 --- a/pkg/transaction/send_tx_with_retry.go +++ b/pkg/transaction/send_tx_with_retry.go @@ -27,6 +27,45 @@ const mempoolBumpPercent = 15 // before escalating to the next tier. const defaultAttemptsPerTier = 2 +// RetryOverrides controls per-call behaviour overrides for SendWithRetry. +// Fields are optional; nil means "use default behaviour". +type RetryOverrides struct { + // IgnoreMaxPrice is called when maxTxPrice would block a broadcast. + // It receives the gasFeeCap (max fee per gas, wei) that would be used + // for this attempt. If it returns true, the price cap is bypassed. + IgnoreMaxPrice func(gasFeeCap *big.Int) bool + + // RetryDelay, if set, rewrites the configured delay between attempts. + RetryDelay func(time.Duration) time.Duration +} + +// RetryOption configures per-call overrides for SendWithRetry. +type RetryOption func(*RetryOverrides) + +// WithIgnoreMaxPrice returns a RetryOption that installs a predicate called +// whenever the configured maxTxPrice would block a broadcast. The predicate +// receives the gasFeeCap (max fee per gas, wei) that would be used. When fn +// returns true the price cap is bypassed for that attempt. +func WithIgnoreMaxPrice(fn func(gasFeeCap *big.Int) bool) RetryOption { + return func(o *RetryOverrides) { o.IgnoreMaxPrice = fn } +} + +// WithRetryDelay returns a RetryOption that rewrites the configured retry delay. +func WithRetryDelay(fn func(time.Duration) time.Duration) RetryOption { + return func(o *RetryOverrides) { o.RetryDelay = fn } +} + +func applyRetryOptions(opts []RetryOption) *RetryOverrides { + if len(opts) == 0 { + return nil + } + var o RetryOverrides + for _, fn := range opts { + fn(&o) + } + return &o +} + // TransactionRetryState is persisted so transactions with retry can resume after a node restart. type TransactionRetryState struct { Nonce uint64 `json:"nonce"` @@ -47,41 +86,42 @@ func retryStateKey(nonce uint64) string { // SendWithRetry sends an EIP-1559 transaction using fee-history tiers with automatic // escalation. Each tier gets attemptsPerTier broadcast rounds with fresh eth_feeHistory // data. A +15% mempool bump floor is applied to ensure replacement transactions are accepted. -func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) { +// Optional RetryOption values can override per-call retry behaviour (e.g. bypass price cap). +func (t *transactionService) SendWithRetry(ctx context.Context, request *TxRequest, opts ...RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) { if request.GasPrice != nil { err = errors.New("send txs with retry requires automatic gas pricing") t.recordRetryComplete(1, err) return common.Hash{}, nil, err } - return t.retry(ctx, "", request) + return t.retry(ctx, "", request, applyRetryOptions(opts)) } -// applyMempoolBump returns tip bumped by mempoolBumpPercent. -func applyMempoolBump(tip *big.Int) *big.Int { +// applyMempoolBump returns value bumped by mempoolBumpPercent. +func applyMempoolBump(value *big.Int) *big.Int { return new(big.Int).Div( - new(big.Int).Mul(new(big.Int).Set(tip), big.NewInt(int64(100+mempoolBumpPercent))), + new(big.Int).Mul(new(big.Int).Set(value), big.NewInt(int64(100+mempoolBumpPercent))), big.NewInt(100), ) } // suggestGasFeeForTier fetches fresh fee history, picks the tip for the given tier, // applies the mempool bump floor relative to previousTip, and computes gasFeeCap. -func (t *transactionService) suggestGasFeeForTier(ctx context.Context, tier feeTier, previousTip *big.Int) (gasFeeCap, gasTipCap *big.Int, err error) { +func (t *transactionService) suggestGasFeeForTier(ctx context.Context, tier feeTier, previousTip, previousBaseFee *big.Int, overrides *RetryOverrides) (gasFeeCap, gasTipCap, effectiveBaseFee *big.Int, err error) { header, err := t.backend.HeaderByNumber(ctx, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } if header == nil || header.BaseFee == nil { - return nil, nil, fmt.Errorf("latest block header or base fee unavailable") + return nil, nil, nil, fmt.Errorf("latest block header or base fee unavailable") } // get fee history fh, err := t.backend.SuggestedFeeAndTipsFromHistory(ctx, nil) if err != nil { - return nil, nil, fmt.Errorf("fee history: %w", err) + return nil, nil, nil, fmt.Errorf("fee history: %w", err) } if fh == nil { - return nil, nil, errors.New("fee history: empty response") + return nil, nil, nil, errors.New("fee history: empty response") } tip := tierTip(tier, fh) @@ -94,32 +134,49 @@ func (t *transactionService) suggestGasFeeForTier(ctx context.Context, tier feeT } } - gasFeeCap = new(big.Int).Mul(header.BaseFee, big.NewInt(2)) + // during retries base_fee can sink and same_tier_tip+15% alone wouldn't be enough to replace transaction + effectiveBaseFee = new(big.Int).Set(header.BaseFee) + if previousBaseFee != nil && previousBaseFee.Sign() > 0 && effectiveBaseFee.Cmp(previousBaseFee) < 0 { + effectiveBaseFee.Set(applyMempoolBump(previousBaseFee)) + } + + gasFeeCap = new(big.Int).Mul(effectiveBaseFee, big.NewInt(2)) gasFeeCapWithTip := new(big.Int).Add(new(big.Int).Set(gasFeeCap), tip) t.logger.Debug("suggest gas fees for retry", "tier", tier.String(), "base_fee", header.BaseFee, + "previous_base_fee", previousBaseFee, + "effective_base_fee", effectiveBaseFee, "previous_tip", previousTip, "selected_tip", tip, "gas_fee_cap", gasFeeCapWithTip, "max_tx_price", t.maxTxPrice) + canOverride := func(feeCap *big.Int) bool { + return overrides != nil && overrides.IgnoreMaxPrice != nil && overrides.IgnoreMaxPrice(feeCap) + } + if t.maxTxPrice != nil && gasFeeCapWithTip.Cmp(t.maxTxPrice) > 0 { - return nil, nil, fmt.Errorf("%w: max_fee_per_gas %s exceeds limit %s", ErrTxMaxPriceExceeded, gasFeeCapWithTip, t.maxTxPrice) + if !canOverride(gasFeeCapWithTip) { + return nil, nil, nil, fmt.Errorf("%w: max_fee_per_gas %s exceeds limit %s", ErrTxMaxPriceExceeded, gasFeeCapWithTip, t.maxTxPrice) + } + + t.logger.Info("max price override: bypassing limit", "escalated_gas_fee_cap", gasFeeCapWithTip, "max_tx_price", t.maxTxPrice) } - return gasFeeCapWithTip, tip, nil + + return gasFeeCapWithTip, tip, effectiveBaseFee, nil } -func (t *transactionService) prepareTransactionForTier(ctx context.Context, request *TxRequest, nonce uint64, tier feeTier, previousTip *big.Int) (*types.Transaction, error) { +func (t *transactionService) prepareTransactionForTier(ctx context.Context, request *TxRequest, nonce uint64, tier feeTier, previousTip, previousBaseFee *big.Int, overrides *RetryOverrides) (*types.Transaction, *big.Int, error) { gasLimit, err := t.estimateGasLimit(ctx, request) if err != nil { - return nil, err + return nil, nil, err } - gasFeeCap, gasTipCap, err := t.suggestGasFeeForTier(ctx, tier, previousTip) + gasFeeCap, gasTipCap, effectiveBaseFee, err := t.suggestGasFeeForTier(ctx, tier, previousTip, previousBaseFee, overrides) if err != nil { - return nil, err + return nil, nil, err } tx := types.NewTx(&types.DynamicFeeTx{ @@ -132,13 +189,13 @@ func (t *transactionService) prepareTransactionForTier(ctx context.Context, requ GasTipCap: gasTipCap, Data: request.Data, }) - return tx, nil + return tx, effectiveBaseFee, nil } // broadcastTx prepares, signs, and sends a transaction. // When fixedNonce is nil a new nonce is allocated (first attempt); // otherwise the supplied nonce is reused (replacement transaction). -func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, tier feeTier, previousTip *big.Int) (*types.Transaction, error) { +func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest, fixedNonce *uint64, tier feeTier, previousTip, previousBaseFee *big.Int, overrides *RetryOverrides) (*types.Transaction, *big.Int, error) { var nonce uint64 if fixedNonce != nil { @@ -149,18 +206,18 @@ func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest n, err := t.nextNonce(ctx) if err != nil { - return nil, err + return nil, nil, err } nonce = n } - tx, err := t.prepareTransactionForTier(ctx, request, nonce, tier, previousTip) + tx, effectiveBaseFee, err := t.prepareTransactionForTier(ctx, request, nonce, tier, previousTip, previousBaseFee, overrides) if err != nil { - return nil, err + return nil, nil, err } signedTx, err := t.signer.SignTx(tx, t.chainID) if err != nil { - return nil, fmt.Errorf("%w: %w", ErrSignTransaction, err) + return nil, nil, fmt.Errorf("%w: %w", ErrSignTransaction, err) } t.logger.Info("send with retry: broadcast", @@ -176,7 +233,7 @@ func (t *transactionService) broadcastTx(ctx context.Context, request *TxRequest "description", request.Description, ) err = t.backend.SendTransaction(ctx, signedTx) - return signedTx, err + return signedTx, effectiveBaseFee, err } func (t *transactionService) deleteRetryStateAndPending(retryKey string, state TransactionRetryState, keepLast bool) { @@ -194,7 +251,7 @@ func (t *transactionService) deleteRetryStateAndPending(retryKey string, state T } } -func (t *transactionService) retry(ctx context.Context, txRetryKey string, request *TxRequest) (common.Hash, *types.Receipt, error) { +func (t *transactionService) retry(ctx context.Context, txRetryKey string, request *TxRequest, overrides *RetryOverrides) (common.Hash, *types.Receipt, error) { var ( txState TransactionRetryState nonce *uint64 @@ -237,7 +294,7 @@ func (t *transactionService) retry(ctx context.Context, txRetryKey string, reque if terminateTxErr != nil { t.logger.Error(terminateTxErr, "send with retry: finished with error", - "attempt", attempt, + "attempt", attempt+1, "tx_hash", txState.LastTxHash, "nonce", txState.Nonce, "to", request.To.String(), @@ -250,49 +307,89 @@ func (t *transactionService) retry(ctx context.Context, txRetryKey string, reque t.recordRetryComplete(attempt+1, terminateTxErr) }() + retryDelay := t.txRetryDelay + if overrides != nil && overrides.RetryDelay != nil { + retryDelay = overrides.RetryDelay(t.txRetryDelay) + } for _, tier := range tiers { + // track base fee within same tier + // if base_fee would sink between attempts, current_tier+15% might not be enough for tx replacement + var previousBaseFee *big.Int + for k := 0; k < t.attemptsPerTier; k++ { if txState.NonceAssigned { nonce = &txState.Nonce } - signedTx, err := t.broadcastTx(ctx, request, nonce, tier, previousTip) + replaced := true + signedTx, effectiveBaseFee, err := t.broadcastTx(ctx, request, nonce, tier, previousTip, previousBaseFee, overrides) if err != nil { - if isNonRetryable(err) { + switch { + case isNonceTooLow(err): + // The nonce was consumed between our last receipt check and + // this rebroadcast: our previously broadcast tx was most + // likely mined. Try to read its receipt exactly once; if + // it is absent, propagate the error and stop retrying. + if txState.LastTxHash != (common.Hash{}) { + if rec, recErr := t.backend.TransactionReceipt(ctx, txState.LastTxHash); recErr == nil && rec != nil { + if rec.Status == 0 { + terminateTxErr = ErrTransactionReverted + return txState.LastTxHash, rec, terminateTxErr + } + return txState.LastTxHash, rec, nil + } + } terminateTxErr = err return common.Hash{}, nil, terminateTxErr + case isReplacementUnderpriced(err): + // Couldn't replace transaction, keep watching latest one + replaced = false + t.logger.Warning("transaction retry broadcast underpriced, keep watching pending tx", + "attempt", attempt, "tier", tier.String(), "pending_tx", txState.LastTxHash, + "error", err, "to", retryToForLog(request, &txState)) + case isNonRetryable(err): + terminateTxErr = err + return common.Hash{}, nil, terminateTxErr + default: + t.logger.Warning("transaction retry broadcast failed, will retry", + "attempt", attempt, "tier", tier.String(), "error", err, "to", retryToForLog(request, &txState)) } - t.logger.Warning("transaction retry broadcast failed, will retry", - "attempt", attempt, "tier", tier.String(), "error", err, "to", retryToForLog(request, &txState)) } - if terminateTxErr = t.updateStates(signedTx, &txState); terminateTxErr != nil { - return common.Hash{}, nil, terminateTxErr + if replaced { + // update states only in case if previous tx was replaced - otherwise keep watching latest known tx hash + if terminateTxErr = t.updateStates(signedTx, &txState); terminateTxErr != nil { + return common.Hash{}, nil, terminateTxErr + } + + if signedTx != nil { + previousTip = signedTx.GasTipCap() + } + + if effectiveBaseFee != nil { + previousBaseFee = effectiveBaseFee + } } if txState.NonceAssigned { txRetryKey = retryStateKey(txState.Nonce) } - if signedTx != nil { - previousTip = signedTx.GasTipCap() - } - if txState.LastTxHash == (common.Hash{}) { t.logger.Debug("send with retry: no tx hash after broadcast failure, waiting before next attempt", - "attempt", attempt, "nonce", txState.Nonce, "retry_delay", t.txRetryDelay) + "attempt", attempt, "nonce", txState.Nonce, "retry_delay", retryDelay) select { case <-ctx.Done(): err := ctx.Err() terminateTxErr = err return common.Hash{}, nil, err - case <-time.After(t.txRetryDelay): + case <-time.After(retryDelay): attempt++ continue } } - waitCtx, cancel := context.WithTimeout(ctx, t.txRetryDelay) + waitCtx, cancel := context.WithTimeout(ctx, retryDelay) rec, waitErr := t.WaitForReceipt(waitCtx, txState.LastTxHash) cancel() @@ -379,6 +476,14 @@ func (t *transactionService) updateStates(signedTx *types.Transaction, txState * return nil } +func isReplacementUnderpriced(err error) bool { + return err != nil && strings.Contains(err.Error(), "replacement transaction underpriced") +} + +func isNonceTooLow(err error) bool { + return err != nil && strings.Contains(err.Error(), "nonce too low") +} + func isNonRetryable(err error) bool { if errors.Is(err, ErrTransactionReverted) || errors.Is(err, ErrTransactionCancelled) || @@ -391,7 +496,6 @@ func isNonRetryable(err error) bool { s := err.Error() nonRetryable := []string{ "specified gas price", - "nonce too low", "AlreadyCommitted", "AlreadyRevealed", "AlreadyClaimed", @@ -484,7 +588,7 @@ func (t *transactionService) resumeRetryTransactions() error { sk := key st := state t.wg.Go(func() { - if _, _, err := t.retry(t.ctx, sk, nil); err != nil { + if _, _, err := t.retry(t.ctx, sk, nil, nil); err != nil { t.logger.Error(err, "resumed transaction retry aborted", "nonce", st.Nonce, "description", st.Description) } }) diff --git a/pkg/transaction/send_tx_with_retry_test.go b/pkg/transaction/send_tx_with_retry_test.go index 6c5546ac5aa..a829cbe9089 100644 --- a/pkg/transaction/send_tx_with_retry_test.go +++ b/pkg/transaction/send_tx_with_retry_test.go @@ -66,7 +66,7 @@ func TestSuggestGasFeeForTier(t *testing.T) { backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( - backend, nil, context.Background(), int(transaction.FeeTierMarket), nil, + backend, nil, context.Background(), int(transaction.FeeTierMarket), nil, nil, nil, ) require.NoError(t, err) @@ -82,7 +82,7 @@ func TestSuggestGasFeeForTier(t *testing.T) { backend := backendmock.New(headerOption(), feeHistoryOption(&feeHistoryCalls)) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( - backend, nil, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), + backend, nil, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), nil, nil, ) require.NoError(t, err) @@ -100,13 +100,110 @@ func TestSuggestGasFeeForTier(t *testing.T) { backend := backendmock.New(headerOption(), feeHistoryOption(nil)) gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( - backend, maxTxPrice, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), + backend, maxTxPrice, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), nil, nil, ) assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) assert.Nil(t, gasFeeCap) assert.Nil(t, gasTipCap) }) + + t.Run("IgnoreMaxPrice override bypasses limit", func(t *testing.T) { + t.Parallel() + + maxTxPrice := big.NewInt(baseFeeCap + prevTip - 1) + backend := backendmock.New(headerOption(), feeHistoryOption(nil)) + + var receivedFeeCap *big.Int + overrides := &transaction.RetryOverrides{ + IgnoreMaxPrice: func(feeCap *big.Int) bool { + receivedFeeCap = feeCap + return true + }, + } + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( + backend, maxTxPrice, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), nil, overrides, + ) + + require.NoError(t, err, "override should bypass ErrTxMaxPriceExceeded") + assert.Equal(t, escalatedTip, gasTipCap.Int64(), "must use escalated tip despite exceeding max") + assert.Equal(t, baseFeeCap+escalatedTip, gasFeeCap.Int64()) + assert.NotNil(t, receivedFeeCap, "IgnoreMaxPrice must receive gasFeeCap") + assert.Equal(t, gasFeeCap.Int64(), receivedFeeCap.Int64(), + "IgnoreMaxPrice must receive the actual gasFeeCap that would be used") + }) + + t.Run("IgnoreMaxPrice false does not bypass limit", func(t *testing.T) { + t.Parallel() + + maxTxPrice := big.NewInt(baseFeeCap + prevTip - 1) + backend := backendmock.New(headerOption(), feeHistoryOption(nil)) + + overrides := &transaction.RetryOverrides{ + IgnoreMaxPrice: func(_ *big.Int) bool { return false }, + } + + _, _, err := transaction.SuggestGasFeeForTier( + backend, maxTxPrice, context.Background(), int(transaction.FeeTierMarket), big.NewInt(prevTip), nil, overrides, + ) + + assert.ErrorIs(t, err, transaction.ErrTxMaxPriceExceeded) + }) +} + +func TestSuggestGasFeeForTier_baseFeeDropUsesBump(t *testing.T) { + t.Parallel() + + const ( + previousBaseFee = int64(1_039_237_808) + currentBaseFee = int64(1_005_339_867) + previousTip = int64(1_440_000) + lowTip = int64(1_440_000) + ) + + backend := backendmock.New( + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + return &types.Header{BaseFee: big.NewInt(currentBaseFee)}, nil + }), + backendmock.WithSuggestedFeeAndTipsFromHistoryFunc(func(ctx context.Context, lastBlock *big.Int) (*transaction.FeeHistorySuggestedFeeAndTips, error) { + return &transaction.FeeHistorySuggestedFeeAndTips{ + LowTip: big.NewInt(lowTip), + MarketTip: big.NewInt(lowTip * 2), + AggressiveTip: big.NewInt(lowTip * 3), + }, nil + }), + ) + + gasFeeCap, gasTipCap, err := transaction.SuggestGasFeeForTier( + backend, + nil, + context.Background(), + int(transaction.FeeTierLow), + big.NewInt(previousTip), + big.NewInt(previousBaseFee), + nil, + ) + + require.NoError(t, err) + + bumpedBaseFee := transaction.ApplyMempoolBump(big.NewInt(previousBaseFee)) + expectedTip := transaction.ApplyMempoolBump(big.NewInt(previousTip)) + expectedFeeCap := new(big.Int).Add( + new(big.Int).Mul(bumpedBaseFee, big.NewInt(2)), + expectedTip, + ) + + assert.Equal(t, expectedTip.Int64(), gasTipCap.Int64()) + assert.Equal(t, expectedFeeCap.Int64(), gasFeeCap.Int64(), + "gas fee cap must use bumped previous base fee when current base fee dropped") + + firstAttemptFeeCap := new(big.Int).Add( + new(big.Int).Mul(big.NewInt(2), big.NewInt(previousBaseFee)), + big.NewInt(previousTip), + ) + assert.Greater(t, gasFeeCap.Int64(), firstAttemptFeeCap.Int64(), + "replacement fee cap must exceed the first attempt when base fee dropped") } // capturedBroadcast records the parameters of a transaction as seen by SendTransaction. @@ -559,6 +656,65 @@ func TestSendWithRetry_EscalateGasThenSuccess(t *testing.T) { "retry state should be cleaned up on success") } +// Underpriced replacement keeps watching the pending tx hash instead of switching to the rejected one. +func TestSendWithRetry_UnderpricedKeepsPendingTxHash(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + broadcastCount atomic.Int32 + watchCount atomic.Int32 + firstTxHash common.Hash + ) + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + if broadcastCount.Add(1) == 1 { + firstTxHash = tx.Hash() + return nil + } + return errors.New("replacement transaction underpriced") + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + switch watchCount.Add(1) { + case 1: + assert.Equal(t, firstTxHash, txHash, "first wait must watch the accepted broadcast") + return make(chan types.Receipt), make(chan error), nil + default: + assert.Equal(t, firstTxHash, txHash, "after underpriced must keep watching the pending tx") + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + } + }), + ), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.Equal(t, firstTxHash, txHash, "must return receipt for the original pending tx") + assert.Equal(t, int32(2), broadcastCount.Load(), "second broadcast should still be attempted") + assert.Equal(t, int32(2), watchCount.Load(), "must wait for receipt again after underpriced broadcast") +} + // All attempts exhausted, receipt never found → error. // Verifies compound escalation chain, nonce immutability, and gasFeeCap on every attempt. func TestSendWithRetry_AllAttemptsExhausted(t *testing.T) { @@ -621,6 +777,112 @@ func TestSendWithRetry_AllAttemptsExhausted(t *testing.T) { "retry state should be cleaned up after exhaustion") } +// "nonce too low" on a rebroadcast means the nonce was consumed between the +// last receipt check and this broadcast: the previously broadcast tx was most +// likely mined. The service must read its receipt exactly once and stop +// retrying regardless of whether the receipt is found. +func TestSendWithRetry_NonceTooLow(t *testing.T) { + t.Parallel() + + newSvc := func(t *testing.T, store storage.StateStorer, firstTxHash *common.Hash, broadcastCount, receiptCalls *atomic.Int32, receiptFn func(common.Hash) (*types.Receipt, error)) transaction.Service { + t.Helper() + s := newRetryTestSetup() + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + if broadcastCount.Add(1) == 1 { + *firstTxHash = tx.Hash() + return nil + } + return errors.New("nonce too low") + }), + backendmock.WithTransactionReceiptFunc(func(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + receiptCalls.Add(1) + assert.Equal(t, *firstTxHash, txHash, "must read receipt of the previously broadcast tx") + return receiptFn(txHash) + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + // first broadcast's receipt never arrives → escalate to second attempt + monitormock.New(receiptWatchTimeout()), + 0, + s.retryConfig(), + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + return svc + } + + t.Run("receipt found stops retry and returns it", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + firstTxHash common.Hash + broadcastCount atomic.Int32 + receiptCalls atomic.Int32 + ) + + svc := newSvc(t, store, &firstTxHash, &broadcastCount, &receiptCalls, + func(txHash common.Hash) (*types.Receipt, error) { + return &types.Receipt{TxHash: txHash, Status: 1}, nil + }) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + require.NoError(t, err) + require.NotNil(t, receipt) + assert.Equal(t, firstTxHash, txHash, "must return the mined tx hash") + assert.Equal(t, uint64(1), receipt.Status) + assert.Equal(t, int32(2), broadcastCount.Load(), "exactly one rebroadcast, no further retries after nonce too low") + assert.Equal(t, int32(1), receiptCalls.Load(), "receipt must be read exactly once") + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up after success") + }) + + t.Run("receipt not found stops retry and returns error", func(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + var ( + firstTxHash common.Hash + broadcastCount atomic.Int32 + receiptCalls atomic.Int32 + ) + + svc := newSvc(t, store, &firstTxHash, &broadcastCount, &receiptCalls, + func(common.Hash) (*types.Receipt, error) { + return nil, ethereum.NotFound + }) + + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request()) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "nonce too low") + assert.Equal(t, common.Hash{}, txHash) + assert.Nil(t, receipt) + assert.Equal(t, int32(2), broadcastCount.Load(), "exactly one rebroadcast, no further retries after nonce too low") + assert.Equal(t, int32(1), receiptCalls.Load(), "receipt must be read exactly once even when not found") + + var rs transaction.TransactionRetryState + assert.ErrorIs(t, store.Get(transaction.RetryStateKey(s.nonce), &rs), storage.ErrNotFound, + "retry state should be cleaned up after error") + }) +} + // Resume after node restart — transaction is re-sent starting from persisted attempt. // Verifies nonce, escalated tip, gasFeeCap, and that fee history is NOT called. func TestSendWithRetry_ResumeAfterRestart(t *testing.T) { @@ -781,6 +1043,166 @@ func TestSendWithRetry_MaxTxPriceCap(t *testing.T) { "no transaction should be sent when maxTxPrice is below the minimum fee") } +// WithIgnoreMaxPrice override allows transactions to be sent despite exceeding maxTxPrice. +func TestSendWithRetry_IgnoreMaxPriceOverride(t *testing.T) { + t.Parallel() + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + marketTip := s.expectedMarketTip() + maxTxPrice := new(big.Int).Sub(s.expectedGasFeeCap(marketTip), big.NewInt(1)) + + cfg := s.retryConfig() + cfg.MaxTxPrice = maxTxPrice + + var broadcasts []capturedBroadcast + var overrideCalls atomic.Int32 + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + s.headerOption(), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + broadcasts = append(broadcasts, captureTx(tx)) + return nil + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + ch := make(chan types.Receipt, 1) + ch <- types.Receipt{TxHash: txHash, Status: 1} + return ch, nil, nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + var receivedFeeCaps []*big.Int + txHash, receipt, err := svc.SendWithRetry(context.Background(), s.request(), + transaction.WithIgnoreMaxPrice(func(gasFeeCap *big.Int) bool { + overrideCalls.Add(1) + receivedFeeCaps = append(receivedFeeCaps, new(big.Int).Set(gasFeeCap)) + return true + }), + ) + + require.NoError(t, err) + assert.NotEqual(t, common.Hash{}, txHash) + require.NotNil(t, receipt) + assert.Equal(t, uint64(1), receipt.Status) + + require.Len(t, broadcasts, 1, "transaction should be sent despite exceeding maxTxPrice") + assert.Equal(t, marketTip.Int64(), broadcasts[0].GasTipCap.Int64()) + assert.GreaterOrEqual(t, int(overrideCalls.Load()), 1, "override function must have been called") + require.Len(t, receivedFeeCaps, 1, "override should have received gasFeeCap") + assert.Equal(t, broadcasts[0].GasFeeCap.Int64(), receivedFeeCaps[0].Int64(), + "override must receive the same gasFeeCap as the broadcast") +} + +// RetryDelay can be rewritten per SendWithRetry call after the service is constructed. +func TestSendWithRetry_RetryDelayPerTransactionOverride(t *testing.T) { + t.Parallel() + + s := newRetryTestSetup() + store := storemock.NewStateStore() + testutil.CleanupCloser(t, store) + + const ( + serviceRetryDelay = 200 * time.Millisecond + overrideDelay = 50 * time.Millisecond + checkAfter = 300 * time.Millisecond + waitTimeout = 5 * time.Second + ) + + cfg := s.retryConfig() + cfg.AttemptsPerTier = 2 + cfg.RetryDelay = serviceRetryDelay + + var ( + sendCalls atomic.Int32 + deferFirstPrepare atomic.Bool + ) + sendTxErr := errors.New("rpc error") + + svc, err := transaction.NewService(log.Noop, s.sender, + backendmock.New( + s.nonceOption(), + s.feeHistoryOption(nil), + backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { + if deferFirstPrepare.CompareAndSwap(true, false) { + return nil, errors.New("temporary RPC error") + } + return &types.Header{BaseFee: new(big.Int).Set(s.baseFee)}, nil + }), + s.estimateGasOption(), + backendmock.WithSendTransactionFunc(func(ctx context.Context, tx *types.Transaction) error { + sendCalls.Add(1) + return sendTxErr + }), + ), + signermock.New(s.passThroughSigner(), s.signerAddr()), + store, + s.chainID, + monitormock.New( + monitormock.WithWatchTransactionFunc(func(txHash common.Hash, nonce uint64) (<-chan types.Receipt, <-chan error, error) { + return make(chan types.Receipt), make(chan error), nil + }), + ), + 0, + cfg, + ) + require.NoError(t, err) + testutil.CleanupCloser(t, svc) + + ctx := context.Background() + + // Scenario 1: overridden delay (50ms) — both attempts should complete within checkAfter. + done1 := make(chan struct{}) + go func() { + defer close(done1) + _, _, _ = svc.SendWithRetry(ctx, s.request(), + transaction.WithRetryDelay(func(time.Duration) time.Duration { + return overrideDelay + }), + ) + }() + + time.Sleep(checkAfter) + assert.EqualValues(t, 2, sendCalls.Load(), "overridden 50ms delay should allow 2 broadcasts within 300ms") + select { + case <-done1: + case <-time.After(waitTimeout): + t.Fatal("timed out waiting for overridden-delay SendWithRetry to finish") + } + + // Scenario 2: default delay (200ms) with first prepare failing — only 1 send within checkAfter. + sendCalls.Store(0) + deferFirstPrepare.Store(true) + + done2 := make(chan struct{}) + go func() { + defer close(done2) + _, _, _ = svc.SendWithRetry(ctx, s.request()) + }() + + time.Sleep(checkAfter) + assert.EqualValues(t, 1, sendCalls.Load(), "default 200ms delay should yield only 1 broadcast within 300ms") + select { + case <-done2: + case <-time.After(waitTimeout): + t.Fatal("timed out waiting for default-delay SendWithRetry to finish") + } +} + // failOnNthPutStore wraps a StateStorer and fails the Nth Put call with putErr. type failOnNthPutStore struct { storage.StateStorer diff --git a/pkg/transaction/transaction.go b/pkg/transaction/transaction.go index 150082b58ee..54f8c90a851 100644 --- a/pkg/transaction/transaction.go +++ b/pkg/transaction/transaction.go @@ -108,10 +108,12 @@ type StoredTransaction struct { // limit and nonce management. type Service interface { io.Closer + EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) // Send creates a transaction based on the request (with gasprice increased by provided percentage) and sends it. Send(ctx context.Context, request *TxRequest, tipCapBoostPercent int) (txHash common.Hash, err error) // SendWithRetry sends a transaction using fee-history tiers and automatic fee escalation; see send_tx_with_retry.go. - SendWithRetry(ctx context.Context, request *TxRequest) (txHash common.Hash, receipt *types.Receipt, err error) + // Optional RetryOption values can override per-call retry behaviour (e.g. bypass price cap). + SendWithRetry(ctx context.Context, request *TxRequest, opts ...RetryOption) (txHash common.Hash, receipt *types.Receipt, err error) // Call simulate a transaction based on the request. Call(ctx context.Context, request *TxRequest) (result []byte, err error) // WaitForReceipt waits until either the transaction with the given hash has been mined or the context is cancelled. @@ -253,6 +255,15 @@ func (t *transactionService) waitForAllPendingTx() error { return nil } +func (t *transactionService) EstimateTxCost(ctx context.Context, gasUnits int64, tip int) (cost *big.Int, gasFeeCap *big.Int, err error) { + gasFeeCap, _, err = t.backend.SuggestedFeeAndTip(ctx, nil, tip) + if err != nil { + return nil, nil, err + } + cost = new(big.Int).Mul(big.NewInt(gasUnits), gasFeeCap) + return cost, gasFeeCap, nil +} + // Send creates and signs a transaction based on the request and sends it. func (t *transactionService) Send(ctx context.Context, request *TxRequest, boostPercent int) (txHash common.Hash, err error) { loggerV1 := t.logger.V(1).Register() @@ -437,6 +448,18 @@ func (t *transactionService) prepareTransaction(ctx context.Context, request *Tx if err != nil { return nil, err } + if request.GasFeeCap != nil { + if request.GasFeeCap.Sign() <= 0 { + return nil, errors.New("gas fee cap must be greater than zero") + } + if gasFeeCap.Cmp(request.GasFeeCap) > 0 { + return nil, fmt.Errorf("gas fee cap exceeded: suggested=%s requested=%s", gasFeeCap, request.GasFeeCap) + } + gasFeeCap = new(big.Int).Set(request.GasFeeCap) + if gasTipCap.Cmp(gasFeeCap) > 0 { + gasTipCap = new(big.Int).Set(gasFeeCap) + } + } t.logger.Debug("prepared transaction", "to", request.To, @@ -583,6 +606,15 @@ func (t *transactionService) ResendTransaction(ctx context.Context, txHash commo if err != nil { return err } + if storedTransaction.GasFeeCap != nil && gasFeeCap.Cmp(storedTransaction.GasFeeCap) > 0 { + gasFeeCap = new(big.Int).Set(storedTransaction.GasFeeCap) + } + if storedTransaction.GasTipCap != nil && gasTipCap.Cmp(storedTransaction.GasTipCap) > 0 { + gasTipCap = new(big.Int).Set(storedTransaction.GasTipCap) + } + if gasTipCap.Cmp(gasFeeCap) > 0 { + gasTipCap = new(big.Int).Set(gasFeeCap) + } tx := types.NewTx(&types.DynamicFeeTx{ Nonce: storedTransaction.Nonce,