feat(retry): more sensible backoff calc (#1413)

This commit is contained in:
Matthias Seitz 2022-06-24 01:18:33 +02:00 committed by GitHub
parent c7e1237a6b
commit 7603731d87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 126 additions and 6 deletions

View File

@ -35,6 +35,8 @@ where
policy: Box<dyn RetryPolicy<T::Error>>,
max_retry: u32,
initial_backoff: u64,
/// available CPU per second
compute_units_per_second: u64,
}
impl<T> RetryClient<T>
@ -60,7 +62,26 @@ where
// in milliseconds
initial_backoff: u64,
) -> Self {
Self { inner, requests_enqueued: AtomicU32::new(0), policy, max_retry, initial_backoff }
Self {
inner,
requests_enqueued: AtomicU32::new(0),
policy,
max_retry,
initial_backoff,
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: 330,
}
}
/// Sets the free compute units per second limit.
///
/// This is the maximum number of weighted request that can be handled per second by the
/// endpoint before rate limit kicks in.
///
/// This is used to guesstimate how long to wait until to retry again
pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self {
self.compute_units_per_second = cpus;
self
}
}
@ -114,7 +135,7 @@ where
A: std::fmt::Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
self.requests_enqueued.fetch_add(1, Ordering::SeqCst);
let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
let params =
serde_json::to_value(params).map_err(|err| RetryClientError::SerdeJson(err))?;
@ -144,11 +165,31 @@ where
let should_retry = self.policy.should_retry(&err);
if should_retry {
let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst);
// using `retry_number + current_queued_requests` for creating back pressure because
let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
// using `retry_number` for creating back pressure because
// of already queued requests
let next_backoff =
self.initial_backoff * 2u64.pow(retry_number + current_queued_requests);
// this increases exponentially with retries and adds a delay based on how many
// requests are currently queued
let mut next_backoff = self.initial_backoff * 2u64.pow(retry_number);
// requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
// requests are more common some example alchemy weights:
// - `eth_getStorageAt`: 17
// - `eth_getBlockByNumber`: 16
// - `eth_newFilter`: 20
//
// (coming from forking mode) assuming here that storage request will be the driver
// for Rate limits we choose `17` as the average cost of any request
const AVG_COST: u64 = 17u64;
let seconds_to_wait_for_compute_budge = compute_unit_offset_in_secs(
AVG_COST,
self.compute_units_per_second,
current_queued_requests,
ahead_in_queue,
);
// backoff is measured in millis
next_backoff += seconds_to_wait_for_compute_budge * 1000;
trace!("retrying and backing off for {}ms", next_backoff);
tokio::time::sleep(Duration::from_millis(next_backoff)).await;
} else {
@ -177,3 +218,82 @@ impl RetryPolicy<ClientError> for HttpRateLimitRetryPolicy {
}
}
}
/// Calculates an offset in seconds by taking into account the number of currently queued requests,
/// number of requests that were ahead in the queue when the request was first issued, the average
/// cost a weighted request (heuristic), and the number of available compute units per seconds.
///
/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request
/// is supposed to wait to not get rate limited. The budget per second is
/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory)
/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited.
/// By taking into account the number of concurrent request and the position in queue when the
/// request was first issued and determine the number of seconds a request is supposed to wait, if
/// at all
fn compute_unit_offset_in_secs(
avg_cost: u64,
compute_units_per_second: u64,
current_queued_requests: u64,
ahead_in_queue: u64,
) -> u64 {
let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
if current_queued_requests > request_capacity_per_second {
current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second)
} else {
0
}
}
#[cfg(test)]
mod tests {
use super::*;
// assumed average cost of a request
const AVG_COST: u64 = 17u64;
const COMPUTE_UNITS: u64 = 330u64;
fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 {
compute_unit_offset_in_secs(
AVG_COST,
COMPUTE_UNITS,
current_queued_requests,
ahead_in_queue,
)
}
#[test]
fn can_measure_unit_offset_single_request() {
let current_queued_requests = 1;
let ahead_in_queue = 0;
let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
assert_eq!(to_wait, 0);
let current_queued_requests = 19;
let ahead_in_queue = 18;
let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
assert_eq!(to_wait, 0);
}
#[test]
fn can_measure_unit_offset_1x_over_budget() {
let current_queued_requests = 20;
let ahead_in_queue = 19;
let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
// need to wait 1 second
assert_eq!(to_wait, 1);
}
#[test]
fn can_measure_unit_offset_2x_over_budget() {
let current_queued_requests = 49;
let ahead_in_queue = 48;
let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
// need to wait 1 second
assert_eq!(to_wait, 2);
let current_queued_requests = 49;
let ahead_in_queue = 20;
let to_wait = compute_offset(current_queued_requests, ahead_in_queue);
// need to wait 1 second
assert_eq!(to_wait, 1);
}
}