From 62b7ce4366a77574f4f8bc49cbb1eba35090fe7d Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 8 Oct 2020 18:56:36 +0300 Subject: [PATCH] feat: Transaction Gas Price Escalator middleware (#81) * fix(signers): make Signer send by blocking on Ledger calls * fix(providers): use Arc in WS impl to allow cloning * feat(middleware): add geometric gas price escalator * test(middleware): ensure that we can still stack everything up * fix(middleware): default to tokio/async-std * chore: fix clippy * docs(middleware): add docs and rename middlewares * chore: fix doctests * feat: add linear gas escalator https://github.com/makerdao/pymaker/blob/master/tests/test_gas.py\#L107 https://github.com/makerdao/pymaker/blob/master/pymaker/gas.py\#L129 * feat: add constructors to gas escalators --- Cargo.lock | 4 + ethers-contract/tests/common/mod.rs | 6 +- ethers-contract/tests/contract.rs | 4 +- ethers-middleware/Cargo.toml | 5 + .../src/gas_escalator/geometric.rs | 114 +++++++++ ethers-middleware/src/gas_escalator/linear.rs | 78 ++++++ ethers-middleware/src/gas_escalator/mod.rs | 231 ++++++++++++++++++ .../src/gas_oracle/middleware.rs | 3 +- ethers-middleware/src/lib.rs | 70 +++++- ethers-middleware/src/nonce_manager.rs | 14 +- .../src/{client.rs => signer.rs} | 60 ++--- ethers-middleware/tests/gas_escalator.rs | 50 ++++ ethers-middleware/tests/nonce_manager.rs | 6 +- ethers-middleware/tests/signer.rs | 6 +- ethers-middleware/tests/stack.rs | 16 +- ethers-providers/src/lib.rs | 10 +- ethers-providers/src/provider.rs | 2 +- ethers-providers/src/transports/ws.rs | 14 +- ethers-signers/Cargo.toml | 1 + ethers-signers/src/ledger/app.rs | 9 +- ethers-signers/src/ledger/mod.rs | 2 +- ethers-signers/src/lib.rs | 2 +- ethers-signers/src/wallet/mod.rs | 2 +- ethers/examples/contract.rs | 2 +- ethers/examples/ens.rs | 2 +- ethers/examples/ledger.rs | 2 +- ethers/examples/local_signer.rs | 2 +- ethers/examples/yubi.rs | 2 +- 28 files changed, 636 insertions(+), 83 deletions(-) create mode 100644 ethers-middleware/src/gas_escalator/geometric.rs create mode 100644 ethers-middleware/src/gas_escalator/linear.rs create mode 100644 ethers-middleware/src/gas_escalator/mod.rs rename ethers-middleware/src/{client.rs => signer.rs} (85%) create mode 100644 ethers-middleware/tests/gas_escalator.rs diff --git a/Cargo.lock b/Cargo.lock index f65871ff..77be0279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -839,11 +839,13 @@ dependencies = [ name = "ethers-middleware" version = "0.1.3" dependencies = [ + "async-std", "async-trait", "ethers", "ethers-core", "ethers-providers", "ethers-signers", + "futures-executor", "futures-util", "reqwest", "rustc-hex", @@ -888,6 +890,7 @@ dependencies = [ "elliptic-curve", "ethers", "ethers-core", + "futures-executor", "futures-util", "rand", "rustc-hex", @@ -1029,6 +1032,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] diff --git a/ethers-contract/tests/common/mod.rs b/ethers-contract/tests/common/mod.rs index 582268cc..363cb237 100644 --- a/ethers-contract/tests/common/mod.rs +++ b/ethers-contract/tests/common/mod.rs @@ -5,7 +5,7 @@ use ethers_core::{ use ethers_contract::{Contract, ContractFactory}; use ethers_core::utils::{GanacheInstance, Solc}; -use ethers_middleware::Client; +use ethers_middleware::signer::SignerMiddleware; use ethers_providers::{Http, Middleware, Provider}; use ethers_signers::LocalWallet; use std::{convert::TryFrom, sync::Arc, time::Duration}; @@ -44,7 +44,7 @@ pub fn compile_contract(name: &str, filename: &str) -> (Abi, Bytes) { (contract.abi.clone(), contract.bytecode.clone()) } -type HttpWallet = Client, LocalWallet>; +type HttpWallet = SignerMiddleware, LocalWallet>; /// connects the private key to http://localhost:8545 pub fn connect(ganache: &GanacheInstance, idx: usize) -> Arc { @@ -52,7 +52,7 @@ pub fn connect(ganache: &GanacheInstance, idx: usize) -> Arc { .unwrap() .interval(Duration::from_millis(10u64)); let wallet: LocalWallet = ganache.keys()[idx].clone().into(); - Arc::new(Client::new(provider, wallet)) + Arc::new(SignerMiddleware::new(provider, wallet)) } /// Launches a ganache instance and deploys the SimpleStorage contract diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index 11625ebf..4dec14ae 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -342,7 +342,7 @@ mod eth_tests { mod celo_tests { use super::*; use ethers::{ - middleware::Client, + middleware::signer::SignerMiddleware, providers::{Http, Provider}, signers::LocalWallet, types::BlockNumber, @@ -363,7 +363,7 @@ mod celo_tests { .parse::() .unwrap(); - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); let client = Arc::new(client); let factory = ContractFactory::new(abi, bytecode, client); diff --git a/ethers-middleware/Cargo.toml b/ethers-middleware/Cargo.toml index f6b6c6eb..001dda01 100644 --- a/ethers-middleware/Cargo.toml +++ b/ethers-middleware/Cargo.toml @@ -29,8 +29,13 @@ serde-aux = "0.6.1" reqwest = { version = "0.10.4", default-features = false, features = ["json", "rustls-tls"] } url = { version = "2.1.1", default-features = false } +# optional for runtime +tokio = { version = "0.2.22", optional = true } +async-std = { version = "1.6.5", optional = true } + [dev-dependencies] ethers = { version = "0.1.3", path = "../ethers" } +futures-executor = { version = "0.3.5", features = ["thread-pool"] } rustc-hex = "2.1.0" tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] } diff --git a/ethers-middleware/src/gas_escalator/geometric.rs b/ethers-middleware/src/gas_escalator/geometric.rs new file mode 100644 index 00000000..dcbba6c7 --- /dev/null +++ b/ethers-middleware/src/gas_escalator/geometric.rs @@ -0,0 +1,114 @@ +use super::GasEscalator; +use ethers_core::types::U256; + +/// Geometrically increasing gas price. +/// +/// Start with `initial_price`, then increase it every 'every_secs' seconds by a fixed coefficient. +/// Coefficient defaults to 1.125 (12.5%), the minimum increase for Parity to replace a transaction. +/// Coefficient can be adjusted, and there is an optional upper limit. +/// +/// https://github.com/makerdao/pymaker/blob/master/pymaker/gas.py#L168 +#[derive(Clone, Debug)] +pub struct GeometricGasPrice { + every_secs: u64, + coefficient: f64, + max_price: Option, +} + +impl GeometricGasPrice { + /// Constructor + /// + /// Note: Providing `None` to `max_price` requires giving it a type-hint, so you'll need + /// to call this like `GeometricGasPrice::new(1.125, 60u64, None::)`. + pub fn new, K: Into>( + coefficient: f64, + every_secs: K, + max_price: Option, + ) -> Self { + GeometricGasPrice { + every_secs: every_secs.into(), + coefficient, + max_price: max_price.map(Into::into), + } + } +} + +impl GasEscalator for GeometricGasPrice { + fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256 { + let mut result = initial_price.as_u64() as f64; + + if time_elapsed >= self.every_secs { + let iters = time_elapsed / self.every_secs; + for _ in 0..iters { + result *= self.coefficient; + } + } + + let mut result = U256::from(result.ceil() as u64); + if let Some(max_price) = self.max_price { + result = std::cmp::min(result, max_price); + } + result + } +} + +#[cfg(test)] +// https://github.com/makerdao/pymaker/blob/master/tests/test_gas.py#L165 +mod tests { + use super::*; + + #[test] + fn gas_price_increases_with_time() { + let oracle = GeometricGasPrice::new(1.125, 10u64, None::); + let initial_price = U256::from(100); + + assert_eq!(oracle.get_gas_price(initial_price, 0), 100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1), 100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 10), 113.into()); + assert_eq!(oracle.get_gas_price(initial_price, 15), 113.into()); + assert_eq!(oracle.get_gas_price(initial_price, 20), 127.into()); + assert_eq!(oracle.get_gas_price(initial_price, 30), 143.into()); + assert_eq!(oracle.get_gas_price(initial_price, 50), 181.into()); + assert_eq!(oracle.get_gas_price(initial_price, 100), 325.into()); + } + + #[test] + fn gas_price_should_obey_max_value() { + let oracle = GeometricGasPrice::new(1.125, 60u64, Some(2500)); + let initial_price = U256::from(1000); + + assert_eq!(oracle.get_gas_price(initial_price, 0), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 59), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 60), 1125.into()); + assert_eq!(oracle.get_gas_price(initial_price, 119), 1125.into()); + assert_eq!(oracle.get_gas_price(initial_price, 120), 1266.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1200), 2500.into()); + assert_eq!(oracle.get_gas_price(initial_price, 3000), 2500.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1000000), 2500.into()); + } + + #[test] + fn behaves_with_realistic_values() { + let oracle = GeometricGasPrice::new(1.25, 10u64, None::); + const GWEI: f64 = 1000000000.0; + let initial_price = U256::from(100 * GWEI as u64); + + for seconds in &[0u64, 1, 10, 12, 30, 60] { + println!( + "gas price after {} seconds is {}", + seconds, + oracle.get_gas_price(initial_price, *seconds).as_u64() as f64 / GWEI + ); + } + + let normalized = |time| oracle.get_gas_price(initial_price, time).as_u64() as f64 / GWEI; + + assert_eq!(normalized(0), 100.0); + assert_eq!(normalized(1), 100.0); + assert_eq!(normalized(10), 125.0); + assert_eq!(normalized(12), 125.0); + assert_eq!(normalized(30), 195.3125); + assert_eq!(normalized(60), 381.469726563); + } +} diff --git a/ethers-middleware/src/gas_escalator/linear.rs b/ethers-middleware/src/gas_escalator/linear.rs new file mode 100644 index 00000000..e47b5bbd --- /dev/null +++ b/ethers-middleware/src/gas_escalator/linear.rs @@ -0,0 +1,78 @@ +use super::GasEscalator; +use ethers_core::types::U256; + +/// Linearly increasing gas price. +/// +/// +/// Start with `initial_price`, then increase it by fixed amount `increase_by` every `every_secs` seconds +/// until the transaction gets confirmed. There is an optional upper limit. +/// +/// https://github.com/makerdao/pymaker/blob/master/pymaker/gas.py#L129 +#[derive(Clone, Debug)] +pub struct LinearGasPrice { + every_secs: u64, + increase_by: U256, + max_price: Option, +} + +impl LinearGasPrice { + /// Constructor + pub fn new>( + increase_by: T, + every_secs: impl Into, + max_price: Option, + ) -> Self { + LinearGasPrice { + every_secs: every_secs.into(), + increase_by: increase_by.into(), + max_price: max_price.map(Into::into), + } + } +} + +impl GasEscalator for LinearGasPrice { + fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256 { + let mut result = initial_price + self.increase_by * (time_elapsed / self.every_secs) as u64; + dbg!(time_elapsed, self.every_secs); + if let Some(max_price) = self.max_price { + result = std::cmp::min(result, max_price); + } + result + } +} + +#[cfg(test)] +// https://github.com/makerdao/pymaker/blob/master/tests/test_gas.py#L107 +mod tests { + use super::*; + + #[test] + fn gas_price_increases_with_time() { + let oracle = LinearGasPrice::new(100, 60u64, None); + let initial_price = U256::from(1000); + + assert_eq!(oracle.get_gas_price(initial_price, 0), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 59), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 60), 1100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 119), 1100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 120), 1200.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1200), 3000.into()); + } + + #[test] + fn gas_price_should_obey_max_value() { + let oracle = LinearGasPrice::new(100, 60u64, Some(2500)); + let initial_price = U256::from(1000); + + assert_eq!(oracle.get_gas_price(initial_price, 0), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 59), 1000.into()); + assert_eq!(oracle.get_gas_price(initial_price, 60), 1100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 119), 1100.into()); + assert_eq!(oracle.get_gas_price(initial_price, 120), 1200.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1200), 2500.into()); + assert_eq!(oracle.get_gas_price(initial_price, 3000), 2500.into()); + assert_eq!(oracle.get_gas_price(initial_price, 1000000), 2500.into()); + } +} diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs new file mode 100644 index 00000000..991c3cb7 --- /dev/null +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -0,0 +1,231 @@ +mod geometric; +pub use geometric::GeometricGasPrice; + +mod linear; +pub use linear::LinearGasPrice; + +use async_trait::async_trait; +use ethers_core::types::{BlockNumber, TransactionRequest, TxHash, U256}; +use ethers_providers::{interval, FromErr, Middleware, StreamExt}; +use futures_util::lock::Mutex; +use std::sync::Arc; +use std::{pin::Pin, time::Instant}; +use thiserror::Error; + +#[cfg(all(not(feature = "tokio"), feature = "async-std"))] +use async_std::task::spawn; +#[cfg(all(feature = "tokio", not(feature = "async-std")))] +use tokio::spawn; +#[cfg(all(feature = "tokio", all(feature = "async-std")))] +// this should never happen, used to silence clippy warnings +fn spawn(_: T) { + unimplemented!("do not use both tokio and async-std!") +} + +/// Trait for fetching updated gas prices after a transaction has been first +/// broadcast +pub trait GasEscalator: Send + Sync + std::fmt::Debug { + /// Given the initial gas price and the time elapsed since the transaction's + /// first broadcast, it returns the new gas price + fn get_gas_price(&self, initial_price: U256, time_elapsed: u64) -> U256; +} + +#[derive(Debug, Clone)] +/// The frequency at which transactions will be bumped +pub enum Frequency { + /// On a per block basis using the eth_newBlock filter + PerBlock, + /// On a duration basis (in milliseconds) + Duration(u64), +} + +#[derive(Debug, Clone)] +/// A Gas escalator allows bumping transactions' gas price to avoid getting them +/// stuck in the memory pool. +/// +/// If the crate is compiled with the `tokio` or `async-std` features, it will +/// automatically start bumping transactions in the background. Otherwise, you need +/// to spawn the `escalate` call yourself with an executor of choice. +/// +/// ```no_run +/// use ethers::{ +/// providers::{Provider, Http}, +/// middleware::{ +/// gas_escalator::{GeometricGasPrice, Frequency, GasEscalatorMiddleware}, +/// gas_oracle::{GasNow, GasCategory, GasOracleMiddleware}, +/// }, +/// }; +/// use std::{convert::TryFrom, time::Duration, sync::Arc}; +/// +/// let provider = Provider::try_from("http://localhost:8545") +/// .unwrap() +/// .interval(Duration::from_millis(2000u64)); +/// +/// let provider = { +/// let escalator = GeometricGasPrice::new(5.0, 10u64, None::); +/// GasEscalatorMiddleware::new(provider, escalator, Frequency::PerBlock) +/// }; +/// +/// // ... proceed to wrap it in other middleware +/// let gas_oracle = GasNow::new().category(GasCategory::SafeLow); +/// let provider = GasOracleMiddleware::new(provider, gas_oracle); +/// ``` +pub struct GasEscalatorMiddleware { + pub(crate) inner: Arc, + pub(crate) escalator: E, + /// The transactions which are currently being monitored for escalation + #[allow(clippy::type_complexity)] + pub txs: Arc)>>>, + frequency: Frequency, +} + +#[async_trait] +impl Middleware for GasEscalatorMiddleware +where + M: Middleware, + E: GasEscalator, +{ + type Error = GasEscalatorError; + type Provider = M::Provider; + type Inner = M; + + fn inner(&self) -> &M { + &self.inner + } + + async fn send_transaction( + &self, + tx: TransactionRequest, + block: Option, + ) -> Result { + let tx_hash = self + .inner() + .send_transaction(tx.clone(), block) + .await + .map_err(GasEscalatorError::MiddlewareError)?; + + // insert the tx in the pending txs + let mut lock = self.txs.lock().await; + lock.push((tx_hash, tx, Instant::now(), block)); + + Ok(tx_hash) + } +} + +impl GasEscalatorMiddleware +where + M: Middleware, + E: GasEscalator, +{ + /// Initializes the middleware with the provided gas escalator and the chosen + /// escalation frequency (per block or per second) + #[allow(clippy::let_and_return)] + pub fn new(inner: M, escalator: E, frequency: Frequency) -> Self + where + E: Clone + 'static, + M: Clone + 'static, + { + let this = Self { + inner: Arc::new(inner), + escalator, + frequency, + txs: Arc::new(Mutex::new(Vec::new())), + }; + + #[cfg(any(feature = "async-std", feature = "tokio"))] + { + let this2 = this.clone(); + spawn(async move { + this2.escalate().await.unwrap(); + }); + } + + this + } + + /// Re-broadcasts pending transactions with a gas price escalator + pub async fn escalate(&self) -> Result<(), GasEscalatorError> { + // the escalation frequency is either on a per-block basis, or on a duratoin basis + let mut watcher: Pin + Send>> = + match self.frequency { + Frequency::PerBlock => Box::pin( + self.inner + .watch_blocks() + .await + .map_err(GasEscalatorError::MiddlewareError)? + .map(|_| ()), + ), + Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))), + }; + + while watcher.next().await.is_some() { + let now = Instant::now(); + let mut txs = self.txs.lock().await; + let len = txs.len(); + + // Pop all transactions and re-insert those that have not been included yet + for _ in 0..len { + // this must never panic as we're explicitly within bounds + let (tx_hash, mut replacement_tx, time, priority) = + txs.pop().expect("should have element in vector"); + + let receipt = self.get_transaction_receipt(tx_hash).await?; + if receipt.is_none() { + // Get the new gas price based on how much time passed since the + // tx was last broadcast + let new_gas_price = self.escalator.get_gas_price( + replacement_tx.gas_price.expect("gas price must be set"), + now.duration_since(time).as_secs(), + ); + + let new_txhash = if Some(new_gas_price) != replacement_tx.gas_price { + // bump the gas price + replacement_tx.gas_price = Some(new_gas_price); + + // the tx hash will be different so we need to update it + match self + .inner() + .send_transaction(replacement_tx.clone(), priority) + .await + { + Ok(tx_hash) => tx_hash, + Err(err) => { + if err.to_string().contains("nonce too low") { + // ignore "nonce too low" errors because they + // may happen if we try to broadcast a higher + // gas price tx when one of the previous ones + // was already mined (meaning we also do not + // push it back to the pending txs vector) + continue; + } else { + return Err(GasEscalatorError::MiddlewareError(err)); + } + } + } + } else { + tx_hash + }; + + txs.push((new_txhash, replacement_tx, time, priority)); + } + } + } + + Ok(()) + } +} + +// Boilerplate +impl FromErr for GasEscalatorError { + fn from(src: M::Error) -> GasEscalatorError { + GasEscalatorError::MiddlewareError(src) + } +} + +#[derive(Error, Debug)] +/// Error thrown when the GasEscalator interacts with the blockchain +pub enum GasEscalatorError { + #[error("{0}")] + /// Thrown when an internal middleware errors + MiddlewareError(M::Error), +} diff --git a/ethers-middleware/src/gas_oracle/middleware.rs b/ethers-middleware/src/gas_oracle/middleware.rs index c1b2aeaf..fed24e0c 100644 --- a/ethers-middleware/src/gas_oracle/middleware.rs +++ b/ethers-middleware/src/gas_oracle/middleware.rs @@ -5,6 +5,7 @@ use ethers_providers::{FromErr, Middleware}; use thiserror::Error; #[derive(Debug)] +/// Middleware used for fetching gas prices over an API instead of `eth_gasPrice` pub struct GasOracleMiddleware { inner: M, gas_oracle: G, @@ -35,7 +36,7 @@ impl FromErr for MiddlewareError { } } -#[async_trait(?Send)] +#[async_trait] impl Middleware for GasOracleMiddleware where M: Middleware, diff --git a/ethers-middleware/src/lib.rs b/ethers-middleware/src/lib.rs index d37d6ed1..f976c199 100644 --- a/ethers-middleware/src/lib.rs +++ b/ethers-middleware/src/lib.rs @@ -1,21 +1,67 @@ -//! Ethers Middleware +//! # Ethers Middleware //! //! Ethers uses a middleware architecture. You start the middleware stack with -//! a [`Provider`], and wrap it with additional middleware functionalities that -//! you need. +//! a [`Provider`](ethers_providers::Provider), and wrap it with additional +//! middleware functionalities that you need. //! -//! # Middlewares +//! ## Available Middleware +//! - Signer +//! - Nonce Manager +//! - Gas Escalator +//! - Gas Oracle //! -//! ## Gas Oracle +//! ## Example of a middleware stack //! -//! ## Signer +//! ```no_run +//! use ethers::{ +//! providers::{Provider, Http}, +//! signers::LocalWallet, +//! middleware::{ +//! gas_escalator::{GasEscalatorMiddleware, GeometricGasPrice, Frequency}, +//! gas_oracle::{GasOracleMiddleware, GasNow, GasCategory}, +//! signer::SignerMiddleware, +//! nonce_manager::NonceManagerMiddleware, +//! }, +//! core::rand, +//! }; +//! use std::convert::TryFrom; //! -//! ## Nonce Manager +//! // Start the stack +//! let provider = Provider::::try_from("http://localhost:8545").unwrap(); +//! +//! // Escalate gas prices +//! let escalator = GeometricGasPrice::new(1.125, 60u64, None::); +//! let provider = +//! GasEscalatorMiddleware::new(provider, escalator, Frequency::PerBlock); +//! +//! // Sign transactions with a private key +//! let signer = LocalWallet::new(&mut rand::thread_rng()); +//! let address = signer.address(); +//! let provider = SignerMiddleware::new(provider, signer); +//! +//! // Use GasNow as the gas oracle +//! let gas_oracle = GasNow::new().category(GasCategory::SafeLow); +//! let provider = GasOracleMiddleware::new(provider, gas_oracle); +//! +//! // Manage nonces locally +//! let provider = NonceManagerMiddleware::new(provider, address); +//! +//! // ... do something with the provider +//! ``` + +/// The gas escalator middleware is used to re-broadcast transactions with an +/// increasing gas price to guarantee their timely inclusion +pub mod gas_escalator; + +/// The gas oracle middleware is used to get the gas price from a list of gas oracles +/// instead of using eth_gasPrice pub mod gas_oracle; -pub use gas_oracle::GasOracleMiddleware; -pub mod client; -pub use client::Client; +/// The nonce manager middleware is used to locally calculate nonces instead of +/// using eth_getTransactionCount +pub mod nonce_manager; -mod nonce_manager; -pub use nonce_manager::NonceManager; +/// The signer middleware is used to locally sign transactions and messages +/// instead of using eth_sendTransaction and eth_sign +pub mod signer; +pub use signer::SignerMiddleware; diff --git a/ethers-middleware/src/nonce_manager.rs b/ethers-middleware/src/nonce_manager.rs index c2fb634f..f6319d07 100644 --- a/ethers-middleware/src/nonce_manager.rs +++ b/ethers-middleware/src/nonce_manager.rs @@ -5,20 +5,22 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use thiserror::Error; #[derive(Debug)] -pub struct NonceManager { +/// Middleware used for calculating nonces locally, useful for signing multiple +/// consecutive transactions without waiting for them to hit the mempool +pub struct NonceManagerMiddleware { pub inner: M, pub initialized: AtomicBool, pub nonce: AtomicU64, pub address: Address, } -impl NonceManager +impl NonceManagerMiddleware where M: Middleware, { /// Instantiates the nonce manager with a 0 nonce. pub fn new(inner: M, address: Address) -> Self { - NonceManager { + Self { initialized: false.into(), nonce: 0.into(), inner, @@ -52,7 +54,9 @@ where } #[derive(Error, Debug)] +/// Thrown when an error happens at the Nonce Manager pub enum NonceManagerError { + /// Thrown when the internal middleware errors #[error("{0}")] MiddlewareError(M::Error), } @@ -63,8 +67,8 @@ impl FromErr for NonceManagerError { } } -#[async_trait(?Send)] -impl Middleware for NonceManager +#[async_trait] +impl Middleware for NonceManagerMiddleware where M: Middleware, { diff --git a/ethers-middleware/src/client.rs b/ethers-middleware/src/signer.rs similarity index 85% rename from ethers-middleware/src/client.rs rename to ethers-middleware/src/signer.rs index 669463dc..bf76297e 100644 --- a/ethers-middleware/src/client.rs +++ b/ethers-middleware/src/signer.rs @@ -23,7 +23,7 @@ use thiserror::Error; /// use ethers::{ /// providers::{Middleware, Provider, Http}, /// signers::LocalWallet, -/// middleware::Client, +/// middleware::SignerMiddleware, /// types::{Address, TransactionRequest}, /// }; /// use std::convert::TryFrom; @@ -37,15 +37,12 @@ use thiserror::Error; /// let wallet: LocalWallet = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc" /// .parse()?; /// -/// let mut client = Client::new(provider, wallet); +/// let mut client = SignerMiddleware::new(provider, wallet); /// -/// // since it derefs to `Provider`, we can just call any of the JSON-RPC API methods -/// let block = client.get_block(100u64).await?; -/// -/// // You can use the node's `eth_sign` and `eth_sendTransaction` calls by calling the -/// // internal provider's method. +/// // You can sign messages with the key /// let signed_msg = client.sign(b"hello".to_vec(), &client.address()).await?; /// +/// // ...and sign transactions /// let tx = TransactionRequest::pay("vitalik.eth", 100); /// let tx_hash = client.send_transaction(tx, None).await?; /// @@ -71,21 +68,21 @@ use thiserror::Error; /// ``` /// /// [`Provider`]: ethers_providers::Provider -pub struct Client { +pub struct SignerMiddleware { pub(crate) inner: M, pub(crate) signer: S, pub(crate) address: Address, } -impl FromErr for ClientError { - fn from(src: M::Error) -> ClientError { - ClientError::MiddlewareError(src) +impl FromErr for SignerMiddlewareError { + fn from(src: M::Error) -> SignerMiddlewareError { + SignerMiddlewareError::MiddlewareError(src) } } #[derive(Error, Debug)] /// Error thrown when the client interacts with the blockchain -pub enum ClientError { +pub enum SignerMiddlewareError { #[error("{0}")] /// Thrown when the internal call to the signer fails SignerError(S::Error), @@ -106,7 +103,7 @@ pub enum ClientError { } // Helper functions for locally signing transactions -impl Client +impl SignerMiddleware where M: Middleware, S: Signer, @@ -114,7 +111,7 @@ where /// Creates a new client from the provider and signer. pub fn new(inner: M, signer: S) -> Self { let address = signer.address(); - Client { + SignerMiddleware { inner, signer, address, @@ -124,17 +121,17 @@ where async fn sign_transaction( &self, tx: TransactionRequest, - ) -> Result> { + ) -> Result> { // The nonce, gas and gasprice fields must already be populated - let nonce = tx.nonce.ok_or(ClientError::NonceMissing)?; - let gas_price = tx.gas_price.ok_or(ClientError::GasPriceMissing)?; - let gas = tx.gas.ok_or(ClientError::GasMissing)?; + let nonce = tx.nonce.ok_or(SignerMiddlewareError::NonceMissing)?; + let gas_price = tx.gas_price.ok_or(SignerMiddlewareError::GasPriceMissing)?; + let gas = tx.gas.ok_or(SignerMiddlewareError::GasMissing)?; let signature = self .signer .sign_transaction(&tx) .await - .map_err(ClientError::SignerError)?; + .map_err(SignerMiddlewareError::SignerError)?; // Get the actual transaction hash let rlp = tx.rlp_signed(&signature); @@ -180,7 +177,7 @@ where &self, tx: &mut TransactionRequest, block: Option, - ) -> Result<(), ClientError> { + ) -> Result<(), SignerMiddlewareError> { // set the `from` field if tx.from.is_none() { tx.from = Some(self.address()); @@ -195,9 +192,9 @@ where self.inner.get_transaction_count(self.address(), block) ), ); - tx.gas_price = Some(gas_price.map_err(ClientError::MiddlewareError)?); - tx.gas = Some(gas.map_err(ClientError::MiddlewareError)?); - tx.nonce = Some(nonce.map_err(ClientError::MiddlewareError)?); + tx.gas_price = Some(gas_price.map_err(SignerMiddlewareError::MiddlewareError)?); + tx.gas = Some(gas.map_err(SignerMiddlewareError::MiddlewareError)?); + tx.nonce = Some(nonce.map_err(SignerMiddlewareError::MiddlewareError)?); Ok(()) } @@ -224,13 +221,13 @@ where } } -#[async_trait(?Send)] -impl Middleware for Client +#[async_trait] +impl Middleware for SignerMiddleware where M: Middleware, S: Signer, { - type Error = ClientError; + type Error = SignerMiddlewareError; type Provider = M::Provider; type Inner = M; @@ -252,7 +249,7 @@ where .inner .resolve_name(&ens_name) .await - .map_err(ClientError::MiddlewareError)?; + .map_err(SignerMiddlewareError::MiddlewareError)?; tx.to = Some(addr.into()) } } @@ -268,7 +265,7 @@ where self.inner .send_raw_transaction(&signed_tx) .await - .map_err(ClientError::MiddlewareError) + .map_err(SignerMiddlewareError::MiddlewareError) } /// Signs a message with the internal signer, or if none is present it will make a call to @@ -278,7 +275,10 @@ where data: T, _: &Address, ) -> Result { - Ok(self.signer.sign_message(data.into()).await.unwrap()) + self.signer + .sign_message(data.into()) + .await + .map_err(SignerMiddlewareError::SignerError) } } @@ -326,7 +326,7 @@ mod tests { .parse::() .unwrap() .set_chain_id(chain_id); - let client = Client::new(provider, key); + let client = SignerMiddleware::new(provider, key); let tx = client.sign_transaction(tx).await.unwrap(); diff --git a/ethers-middleware/tests/gas_escalator.rs b/ethers-middleware/tests/gas_escalator.rs new file mode 100644 index 00000000..d4750cfe --- /dev/null +++ b/ethers-middleware/tests/gas_escalator.rs @@ -0,0 +1,50 @@ +use ethers_core::types::*; +use ethers_middleware::{ + gas_escalator::{Frequency, GasEscalatorMiddleware, GeometricGasPrice}, + signer::SignerMiddleware, +}; +use ethers_providers::{Middleware, Provider, Ws}; +use ethers_signers::LocalWallet; +use std::time::Duration; + +#[tokio::test] +#[ignore] +async fn gas_escalator_live() { + // connect to ropsten for getting bad block times + let ws = Ws::connect("wss://ropsten.infura.io/ws/v3/fd8b88b56aa84f6da87b60f5441d6778") + .await + .unwrap(); + let provider = Provider::new(ws).interval(Duration::from_millis(2000u64)); + let wallet = "fdb33e2105f08abe41a8ee3b758726a31abdd57b7a443f470f23efce853af169" + .parse::() + .unwrap(); + let address = wallet.address(); + let provider = SignerMiddleware::new(provider, wallet); + + let escalator = GeometricGasPrice::new(5.0, 10u64, Some(2000_000_000_000u64)); + + let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::Duration(3000)); + + let nonce = provider.get_transaction_count(address, None).await.unwrap(); + let tx = TransactionRequest::pay(Address::zero(), 1u64).gas_price(10_000_000); + + // broadcast 3 txs + provider + .send_transaction(tx.clone().nonce(nonce), None) + .await + .unwrap(); + provider + .send_transaction(tx.clone().nonce(nonce + 1), None) + .await + .unwrap(); + provider + .send_transaction(tx.clone().nonce(nonce + 2), None) + .await + .unwrap(); + + // Wait a bunch of seconds and refresh etherscan to see the transactions get bumped + tokio::time::delay_for(std::time::Duration::from_secs(100)).await; + + // TODO: Figure out how to test this behavior properly in a local network. If the gas price was bumped + // then the tx hash will be different +} diff --git a/ethers-middleware/tests/nonce_manager.rs b/ethers-middleware/tests/nonce_manager.rs index 65cf2082..ef422530 100644 --- a/ethers-middleware/tests/nonce_manager.rs +++ b/ethers-middleware/tests/nonce_manager.rs @@ -2,7 +2,7 @@ #[cfg(not(feature = "celo"))] async fn nonce_manager() { use ethers_core::types::*; - use ethers_middleware::{Client, NonceManager}; + use ethers_middleware::{nonce_manager::NonceManagerMiddleware, signer::SignerMiddleware}; use ethers_providers::{Http, Middleware, Provider}; use ethers_signers::LocalWallet; use std::convert::TryFrom; @@ -18,11 +18,11 @@ async fn nonce_manager() { .unwrap(); let address = wallet.address(); - let provider = Client::new(provider, wallet); + let provider = SignerMiddleware::new(provider, wallet); // the nonce manager must be over the Client so that it overrides the nonce // before the client gets it - let provider = NonceManager::new(provider, address); + let provider = NonceManagerMiddleware::new(provider, address); let nonce = provider .get_transaction_count(address, Some(BlockNumber::Pending)) diff --git a/ethers-middleware/tests/signer.rs b/ethers-middleware/tests/signer.rs index 93322723..2e84f0ac 100644 --- a/ethers-middleware/tests/signer.rs +++ b/ethers-middleware/tests/signer.rs @@ -1,7 +1,7 @@ use ethers_providers::{Http, Middleware, Provider}; use ethers_core::types::TransactionRequest; -use ethers_middleware::Client; +use ethers_middleware::signer::SignerMiddleware; use ethers_signers::LocalWallet; use std::{convert::TryFrom, time::Duration}; @@ -20,7 +20,7 @@ async fn send_eth() { let provider = Provider::::try_from(ganache.endpoint()) .unwrap() .interval(Duration::from_millis(10u64)); - let provider = Client::new(provider, wallet); + let provider = SignerMiddleware::new(provider, wallet); // craft the transaction let tx = TransactionRequest::new().to(wallet2.address()).value(10000); @@ -54,7 +54,7 @@ async fn test_send_transaction() { let wallet = "d652abb81e8c686edba621a895531b1f291289b63b5ef09a94f686a5ecdd5db1" .parse::() .unwrap(); - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); let balance_before = client.get_balance(client.address(), None).await.unwrap(); let tx = TransactionRequest::pay(client.address(), 100); diff --git a/ethers-middleware/tests/stack.rs b/ethers-middleware/tests/stack.rs index 6dd0038d..915e42ce 100644 --- a/ethers-middleware/tests/stack.rs +++ b/ethers-middleware/tests/stack.rs @@ -3,8 +3,10 @@ async fn can_stack_middlewares() { use ethers_core::{types::TransactionRequest, utils::Ganache}; use ethers_middleware::{ - gas_oracle::{GasCategory, GasNow}, - Client, GasOracleMiddleware, NonceManager, + gas_escalator::{Frequency, GasEscalatorMiddleware, GeometricGasPrice}, + gas_oracle::{GasCategory, GasNow, GasOracleMiddleware}, + nonce_manager::NonceManagerMiddleware, + signer::SignerMiddleware, }; use ethers_providers::{Http, Middleware, Provider}; use ethers_signers::LocalWallet; @@ -19,15 +21,21 @@ async fn can_stack_middlewares() { let provider = Provider::::try_from(ganache.endpoint()).unwrap(); let provider_clone = provider.clone(); + // the Gas Price escalator middleware is the first middleware above the provider, + // so that it receives the transaction last, after all the other middleware + // have modified it accordingly + let escalator = GeometricGasPrice::new(1.125, 60u64, None::); + let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::PerBlock); + // The gas price middleware MUST be below the signing middleware for things to work let provider = GasOracleMiddleware::new(provider, gas_oracle); // The signing middleware signs txs - let provider = Client::new(provider, signer); + let provider = SignerMiddleware::new(provider, signer); // The nonce manager middleware MUST be above the signing middleware so that it overrides // the nonce and the signer does not make any eth_getTransaction count calls - let provider = NonceManager::new(provider, address); + let provider = NonceManagerMiddleware::new(provider, address); let tx = TransactionRequest::new(); let mut tx_hash = None; diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 84def62c..96c64648 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -112,7 +112,7 @@ pub use pending_transaction::PendingTransaction; mod stream; pub use futures_util::StreamExt; -pub use stream::{FilterWatcher, DEFAULT_POLL_INTERVAL}; +pub use stream::{interval, FilterWatcher, DEFAULT_POLL_INTERVAL}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -121,7 +121,8 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin}; pub use provider::{FilterKind, Provider, ProviderError}; // Helper type alias -pub(crate) type PinBoxFut<'a, T> = Pin> + 'a>>; +pub(crate) type PinBoxFut<'a, T> = + Pin> + Send + 'a>>; #[async_trait] /// Trait which must be implemented by data transports to be used with the Ethereum @@ -138,14 +139,13 @@ pub trait JsonRpcClient: Debug + Send + Sync { } use ethers_core::types::*; - pub trait FromErr { fn from(src: T) -> Self; } -#[async_trait(?Send)] +#[async_trait] pub trait Middleware: Sync + Send + Debug { - type Error: Error + FromErr<::Error>; + type Error: Send + Error + FromErr<::Error>; type Provider: JsonRpcClient; type Inner: Middleware; diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 73177d0e..ecea8089 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -115,7 +115,7 @@ impl Provider

{ } } -#[async_trait(?Send)] +#[async_trait] impl Middleware for Provider

