feat: support retrying connection errors (#1629)

This commit is contained in:
Matthias Seitz 2022-08-22 18:47:26 +02:00 committed by GitHub
parent 71b4893a3d
commit 98174863c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 153 additions and 24 deletions

View File

@ -6,6 +6,7 @@ use crate::{provider::ProviderError, JsonRpcClient};
use async_trait::async_trait; use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::{ use std::{
error::Error,
fmt::Debug, fmt::Debug,
sync::atomic::{AtomicU32, Ordering}, sync::atomic::{AtomicU32, Ordering},
time::Duration, time::Duration,
@ -22,6 +23,27 @@ pub trait RetryPolicy<E>: Send + Sync + Debug {
/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry /// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry
/// requests based with an exponential backoff and filtering based on [RetryPolicy]. /// requests based with an exponential backoff and filtering based on [RetryPolicy].
///
/// The `RetryPolicy`, mainly for rate-limiting errors, can be adjusted for specific applications,
/// endpoints. In addition to the `RetryPolicy` errors due to connectivity issues, like timed out
/// connections or responses in range `5xx` can be retried separately.
///
/// # Example
///
/// ```
/// # async fn demo() {
/// use ethers_providers::{Http, RetryClient, RetryClientBuilder, HttpRateLimitRetryPolicy};
/// use std::time::Duration;
/// use url::Url;
///
/// let http = Http::new(Url::parse("http://localhost:8545").unwrap());
/// let client = RetryClientBuilder::default()
/// .rate_limit_retries(10)
/// .timeout_retries(3)
/// .initial_backoff(Duration::from_millis(500))
/// .build(http, Box::new(HttpRateLimitRetryPolicy::default()));
/// # }
/// ```
#[derive(Debug)] #[derive(Debug)]
pub struct RetryClient<T> pub struct RetryClient<T>
where where
@ -30,9 +52,14 @@ where
{ {
inner: T, inner: T,
requests_enqueued: AtomicU32, requests_enqueued: AtomicU32,
/// The policy to use to determine whether to retry a request due to rate limiting
policy: Box<dyn RetryPolicy<T::Error>>, policy: Box<dyn RetryPolicy<T::Error>>,
max_retry: u32, /// How many connection `TimedOut` should be retried.
initial_backoff: u64, timeout_retries: u32,
/// How many retries for rate limited responses
rate_limit_retries: u32,
/// How long to wait initially
initial_backoff: Duration,
/// available CPU per second /// available CPU per second
compute_units_per_second: u64, compute_units_per_second: u64,
} }
@ -67,15 +94,10 @@ where
// in milliseconds // in milliseconds
initial_backoff: u64, initial_backoff: u64,
) -> Self { ) -> Self {
Self { RetryClientBuilder::default()
inner, .initial_backoff(Duration::from_millis(initial_backoff))
requests_enqueued: AtomicU32::new(0), .rate_limit_retries(max_retry)
policy, .build(inner, policy)
max_retry,
initial_backoff,
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: 330,
}
} }
/// Sets the free compute units per second limit. /// Sets the free compute units per second limit.
@ -90,6 +112,87 @@ where
} }
} }
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RetryClientBuilder {
/// How many connection `TimedOut` should be retried.
timeout_retries: u32,
/// How many retries for rate limited responses
rate_limit_retries: u32,
/// How long to wait initially
initial_backoff: Duration,
/// available CPU per second
compute_units_per_second: u64,
}
// === impl RetryClientBuilder ===
impl RetryClientBuilder {
/// Sets the number of retries after a connection times out
///
/// **Note:** this will only be used for `request::Error::TimedOut`
pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
self.timeout_retries = timeout_retries;
self
}
/// How many retries for rate limited responses
pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
self.rate_limit_retries = rate_limit_retries;
self
}
/// Sets the number of assumed available compute units per second
///
/// See also, <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
self.compute_units_per_second = compute_units_per_second;
self
}
/// Sets the duration to wait initially before retrying
pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
self.initial_backoff = initial_backoff;
self
}
/// Creates the `RetryClient` with the configured settings
pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
where
T: JsonRpcClient,
T::Error: Sync + Send + 'static,
{
let RetryClientBuilder {
timeout_retries,
rate_limit_retries,
initial_backoff,
compute_units_per_second,
} = self;
RetryClient {
inner: client,
requests_enqueued: AtomicU32::new(0),
policy,
timeout_retries,
rate_limit_retries,
initial_backoff,
compute_units_per_second,
}
}
}
// Some sensible defaults
impl Default for RetryClientBuilder {
fn default() -> Self {
Self {
timeout_retries: 3,
// this should be enough to even out heavy loads
rate_limit_retries: 10,
initial_backoff: Duration::from_millis(100),
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: 330,
}
}
}
/// Error thrown when: /// Error thrown when:
/// 1. Internal client throws an error we do not wish to try to recover from. /// 1. Internal client throws an error we do not wish to try to recover from.
/// 2. Params serialization failed. /// 2. Params serialization failed.
@ -137,7 +240,7 @@ where
async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error> async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
where where
A: std::fmt::Debug + Serialize + Send + Sync, A: Debug + Serialize + Send + Sync,
R: DeserializeOwned, R: DeserializeOwned,
{ {
// Helper type that caches the `params` value across several retries // Helper type that caches the `params` value across several retries
@ -158,7 +261,8 @@ where
let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
let mut retry_number: u32 = 0; let mut rate_limit_retry_number: u32 = 0;
let mut timeout_retries: u32 = 0;
loop { loop {
let err; let err;
@ -179,20 +283,22 @@ where
} }
} }
retry_number += 1;
if retry_number > self.max_retry {
trace!("request timed out after {} retries", self.max_retry);
return Err(RetryClientError::TimeoutError)
}
let should_retry = self.policy.should_retry(&err); let should_retry = self.policy.should_retry(&err);
if should_retry { if should_retry {
rate_limit_retry_number += 1;
if rate_limit_retry_number > self.rate_limit_retries {
trace!("request timed out after {} retries", self.rate_limit_retries);
return Err(RetryClientError::TimeoutError)
}
let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64; let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
// using `retry_number` for creating back pressure because // using `retry_number` for creating back pressure because
// of already queued requests // of already queued requests
// this increases exponentially with retries and adds a delay based on how many // this increases exponentially with retries and adds a delay based on how many
// requests are currently queued // requests are currently queued
let mut next_backoff = self.initial_backoff * 2u64.pow(retry_number); let mut next_backoff = Duration::from_millis(
self.initial_backoff.as_millis().pow(rate_limit_retry_number) as u64,
);
// requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper // requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
// requests are more common some example alchemy weights: // requests are more common some example alchemy weights:
@ -209,12 +315,17 @@ where
current_queued_requests, current_queued_requests,
ahead_in_queue, ahead_in_queue,
); );
// backoff is measured in millis next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budge);
next_backoff += seconds_to_wait_for_compute_budge * 1000;
trace!("retrying and backing off for {}ms", next_backoff); trace!("retrying and backing off for {:?}", next_backoff);
tokio::time::sleep(Duration::from_millis(next_backoff)).await; tokio::time::sleep(next_backoff).await;
} else { } else {
if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
timeout_retries += 1;
trace!(err = ?err, "retrying due to spurious network");
continue
}
trace!(err = ?err, "should not retry"); trace!(err = ?err, "should not retry");
self.requests_enqueued.fetch_sub(1, Ordering::SeqCst); self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Err(RetryClientError::ProviderError(err)) return Err(RetryClientError::ProviderError(err))
@ -266,6 +377,24 @@ fn compute_unit_offset_in_secs(
} }
} }
/// Checks whether the `error` is the result of a connectivity issue, like
/// `request::Error::TimedOut`
fn maybe_connectivity(err: &(dyn Error + 'static)) -> bool {
if let Some(reqwest_err) = err.downcast_ref::<reqwest::Error>() {
if reqwest_err.is_timeout() || reqwest_err.is_connect() {
return true
}
// Error HTTP codes (5xx) are considered connectivity issues and will prompt retry
if let Some(status) = reqwest_err.status() {
let code = status.as_u16();
if (500..600).contains(&code) {
return true
}
}
}
false
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;