feat: add client builder (#84)

* add client builder

* use client builder in cli

* make build sync

* fix data dir override

* fix tests
This commit is contained in:
Noah Citron 2022-11-03 15:24:17 -04:00 committed by GitHub
parent 7841eb90e0
commit f605f009a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 219 additions and 55 deletions

View File

@ -2,6 +2,7 @@ use std::{
fs, fs,
path::PathBuf, path::PathBuf,
process::exit, process::exit,
str::FromStr,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
@ -11,7 +12,7 @@ use dirs::home_dir;
use env_logger::Env; use env_logger::Env;
use eyre::Result; use eyre::Result;
use client::{database::FileDB, Client}; use client::{database::FileDB, Client, ClientBuilder};
use config::{CliConfig, Config}; use config::{CliConfig, Config};
use futures::executor::block_on; use futures::executor::block_on;
use log::info; use log::info;
@ -21,7 +22,7 @@ async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let config = get_config(); let config = get_config();
let mut client = Client::new(config).await?; let mut client = ClientBuilder::new().config(config).build()?;
client.start().await?; client.start().await?;
@ -82,6 +83,8 @@ struct Cli {
execution_rpc: Option<String>, execution_rpc: Option<String>,
#[clap(short, long, env)] #[clap(short, long, env)]
consensus_rpc: Option<String>, consensus_rpc: Option<String>,
#[clap(short, long, env)]
data_dir: Option<String>,
} }
impl Cli { impl Cli {
@ -116,8 +119,12 @@ impl Cli {
} }
fn get_data_dir(&self) -> PathBuf { fn get_data_dir(&self) -> PathBuf {
if let Some(dir) = &self.data_dir {
PathBuf::from_str(dir).expect("cannot find data dir")
} else {
home_dir() home_dir()
.unwrap() .unwrap()
.join(format!(".helios/data/{}", self.network)) .join(format!(".helios/data/{}", self.network))
} }
} }
}

View File

