feat(providers): add `RetryClient` (#1302)

* feat(providers): add RateAwareClient

* feat(providers): add RetryClient

* remove RateAwareClient

* feat(providers): add RetryPolicy, Backoff traits and some implementations

* remove backoff abstractions

* fix doc tests

* add helper methods for RetryClient Provider

* remove backoff wasm dependency

* remove conflicitng impl for TryFrom<&str>, etc.

* update docs and remove backoff crate dependency

* fix tests

* fix backoff formula

* use value, string is not leading to corerct serialization

* catch 429 error thrown as JsonRpcError

* fix requests_enqueued, make backoff and max retries configurable

* fix doc tests

* fix tests

* use match statement

* revert incorrect change

* ms precision backoffs, remove reduntant continue
This commit is contained in:
Meet Mangukiya 2022-05-28 02:32:16 +05:30 committed by GitHub
parent b3c387090c
commit e3ab2feada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 196 additions and 1 deletions

View File

@ -58,7 +58,7 @@ wasm-timer = "0.2"
parking_lot = { version = "0.11", features = ["wasm-bindgen"] }
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1.5", default-features = false, features = ["rt", "macros"] }
tokio = { version = "1.5", default-features = false, features = ["rt", "macros", "time"] }
tempfile = "3.3.0"
[features]

View File

@ -6,6 +6,9 @@ use crate::{
PendingTransaction, QuorumProvider, RwClient, SyncingStatus,
};
#[cfg(not(target_arch = "wasm32"))]
use crate::transports::{HttpRateLimitRetryPolicy, RetryClient};
#[cfg(feature = "celo")]
use crate::CeloMiddleware;
use crate::Middleware;
@ -1343,6 +1346,18 @@ impl<'a> TryFrom<&'a String> for Provider<HttpProvider> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Provider<RetryClient<HttpProvider>> {
pub fn new_client(src: &str, max_retry: u32, initial_backoff: u64) -> Result<Self, ParseError> {
Ok(Provider::new(RetryClient::new(
HttpProvider::new(Url::parse(src)?),
Box::new(HttpRateLimitRetryPolicy),
max_retry,
initial_backoff,
)))
}
}
/// A middleware supporting development-specific JSON RPC methods
///
/// # Example

View File

@ -39,5 +39,10 @@ pub use quorum::{Quorum, QuorumError, QuorumProvider, WeightedProvider};
mod rw;
pub use rw::{RwClient, RwClientError};
#[cfg(not(target_arch = "wasm32"))]
mod retry;
#[cfg(not(target_arch = "wasm32"))]
pub use retry::*;
mod mock;
pub use mock::{MockError, MockProvider};

View File

@ -0,0 +1,175 @@
//! A [JsonRpcClient] implementation that retries requests filtered by [RetryPolicy]
//! with an exponential backoff.
use super::{common::JsonRpcError, http::ClientError};
use crate::{provider::ProviderError, JsonRpcClient};
use std::{
clone::Clone,
fmt::Debug,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use thiserror::Error;
/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should
/// the client retry the request and try to recover from.
pub trait RetryPolicy<E>: Send + Sync + Debug {
fn should_retry(&self, error: &E) -> bool;
}
/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry
/// requests based with an exponential backoff and filtering based on [RetryPolicy].
#[derive(Debug)]
pub struct RetryClient<T>
where
T: JsonRpcClient,
T::Error: Sync + Send + 'static,
{
inner: T,
requests_enqueued: AtomicU32,
policy: Box<dyn RetryPolicy<T::Error>>,
max_retry: u32,
initial_backoff: u64,
}
impl<T> RetryClient<T>
where
T: JsonRpcClient,
T::Error: Sync + Send + 'static,
{
/// Example:
///
/// ```no_run
/// use ethers_providers::{Http, RetryClient, HttpRateLimitRetryPolicy};
/// use std::time::Duration;
/// use url::Url;
///
/// let http = Http::new(Url::parse("http://localhost:8545").unwrap());
/// let delay = Duration::new(10, 0);
/// let client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy), 10, 1);
/// ```
pub fn new(
inner: T,
policy: Box<dyn RetryPolicy<T::Error>>,
max_retry: u32,
// in milliseconds
initial_backoff: u64,
) -> Self {
Self { inner, requests_enqueued: AtomicU32::new(0), policy, max_retry, initial_backoff }
}
}
/// Error thrown when:
/// 1. Internal client throws an error we do not wish to try to recover from.
/// 2. Params serialization failed.
/// 3. Request timed out i.e. max retries were already made.
#[derive(Error, Debug)]
pub enum RetryClientError<T>
where
T: JsonRpcClient,
T::Error: Sync + Send + 'static,
{
#[error(transparent)]
ProviderError(T::Error),
TimeoutError,
#[error(transparent)]
SerdeJson(serde_json::Error),
}
impl<T> std::fmt::Display for RetryClientError<T>
where
T: JsonRpcClient,
<T as JsonRpcClient>::Error: Sync + Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl<T> From<RetryClientError<T>> for ProviderError
where
T: JsonRpcClient + 'static,
<T as JsonRpcClient>::Error: Sync + Send + 'static,
{
fn from(src: RetryClientError<T>) -> Self {
ProviderError::JsonRpcClientError(Box::new(src))
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<T> JsonRpcClient for RetryClient<T>
where
T: JsonRpcClient + 'static,
T::Error: Sync + Send + 'static,
{
type Error = RetryClientError<T>;
async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
where
A: std::fmt::Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
let params =
serde_json::to_value(params).map_err(|err| RetryClientError::SerdeJson(err))?;
let mut retry_number: u32 = 0;
loop {
let err;
// hack to not hold `R` across an await in the sleep future and prevent requiring
// R: Send + Sync
{
match self.inner.request(method, params.clone()).await {
Ok(ret) => {
self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Ok(ret)
}
Err(err_) => err = err_,
}
}
retry_number += 1;
if retry_number > self.max_retry {
return Err(RetryClientError::TimeoutError)
}
let should_retry = self.policy.should_retry(&err);
if should_retry {
let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst);
// using `retry_number + current_queued_requests` for creating back pressure because
// of already queued requests
let next_backoff =
self.initial_backoff * 2u64.pow(retry_number + current_queued_requests);
tokio::time::sleep(Duration::from_millis(next_backoff)).await;
} else {
self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Err(RetryClientError::ProviderError(err))
}
}
}
}
/// Implements [RetryPolicy] that will retry requests that errored with
/// status code 429 i.e. TOO_MANY_REQUESTS
#[derive(Debug)]
pub struct HttpRateLimitRetryPolicy;
impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
fn should_retry(&self, error: &ClientError) -> bool {
match error {
ClientError::ReqwestError(err) => {
err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS)
}
// alchemy throws it this way
ClientError::JsonRpcError(JsonRpcError { code, message: _, data: _ }) => *code == 429,
_ => false,
}
}
}