diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 213e3b54..77cc3ef0 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -58,7 +58,7 @@ wasm-timer = "0.2" parking_lot = { version = "0.11", features = ["wasm-bindgen"] } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tokio = { version = "1.5", default-features = false, features = ["rt", "macros"] } +tokio = { version = "1.5", default-features = false, features = ["rt", "macros", "time"] } tempfile = "3.3.0" [features] diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index a75e7cdf..e84d86f5 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -6,6 +6,9 @@ use crate::{ PendingTransaction, QuorumProvider, RwClient, SyncingStatus, }; +#[cfg(not(target_arch = "wasm32"))] +use crate::transports::{HttpRateLimitRetryPolicy, RetryClient}; + #[cfg(feature = "celo")] use crate::CeloMiddleware; use crate::Middleware; @@ -1343,6 +1346,18 @@ impl<'a> TryFrom<&'a String> for Provider { } } +#[cfg(not(target_arch = "wasm32"))] +impl Provider> { + pub fn new_client(src: &str, max_retry: u32, initial_backoff: u64) -> Result { + Ok(Provider::new(RetryClient::new( + HttpProvider::new(Url::parse(src)?), + Box::new(HttpRateLimitRetryPolicy), + max_retry, + initial_backoff, + ))) + } +} + /// A middleware supporting development-specific JSON RPC methods /// /// # Example diff --git a/ethers-providers/src/transports/mod.rs b/ethers-providers/src/transports/mod.rs index 1a369b40..6fc97dba 100644 --- a/ethers-providers/src/transports/mod.rs +++ b/ethers-providers/src/transports/mod.rs @@ -39,5 +39,10 @@ pub use quorum::{Quorum, QuorumError, QuorumProvider, WeightedProvider}; mod rw; pub use rw::{RwClient, RwClientError}; +#[cfg(not(target_arch = "wasm32"))] +mod retry; +#[cfg(not(target_arch = "wasm32"))] +pub use retry::*; + mod mock; pub use mock::{MockError, MockProvider}; diff --git a/ethers-providers/src/transports/retry.rs b/ethers-providers/src/transports/retry.rs new file mode 100644 index 00000000..17655c56 --- /dev/null +++ b/ethers-providers/src/transports/retry.rs @@ -0,0 +1,175 @@ +//! A [JsonRpcClient] implementation that retries requests filtered by [RetryPolicy] +//! with an exponential backoff. + +use super::{common::JsonRpcError, http::ClientError}; +use crate::{provider::ProviderError, JsonRpcClient}; + +use std::{ + clone::Clone, + fmt::Debug, + sync::atomic::{AtomicU32, Ordering}, + time::Duration, +}; + +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Serialize}; +use thiserror::Error; + +/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should +/// the client retry the request and try to recover from. +pub trait RetryPolicy: Send + Sync + Debug { + fn should_retry(&self, error: &E) -> bool; +} + +/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry +/// requests based with an exponential backoff and filtering based on [RetryPolicy]. +#[derive(Debug)] +pub struct RetryClient +where + T: JsonRpcClient, + T::Error: Sync + Send + 'static, +{ + inner: T, + requests_enqueued: AtomicU32, + policy: Box>, + max_retry: u32, + initial_backoff: u64, +} + +impl RetryClient +where + T: JsonRpcClient, + T::Error: Sync + Send + 'static, +{ + /// Example: + /// + /// ```no_run + /// use ethers_providers::{Http, RetryClient, HttpRateLimitRetryPolicy}; + /// use std::time::Duration; + /// use url::Url; + /// + /// let http = Http::new(Url::parse("http://localhost:8545").unwrap()); + /// let delay = Duration::new(10, 0); + /// let client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy), 10, 1); + /// ``` + pub fn new( + inner: T, + policy: Box>, + max_retry: u32, + // in milliseconds + initial_backoff: u64, + ) -> Self { + Self { inner, requests_enqueued: AtomicU32::new(0), policy, max_retry, initial_backoff } + } +} + +/// Error thrown when: +/// 1. Internal client throws an error we do not wish to try to recover from. +/// 2. Params serialization failed. +/// 3. Request timed out i.e. max retries were already made. +#[derive(Error, Debug)] +pub enum RetryClientError +where + T: JsonRpcClient, + T::Error: Sync + Send + 'static, +{ + #[error(transparent)] + ProviderError(T::Error), + TimeoutError, + #[error(transparent)] + SerdeJson(serde_json::Error), +} + +impl std::fmt::Display for RetryClientError +where + T: JsonRpcClient, + ::Error: Sync + Send + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From> for ProviderError +where + T: JsonRpcClient + 'static, + ::Error: Sync + Send + 'static, +{ + fn from(src: RetryClientError) -> Self { + ProviderError::JsonRpcClientError(Box::new(src)) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl JsonRpcClient for RetryClient +where + T: JsonRpcClient + 'static, + T::Error: Sync + Send + 'static, +{ + type Error = RetryClientError; + + async fn request(&self, method: &str, params: A) -> Result + where + A: std::fmt::Debug + Serialize + Send + Sync, + R: DeserializeOwned, + { + self.requests_enqueued.fetch_add(1, Ordering::SeqCst); + + let params = + serde_json::to_value(params).map_err(|err| RetryClientError::SerdeJson(err))?; + + let mut retry_number: u32 = 0; + + loop { + let err; + + // hack to not hold `R` across an await in the sleep future and prevent requiring + // R: Send + Sync + { + match self.inner.request(method, params.clone()).await { + Ok(ret) => { + self.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Ok(ret) + } + Err(err_) => err = err_, + } + } + + retry_number += 1; + if retry_number > self.max_retry { + return Err(RetryClientError::TimeoutError) + } + + let should_retry = self.policy.should_retry(&err); + if should_retry { + let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst); + // using `retry_number + current_queued_requests` for creating back pressure because + // of already queued requests + let next_backoff = + self.initial_backoff * 2u64.pow(retry_number + current_queued_requests); + tokio::time::sleep(Duration::from_millis(next_backoff)).await; + } else { + self.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Err(RetryClientError::ProviderError(err)) + } + } + } +} + +/// Implements [RetryPolicy] that will retry requests that errored with +/// status code 429 i.e. TOO_MANY_REQUESTS +#[derive(Debug)] +pub struct HttpRateLimitRetryPolicy; + +impl RetryPolicy for HttpRateLimitRetryPolicy { + fn should_retry(&self, error: &ClientError) -> bool { + match error { + ClientError::ReqwestError(err) => { + err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS) + } + // alchemy throws it this way + ClientError::JsonRpcError(JsonRpcError { code, message: _, data: _ }) => *code == 429, + _ => false, + } + } +}