@ -1,5 +1,7 @@
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use config::networks::Network;
use ethers::prelude::{Address, U256}; use ethers::prelude::{Address, U256};
use ethers::types::{Transaction, TransactionReceipt, H256}; use ethers::types::{Transaction, TransactionReceipt, H256};
use eyre::{eyre, Result}; use eyre::{eyre, Result};
@ -24,9 +26,9 @@ pub struct Client<DB: Database> {
} }
impl Client<FileDB> { impl Client<FileDB> {
pub async fn new(config: Config) -> Result<Self> { fn new(config: Config) -> Result<Self> {
let config = Arc::new(config); let config = Arc::new(config);
let node = Node::new(config.clone()).await?; let node = Node::new(config.clone())?;
let node = Arc::new(RwLock::new(node)); let node = Arc::new(RwLock::new(node));
let rpc = if let Some(port) = config.rpc_port { let rpc = if let Some(port) = config.rpc_port {
@ -42,6 +44,130 @@ impl Client<FileDB> {
} }
} }
pub struct ClientBuilder {
network: Option<Network>,
consensus_rpc: Option<String>,
execution_rpc: Option<String>,
checkpoint: Option<Vec<u8>>,
rpc_port: Option<u16>,
data_dir: Option<PathBuf>,
config: Option<Config>,
}
impl ClientBuilder {
pub fn new() -> Self {
Self {
network: None,
consensus_rpc: None,
execution_rpc: None,
checkpoint: None,
rpc_port: None,
data_dir: None,
config: None,
}
}
pub fn network(mut self, network: Network) -> Self {
self.network = Some(network);
self
}
pub fn consensus_rpc(mut self, consensus_rpc: &str) -> Self {
self.consensus_rpc = Some(consensus_rpc.to_string());
self
}
pub fn execution_rpc(mut self, execution_rpc: &str) -> Self {
self.execution_rpc = Some(execution_rpc.to_string());
self
}
pub fn checkpoint(mut self, checkpoint: &str) -> Self {
let checkpoint = hex::decode(checkpoint).expect("cannot parse checkpoint");
self.checkpoint = Some(checkpoint);
self
}
pub fn rpc_port(mut self, port: u16) -> Self {
self.rpc_port = Some(port);
self
}
pub fn data_dir(mut self, data_dir: PathBuf) -> Self {
self.data_dir = Some(data_dir);
self
}
pub fn config(mut self, config: Config) -> Self {
self.config = Some(config);
self
}
pub fn build(self) -> Result<Client<FileDB>> {
let base_config = if let Some(network) = self.network {
network.to_base_config()
} else {
let config = self
.config
.as_ref()
.ok_or(eyre!("missing network config"))?;
config.to_base_config()
};
let consensus_rpc = self.consensus_rpc.unwrap_or(
self.config
.as_ref()
.ok_or(eyre!("missing consensus rpc"))?
.consensus_rpc
.clone(),
);
let execution_rpc = self.execution_rpc.unwrap_or(
self.config
.as_ref()
.ok_or(eyre!("missing execution rpc"))?
.execution_rpc
.clone(),
);
let checkpoint = self.checkpoint.unwrap_or(
self.config
.as_ref()
.ok_or(eyre!("missing checkpoint"))?
.checkpoint
.clone(),
);
let rpc_port = if self.rpc_port.is_some() {
self.rpc_port
} else if let Some(config) = &self.config {
config.rpc_port
} else {
None
};
let data_dir = if self.data_dir.is_some() {
self.data_dir
} else if let Some(config) = &self.config {
config.data_dir.clone()
} else {
None
};
let config = Config {
consensus_rpc,
execution_rpc,
checkpoint,
rpc_port,
data_dir,
chain: base_config.chain,
forks: base_config.forks,
};
Client::new(config)
}
}
impl<DB: Database> Client<DB> { impl<DB: Database> Client<DB> {
pub async fn start(&mut self) -> Result<()> { pub async fn start(&mut self) -> Result<()> {
self.rpc.as_mut().unwrap().start().await?; self.rpc.as_mut().unwrap().start().await?;

View File

@ -27,13 +27,12 @@ pub struct Node {
} }
impl Node { impl Node {
pub async fn new(config: Arc<Config>) -> Result<Self> { pub fn new(config: Arc<Config>) -> Result<Self> {
let consensus_rpc = &config.consensus_rpc; let consensus_rpc = &config.consensus_rpc;
let checkpoint_hash = &config.checkpoint; let checkpoint_hash = &config.checkpoint;
let execution_rpc = &config.execution_rpc; let execution_rpc = &config.execution_rpc;
let consensus = let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone())?;
ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()).await?;
let execution = Arc::new(ExecutionClient::new(execution_rpc)?); let execution = Arc::new(ExecutionClient::new(execution_rpc)?);
let payloads = BTreeMap::new(); let payloads = BTreeMap::new();

View File

@ -85,6 +85,15 @@ impl Config {
self.forks.genesis.fork_version.clone() self.forks.genesis.fork_version.clone()
} }
} }
pub fn to_base_config(&self) -> BaseConfig {
BaseConfig {
rpc_port: self.rpc_port.unwrap_or(8545),
checkpoint: self.checkpoint.clone(),
chain: self.chain.clone(),
forks: self.forks.clone(),
}
}
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -122,7 +131,7 @@ impl CliConfig {
} }
} }
#[derive(Serialize, Deserialize, Debug, Default)] #[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct ChainConfig { pub struct ChainConfig {
pub chain_id: u64, pub chain_id: u64,
pub genesis_time: u64, pub genesis_time: u64,
@ -133,14 +142,14 @@ pub struct ChainConfig {
pub genesis_root: Vec<u8>, pub genesis_root: Vec<u8>,
} }
#[derive(Serialize, Deserialize, Debug, Default)] #[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct Forks { pub struct Forks {
pub genesis: Fork, pub genesis: Fork,
pub altair: Fork, pub altair: Fork,
pub bellatrix: Fork, pub bellatrix: Fork,
} }
#[derive(Serialize, Deserialize, Debug, Default)] #[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct Fork { pub struct Fork {
pub epoch: u64, pub epoch: u64,
#[serde( #[serde(

View File

