feat: add retries to consensus rpc (#87)

* feat: add retries to consensus rpc

* fix tests
This commit is contained in:
Noah Citron 2022-11-04 11:05:18 -04:00 committed by GitHub
parent 20c86907e2
commit eaca764aac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 11 deletions

69
Cargo.lock generated
View File

@ -624,6 +624,8 @@ dependencies = [
"log", "log",
"openssl", "openssl",
"reqwest", "reqwest",
"reqwest-middleware",
"reqwest-retry",
"serde", "serde",
"serde_json", "serde_json",
"ssz-rs", "ssz-rs",
@ -2200,6 +2202,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.5" version = "0.8.5"
@ -2913,6 +2925,7 @@ dependencies = [
"js-sys", "js-sys",
"log", "log",
"mime", "mime",
"mime_guess",
"native-tls", "native-tls",
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
@ -2934,6 +2947,53 @@ dependencies = [
"winreg", "winreg",
] ]
[[package]]
name = "reqwest-middleware"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69539cea4148dce683bec9dc95be3f0397a9bb2c248a49c8296a9d21659a8cdd"
dependencies = [
"anyhow",
"async-trait",
"futures",
"http",
"reqwest",
"serde",
"task-local-extensions",
"thiserror",
]
[[package]]
name = "reqwest-retry"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce246a729eaa6aff5e215aee42845bf5fed9893cc6cd51aeeb712f34e04dd9f3"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"http",
"hyper",
"reqwest",
"reqwest-middleware",
"retry-policies",
"task-local-extensions",
"tokio",
"tracing",
]
[[package]]
name = "retry-policies"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e09bbcb5003282bcb688f0bae741b278e9c7e8f378f561522c9806c58e075d9b"
dependencies = [
"anyhow",
"chrono",
"rand 0.8.5",
]
[[package]] [[package]]
name = "revm" name = "revm"
version = "2.1.0" version = "2.1.0"
@ -3567,6 +3627,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "task-local-extensions"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4167afbec18ae012de40f8cf1b9bf48420abb390678c34821caa07d924941cc4"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.3.0" version = "3.3.0"

View File

@ -32,7 +32,7 @@ pub struct BaseConfig {
pub fn mainnet() -> BaseConfig { pub fn mainnet() -> BaseConfig {
BaseConfig { BaseConfig {
checkpoint: hex_str_to_bytes( checkpoint: hex_str_to_bytes(
"0x6d41048663adafa064bae3b3768a8448fa1f1b003118fa5887d06da266530cff", "0x428ce0b5f5bbed1fc2b3feb5d4152ae0fe98a80b1bfa8de36681868e81e9222a",
) )
.unwrap(), .unwrap(),
rpc_port: 8545, rpc_port: 8545,
@ -64,7 +64,7 @@ pub fn mainnet() -> BaseConfig {
pub fn goerli() -> BaseConfig { pub fn goerli() -> BaseConfig {
BaseConfig { BaseConfig {
checkpoint: hex_str_to_bytes( checkpoint: hex_str_to_bytes(
"0x1e591af1e90f2db918b2a132991c7c2ee9a4ab26da496bd6e71e4f0bd65ea870", "0xd4344682866dbede543395ecf5adf9443a27f423a4b00f270458e7932686ced1",
) )
.unwrap(), .unwrap(),
rpc_port: 8545, rpc_port: 8545,

View File

@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
eyre = "0.6.8" eyre = "0.6.8"
serde = { version = "1.0.143", features = ["derive"] } serde = { version = "1.0.143", features = ["derive"] }
@ -22,6 +21,9 @@ log = "0.4.17"
chrono = "0.4.22" chrono = "0.4.22"
thiserror = "1.0.37" thiserror = "1.0.37"
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.11.12", features = ["json"] }
reqwest-middleware = "0.1.6"
reqwest-retry = "0.1.5"
common = { path = "../common" } common = { path = "../common" }
config = { path = "../config" } config = { path = "../config" }

View File

@ -579,8 +579,11 @@ mod tests {
..Default::default() ..Default::default()
}; };
let mut client = let checkpoint =
ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)).unwrap(); hex::decode("1e591af1e90f2db918b2a132991c7c2ee9a4ab26da496bd6e71e4f0bd65ea870")
.unwrap();
let mut client = ConsensusClient::new("testdata/", &checkpoint, Arc::new(config)).unwrap();
client.bootstrap().await.unwrap(); client.bootstrap().await.unwrap();
client client

View File

@ -1,19 +1,31 @@
use async_trait::async_trait; use async_trait::async_trait;
use common::errors::RpcError; use common::errors::RpcError;
use eyre::Result; use eyre::Result;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use super::ConsensusRpc; use super::ConsensusRpc;
use crate::types::*; use crate::types::*;
pub struct NimbusRpc { pub struct NimbusRpc {
rpc: String, rpc: String,
client: ClientWithMiddleware,
} }
#[async_trait] #[async_trait]
impl ConsensusRpc for NimbusRpc { impl ConsensusRpc for NimbusRpc {
fn new(rpc: &str) -> Self { fn new(rpc: &str) -> Self {
let retry_policy = ExponentialBackoff::builder()
.backoff_exponent(1)
.build_with_max_retries(3);
let client = ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
NimbusRpc { NimbusRpc {
rpc: rpc.to_string(), rpc: rpc.to_string(),
client,
} }
} }
@ -24,7 +36,10 @@ impl ConsensusRpc for NimbusRpc {
self.rpc, root_hex self.rpc, root_hex
); );
let res = reqwest::get(req) let res = self
.client
.get(req)
.send()
.await .await
.map_err(|e| RpcError::new("bootstrap", e))? .map_err(|e| RpcError::new("bootstrap", e))?
.json::<BootstrapResponse>() .json::<BootstrapResponse>()
@ -40,7 +55,10 @@ impl ConsensusRpc for NimbusRpc {
self.rpc, period self.rpc, period
); );
let res = reqwest::get(req) let res = self
.client
.get(req)
.send()
.await .await
.map_err(|e| RpcError::new("updates", e))? .map_err(|e| RpcError::new("updates", e))?
.json::<UpdateResponse>() .json::<UpdateResponse>()
@ -52,7 +70,10 @@ impl ConsensusRpc for NimbusRpc {
async fn get_finality_update(&self) -> Result<FinalityUpdate> { async fn get_finality_update(&self) -> Result<FinalityUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/finality_update", self.rpc); let req = format!("{}/eth/v1/beacon/light_client/finality_update", self.rpc);
let res = reqwest::get(req) let res = self
.client
.get(req)
.send()
.await .await
.map_err(|e| RpcError::new("finality_update", e))? .map_err(|e| RpcError::new("finality_update", e))?
.json::<FinalityUpdateResponse>() .json::<FinalityUpdateResponse>()
@ -64,7 +85,10 @@ impl ConsensusRpc for NimbusRpc {
async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> { async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
let req = format!("{}/eth/v1/beacon/light_client/optimistic_update", self.rpc); let req = format!("{}/eth/v1/beacon/light_client/optimistic_update", self.rpc);
let res = reqwest::get(req) let res = self
.client
.get(req)
.send()
.await .await
.map_err(|e| RpcError::new("optimistic_update", e))? .map_err(|e| RpcError::new("optimistic_update", e))?
.json::<OptimisticUpdateResponse>() .json::<OptimisticUpdateResponse>()
@ -76,7 +100,10 @@ impl ConsensusRpc for NimbusRpc {
async fn get_block(&self, slot: u64) -> Result<BeaconBlock> { async fn get_block(&self, slot: u64) -> Result<BeaconBlock> {
let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot); let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot);
let res = reqwest::get(req) let res = self
.client
.get(req)
.send()
.await .await
.map_err(|e| RpcError::new("blocks", e))? .map_err(|e| RpcError::new("blocks", e))?
.json::<BeaconBlockResponse>() .json::<BeaconBlockResponse>()

View File

@ -13,7 +13,10 @@ async fn setup() -> ConsensusClient<MockRpc> {
..Default::default() ..Default::default()
}; };
ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)).unwrap() let checkpoint =
hex::decode("1e591af1e90f2db918b2a132991c7c2ee9a4ab26da496bd6e71e4f0bd65ea870").unwrap();
ConsensusClient::new("testdata/", &checkpoint, Arc::new(config)).unwrap()
} }
#[tokio::test] #[tokio::test]