diff --git a/Cargo.lock b/Cargo.lock index a436fa3d..96a617dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1368,6 +1368,7 @@ dependencies = [ "ethers-providers", "ethers-signers", "ethers-solc", + "futures-channel", "futures-locks", "futures-util", "hex", diff --git a/ethers-middleware/Cargo.toml b/ethers-middleware/Cargo.toml index fce13218..a8710206 100644 --- a/ethers-middleware/Cargo.toml +++ b/ethers-middleware/Cargo.toml @@ -36,6 +36,7 @@ serde.workspace = true thiserror.workspace = true futures-util.workspace = true futures-locks.workspace = true +futures-channel.workspace = true tracing.workspace = true tracing-futures.workspace = true instant.workspace = true diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 384f0291..fb55c7c1 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -1,21 +1,28 @@ mod geometric; -use ethers_core::types::transaction::eip2718::TypedTransaction; pub use geometric::GeometricGasPrice; mod linear; pub use linear::LinearGasPrice; use async_trait::async_trait; -use ethers_core::types::{BlockId, TransactionRequest, TxHash, U256}; -use ethers_providers::{interval, Middleware, MiddlewareError, PendingTransaction, StreamExt}; -use futures_util::lock::Mutex; + +use futures_channel::oneshot; +use futures_util::{lock::Mutex, select_biased}; use instant::Instant; use std::{pin::Pin, sync::Arc}; use thiserror::Error; +use tracing_futures::Instrument; + +use ethers_core::types::{ + transaction::eip2718::TypedTransaction, BlockId, TransactionRequest, TxHash, U256, +}; +use ethers_providers::{interval, Middleware, MiddlewareError, PendingTransaction, StreamExt}; #[cfg(not(target_arch = "wasm32"))] use tokio::spawn; +type ToEscalate = Arc)>>>; + #[cfg(target_arch = "wasm32")] type WatcherFuture<'a> = Pin + 'a>>; #[cfg(not(target_arch = "wasm32"))] @@ -29,7 +36,34 @@ pub trait GasEscalator: Send + Sync + std::fmt::Debug { fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256; } -#[derive(Debug, Clone)] +#[derive(Error, Debug)] +/// Error thrown when the GasEscalator interacts with the blockchain +pub enum GasEscalatorError { + #[error("{0}")] + /// Thrown when an internal middleware errors + MiddlewareError(M::Error), + + #[error("Gas escalation is only supported for EIP2930 or Legacy transactions")] + UnsupportedTxType, +} + +// Boilerplate +impl MiddlewareError for GasEscalatorError { + type Inner = M::Error; + + fn from_err(src: M::Error) -> GasEscalatorError { + GasEscalatorError::MiddlewareError(src) + } + + fn as_inner(&self) -> Option<&Self::Inner> { + match self { + GasEscalatorError::MiddlewareError(e) => Some(e), + _ => None, + } + } +} + +#[derive(Debug, Clone, Copy)] /// The frequency at which transactions will be bumped pub enum Frequency { /// On a per block basis using the eth_newBlock filter @@ -39,9 +73,49 @@ pub enum Frequency { } #[derive(Debug)] +pub(crate) struct GasEscalatorMiddlewareInternal { + pub(crate) inner: Arc, + /// The transactions which are currently being monitored for escalation + #[allow(clippy::type_complexity)] + pub txs: ToEscalate, + _background: oneshot::Sender<()>, +} + +#[derive(Debug, Clone)] /// A Gas escalator allows bumping transactions' gas price to avoid getting them /// stuck in the memory pool. /// +/// GasEscalator runs a background task which monitors the blockchain for tx +/// confirmation, and bumps fees over time if txns do not occur. This task +/// periodically loops over a stored history of sent transactions, and checks +/// if any require fee bumps. If so, it will resend the same transaction with a +/// higher fee. +/// +/// Using [`GasEscalatorMiddleware::new`] will create a new instance of the +/// background task. Using [`GasEscalatorMiddleware::clone`] will crate a new +/// instance of the middleware, but will not create a new background task. The +/// background task is shared among all clones. +/// +/// ## Footgun +/// +/// If you drop the middleware, the background task will be dropped as well, +/// and any transactions you have sent will stop escalating. We recommend +/// holding an instance of the middleware throughout your application's +/// lifecycle, or leaking an `Arc` of it so that it is never dropped. +/// +/// ## Outstanding issue +/// +/// This task is fallible, and will stop if the provider's connection is lost. +/// If this happens, the middleware will become unable to properly escalate gas +/// prices. Transactions will still be dispatched, but no fee-bumping will +/// happen. This will also cause a memory leak, as the middleware will keep +/// appending to the list of transactions to escalate (and nothing will ever +/// clear that list). +/// +/// We intend to fix this issue in a future release. +/// +/// ## Example +/// /// ```no_run /// use ethers_providers::{Provider, Http}; /// use ethers_middleware::{ @@ -63,39 +137,22 @@ pub enum Frequency { /// let gas_oracle = GasNow::new().category(GasCategory::SafeLow); /// let provider = GasOracleMiddleware::new(provider, gas_oracle); /// ``` -pub struct GasEscalatorMiddleware { - pub(crate) inner: Arc, - pub(crate) escalator: E, - /// The transactions which are currently being monitored for escalation - #[allow(clippy::type_complexity)] - pub txs: Arc)>>>, - frequency: Frequency, -} - -impl Clone for GasEscalatorMiddleware { - fn clone(&self) -> Self { - GasEscalatorMiddleware { - inner: self.inner.clone(), - escalator: self.escalator.clone(), - txs: self.txs.clone(), - frequency: self.frequency.clone(), - } - } +pub struct GasEscalatorMiddleware { + pub(crate) inner: Arc>, } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] -impl Middleware for GasEscalatorMiddleware +impl Middleware for GasEscalatorMiddleware where M: Middleware, - E: GasEscalator, { type Error = GasEscalatorError; type Provider = M::Provider; type Inner = M; - fn inner(&self) -> &M { - &self.inner + fn inner(&self) -> &Self::Inner { + &self.inner.inner } async fn send_transaction + Send + Sync>( @@ -103,13 +160,26 @@ where tx: T, block: Option, ) -> Result, Self::Error> { + self.inner.send_transaction(tx, block).await + } +} + +impl GasEscalatorMiddlewareInternal +where + M: Middleware, +{ + async fn send_transaction + Send + Sync>( + &self, + tx: T, + block: Option, + ) -> Result, GasEscalatorError> { let tx = tx.into(); let pending_tx = self - .inner() + .inner .send_transaction(tx.clone(), block) .await - .map_err(GasEscalatorError::MiddlewareError)?; + .map_err(MiddlewareError::from_err)?; let tx = match tx { TypedTransaction::Legacy(inner) => inner, @@ -125,141 +195,164 @@ where } } -impl GasEscalatorMiddleware +impl GasEscalatorMiddleware where M: Middleware, - E: GasEscalator, { /// Initializes the middleware with the provided gas escalator and the chosen /// escalation frequency (per block or per second) #[allow(clippy::let_and_return)] #[cfg(not(target_arch = "wasm32"))] - pub fn new(inner: M, escalator: E, frequency: Frequency) -> Self + pub fn new(inner: M, escalator: E, frequency: Frequency) -> Self where - E: Clone + 'static, - M: Clone + 'static, + E: GasEscalator + 'static, + M: 'static, { - use tracing_futures::Instrument; + let (tx, rx) = oneshot::channel(); + let inner = Arc::new(inner); - let this = Self { - inner: Arc::new(inner), - escalator, - frequency, - txs: Arc::new(Mutex::new(Vec::new())), - }; + let txs: ToEscalate = Default::default(); + + let this = Arc::new(GasEscalatorMiddlewareInternal { + inner: inner.clone(), + txs: txs.clone(), + _background: tx, + }); + + let esc = EscalationTask { inner, escalator, frequency, txs, shutdown: rx }; { - let this2 = this.clone(); - spawn(async move { - this2.escalate().instrument(tracing::trace_span!("gas-escalation")).await.unwrap(); - }); + spawn(esc.escalate().instrument(tracing::trace_span!("gas-escalation"))); } - this + Self { inner: this } + } +} + +#[derive(Debug)] +pub struct EscalationTask { + inner: M, + escalator: E, + frequency: Frequency, + txs: ToEscalate, + shutdown: oneshot::Receiver<()>, +} + +impl EscalationTask { + pub fn new( + inner: M, + escalator: E, + frequency: Frequency, + txs: ToEscalate, + shutdown: oneshot::Receiver<()>, + ) -> Self { + Self { inner, escalator, frequency, txs, shutdown } } - /// Re-broadcasts pending transactions with a gas price escalator - pub async fn escalate(&self) -> Result<(), GasEscalatorError> { + async fn escalate(mut self) -> Result<(), GasEscalatorError> + where + M: Middleware, + E: GasEscalator, + { // the escalation frequency is either on a per-block basis, or on a duration basis - let mut watcher: WatcherFuture = match self.frequency { + let watcher: WatcherFuture = match self.frequency { Frequency::PerBlock => Box::pin( - self.inner - .watch_blocks() - .await - .map_err(GasEscalatorError::MiddlewareError)? - .map(|_| ()), + self.inner.watch_blocks().await.map_err(MiddlewareError::from_err)?.map(|_| ()), ), Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))), }; - while watcher.next().await.is_some() { - let now = Instant::now(); - let mut txs = self.txs.lock().await; - let len = txs.len(); + let mut watcher = watcher.fuse(); - // Pop all transactions and re-insert those that have not been included yet - for _ in 0..len { - // this must never panic as we're explicitly within bounds - let (tx_hash, mut replacement_tx, time, priority) = - txs.pop().expect("should have element in vector"); + loop { + select_biased! { + _ = &mut self.shutdown => { + tracing::debug!("Shutting down escalation task, middleware has gone away"); + return Ok(()) + } + opt = watcher.next() => { + if opt.is_none() { + tracing::error!("timing future has gone away"); + return Ok(()); + } + let now = Instant::now(); - let receipt = self.get_transaction_receipt(tx_hash).await?; - tracing::trace!(tx_hash = ?tx_hash, "checking if exists"); - if receipt.is_none() { - let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); - // Get the new gas price based on how much time passed since the - // tx was last broadcast - let new_gas_price = self - .escalator - .get_gas_price(old_gas_price, now.duration_since(time).as_secs()); + // We take the contents of the mutex, and then add them back in + // later. + let mut txs: Vec<_> = { + let mut txs = self.txs.lock().await; + std::mem::take(&mut (*txs)) + // Lock scope ends + }; - let new_txhash = if new_gas_price != old_gas_price { - // bump the gas price - replacement_tx.gas_price = Some(new_gas_price); + let len = txs.len(); + // Pop all transactions and re-insert those that have not been included yet + for _ in 0..len { + // this must never panic as we're explicitly within bounds + let (tx_hash, mut replacement_tx, time, priority) = + txs.pop().expect("should have element in vector"); - // the tx hash will be different so we need to update it - match self.inner().send_transaction(replacement_tx.clone(), priority).await - { - Ok(new_tx_hash) => { - let new_tx_hash = *new_tx_hash; - tracing::trace!( - old_tx_hash = ?tx_hash, - new_tx_hash = ?new_tx_hash, - old_gas_price = ?old_gas_price, - new_gas_price = ?new_gas_price, - "escalated" - ); - new_tx_hash - } - Err(err) => { - if err.to_string().contains("nonce too low") { - // ignore "nonce too low" errors because they - // may happen if we try to broadcast a higher - // gas price tx when one of the previous ones - // was already mined (meaning we also do not - // push it back to the pending txs vector) - continue - } else { - return Err(GasEscalatorError::MiddlewareError(err)) + let receipt = self + .inner + .get_transaction_receipt(tx_hash) + .await + .map_err(MiddlewareError::from_err)?; + + tracing::trace!(tx_hash = ?tx_hash, "checking if exists"); + + if receipt.is_none() { + let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); + // Get the new gas price based on how much time passed since the + // tx was last broadcast + let new_gas_price = self + .escalator + .get_gas_price(old_gas_price, now.duration_since(time).as_secs()); + + let new_txhash = if new_gas_price == old_gas_price { + tx_hash + } else { + // bump the gas price + replacement_tx.gas_price = Some(new_gas_price); + + // the tx hash will be different so we need to update it + match self.inner.send_transaction(replacement_tx.clone(), priority).await { + Ok(new_tx_hash) => { + let new_tx_hash = *new_tx_hash; + tracing::trace!( + old_tx_hash = ?tx_hash, + new_tx_hash = ?new_tx_hash, + old_gas_price = ?old_gas_price, + new_gas_price = ?new_gas_price, + "escalated" + ); + new_tx_hash + } + Err(err) => { + if err.to_string().contains("nonce too low") { + // ignore "nonce too low" errors because they + // may happen if we try to broadcast a higher + // gas price tx when one of the previous ones + // was already mined (meaning we also do not + // push it back to the pending txs vector) + continue + } else { + tracing::error!( + err = %err, + "Killing escalator backend" + ); + return Err(GasEscalatorError::MiddlewareError(err)) + } } } - } - } else { - tx_hash - }; - - txs.push((new_txhash, replacement_tx, time, priority)); + }; + txs.push((new_txhash, replacement_tx, time, priority)); + } } - } - } - - Ok(()) - } -} - -// Boilerplate -impl MiddlewareError for GasEscalatorError { - type Inner = M::Error; - - fn from_err(src: M::Error) -> GasEscalatorError { - GasEscalatorError::MiddlewareError(src) - } - - fn as_inner(&self) -> Option<&Self::Inner> { - match self { - GasEscalatorError::MiddlewareError(e) => Some(e), - _ => None, + // after this big ugly loop, we dump everything back in + // we don't replace here, as the vec in the mutex may contain + // items! + self.txs.lock().await.extend(txs); + }} } } } - -#[derive(Error, Debug)] -/// Error thrown when the GasEscalator interacts with the blockchain -pub enum GasEscalatorError { - #[error("{0}")] - /// Thrown when an internal middleware errors - MiddlewareError(M::Error), - - #[error("Gas escalation is only supported for EIP2930 or Legacy transactions")] - UnsupportedTxType, -}