diff --git a/ethers-providers/src/transports/retry.rs b/ethers-providers/src/transports/retry.rs index 8cba35f4..8278fa09 100644 --- a/ethers-providers/src/transports/retry.rs +++ b/ethers-providers/src/transports/retry.rs @@ -6,6 +6,7 @@ use crate::{provider::ProviderError, JsonRpcClient}; use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; use std::{ + error::Error, fmt::Debug, sync::atomic::{AtomicU32, Ordering}, time::Duration, @@ -22,6 +23,27 @@ pub trait RetryPolicy: Send + Sync + Debug { /// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry /// requests based with an exponential backoff and filtering based on [RetryPolicy]. +/// +/// The `RetryPolicy`, mainly for rate-limiting errors, can be adjusted for specific applications, +/// endpoints. In addition to the `RetryPolicy` errors due to connectivity issues, like timed out +/// connections or responses in range `5xx` can be retried separately. +/// +/// # Example +/// +/// ``` +/// # async fn demo() { +/// use ethers_providers::{Http, RetryClient, RetryClientBuilder, HttpRateLimitRetryPolicy}; +/// use std::time::Duration; +/// use url::Url; +/// +/// let http = Http::new(Url::parse("http://localhost:8545").unwrap()); +/// let client = RetryClientBuilder::default() +/// .rate_limit_retries(10) +/// .timeout_retries(3) +/// .initial_backoff(Duration::from_millis(500)) +/// .build(http, Box::new(HttpRateLimitRetryPolicy::default())); +/// # } +/// ``` #[derive(Debug)] pub struct RetryClient where @@ -30,9 +52,14 @@ where { inner: T, requests_enqueued: AtomicU32, + /// The policy to use to determine whether to retry a request due to rate limiting policy: Box>, - max_retry: u32, - initial_backoff: u64, + /// How many connection `TimedOut` should be retried. + timeout_retries: u32, + /// How many retries for rate limited responses + rate_limit_retries: u32, + /// How long to wait initially + initial_backoff: Duration, /// available CPU per second compute_units_per_second: u64, } @@ -67,15 +94,10 @@ where // in milliseconds initial_backoff: u64, ) -> Self { - Self { - inner, - requests_enqueued: AtomicU32::new(0), - policy, - max_retry, - initial_backoff, - // alchemy max cpus - compute_units_per_second: 330, - } + RetryClientBuilder::default() + .initial_backoff(Duration::from_millis(initial_backoff)) + .rate_limit_retries(max_retry) + .build(inner, policy) } /// Sets the free compute units per second limit. @@ -90,6 +112,87 @@ where } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RetryClientBuilder { + /// How many connection `TimedOut` should be retried. + timeout_retries: u32, + /// How many retries for rate limited responses + rate_limit_retries: u32, + /// How long to wait initially + initial_backoff: Duration, + /// available CPU per second + compute_units_per_second: u64, +} + +// === impl RetryClientBuilder === + +impl RetryClientBuilder { + /// Sets the number of retries after a connection times out + /// + /// **Note:** this will only be used for `request::Error::TimedOut` + pub fn timeout_retries(mut self, timeout_retries: u32) -> Self { + self.timeout_retries = timeout_retries; + self + } + + /// How many retries for rate limited responses + pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self { + self.rate_limit_retries = rate_limit_retries; + self + } + + /// Sets the number of assumed available compute units per second + /// + /// See also, + pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self { + self.compute_units_per_second = compute_units_per_second; + self + } + + /// Sets the duration to wait initially before retrying + pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self { + self.initial_backoff = initial_backoff; + self + } + + /// Creates the `RetryClient` with the configured settings + pub fn build(self, client: T, policy: Box>) -> RetryClient + where + T: JsonRpcClient, + T::Error: Sync + Send + 'static, + { + let RetryClientBuilder { + timeout_retries, + rate_limit_retries, + initial_backoff, + compute_units_per_second, + } = self; + RetryClient { + inner: client, + requests_enqueued: AtomicU32::new(0), + policy, + timeout_retries, + rate_limit_retries, + initial_backoff, + compute_units_per_second, + } + } +} + +// Some sensible defaults +impl Default for RetryClientBuilder { + fn default() -> Self { + Self { + timeout_retries: 3, + // this should be enough to even out heavy loads + rate_limit_retries: 10, + initial_backoff: Duration::from_millis(100), + // alchemy max cpus + compute_units_per_second: 330, + } + } +} + /// Error thrown when: /// 1. Internal client throws an error we do not wish to try to recover from. /// 2. Params serialization failed. @@ -137,7 +240,7 @@ where async fn request(&self, method: &str, params: A) -> Result where - A: std::fmt::Debug + Serialize + Send + Sync, + A: Debug + Serialize + Send + Sync, R: DeserializeOwned, { // Helper type that caches the `params` value across several retries @@ -158,7 +261,8 @@ where let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; - let mut retry_number: u32 = 0; + let mut rate_limit_retry_number: u32 = 0; + let mut timeout_retries: u32 = 0; loop { let err; @@ -179,20 +283,22 @@ where } } - retry_number += 1; - if retry_number > self.max_retry { - trace!("request timed out after {} retries", self.max_retry); - return Err(RetryClientError::TimeoutError) - } - let should_retry = self.policy.should_retry(&err); if should_retry { + rate_limit_retry_number += 1; + if rate_limit_retry_number > self.rate_limit_retries { + trace!("request timed out after {} retries", self.rate_limit_retries); + return Err(RetryClientError::TimeoutError) + } + let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64; // using `retry_number` for creating back pressure because // of already queued requests // this increases exponentially with retries and adds a delay based on how many // requests are currently queued - let mut next_backoff = self.initial_backoff * 2u64.pow(retry_number); + let mut next_backoff = Duration::from_millis( + self.initial_backoff.as_millis().pow(rate_limit_retry_number) as u64, + ); // requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper // requests are more common some example alchemy weights: @@ -209,12 +315,17 @@ where current_queued_requests, ahead_in_queue, ); - // backoff is measured in millis - next_backoff += seconds_to_wait_for_compute_budge * 1000; + next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budge); - trace!("retrying and backing off for {}ms", next_backoff); - tokio::time::sleep(Duration::from_millis(next_backoff)).await; + trace!("retrying and backing off for {:?}", next_backoff); + tokio::time::sleep(next_backoff).await; } else { + if timeout_retries < self.timeout_retries && maybe_connectivity(&err) { + timeout_retries += 1; + trace!(err = ?err, "retrying due to spurious network"); + continue + } + trace!(err = ?err, "should not retry"); self.requests_enqueued.fetch_sub(1, Ordering::SeqCst); return Err(RetryClientError::ProviderError(err)) @@ -266,6 +377,24 @@ fn compute_unit_offset_in_secs( } } +/// Checks whether the `error` is the result of a connectivity issue, like +/// `request::Error::TimedOut` +fn maybe_connectivity(err: &(dyn Error + 'static)) -> bool { + if let Some(reqwest_err) = err.downcast_ref::() { + if reqwest_err.is_timeout() || reqwest_err.is_connect() { + return true + } + // Error HTTP codes (5xx) are considered connectivity issues and will prompt retry + if let Some(status) = reqwest_err.status() { + let code = status.as_u16(); + if (500..600).contains(&code) { + return true + } + } + } + false +} + #[cfg(test)] mod tests { use super::*;