feat: backfill payloads (#189)

* Loop over all missing slots since last update

* Adjust get_block_header function to allow getting headers of past blocks

* Compare parent hashes when backfilling blocks

* Backfill blocks concurrently

* Do not rehash backfilled blocks

* Revert "Adjust get_block_header function to allow getting headers of past blocks"

This reverts commit 5895118046.

* Move get_payloads to consensus module

* Continue with the next block instead of request failure to recover from skipped blocks

* clippy and rustfmt

* clippy

* Remove redundant get_block_from_rpc method
This commit is contained in:
christn 2023-02-08 06:36:29 +08:00 committed by GitHub
parent a0032835f3
commit 1b1a540340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 1 deletions

1
Cargo.lock generated
View File

@ -687,6 +687,7 @@ dependencies = [
"config",
"ethers",
"eyre",
"futures",
"hex",
"log",
"milagro_bls",

View File

@ -9,6 +9,7 @@ use eyre::{eyre, Result};
use common::errors::BlockNotFoundError;
use common::types::BlockTag;
use config::Config;
use consensus::rpc::nimbus_rpc::NimbusRpc;
use consensus::types::{ExecutionPayload, Header};
use consensus::ConsensusClient;
@ -25,6 +26,7 @@ pub struct Node {
pub config: Arc<Config>,
payloads: BTreeMap<u64, ExecutionPayload>,
finalized_payloads: BTreeMap<u64, ExecutionPayload>,
current_slot: Option<u64>,
pub history_size: usize,
}
@ -49,6 +51,7 @@ impl Node {
config,
payloads,
finalized_payloads,
current_slot: None,
history_size: 64,
})
}
@ -110,6 +113,20 @@ impl Node {
self.finalized_payloads
.insert(finalized_payload.block_number, finalized_payload);
let start_slot = self
.current_slot
.unwrap_or(latest_header.slot - self.history_size as u64);
let backfill_payloads = self
.consensus
.get_payloads(start_slot, latest_header.slot)
.await
.map_err(NodeError::ConsensusPayloadError)?;
for payload in backfill_payloads {
self.payloads.insert(payload.block_number, payload);
}
self.current_slot = Some(latest_header.slot);
while self.payloads.len() > self.history_size {
self.payloads.pop_first();
}

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
eyre = "0.6.8"
futures = "0.3.23"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.85"
hex = "0.4.3"

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use chrono::Duration;
use eyre::eyre;
use eyre::Result;
use futures::future::join_all;
use log::warn;
use log::{debug, info};
use milagro_bls::PublicKey;
@ -106,6 +107,43 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
}
}
pub async fn get_payloads(
&self,
start_slot: u64,
end_slot: u64,
) -> Result<Vec<ExecutionPayload>> {
let payloads_fut = (start_slot..end_slot)
.rev()
.map(|slot| self.rpc.get_block(slot));
let mut prev_parent_hash: Bytes32 = self
.rpc
.get_block(end_slot)
.await?
.body
.execution_payload
.parent_hash;
let mut payloads: Vec<ExecutionPayload> = Vec::new();
for result in join_all(payloads_fut).await {
if result.is_err() {
continue;
}
let payload = result.unwrap().body.execution_payload;
if payload.block_hash != prev_parent_hash {
warn!(
"error while backfilling blocks: {}",
ConsensusError::InvalidHeaderHash(
format!("{prev_parent_hash:02X?}"),
format!("{:02X?}", payload.parent_hash),
)
);
break;
}
prev_parent_hash = payload.parent_hash.clone();
payloads.push(payload);
}
Ok(payloads)
}
pub fn get_header(&self) -> &Header {
&self.store.optimistic_header
}

View File

@ -175,7 +175,7 @@ impl<'a, R: ExecutionRpc> Evm<'a, R> {
env.tx.transact_to = TransactTo::Call(opts.to);
env.tx.caller = opts.from.unwrap_or(Address::zero());
env.tx.value = opts.value.unwrap_or(U256::from(0));
env.tx.data = Bytes::from(opts.data.clone().unwrap_or(vec![]));
env.tx.data = Bytes::from(opts.data.clone().unwrap_or_default());
env.tx.gas_limit = opts.gas.map(|v| v.as_u64()).unwrap_or(u64::MAX);
env.tx.gas_price = opts.gas_price.unwrap_or(U256::zero());