feat: add checkpoint caching (#41)

* add checkpoint caching

* add data dir override to cli

* move checkpointing into database

* move logging to client

* clean up
This commit is contained in:
Noah Citron 2022-09-16 15:32:15 -04:00 committed by GitHub
parent c4e222b319
commit 0579855141
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 178 additions and 18 deletions

24
Cargo.lock generated
View File

@ -629,6 +629,16 @@ dependencies = [
"cipher", "cipher",
] ]
[[package]]
name = "ctrlc"
version = "3.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d91974fbbe88ec1df0c24a4f00f99583667a7e2e6272b2b92d294d81e462173"
dependencies = [
"nix",
"winapi",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.6.0" version = "0.6.0"
@ -1942,9 +1952,11 @@ dependencies = [
"client", "client",
"common", "common",
"config", "config",
"ctrlc",
"dirs", "dirs",
"env_logger", "env_logger",
"eyre", "eyre",
"futures",
"log", "log",
"tokio", "tokio",
] ]
@ -2016,6 +2028,18 @@ dependencies = [
"tempfile", "tempfile",
] ]
[[package]]
name = "nix"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e322c04a9e3440c327fca7b6c8a63e6890a32fa2ad689db972425f07e0d22abb"
dependencies = [
"autocfg",
"bitflags",
"cfg-if",
"libc",
]
[[package]] [[package]]
name = "num" name = "num"
version = "0.4.0" version = "0.4.0"

View File

@ -10,6 +10,8 @@ eyre = "0.6.8"
dirs = "4.0.0" dirs = "4.0.0"
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.17" log = "0.4.17"
ctrlc = "3.2.3"
futures = "0.3.23"
client = { path = "../client" } client = { path = "../client" }
config = { path = "../config" } config = { path = "../config" }

View File

