From 1b1a54034077d06af945e579282c5a243a333d98 Mon Sep 17 00:00:00 2001 From: christn Date: Wed, 8 Feb 2023 06:36:29 +0800 Subject: [PATCH] feat: backfill payloads (#189) * Loop over all missing slots since last update * Adjust get_block_header function to allow getting headers of past blocks * Compare parent hashes when backfilling blocks * Backfill blocks concurrently * Do not rehash backfilled blocks * Revert "Adjust get_block_header function to allow getting headers of past blocks" This reverts commit 5895118046a0778a973f034f86757997d596ef35. * Move get_payloads to consensus module * Continue with the next block instead of request failure to recover from skipped blocks * clippy and rustfmt * clippy * Remove redundant get_block_from_rpc method --- Cargo.lock | 1 + client/src/node.rs | 17 +++++++++++++++++ consensus/Cargo.toml | 1 + consensus/src/consensus.rs | 38 ++++++++++++++++++++++++++++++++++++++ execution/src/evm.rs | 2 +- 5 files changed, 58 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 2f4c8dc..733b499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,6 +687,7 @@ dependencies = [ "config", "ethers", "eyre", + "futures", "hex", "log", "milagro_bls", diff --git a/client/src/node.rs b/client/src/node.rs index 93d6b41..cb4e35d 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -9,6 +9,7 @@ use eyre::{eyre, Result}; use common::errors::BlockNotFoundError; use common::types::BlockTag; use config::Config; + use consensus::rpc::nimbus_rpc::NimbusRpc; use consensus::types::{ExecutionPayload, Header}; use consensus::ConsensusClient; @@ -25,6 +26,7 @@ pub struct Node { pub config: Arc, payloads: BTreeMap, finalized_payloads: BTreeMap, + current_slot: Option, pub history_size: usize, } @@ -49,6 +51,7 @@ impl Node { config, payloads, finalized_payloads, + current_slot: None, history_size: 64, }) } @@ -110,6 +113,20 @@ impl Node { self.finalized_payloads .insert(finalized_payload.block_number, finalized_payload); + let start_slot = self + .current_slot + .unwrap_or(latest_header.slot - self.history_size as u64); + let backfill_payloads = self + .consensus + .get_payloads(start_slot, latest_header.slot) + .await + .map_err(NodeError::ConsensusPayloadError)?; + for payload in backfill_payloads { + self.payloads.insert(payload.block_number, payload); + } + + self.current_slot = Some(latest_header.slot); + while self.payloads.len() > self.history_size { self.payloads.pop_first(); } diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 41e1a1c..c261ff4 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] eyre = "0.6.8" +futures = "0.3.23" serde = { version = "1.0.143", features = ["derive"] } serde_json = "1.0.85" hex = "0.4.3" diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 86c60a1..ea0051e 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use chrono::Duration; use eyre::eyre; use eyre::Result; +use futures::future::join_all; use log::warn; use log::{debug, info}; use milagro_bls::PublicKey; @@ -106,6 +107,43 @@ impl ConsensusClient { } } + pub async fn get_payloads( + &self, + start_slot: u64, + end_slot: u64, + ) -> Result> { + let payloads_fut = (start_slot..end_slot) + .rev() + .map(|slot| self.rpc.get_block(slot)); + let mut prev_parent_hash: Bytes32 = self + .rpc + .get_block(end_slot) + .await? + .body + .execution_payload + .parent_hash; + let mut payloads: Vec = Vec::new(); + for result in join_all(payloads_fut).await { + if result.is_err() { + continue; + } + let payload = result.unwrap().body.execution_payload; + if payload.block_hash != prev_parent_hash { + warn!( + "error while backfilling blocks: {}", + ConsensusError::InvalidHeaderHash( + format!("{prev_parent_hash:02X?}"), + format!("{:02X?}", payload.parent_hash), + ) + ); + break; + } + prev_parent_hash = payload.parent_hash.clone(); + payloads.push(payload); + } + Ok(payloads) + } + pub fn get_header(&self) -> &Header { &self.store.optimistic_header } diff --git a/execution/src/evm.rs b/execution/src/evm.rs index 33095fa..223b330 100644 --- a/execution/src/evm.rs +++ b/execution/src/evm.rs @@ -175,7 +175,7 @@ impl<'a, R: ExecutionRpc> Evm<'a, R> { env.tx.transact_to = TransactTo::Call(opts.to); env.tx.caller = opts.from.unwrap_or(Address::zero()); env.tx.value = opts.value.unwrap_or(U256::from(0)); - env.tx.data = Bytes::from(opts.data.clone().unwrap_or(vec![])); + env.tx.data = Bytes::from(opts.data.clone().unwrap_or_default()); env.tx.gas_limit = opts.gas.map(|v| v.as_u64()).unwrap_or(u64::MAX); env.tx.gas_price = opts.gas_price.unwrap_or(U256::zero());