{ type Error = ProviderError; type Provider = P; diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index 25362045..83ffa8d0 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -10,6 +10,7 @@ use futures_util::{ use serde::{Deserialize, Serialize}; use std::fmt::{self, Debug}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use thiserror::Error; use super::common::{JsonRpcError, Request, ResponseData}; @@ -92,7 +93,16 @@ pub type MaybeTlsStream = StreamSwitcher>; /// for your runtime. pub struct Provider { id: AtomicU64, - ws: Mutex, + ws: Arc>, +} + +impl Clone for Provider { + fn clone(&self) -> Self { + Self { + id: AtomicU64::new(self.id.load(Ordering::SeqCst)), + ws: self.ws.clone(), + } + } } impl Debug for Provider { @@ -129,7 +139,7 @@ where pub fn new(ws: S) -> Self { Self { id: AtomicU64::new(0), - ws: Mutex::new(ws), + ws: Arc::new(Mutex::new(ws)), } } } diff --git a/ethers-signers/Cargo.toml b/ethers-signers/Cargo.toml index e3526157..774710ba 100644 --- a/ethers-signers/Cargo.toml +++ b/ethers-signers/Cargo.toml @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] ethers-core = { version = "0.1.3", path = "../ethers-core" } thiserror = { version = "1.0.15", default-features = false } futures-util = { version = "0.3.5", default-features = false } +futures-executor = { version = "0.3.5", default-features = false } serde = { version = "1.0.112", default-features = false } coins-ledger = { git = "https://github.com/summa-tx/bitcoins-rs", optional = true } diff --git a/ethers-signers/src/ledger/app.rs b/ethers-signers/src/ledger/app.rs index c5aeb022..a85f7a57 100644 --- a/ethers-signers/src/ledger/app.rs +++ b/ethers-signers/src/ledger/app.rs @@ -3,6 +3,7 @@ use coins_ledger::{ common::{APDUAnswer, APDUCommand, APDUData}, transports::{Ledger, LedgerAsync}, }; +use futures_executor::block_on; use futures_util::lock::Mutex; use ethers_core::{ @@ -88,7 +89,7 @@ impl LedgerEthereum { response_len: None, }; - let answer = transport.exchange(&command).await?; + let answer = block_on(transport.exchange(&command))?; let result = answer.data().ok_or(LedgerError::UnexpectedNullResponse)?; let address = { @@ -113,7 +114,7 @@ impl LedgerEthereum { response_len: None, }; - let answer = transport.exchange(&command).await?; + let answer = block_on(transport.exchange(&command))?; let result = answer.data().ok_or(LedgerError::UnexpectedNullResponse)?; Ok(format!("{}.{}.{}", result[1], result[2], result[3])) @@ -164,7 +165,7 @@ impl LedgerEthereum { let data = payload.drain(0..chunk_size).collect::>(); command.data = APDUData::new(&data); - let answer = transport.exchange(&command).await?; + let answer = block_on(transport.exchange(&command))?; result = answer .data() .ok_or(LedgerError::UnexpectedNullResponse)? @@ -201,7 +202,7 @@ impl LedgerEthereum { } } -#[cfg(all(test, feature = "ledger-tests"))] +#[cfg(all(test, feature = "ledger"))] mod tests { use super::*; use crate::Signer; diff --git a/ethers-signers/src/ledger/mod.rs b/ethers-signers/src/ledger/mod.rs index 503527b3..7b12018e 100644 --- a/ethers-signers/src/ledger/mod.rs +++ b/ethers-signers/src/ledger/mod.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use ethers_core::types::{Address, Signature, TransactionRequest}; use types::LedgerError; -#[async_trait(?Send)] +#[async_trait] impl Signer for LedgerEthereum { type Error = LedgerError; diff --git a/ethers-signers/src/lib.rs b/ethers-signers/src/lib.rs index 9ea7cc43..0e7e9465 100644 --- a/ethers-signers/src/lib.rs +++ b/ethers-signers/src/lib.rs @@ -63,7 +63,7 @@ use std::error::Error; /// Trait for signing transactions and messages /// /// Implement this trait to support different signing modes, e.g. Ledger, hosted etc. -#[async_trait(?Send)] +#[async_trait] pub trait Signer: std::fmt::Debug + Send + Sync { type Error: Error + Send + Sync; /// Signs the hash of the provided message after prefixing it diff --git a/ethers-signers/src/wallet/mod.rs b/ethers-signers/src/wallet/mod.rs index 9f60fa82..97cfbd26 100644 --- a/ethers-signers/src/wallet/mod.rs +++ b/ethers-signers/src/wallet/mod.rs @@ -62,7 +62,7 @@ pub struct Wallet> { pub(crate) chain_id: Option, } -#[async_trait(?Send)] +#[async_trait] impl> Signer for Wallet { type Error = std::convert::Infallible; diff --git a/ethers/examples/contract.rs b/ethers/examples/contract.rs index ecc73ae3..d9ddb30c 100644 --- a/ethers/examples/contract.rs +++ b/ethers/examples/contract.rs @@ -31,7 +31,7 @@ async fn main() -> Result<()> { Provider::::try_from(ganache.endpoint())?.interval(Duration::from_millis(10u64)); // 5. instantiate the client with the wallet - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); let client = Arc::new(client); // 6. create a factory which will be used to deploy instances of the contract diff --git a/ethers/examples/ens.rs b/ethers/examples/ens.rs index 0d096329..196f0311 100644 --- a/ethers/examples/ens.rs +++ b/ethers/examples/ens.rs @@ -12,7 +12,7 @@ async fn main() -> Result<()> { // create a wallet and connect it to the provider let wallet = "dcf2cbdd171a21c480aa7f53d77f31bb102282b3ff099c78e3118b37348c72f7" .parse::()?; - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); // craft the transaction let tx = TransactionRequest::new().to("vitalik.eth").value(100_000); diff --git a/ethers/examples/ledger.rs b/ethers/examples/ledger.rs index f7f786f4..a08d75a8 100644 --- a/ethers/examples/ledger.rs +++ b/ethers/examples/ledger.rs @@ -10,7 +10,7 @@ async fn main() -> anyhow::Result<()> { // index or supply the full HD path string. You may also provide the chain_id // (here: mainnet) for EIP155 support. let ledger = Ledger::new(HDPath::LedgerLive(0), Some(1)).await?; - let client = Client::new(provider, ledger); + let client = SignerMiddleware::new(provider, ledger); // Create and broadcast a transaction (ENS enabled!) // (this will require confirming the tx on the device) diff --git a/ethers/examples/local_signer.rs b/ethers/examples/local_signer.rs index a92bf574..cedfaff5 100644 --- a/ethers/examples/local_signer.rs +++ b/ethers/examples/local_signer.rs @@ -13,7 +13,7 @@ async fn main() -> Result<()> { let provider = Provider::::try_from(ganache.endpoint())?; // connect the wallet to the provider - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); // craft the transaction let tx = TransactionRequest::new().to(wallet2.address()).value(10000); diff --git a/ethers/examples/yubi.rs b/ethers/examples/yubi.rs index b9645ecf..af0caa30 100644 --- a/ethers/examples/yubi.rs +++ b/ethers/examples/yubi.rs @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> { // `from_key` method to upload a key you already have, or the `new` method // to generate a new keypair. let wallet = YubiWallet::connect(connector, Credentials::default(), 0); - let client = Client::new(provider, wallet); + let client = SignerMiddleware::new(provider, wallet); // Create and broadcast a transaction (ENS enabled!) let tx = TransactionRequest::new()