@ -1,3 +1,5 @@
use std::{fs, path::PathBuf, process::exit};
use clap::Parser; use clap::Parser;
use common::utils::hex_str_to_bytes; use common::utils::hex_str_to_bytes;
use dirs::home_dir; use dirs::home_dir;
@ -6,19 +8,26 @@ use eyre::Result;
use client::Client; use client::Client;
use config::{networks, Config}; use config::{networks, Config};
use futures::executor::block_on;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let config = get_config()?; let config = get_config();
let mut client = Client::new(config).await?; let mut client = Client::new(config).await?;
client.start().await?; client.start().await?;
ctrlc::set_handler(move || {
block_on(client.shutdown());
exit(0);
})?;
std::future::pending().await std::future::pending().await
} }
fn get_config() -> Result<Config> { fn get_config() -> Config {
let cli = Cli::parse(); let cli = Cli::parse();
let mut config = match cli.network.as_str() { let mut config = match cli.network.as_str() {
"mainnet" => networks::mainnet(), "mainnet" => networks::mainnet(),
@ -26,13 +35,16 @@ fn get_config() -> Result<Config> {
_ => { _ => {
let home = home_dir().unwrap(); let home = home_dir().unwrap();
let config_path = home.join(format!(".lightclient/configs/{}.toml", cli.network)); let config_path = home.join(format!(".lightclient/configs/{}.toml", cli.network));
Config::from_file(&config_path).unwrap() Config::from_file(&config_path).expect("could not read network config")
} }
}; };
if let Some(checkpoint) = cli.checkpoint { let data_dir = get_data_dir(&cli);
config.general.checkpoint = hex_str_to_bytes(&checkpoint)?;
} config.general.checkpoint = match cli.checkpoint {
Some(checkpoint) => hex_str_to_bytes(&checkpoint).expect("invalid checkpoint"),
None => get_cached_checkpoint(&data_dir).unwrap_or(config.general.checkpoint),
};
if let Some(port) = cli.port { if let Some(port) = cli.port {
config.general.rpc_port = Some(port); config.general.rpc_port = Some(port);
@ -46,7 +58,31 @@ fn get_config() -> Result<Config> {
config.general.consensus_rpc = consensus_rpc; config.general.consensus_rpc = consensus_rpc;
} }
Ok(config) config.machine.data_dir = Some(data_dir);
config
}
fn get_data_dir(cli: &Cli) -> PathBuf {
match &cli.data_dir {
Some(dir) => PathBuf::from(dir),
None => home_dir()
.unwrap()
.join(format!(".lightclient/data/{}", cli.network)),
}
}
fn get_cached_checkpoint(data_dir: &PathBuf) -> Option<Vec<u8>> {
let checkpoint_file = data_dir.join("checkpoint");
if checkpoint_file.exists() {
let checkpoint_res = fs::read(checkpoint_file);
match checkpoint_res {
Ok(checkpoint) => Some(checkpoint),
Err(_) => None,
}
} else {
None
}
} }
#[derive(Parser)] #[derive(Parser)]
@ -61,4 +97,6 @@ struct Cli {
execution_rpc: Option<String>, execution_rpc: Option<String>,
#[clap(short, long)] #[clap(short, long)]
consensus_rpc: Option<String>, consensus_rpc: Option<String>,
#[clap(long)]
data_dir: Option<String>,
} }

View File

@ -3,25 +3,27 @@ use std::time::Duration;
use ethers::prelude::{Address, U256}; use ethers::prelude::{Address, U256};
use ethers::types::{Transaction, TransactionReceipt, H256}; use ethers::types::{Transaction, TransactionReceipt, H256};
use eyre::Result; use eyre::{eyre, Result};
use config::Config; use config::Config;
use consensus::types::Header; use consensus::types::Header;
use execution::types::{CallOpts, ExecutionBlock}; use execution::types::{CallOpts, ExecutionBlock};
use log::warn; use log::{info, warn};
use tokio::spawn; use tokio::spawn;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::sleep; use tokio::time::sleep;
use crate::database::{Database, FileDB};
use crate::node::{BlockTag, Node}; use crate::node::{BlockTag, Node};
use crate::rpc::Rpc; use crate::rpc::Rpc;
pub struct Client { pub struct Client<DB: Database> {
node: Arc<Mutex<Node>>, node: Arc<Mutex<Node>>,
rpc: Option<Rpc>, rpc: Option<Rpc>,
db: DB,
} }
impl Client { impl Client<FileDB> {
pub async fn new(config: Config) -> Result<Self> { pub async fn new(config: Config) -> Result<Self> {
let config = Arc::new(config); let config = Arc::new(config);
let node = Node::new(config.clone()).await?; let node = Node::new(config.clone()).await?;
@ -33,15 +35,24 @@ impl Client {
None None
}; };
Ok(Client { node, rpc }) let data_dir = config.machine.data_dir.clone();
} let db = FileDB::new(data_dir.ok_or(eyre!("data dir not found"))?);
Ok(Client { node, rpc, db })
}
}
impl<DB: Database> Client<DB> {
pub async fn start(&mut self) -> Result<()> { pub async fn start(&mut self) -> Result<()> {
self.rpc.as_mut().unwrap().start().await?; self.rpc.as_mut().unwrap().start().await?;
self.node.lock().await.sync().await?;
let node = self.node.clone(); let node = self.node.clone();
spawn(async move { spawn(async move {
let res = node.lock().await.sync().await;
if let Err(err) = res {
warn!("{}", err);
}
loop { loop {
let res = node.lock().await.advance().await; let res = node.lock().await.advance().await;
if let Err(err) = res { if let Err(err) = res {
@ -55,6 +66,24 @@ impl Client {
Ok(()) Ok(())
} }
pub async fn shutdown(&self) {
println!();
info!("shutting down");
let node = self.node.lock().await;
let checkpoint = if let Some(checkpoint) = node.get_last_checkpoint() {
checkpoint
} else {
return;
};
info!("saving last checkpoint hash");
let res = self.db.save_checkpoint(checkpoint);
if res.is_err() {
warn!("checkpoint save failed");
}
}
pub async fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> { pub async fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> {
self.node.lock().await.call(opts, block) self.node.lock().await.call(opts, block)
} }

33
client/src/database.rs Normal file
View File

@ -0,0 +1,33 @@
use std::{fs, io::Write, path::PathBuf};
use eyre::Result;
pub trait Database {
fn save_checkpoint(&self, checkpoint: Vec<u8>) -> Result<()>;
}
pub struct FileDB {
data_dir: PathBuf,
}
impl FileDB {
pub fn new(data_dir: PathBuf) -> Self {
FileDB { data_dir }
}
}
impl Database for FileDB {
fn save_checkpoint(&self, checkpoint: Vec<u8>) -> Result<()> {
fs::create_dir_all(&self.data_dir)?;
let mut f = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(self.data_dir.join("checkpoint"))?;
f.write_all(checkpoint.as_slice())?;
Ok(())
}
}

View File

@ -3,6 +3,7 @@
mod client; mod client;
pub use crate::client::*; pub use crate::client::*;
pub mod database;
pub mod rpc; pub mod rpc;
mod node; mod node;

View File

@ -190,6 +190,10 @@ impl Node {
self.consensus.get_header() self.consensus.get_header()
} }
pub fn get_last_checkpoint(&self) -> Option<Vec<u8>> {
self.consensus.last_checkpoint.clone()
}
fn get_payload(&self, block: &BlockTag) -> Result<&ExecutionPayload> { fn get_payload(&self, block: &BlockTag) -> Result<&ExecutionPayload> {
match block { match block {
BlockTag::Latest => { BlockTag::Latest => {

View File

@ -1,6 +1,9 @@
pub mod networks; pub mod networks;
use std::{fs, path::Path}; use std::{
fs,
path::{Path, PathBuf},
};
use eyre::Result; use eyre::Result;
use serde::Deserialize; use serde::Deserialize;
@ -11,6 +14,7 @@ use common::utils::hex_str_to_bytes;
pub struct Config { pub struct Config {
pub general: General, pub general: General,
pub forks: Forks, pub forks: Forks,
pub machine: Machine,
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -40,6 +44,11 @@ pub struct Fork {
pub fork_version: Vec<u8>, pub fork_version: Vec<u8>,
} }
#[derive(Deserialize, Debug)]
pub struct Machine {
pub data_dir: Option<PathBuf>,
}
impl Config { impl Config {
pub fn from_file(path: &Path) -> Result<Self> { pub fn from_file(path: &Path) -> Result<Self> {
let contents = fs::read_to_string(path)?; let contents = fs::read_to_string(path)?;

View File

@ -1,6 +1,6 @@
use common::utils::hex_str_to_bytes; use common::utils::hex_str_to_bytes;
use crate::{Config, Fork, Forks, General}; use crate::{Config, Fork, Forks, General, Machine};
pub fn mainnet() -> Config { pub fn mainnet() -> Config {
Config { Config {
@ -34,6 +34,7 @@ pub fn mainnet() -> Config {
fork_version: hex_str_to_bytes("0x02000000").unwrap(), fork_version: hex_str_to_bytes("0x02000000").unwrap(),
}, },
}, },
machine: Machine { data_dir: None },
} }
} }
@ -70,5 +71,6 @@ pub fn goerli() -> Config {
fork_version: hex_str_to_bytes("0x02001020").unwrap(), fork_version: hex_str_to_bytes("0x02001020").unwrap(),
}, },
}, },
machine: Machine { data_dir: None },
} }
} }

View File

@ -19,7 +19,8 @@ use super::types::*;
pub struct ConsensusClient<R: Rpc> { pub struct ConsensusClient<R: Rpc> {
rpc: R, rpc: R,
store: Store, store: Store,
config: Arc<Config>, pub last_checkpoint: Option<Vec<u8>>,
pub config: Arc<Config>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -65,7 +66,12 @@ impl<R: Rpc> ConsensusClient<R> {
current_max_active_participants: 0, current_max_active_participants: 0,
}; };
Ok(ConsensusClient { rpc, store, config }) Ok(ConsensusClient {
rpc,
store,
last_checkpoint: None,
config,
})
} }
pub async fn get_execution_payload(&self, slot: &Option<u64>) -> Result<ExecutionPayload> { pub async fn get_execution_payload(&self, slot: &Option<u64>) -> Result<ExecutionPayload> {
@ -288,6 +294,12 @@ impl<R: Rpc> ConsensusClient<R> {
self.store.finalized_header = update.finalized_header.clone(); self.store.finalized_header = update.finalized_header.clone();
if update.finalized_header.slot % 32 == 0 {
let n = update.finalized_header.clone().hash_tree_root().unwrap();
let checkpoint = n.as_bytes().to_vec();
self.last_checkpoint = Some(checkpoint);
}
if self.store.next_sync_committee.is_none() { if self.store.next_sync_committee.is_none() {
self.store.next_sync_committee = Some(update.next_sync_committee.clone()); self.store.next_sync_committee = Some(update.next_sync_committee.clone());
} else if update_signature_period == store_period + 1 { } else if update_signature_period == store_period + 1 {
@ -322,6 +334,12 @@ impl<R: Rpc> ConsensusClient<R> {
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
let delay = self.get_delay(self.store.finalized_header.slot); let delay = self.get_delay(self.store.finalized_header.slot);
if update.finalized_header.slot % 32 == 0 {
let n = update.finalized_header.clone().hash_tree_root().unwrap();
let checkpoint = n.as_bytes().to_vec();
self.last_checkpoint = Some(checkpoint);
}
info!( info!(
"applying finality update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}", "applying finality update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}",
self.store.finalized_header.slot, self.store.finalized_header.slot,