From 05798551412a9f590fc4202f2475eab71529dfa9 Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Fri, 16 Sep 2022 15:32:15 -0400 Subject: [PATCH] feat: add checkpoint caching (#41) * add checkpoint caching * add data dir override to cli * move checkpointing into database * move logging to client * clean up --- Cargo.lock | 24 ++++++++++++++++++ cli/Cargo.toml | 2 ++ cli/src/main.rs | 52 +++++++++++++++++++++++++++++++++----- client/src/client.rs | 43 ++++++++++++++++++++++++++----- client/src/database.rs | 33 ++++++++++++++++++++++++ client/src/lib.rs | 1 + client/src/node.rs | 4 +++ config/src/lib.rs | 11 +++++++- config/src/networks.rs | 4 ++- consensus/src/consensus.rs | 22 ++++++++++++++-- 10 files changed, 178 insertions(+), 18 deletions(-) create mode 100644 client/src/database.rs diff --git a/Cargo.lock b/Cargo.lock index 44f76d2..16455a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -629,6 +629,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctrlc" +version = "3.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d91974fbbe88ec1df0c24a4f00f99583667a7e2e6272b2b92d294d81e462173" +dependencies = [ + "nix", + "winapi", +] + [[package]] name = "der" version = "0.6.0" @@ -1942,9 +1952,11 @@ dependencies = [ "client", "common", "config", + "ctrlc", "dirs", "env_logger", "eyre", + "futures", "log", "tokio", ] @@ -2016,6 +2028,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "num" version = "0.4.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 9c067be..54566a3 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -10,6 +10,8 @@ eyre = "0.6.8" dirs = "4.0.0" env_logger = "0.9.0" log = "0.4.17" +ctrlc = "3.2.3" +futures = "0.3.23" client = { path = "../client" } config = { path = "../config" } diff --git a/cli/src/main.rs b/cli/src/main.rs index 49270df..f97f896 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,3 +1,5 @@ +use std::{fs, path::PathBuf, process::exit}; + use clap::Parser; use common::utils::hex_str_to_bytes; use dirs::home_dir; @@ -6,19 +8,26 @@ use eyre::Result; use client::Client; use config::{networks, Config}; +use futures::executor::block_on; #[tokio::main] async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - let config = get_config()?; + let config = get_config(); let mut client = Client::new(config).await?; + client.start().await?; + ctrlc::set_handler(move || { + block_on(client.shutdown()); + exit(0); + })?; + std::future::pending().await } -fn get_config() -> Result { +fn get_config() -> Config { let cli = Cli::parse(); let mut config = match cli.network.as_str() { "mainnet" => networks::mainnet(), @@ -26,13 +35,16 @@ fn get_config() -> Result { _ => { let home = home_dir().unwrap(); let config_path = home.join(format!(".lightclient/configs/{}.toml", cli.network)); - Config::from_file(&config_path).unwrap() + Config::from_file(&config_path).expect("could not read network config") } }; - if let Some(checkpoint) = cli.checkpoint { - config.general.checkpoint = hex_str_to_bytes(&checkpoint)?; - } + let data_dir = get_data_dir(&cli); + + config.general.checkpoint = match cli.checkpoint { + Some(checkpoint) => hex_str_to_bytes(&checkpoint).expect("invalid checkpoint"), + None => get_cached_checkpoint(&data_dir).unwrap_or(config.general.checkpoint), + }; if let Some(port) = cli.port { config.general.rpc_port = Some(port); @@ -46,7 +58,31 @@ fn get_config() -> Result { config.general.consensus_rpc = consensus_rpc; } - Ok(config) + config.machine.data_dir = Some(data_dir); + + config +} + +fn get_data_dir(cli: &Cli) -> PathBuf { + match &cli.data_dir { + Some(dir) => PathBuf::from(dir), + None => home_dir() + .unwrap() + .join(format!(".lightclient/data/{}", cli.network)), + } +} + +fn get_cached_checkpoint(data_dir: &PathBuf) -> Option> { + let checkpoint_file = data_dir.join("checkpoint"); + if checkpoint_file.exists() { + let checkpoint_res = fs::read(checkpoint_file); + match checkpoint_res { + Ok(checkpoint) => Some(checkpoint), + Err(_) => None, + } + } else { + None + } } #[derive(Parser)] @@ -61,4 +97,6 @@ struct Cli { execution_rpc: Option, #[clap(short, long)] consensus_rpc: Option, + #[clap(long)] + data_dir: Option, } diff --git a/client/src/client.rs b/client/src/client.rs index 0b0f923..d413d04 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -3,25 +3,27 @@ use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{Transaction, TransactionReceipt, H256}; -use eyre::Result; +use eyre::{eyre, Result}; use config::Config; use consensus::types::Header; use execution::types::{CallOpts, ExecutionBlock}; -use log::warn; +use log::{info, warn}; use tokio::spawn; use tokio::sync::Mutex; use tokio::time::sleep; +use crate::database::{Database, FileDB}; use crate::node::{BlockTag, Node}; use crate::rpc::Rpc; -pub struct Client { +pub struct Client { node: Arc>, rpc: Option, + db: DB, } -impl Client { +impl Client { pub async fn new(config: Config) -> Result { let config = Arc::new(config); let node = Node::new(config.clone()).await?; @@ -33,15 +35,24 @@ impl Client { None }; - Ok(Client { node, rpc }) - } + let data_dir = config.machine.data_dir.clone(); + let db = FileDB::new(data_dir.ok_or(eyre!("data dir not found"))?); + Ok(Client { node, rpc, db }) + } +} + +impl Client { pub async fn start(&mut self) -> Result<()> { self.rpc.as_mut().unwrap().start().await?; - self.node.lock().await.sync().await?; let node = self.node.clone(); spawn(async move { + let res = node.lock().await.sync().await; + if let Err(err) = res { + warn!("{}", err); + } + loop { let res = node.lock().await.advance().await; if let Err(err) = res { @@ -55,6 +66,24 @@ impl Client { Ok(()) } + pub async fn shutdown(&self) { + println!(); + info!("shutting down"); + + let node = self.node.lock().await; + let checkpoint = if let Some(checkpoint) = node.get_last_checkpoint() { + checkpoint + } else { + return; + }; + + info!("saving last checkpoint hash"); + let res = self.db.save_checkpoint(checkpoint); + if res.is_err() { + warn!("checkpoint save failed"); + } + } + pub async fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result> { self.node.lock().await.call(opts, block) } diff --git a/client/src/database.rs b/client/src/database.rs new file mode 100644 index 0000000..48373b2 --- /dev/null +++ b/client/src/database.rs @@ -0,0 +1,33 @@ +use std::{fs, io::Write, path::PathBuf}; + +use eyre::Result; + +pub trait Database { + fn save_checkpoint(&self, checkpoint: Vec) -> Result<()>; +} + +pub struct FileDB { + data_dir: PathBuf, +} + +impl FileDB { + pub fn new(data_dir: PathBuf) -> Self { + FileDB { data_dir } + } +} + +impl Database for FileDB { + fn save_checkpoint(&self, checkpoint: Vec) -> Result<()> { + fs::create_dir_all(&self.data_dir)?; + + let mut f = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(self.data_dir.join("checkpoint"))?; + + f.write_all(checkpoint.as_slice())?; + + Ok(()) + } +} diff --git a/client/src/lib.rs b/client/src/lib.rs index ff4e291..4f290ef 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -3,6 +3,7 @@ mod client; pub use crate::client::*; +pub mod database; pub mod rpc; mod node; diff --git a/client/src/node.rs b/client/src/node.rs index 78eadca..d60b608 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -190,6 +190,10 @@ impl Node { self.consensus.get_header() } + pub fn get_last_checkpoint(&self) -> Option> { + self.consensus.last_checkpoint.clone() + } + fn get_payload(&self, block: &BlockTag) -> Result<&ExecutionPayload> { match block { BlockTag::Latest => { diff --git a/config/src/lib.rs b/config/src/lib.rs index 18bdddd..6152c30 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -1,6 +1,9 @@ pub mod networks; -use std::{fs, path::Path}; +use std::{ + fs, + path::{Path, PathBuf}, +}; use eyre::Result; use serde::Deserialize; @@ -11,6 +14,7 @@ use common::utils::hex_str_to_bytes; pub struct Config { pub general: General, pub forks: Forks, + pub machine: Machine, } #[derive(Deserialize, Debug)] @@ -40,6 +44,11 @@ pub struct Fork { pub fork_version: Vec, } +#[derive(Deserialize, Debug)] +pub struct Machine { + pub data_dir: Option, +} + impl Config { pub fn from_file(path: &Path) -> Result { let contents = fs::read_to_string(path)?; diff --git a/config/src/networks.rs b/config/src/networks.rs index 765fd30..c8733e5 100644 --- a/config/src/networks.rs +++ b/config/src/networks.rs @@ -1,6 +1,6 @@ use common::utils::hex_str_to_bytes; -use crate::{Config, Fork, Forks, General}; +use crate::{Config, Fork, Forks, General, Machine}; pub fn mainnet() -> Config { Config { @@ -34,6 +34,7 @@ pub fn mainnet() -> Config { fork_version: hex_str_to_bytes("0x02000000").unwrap(), }, }, + machine: Machine { data_dir: None }, } } @@ -70,5 +71,6 @@ pub fn goerli() -> Config { fork_version: hex_str_to_bytes("0x02001020").unwrap(), }, }, + machine: Machine { data_dir: None }, } } diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 40f12ec..8841ad9 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -19,7 +19,8 @@ use super::types::*; pub struct ConsensusClient { rpc: R, store: Store, - config: Arc, + pub last_checkpoint: Option>, + pub config: Arc, } #[derive(Debug)] @@ -65,7 +66,12 @@ impl ConsensusClient { current_max_active_participants: 0, }; - Ok(ConsensusClient { rpc, store, config }) + Ok(ConsensusClient { + rpc, + store, + last_checkpoint: None, + config, + }) } pub async fn get_execution_payload(&self, slot: &Option) -> Result { @@ -288,6 +294,12 @@ impl ConsensusClient { self.store.finalized_header = update.finalized_header.clone(); + if update.finalized_header.slot % 32 == 0 { + let n = update.finalized_header.clone().hash_tree_root().unwrap(); + let checkpoint = n.as_bytes().to_vec(); + self.last_checkpoint = Some(checkpoint); + } + if self.store.next_sync_committee.is_none() { self.store.next_sync_committee = Some(update.next_sync_committee.clone()); } else if update_signature_period == store_period + 1 { @@ -322,6 +334,12 @@ impl ConsensusClient { get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; let delay = self.get_delay(self.store.finalized_header.slot); + if update.finalized_header.slot % 32 == 0 { + let n = update.finalized_header.clone().hash_tree_root().unwrap(); + let checkpoint = n.as_bytes().to_vec(); + self.last_checkpoint = Some(checkpoint); + } + info!( "applying finality update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}", self.store.finalized_header.slot,