From 57736ed2322da53671734bfbfa4812aa0b292c92 Mon Sep 17 00:00:00 2001 From: James Prestwich <10149425+prestwich@users.noreply.github.com> Date: Fri, 17 Sep 2021 10:38:35 -0700 Subject: [PATCH] feature: first draft timelag middleware (#457) * feature: first draft timelag middleware * chore: add 1 line docstrings * fix: address PR comments - imports to top of file - expect with message - fill_transaction fix * bug: fix fill_transaction error return * refactor: make receipt filtering less ugly * fix: make trait impl wasm compatible Co-authored-by: Georgios Konstantopoulos --- ethers-middleware/src/lib.rs | 5 + ethers-middleware/src/timelag/mod.rs | 386 +++++++++++++++++++++++++++ 2 files changed, 391 insertions(+) create mode 100644 ethers-middleware/src/timelag/mod.rs diff --git a/ethers-middleware/src/lib.rs b/ethers-middleware/src/lib.rs index 4e2bfbf6..94741d16 100644 --- a/ethers-middleware/src/lib.rs +++ b/ethers-middleware/src/lib.rs @@ -81,3 +81,8 @@ pub use signer::SignerMiddleware; /// configured in the `PolicyMiddleware` before sending them. pub mod policy; pub use policy::PolicyMiddleware; + +/// The [TimeLag](crate::TimeLag) provides safety against reorgs by querying state N blocks +/// before the chain tip +pub mod timelag; +pub use timelag::TimeLag; diff --git a/ethers-middleware/src/timelag/mod.rs b/ethers-middleware/src/timelag/mod.rs new file mode 100644 index 00000000..5125fd97 --- /dev/null +++ b/ethers-middleware/src/timelag/mod.rs @@ -0,0 +1,386 @@ +use async_trait::async_trait; +use ethers_core::types::{ + transaction::eip2718::TypedTransaction, Block, BlockId, BlockNumber, Bytes, FilterBlockOption, + NameOrAddress, Transaction, TransactionReceipt, TxHash, U256, +}; +use std::sync::Arc; +use thiserror::Error; + +use ethers_providers::{FromErr, Middleware}; + +type TimeLagResult = Result>; + +/// TimeLage Provider Errors +#[derive(Error, Debug)] +pub enum TimeLagError +where + M: Middleware, +{ + #[error("{0}")] + /// Thrown when an internal middleware errors + MiddlewareError(M::Error), + + #[error("Unsupported RPC. Timelag provider does not support filters or subscriptions.")] + Unsupported, +} + +// Boilerplate +impl FromErr for TimeLagError { + fn from(src: M::Error) -> TimeLagError { + TimeLagError::MiddlewareError(src) + } +} + +/// TimeLag Provider +#[derive(Debug)] +pub struct TimeLag { + inner: Arc, +} + +impl TimeLag +where + M: Middleware, +{ + async fn normalize_block_id(&self, id: Option) -> TimeLagResult, M> { + match id { + Some(BlockId::Number(n)) => { + Ok(self.normalize_block_number(Some(n)).await?.map(Into::into)) + } + _ => Ok(id), + } + } + + async fn normalize_block_number( + &self, + number: Option, + ) -> TimeLagResult, M> { + let tip = self.get_block_number().await?; + match number { + Some(BlockNumber::Latest) => Ok(Some(BlockNumber::Number(tip))), + Some(BlockNumber::Number(n)) => { + if n > tip { + Ok(Some(BlockNumber::Latest)) + } else { + Ok(number) + } + } + _ => Ok(number), + } + } + + async fn normalize_filter_range( + &self, + block_option: FilterBlockOption, + ) -> TimeLagResult { + match block_option { + FilterBlockOption::Range { + from_block: _, + to_block: None, + } => Ok(block_option.set_to_block(self.get_block_number().await?.into())), + _ => Ok(block_option), + } + } +} + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl Middleware for TimeLag +where + M: Middleware, +{ + type Error = TimeLagError; + + type Provider = M::Provider; + + type Inner = M; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn get_block_number(&self) -> Result { + self.inner() + .get_block_number() + .await + .map(|num| num - K) + .map_err(ethers_providers::FromErr::from) + } + + async fn send_transaction + Send + Sync>( + &self, + tx: T, + block: Option, + ) -> Result, Self::Error> { + let block = self.normalize_block_id(block).await?; + self.inner() + .send_transaction(tx, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_block + Send + Sync>( + &self, + block_hash_or_number: T, + ) -> Result>, Self::Error> { + let block_hash_or_number = self + .normalize_block_id(Some(block_hash_or_number.into())) + .await? + .expect("Cannot return None if Some is passed in"); + + self.inner() + .get_block(block_hash_or_number) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_block_with_txs + Send + Sync>( + &self, + block_hash_or_number: T, + ) -> Result>, Self::Error> { + let block_hash_or_number = self + .normalize_block_id(Some(block_hash_or_number.into())) + .await? + .expect("Cannot return None if Some is passed in"); + + self.inner() + .get_block_with_txs(block_hash_or_number) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_uncle_count + Send + Sync>( + &self, + block_hash_or_number: T, + ) -> Result { + let block_hash_or_number = self + .normalize_block_id(Some(block_hash_or_number.into())) + .await? + .expect("Cannot return None if Some is passed in"); + + self.inner() + .get_uncle_count(block_hash_or_number) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_uncle + Send + Sync>( + &self, + block_hash_or_number: T, + idx: ethers_core::types::U64, + ) -> Result>, Self::Error> { + let block_hash_or_number = self + .normalize_block_id(Some(block_hash_or_number.into())) + .await? + .expect("Cannot return None if Some is passed in"); + + self.inner() + .get_uncle(block_hash_or_number, idx) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_transaction_count + Send + Sync>( + &self, + from: T, + block: Option, + ) -> Result { + let block = self.normalize_block_id(block).await?; + + self.inner() + .get_transaction_count(from, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn call( + &self, + tx: &TypedTransaction, + block: Option, + ) -> Result { + let block = self.normalize_block_id(block).await?; + + self.inner() + .call(tx, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_balance + Send + Sync>( + &self, + from: T, + block: Option, + ) -> Result { + let block = self.normalize_block_id(block).await?; + self.inner() + .get_balance(from, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_transaction_receipt>( + &self, + transaction_hash: T, + ) -> Result, Self::Error> { + let receipt = self + .inner() + .get_transaction_receipt(transaction_hash) + .await + .map_err(ethers_providers::FromErr::from)?; + + if receipt.is_none() { + return Ok(None); + } + + let receipt = receipt.expect("checked is_none"); + if receipt.block_number.is_none() { + return Ok(Some(receipt)); + } + + let number = receipt.block_number.expect("checked is_none"); + if number <= self.get_block_number().await? { + Ok(Some(receipt)) + } else { + // Pretend it hasn't confirmed yet. + Ok(None) + } + } + + async fn get_code + Send + Sync>( + &self, + at: T, + block: Option, + ) -> Result { + let block = self.normalize_block_id(block).await?; + + self.inner() + .get_code(at, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_storage_at + Send + Sync>( + &self, + from: T, + location: TxHash, + block: Option, + ) -> Result { + let block = self.normalize_block_id(block).await?; + self.inner() + .get_storage_at(from, location, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn fill_transaction( + &self, + tx: &mut TypedTransaction, + block: Option, + ) -> Result<(), Self::Error> { + let block = self.normalize_block_id(block).await?; + self.inner() + .fill_transaction(tx, block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_block_receipts + Send + Sync>( + &self, + block: T, + ) -> Result, Self::Error> { + let block: BlockNumber = block.into(); + let block = self + .normalize_block_number(Some(block)) + .await? + .expect("Cannot return None if Some is passed in"); + + self.inner() + .get_block_receipts(block) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn get_logs( + &self, + filter: ðers_core::types::Filter, + ) -> Result, Self::Error> { + let mut filter = filter.clone(); + filter.block_option = self.normalize_filter_range(filter.block_option).await?; + + self.inner() + .get_logs(&filter) + .await + .map_err(ethers_providers::FromErr::from) + } + + async fn new_filter( + &self, + _filter: ethers_providers::FilterKind<'_>, + ) -> Result { + Err(TimeLagError::Unsupported) + } + + async fn get_filter_changes(&self, _id: T) -> Result, Self::Error> + where + T: Into + Send + Sync, + R: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + std::fmt::Debug, + { + Err(TimeLagError::Unsupported) + } + + async fn watch_blocks( + &self, + ) -> Result, Self::Error> { + Err(TimeLagError::Unsupported) + } + + async fn subscribe( + &self, + _params: T, + ) -> Result, Self::Error> + where + T: std::fmt::Debug + serde::Serialize + Send + Sync, + R: serde::de::DeserializeOwned + Send + Sync, + Self::Provider: ethers_providers::PubsubClient, + { + Err(TimeLagError::Unsupported) + } + + async fn unsubscribe(&self, _id: T) -> Result + where + T: Into + Send + Sync, + Self::Provider: ethers_providers::PubsubClient, + { + Err(TimeLagError::Unsupported) + } + + async fn subscribe_blocks( + &self, + ) -> Result>, Self::Error> + where + Self::Provider: ethers_providers::PubsubClient, + { + Err(TimeLagError::Unsupported) + } + + async fn subscribe_pending_txs( + &self, + ) -> Result, Self::Error> + where + Self::Provider: ethers_providers::PubsubClient, + { + Err(TimeLagError::Unsupported) + } + + async fn subscribe_logs<'a>( + &'a self, + _filter: ðers_core::types::Filter, + ) -> Result< + ethers_providers::SubscriptionStream<'a, Self::Provider, ethers_core::types::Log>, + Self::Error, + > + where + Self::Provider: ethers_providers::PubsubClient, + { + Err(TimeLagError::Unsupported) + } +}