Prestwich/gas escalator dangle (#2284)

* fix: prevent gas escalator dangling tasks

* chore: lockfile

* refactor: avoid holding lock for a long time

* refactor: cleanup

* chore: changelog and docs

* tests: fix test in name

* docs: add a lot of info to the gasescalator docs

* reset changelog as it'll be automatically managed in the future

---------

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
James Prestwich 2023-03-21 11:18:41 -07:00 committed by GitHub
parent 0356db1fae
commit 080bb2e068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 230 additions and 135 deletions

1
Cargo.lock generated
View File

@ -1368,6 +1368,7 @@ dependencies = [
"ethers-providers", "ethers-providers",
"ethers-signers", "ethers-signers",
"ethers-solc", "ethers-solc",
"futures-channel",
"futures-locks", "futures-locks",
"futures-util", "futures-util",
"hex", "hex",

View File

@ -36,6 +36,7 @@ serde.workspace = true
thiserror.workspace = true thiserror.workspace = true
futures-util.workspace = true futures-util.workspace = true
futures-locks.workspace = true futures-locks.workspace = true
futures-channel.workspace = true
tracing.workspace = true tracing.workspace = true
tracing-futures.workspace = true tracing-futures.workspace = true
instant.workspace = true instant.workspace = true

View File

@ -1,21 +1,28 @@
mod geometric; mod geometric;
use ethers_core::types::transaction::eip2718::TypedTransaction;
pub use geometric::GeometricGasPrice; pub use geometric::GeometricGasPrice;
mod linear; mod linear;
pub use linear::LinearGasPrice; pub use linear::LinearGasPrice;
use async_trait::async_trait; use async_trait::async_trait;
use ethers_core::types::{BlockId, TransactionRequest, TxHash, U256};
use ethers_providers::{interval, Middleware, MiddlewareError, PendingTransaction, StreamExt}; use futures_channel::oneshot;
use futures_util::lock::Mutex; use futures_util::{lock::Mutex, select_biased};
use instant::Instant; use instant::Instant;
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use thiserror::Error; use thiserror::Error;
use tracing_futures::Instrument;
use ethers_core::types::{
transaction::eip2718::TypedTransaction, BlockId, TransactionRequest, TxHash, U256,
};
use ethers_providers::{interval, Middleware, MiddlewareError, PendingTransaction, StreamExt};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use tokio::spawn; use tokio::spawn;
type ToEscalate = Arc<Mutex<Vec<(TxHash, TransactionRequest, Instant, Option<BlockId>)>>>;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
type WatcherFuture<'a> = Pin<Box<dyn futures_util::stream::Stream<Item = ()> + 'a>>; type WatcherFuture<'a> = Pin<Box<dyn futures_util::stream::Stream<Item = ()> + 'a>>;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -29,7 +36,34 @@ pub trait GasEscalator: Send + Sync + std::fmt::Debug {
fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256; fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256;
} }
#[derive(Debug, Clone)] #[derive(Error, Debug)]
/// Error thrown when the GasEscalator interacts with the blockchain
pub enum GasEscalatorError<M: Middleware> {
#[error("{0}")]
/// Thrown when an internal middleware errors
MiddlewareError(M::Error),
#[error("Gas escalation is only supported for EIP2930 or Legacy transactions")]
UnsupportedTxType,
}
// Boilerplate
impl<M: Middleware> MiddlewareError for GasEscalatorError<M> {
type Inner = M::Error;
fn from_err(src: M::Error) -> GasEscalatorError<M> {
GasEscalatorError::MiddlewareError(src)
}
fn as_inner(&self) -> Option<&Self::Inner> {
match self {
GasEscalatorError::MiddlewareError(e) => Some(e),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy)]
/// The frequency at which transactions will be bumped /// The frequency at which transactions will be bumped
pub enum Frequency { pub enum Frequency {
/// On a per block basis using the eth_newBlock filter /// On a per block basis using the eth_newBlock filter
@ -39,9 +73,49 @@ pub enum Frequency {
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct GasEscalatorMiddlewareInternal<M> {
pub(crate) inner: Arc<M>,
/// The transactions which are currently being monitored for escalation
#[allow(clippy::type_complexity)]
pub txs: ToEscalate,
_background: oneshot::Sender<()>,
}
#[derive(Debug, Clone)]
/// A Gas escalator allows bumping transactions' gas price to avoid getting them /// A Gas escalator allows bumping transactions' gas price to avoid getting them
/// stuck in the memory pool. /// stuck in the memory pool.
/// ///
/// GasEscalator runs a background task which monitors the blockchain for tx
/// confirmation, and bumps fees over time if txns do not occur. This task
/// periodically loops over a stored history of sent transactions, and checks
/// if any require fee bumps. If so, it will resend the same transaction with a
/// higher fee.
///
/// Using [`GasEscalatorMiddleware::new`] will create a new instance of the
/// background task. Using [`GasEscalatorMiddleware::clone`] will crate a new
/// instance of the middleware, but will not create a new background task. The
/// background task is shared among all clones.
///
/// ## Footgun
///
/// If you drop the middleware, the background task will be dropped as well,
/// and any transactions you have sent will stop escalating. We recommend
/// holding an instance of the middleware throughout your application's
/// lifecycle, or leaking an `Arc` of it so that it is never dropped.
///
/// ## Outstanding issue
///
/// This task is fallible, and will stop if the provider's connection is lost.
/// If this happens, the middleware will become unable to properly escalate gas
/// prices. Transactions will still be dispatched, but no fee-bumping will
/// happen. This will also cause a memory leak, as the middleware will keep
/// appending to the list of transactions to escalate (and nothing will ever
/// clear that list).
///
/// We intend to fix this issue in a future release.
///
/// ## Example
///
/// ```no_run /// ```no_run
/// use ethers_providers::{Provider, Http}; /// use ethers_providers::{Provider, Http};
/// use ethers_middleware::{ /// use ethers_middleware::{
@ -63,39 +137,22 @@ pub enum Frequency {
/// let gas_oracle = GasNow::new().category(GasCategory::SafeLow); /// let gas_oracle = GasNow::new().category(GasCategory::SafeLow);
/// let provider = GasOracleMiddleware::new(provider, gas_oracle); /// let provider = GasOracleMiddleware::new(provider, gas_oracle);
/// ``` /// ```
pub struct GasEscalatorMiddleware<M, E> { pub struct GasEscalatorMiddleware<M> {
pub(crate) inner: Arc<M>, pub(crate) inner: Arc<GasEscalatorMiddlewareInternal<M>>,
pub(crate) escalator: E,
/// The transactions which are currently being monitored for escalation
#[allow(clippy::type_complexity)]
pub txs: Arc<Mutex<Vec<(TxHash, TransactionRequest, Instant, Option<BlockId>)>>>,
frequency: Frequency,
}
impl<M, E: Clone> Clone for GasEscalatorMiddleware<M, E> {
fn clone(&self) -> Self {
GasEscalatorMiddleware {
inner: self.inner.clone(),
escalator: self.escalator.clone(),
txs: self.txs.clone(),
frequency: self.frequency.clone(),
}
}
} }
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<M, E> Middleware for GasEscalatorMiddleware<M, E> impl<M> Middleware for GasEscalatorMiddleware<M>
where where
M: Middleware, M: Middleware,
E: GasEscalator,
{ {
type Error = GasEscalatorError<M>; type Error = GasEscalatorError<M>;
type Provider = M::Provider; type Provider = M::Provider;
type Inner = M; type Inner = M;
fn inner(&self) -> &M { fn inner(&self) -> &Self::Inner {
&self.inner &self.inner.inner
} }
async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>( async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
@ -103,13 +160,26 @@ where
tx: T, tx: T,
block: Option<BlockId>, block: Option<BlockId>,
) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> { ) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
self.inner.send_transaction(tx, block).await
}
}
impl<M> GasEscalatorMiddlewareInternal<M>
where
M: Middleware,
{
async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
&self,
tx: T,
block: Option<BlockId>,
) -> Result<PendingTransaction<'_, M::Provider>, GasEscalatorError<M>> {
let tx = tx.into(); let tx = tx.into();
let pending_tx = self let pending_tx = self
.inner() .inner
.send_transaction(tx.clone(), block) .send_transaction(tx.clone(), block)
.await .await
.map_err(GasEscalatorError::MiddlewareError)?; .map_err(MiddlewareError::from_err)?;
let tx = match tx { let tx = match tx {
TypedTransaction::Legacy(inner) => inner, TypedTransaction::Legacy(inner) => inner,
@ -125,66 +195,111 @@ where
} }
} }
impl<M, E> GasEscalatorMiddleware<M, E> impl<M> GasEscalatorMiddleware<M>
where where
M: Middleware, M: Middleware,
E: GasEscalator,
{ {
/// Initializes the middleware with the provided gas escalator and the chosen /// Initializes the middleware with the provided gas escalator and the chosen
/// escalation frequency (per block or per second) /// escalation frequency (per block or per second)
#[allow(clippy::let_and_return)] #[allow(clippy::let_and_return)]
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn new(inner: M, escalator: E, frequency: Frequency) -> Self pub fn new<E>(inner: M, escalator: E, frequency: Frequency) -> Self
where where
E: Clone + 'static, E: GasEscalator + 'static,
M: Clone + 'static, M: 'static,
{ {
use tracing_futures::Instrument; let (tx, rx) = oneshot::channel();
let inner = Arc::new(inner);
let this = Self { let txs: ToEscalate = Default::default();
inner: Arc::new(inner),
escalator,
frequency,
txs: Arc::new(Mutex::new(Vec::new())),
};
{ let this = Arc::new(GasEscalatorMiddlewareInternal {
let this2 = this.clone(); inner: inner.clone(),
spawn(async move { txs: txs.clone(),
this2.escalate().instrument(tracing::trace_span!("gas-escalation")).await.unwrap(); _background: tx,
}); });
let esc = EscalationTask { inner, escalator, frequency, txs, shutdown: rx };
{
spawn(esc.escalate().instrument(tracing::trace_span!("gas-escalation")));
} }
this Self { inner: this }
}
} }
/// Re-broadcasts pending transactions with a gas price escalator #[derive(Debug)]
pub async fn escalate(&self) -> Result<(), GasEscalatorError<M>> { pub struct EscalationTask<M, E> {
inner: M,
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
}
impl<M, E> EscalationTask<M, E> {
pub fn new(
inner: M,
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
) -> Self {
Self { inner, escalator, frequency, txs, shutdown }
}
async fn escalate(mut self) -> Result<(), GasEscalatorError<M>>
where
M: Middleware,
E: GasEscalator,
{
// the escalation frequency is either on a per-block basis, or on a duration basis // the escalation frequency is either on a per-block basis, or on a duration basis
let mut watcher: WatcherFuture = match self.frequency { let watcher: WatcherFuture = match self.frequency {
Frequency::PerBlock => Box::pin( Frequency::PerBlock => Box::pin(
self.inner self.inner.watch_blocks().await.map_err(MiddlewareError::from_err)?.map(|_| ()),
.watch_blocks()
.await
.map_err(GasEscalatorError::MiddlewareError)?
.map(|_| ()),
), ),
Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))), Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))),
}; };
while watcher.next().await.is_some() { let mut watcher = watcher.fuse();
let now = Instant::now();
let mut txs = self.txs.lock().await;
let len = txs.len();
loop {
select_biased! {
_ = &mut self.shutdown => {
tracing::debug!("Shutting down escalation task, middleware has gone away");
return Ok(())
}
opt = watcher.next() => {
if opt.is_none() {
tracing::error!("timing future has gone away");
return Ok(());
}
let now = Instant::now();
// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};
let len = txs.len();
// Pop all transactions and re-insert those that have not been included yet // Pop all transactions and re-insert those that have not been included yet
for _ in 0..len { for _ in 0..len {
// this must never panic as we're explicitly within bounds // this must never panic as we're explicitly within bounds
let (tx_hash, mut replacement_tx, time, priority) = let (tx_hash, mut replacement_tx, time, priority) =
txs.pop().expect("should have element in vector"); txs.pop().expect("should have element in vector");
let receipt = self.get_transaction_receipt(tx_hash).await?; let receipt = self
.inner
.get_transaction_receipt(tx_hash)
.await
.map_err(MiddlewareError::from_err)?;
tracing::trace!(tx_hash = ?tx_hash, "checking if exists"); tracing::trace!(tx_hash = ?tx_hash, "checking if exists");
if receipt.is_none() { if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the // Get the new gas price based on how much time passed since the
@ -193,13 +308,14 @@ where
.escalator .escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs()); .get_gas_price(old_gas_price, now.duration_since(time).as_secs());
let new_txhash = if new_gas_price != old_gas_price { let new_txhash = if new_gas_price == old_gas_price {
tx_hash
} else {
// bump the gas price // bump the gas price
replacement_tx.gas_price = Some(new_gas_price); replacement_tx.gas_price = Some(new_gas_price);
// the tx hash will be different so we need to update it // the tx hash will be different so we need to update it
match self.inner().send_transaction(replacement_tx.clone(), priority).await match self.inner.send_transaction(replacement_tx.clone(), priority).await {
{
Ok(new_tx_hash) => { Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash; let new_tx_hash = *new_tx_hash;
tracing::trace!( tracing::trace!(
@ -220,46 +336,23 @@ where
// push it back to the pending txs vector) // push it back to the pending txs vector)
continue continue
} else { } else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
return Err(GasEscalatorError::MiddlewareError(err)) return Err(GasEscalatorError::MiddlewareError(err))
} }
} }
} }
} else {
tx_hash
}; };
txs.push((new_txhash, replacement_tx, time, priority)); txs.push((new_txhash, replacement_tx, time, priority));
} }
} }
} // after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
Ok(()) // items!
} self.txs.lock().await.extend(txs);
} }}
// Boilerplate
impl<M: Middleware> MiddlewareError for GasEscalatorError<M> {
type Inner = M::Error;
fn from_err(src: M::Error) -> GasEscalatorError<M> {
GasEscalatorError::MiddlewareError(src)
}
fn as_inner(&self) -> Option<&Self::Inner> {
match self {
GasEscalatorError::MiddlewareError(e) => Some(e),
_ => None,
} }
} }
} }
#[derive(Error, Debug)]
/// Error thrown when the GasEscalator interacts with the blockchain
pub enum GasEscalatorError<M: Middleware> {
#[error("{0}")]
/// Thrown when an internal middleware errors
MiddlewareError(M::Error),
#[error("Gas escalation is only supported for EIP2930 or Legacy transactions")]
UnsupportedTxType,
}