diff --git a/ethers-providers/src/transports/retry.rs b/ethers-providers/src/transports/retry.rs index 9b4f15e0..5c496045 100644 --- a/ethers-providers/src/transports/retry.rs +++ b/ethers-providers/src/transports/retry.rs @@ -35,6 +35,8 @@ where policy: Box>, max_retry: u32, initial_backoff: u64, + /// available CPU per second + compute_units_per_second: u64, } impl RetryClient @@ -60,7 +62,26 @@ where // in milliseconds initial_backoff: u64, ) -> Self { - Self { inner, requests_enqueued: AtomicU32::new(0), policy, max_retry, initial_backoff } + Self { + inner, + requests_enqueued: AtomicU32::new(0), + policy, + max_retry, + initial_backoff, + // alchemy max cpus + compute_units_per_second: 330, + } + } + + /// Sets the free compute units per second limit. + /// + /// This is the maximum number of weighted request that can be handled per second by the + /// endpoint before rate limit kicks in. + /// + /// This is used to guesstimate how long to wait until to retry again + pub fn set_compute_units(&mut self, cpus: u64) -> &mut Self { + self.compute_units_per_second = cpus; + self } } @@ -114,7 +135,7 @@ where A: std::fmt::Debug + Serialize + Send + Sync, R: DeserializeOwned, { - self.requests_enqueued.fetch_add(1, Ordering::SeqCst); + let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; let params = serde_json::to_value(params).map_err(|err| RetryClientError::SerdeJson(err))?; @@ -144,11 +165,31 @@ where let should_retry = self.policy.should_retry(&err); if should_retry { - let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst); - // using `retry_number + current_queued_requests` for creating back pressure because + let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64; + // using `retry_number` for creating back pressure because // of already queued requests - let next_backoff = - self.initial_backoff * 2u64.pow(retry_number + current_queued_requests); + // this increases exponentially with retries and adds a delay based on how many + // requests are currently queued + let mut next_backoff = self.initial_backoff * 2u64.pow(retry_number); + + // requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper + // requests are more common some example alchemy weights: + // - `eth_getStorageAt`: 17 + // - `eth_getBlockByNumber`: 16 + // - `eth_newFilter`: 20 + // + // (coming from forking mode) assuming here that storage request will be the driver + // for Rate limits we choose `17` as the average cost of any request + const AVG_COST: u64 = 17u64; + let seconds_to_wait_for_compute_budge = compute_unit_offset_in_secs( + AVG_COST, + self.compute_units_per_second, + current_queued_requests, + ahead_in_queue, + ); + // backoff is measured in millis + next_backoff += seconds_to_wait_for_compute_budge * 1000; + trace!("retrying and backing off for {}ms", next_backoff); tokio::time::sleep(Duration::from_millis(next_backoff)).await; } else { @@ -177,3 +218,82 @@ impl RetryPolicy for HttpRateLimitRetryPolicy { } } } + +/// Calculates an offset in seconds by taking into account the number of currently queued requests, +/// number of requests that were ahead in the queue when the request was first issued, the average +/// cost a weighted request (heuristic), and the number of available compute units per seconds. +/// +/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request +/// is supposed to wait to not get rate limited. The budget per second is +/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) +/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. +/// By taking into account the number of concurrent request and the position in queue when the +/// request was first issued and determine the number of seconds a request is supposed to wait, if +/// at all +fn compute_unit_offset_in_secs( + avg_cost: u64, + compute_units_per_second: u64, + current_queued_requests: u64, + ahead_in_queue: u64, +) -> u64 { + let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); + if current_queued_requests > request_capacity_per_second { + current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) + } else { + 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + // assumed average cost of a request + const AVG_COST: u64 = 17u64; + const COMPUTE_UNITS: u64 = 330u64; + + fn compute_offset(current_queued_requests: u64, ahead_in_queue: u64) -> u64 { + compute_unit_offset_in_secs( + AVG_COST, + COMPUTE_UNITS, + current_queued_requests, + ahead_in_queue, + ) + } + + #[test] + fn can_measure_unit_offset_single_request() { + let current_queued_requests = 1; + let ahead_in_queue = 0; + let to_wait = compute_offset(current_queued_requests, ahead_in_queue); + assert_eq!(to_wait, 0); + + let current_queued_requests = 19; + let ahead_in_queue = 18; + let to_wait = compute_offset(current_queued_requests, ahead_in_queue); + assert_eq!(to_wait, 0); + } + + #[test] + fn can_measure_unit_offset_1x_over_budget() { + let current_queued_requests = 20; + let ahead_in_queue = 19; + let to_wait = compute_offset(current_queued_requests, ahead_in_queue); + // need to wait 1 second + assert_eq!(to_wait, 1); + } + + #[test] + fn can_measure_unit_offset_2x_over_budget() { + let current_queued_requests = 49; + let ahead_in_queue = 48; + let to_wait = compute_offset(current_queued_requests, ahead_in_queue); + // need to wait 1 second + assert_eq!(to_wait, 2); + + let current_queued_requests = 49; + let ahead_in_queue = 20; + let to_wait = compute_offset(current_queued_requests, ahead_in_queue); + // need to wait 1 second + assert_eq!(to_wait, 1); + } +}