@ -3,9 +3,23 @@ use serde::Serialize;
use crate::{bytes_serialize, ChainConfig, Fork, Forks}; use crate::{bytes_serialize, ChainConfig, Fork, Forks};
use common::utils::hex_str_to_bytes; use common::utils::hex_str_to_bytes;
pub enum Network {
MAINNET,
GOERLI,
}
impl Network {
pub fn to_base_config(&self) -> BaseConfig {
match self {
Self::MAINNET => mainnet(),
Self::GOERLI => goerli(),
}
}
}
#[derive(Serialize, Default)] #[derive(Serialize, Default)]
pub struct BaseConfig { pub struct BaseConfig {
rpc_port: u16, pub rpc_port: u16,
#[serde( #[serde(
deserialize_with = "bytes_deserialize", deserialize_with = "bytes_deserialize",
serialize_with = "bytes_serialize" serialize_with = "bytes_serialize"

View File

@ -25,11 +25,12 @@ use super::utils::*;
pub struct ConsensusClient<R: ConsensusRpc> { pub struct ConsensusClient<R: ConsensusRpc> {
rpc: R, rpc: R,
store: LightClientStore, store: LightClientStore,
initial_checkpoint: Vec<u8>,
pub last_checkpoint: Option<Vec<u8>>, pub last_checkpoint: Option<Vec<u8>>,
pub config: Arc<Config>, pub config: Arc<Config>,
} }
#[derive(Debug)] #[derive(Debug, Default)]
struct LightClientStore { struct LightClientStore {
finalized_header: Header, finalized_header: Header,
current_sync_committee: SyncCommittee, current_sync_committee: SyncCommittee,
@ -40,50 +41,19 @@ struct LightClientStore {
} }
impl<R: ConsensusRpc> ConsensusClient<R> { impl<R: ConsensusRpc> ConsensusClient<R> {
pub async fn new( pub fn new(
rpc: &str, rpc: &str,
checkpoint_block_root: &Vec<u8>, checkpoint_block_root: &Vec<u8>,
config: Arc<Config>, config: Arc<Config>,
) -> Result<ConsensusClient<R>> { ) -> Result<ConsensusClient<R>> {
let rpc = R::new(rpc); let rpc = R::new(rpc);
let mut bootstrap = rpc
.get_bootstrap(checkpoint_block_root)
.await
.map_err(|_| eyre!("could not fetch bootstrap"))?;
let committee_valid = is_current_committee_proof_valid(
&bootstrap.header,
&mut bootstrap.current_sync_committee,
&bootstrap.current_sync_committee_branch,
);
let header_hash = bootstrap.header.hash_tree_root()?.to_string();
let expected_hash = format!("0x{}", hex::encode(checkpoint_block_root));
let header_valid = header_hash == expected_hash;
if !header_valid {
return Err(ConsensusError::InvalidHeaderHash(expected_hash, header_hash).into());
}
if !committee_valid {
return Err(ConsensusError::InvalidCurrentSyncCommitteeProof.into());
}
let store = LightClientStore {
finalized_header: bootstrap.header.clone(),
current_sync_committee: bootstrap.current_sync_committee,
next_sync_committee: None,
optimistic_header: bootstrap.header.clone(),
previous_max_active_participants: 0,
current_max_active_participants: 0,
};
Ok(ConsensusClient { Ok(ConsensusClient {
rpc, rpc,
store, store: LightClientStore::default(),
last_checkpoint: None, last_checkpoint: None,
config, config,
initial_checkpoint: checkpoint_block_root.clone(),
}) })
} }
@ -123,6 +93,8 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
} }
pub async fn sync(&mut self) -> Result<()> { pub async fn sync(&mut self) -> Result<()> {
self.bootstrap().await?;
let current_period = calc_sync_period(self.store.finalized_header.slot); let current_period = calc_sync_period(self.store.finalized_header.slot);
let updates = self.rpc.get_updates(current_period).await?; let updates = self.rpc.get_updates(current_period).await?;
@ -170,6 +142,43 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
Ok(()) Ok(())
} }
async fn bootstrap(&mut self) -> Result<()> {
let mut bootstrap = self
.rpc
.get_bootstrap(&self.initial_checkpoint)
.await
.map_err(|_| eyre!("could not fetch bootstrap"))?;
let committee_valid = is_current_committee_proof_valid(
&bootstrap.header,
&mut bootstrap.current_sync_committee,
&bootstrap.current_sync_committee_branch,
);
let header_hash = bootstrap.header.hash_tree_root()?.to_string();
let expected_hash = format!("0x{}", hex::encode(&self.initial_checkpoint));
let header_valid = header_hash == expected_hash;
if !header_valid {
return Err(ConsensusError::InvalidHeaderHash(expected_hash, header_hash).into());
}
if !committee_valid {
return Err(ConsensusError::InvalidCurrentSyncCommitteeProof.into());
}
self.store = LightClientStore {
finalized_header: bootstrap.header.clone(),
current_sync_committee: bootstrap.current_sync_committee,
next_sync_committee: None,
optimistic_header: bootstrap.header.clone(),
previous_max_active_participants: 0,
current_max_active_participants: 0,
};
Ok(())
}
// implements checks from validate_light_client_update and process_light_client_update in the // implements checks from validate_light_client_update and process_light_client_update in the
// specification // specification
fn verify_generic_update(&self, update: &GenericUpdate) -> Result<()> { fn verify_generic_update(&self, update: &GenericUpdate) -> Result<()> {
@ -570,9 +579,11 @@ mod tests {
..Default::default() ..Default::default()
}; };
ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)) let mut client =
.await ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)).unwrap();
.unwrap() client.bootstrap().await.unwrap();
client
} }
#[tokio::test] #[tokio::test]

View File

@ -13,9 +13,7 @@ async fn setup() -> ConsensusClient<MockRpc> {
..Default::default() ..Default::default()
}; };
ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)) ConsensusClient::new("testdata/", &base_config.checkpoint, Arc::new(config)).unwrap()
.await
.unwrap()
} }
#[tokio::test] #[tokio::test]