feat: detect requested backoff (#1711)

This commit is contained in:
Matthias Seitz 2022-09-17 21:06:29 +02:00 committed by GitHub
parent 74272ca3f5
commit f208fb9cd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 64 additions and 14 deletions

View File

@ -18,6 +18,9 @@ use tracing::trace;
pub trait RetryPolicy<E>: Send + Sync + Debug { pub trait RetryPolicy<E>: Send + Sync + Debug {
/// Whether to retry the request based on the given `error` /// Whether to retry the request based on the given `error`
fn should_retry(&self, error: &E) -> bool; fn should_retry(&self, error: &E) -> bool;
/// Providers may include the `backoff` in the error response directly
fn backoff_hint(&self, error: &E) -> Option<Duration>;
} }
/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry /// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry
@ -282,13 +285,18 @@ where
} }
let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64; let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
// using `retry_number` for creating back pressure because
// of already queued requests // try to extract the requested backoff from the error or compute the next backoff
// this increases exponentially with retries and adds a delay based on how many // based on retry count
// requests are currently queued let mut next_backoff = self.policy.backoff_hint(&err).unwrap_or_else(|| {
let mut next_backoff = Duration::from_millis( // using `retry_number` for creating back pressure because
self.initial_backoff.as_millis().pow(rate_limit_retry_number) as u64, // of already queued requests
); // this increases exponentially with retries and adds a delay based on how many
// requests are currently queued
Duration::from_millis(
self.initial_backoff.as_millis().pow(rate_limit_retry_number) as u64,
)
});
// requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper // requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
// requests are more common some example alchemy weights: // requests are more common some example alchemy weights:
@ -299,13 +307,13 @@ where
// (coming from forking mode) assuming here that storage request will be the driver // (coming from forking mode) assuming here that storage request will be the driver
// for Rate limits we choose `17` as the average cost of any request // for Rate limits we choose `17` as the average cost of any request
const AVG_COST: u64 = 17u64; const AVG_COST: u64 = 17u64;
let seconds_to_wait_for_compute_budge = compute_unit_offset_in_secs( let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
AVG_COST, AVG_COST,
self.compute_units_per_second, self.compute_units_per_second,
current_queued_requests, current_queued_requests,
ahead_in_queue, ahead_in_queue,
); );
next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budge); next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budget);
trace!("retrying and backing off for {:?}", next_backoff); trace!("retrying and backing off for {:?}", next_backoff);
tokio::time::sleep(next_backoff).await; tokio::time::sleep(next_backoff).await;
@ -339,20 +347,41 @@ impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
ClientError::ReqwestError(err) => { ClientError::ReqwestError(err) => {
err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS) err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS)
} }
ClientError::JsonRpcError(JsonRpcError { code, message, data: _ }) => { ClientError::JsonRpcError(JsonRpcError { code, message, .. }) => {
// alchemy throws it this way // alchemy throws it this way
if *code == 429 { if *code == 429 {
return true return true
} }
// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234> match message.as_str() {
if message == "header not found" { // this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
return true "header not found" => true,
// also thrown by infura if out of budget for the day and ratelimited
"daily request count exceeded, request rate limited" => true,
_ => false,
} }
false
} }
_ => false, _ => false,
} }
} }
fn backoff_hint(&self, error: &ClientError) -> Option<Duration> {
if let ClientError::JsonRpcError(JsonRpcError { data, .. }) = error {
let data = data.as_ref()?;
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let backoff_seconds = &data["rate"]["backoff_seconds"];
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(Duration::from_secs(seconds))
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(Duration::from_secs(seconds as u64 + 1))
}
}
None
}
} }
/// Calculates an offset in seconds by taking into account the number of currently queued requests, /// Calculates an offset in seconds by taking into account the number of currently queued requests,
@ -450,4 +479,25 @@ mod tests {
// need to wait 1 second // need to wait 1 second
assert_eq!(to_wait, 1); assert_eq!(to_wait, 1);
} }
#[test]
fn can_extract_backoff() {
let resp = r#"{"rate": {"allowed_rps": 1, "backoff_seconds": 30, "current_rps": 1.1}, "see": "https://infura.io/dashboard"}"#;
let err = ClientError::JsonRpcError(JsonRpcError {
code: 0,
message: "daily request count exceeded, request rate limited".to_string(),
data: Some(serde_json::from_str(resp).unwrap()),
});
let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err).unwrap();
assert_eq!(backoff, Duration::from_secs(30));
let err = ClientError::JsonRpcError(JsonRpcError {
code: 0,
message: "daily request count exceeded, request rate limited".to_string(),
data: Some(serde_json::Value::String("blocked".to_string())),
});
let backoff = HttpRateLimitRetryPolicy.backoff_hint(&err);
assert!(backoff.is_none());
}
} }