:caution: rfc

This commit is contained in:
Andreas Bigger 2022-12-03 07:58:58 -08:00
parent 4f99cfef95
commit 38091aaa00
28 changed files with 523 additions and 395 deletions

404
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -21,9 +21,6 @@ common = { path = "./common" }
consensus = { path = "./consensus" } consensus = { path = "./consensus" }
execution = { path = "./execution" } execution = { path = "./execution" }
[patch.crates-io]
ethers = { git = "https://github.com/ncitron/ethers-rs", branch = "fix-retry" }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
eyre = "0.6.8" eyre = "0.6.8"

View File

@ -17,7 +17,6 @@ eyre = "0.6.8"
dirs = "4.0.0" dirs = "4.0.0"
env_logger = "0.9.0" env_logger = "0.9.0"
log = "0.4.17" log = "0.4.17"
ctrlc = "3.2.3"
futures = "0.3.23" futures = "0.3.23"
client = { path = "../client" } client = { path = "../client" }

84
cli/src/cli.rs Normal file
View File

@ -0,0 +1,84 @@
use std::{fs, path::PathBuf, str::FromStr};
use clap::Parser;
use common::utils::hex_str_to_bytes;
use dirs::home_dir;
use config::{CliConfig, Config};
#[derive(Parser)]
pub struct Cli {
#[clap(short, long, default_value = "mainnet")]
network: String,
#[clap(short = 'p', long, env)]
rpc_port: Option<u16>,
#[clap(short = 'w', long, env)]
checkpoint: Option<String>,
#[clap(short, long, env)]
execution_rpc: Option<String>,
#[clap(short, long, env)]
consensus_rpc: Option<String>,
#[clap(short, long, env)]
data_dir: Option<String>,
#[clap(short = 'f', long, env)]
fallback: Option<String>,
#[clap(short = 'l', long, env)]
load_external_fallback: bool,
#[clap(short = 's', long, env)]
with_ws: bool,
#[clap(short = 'h', long, env)]
with_http: bool,
}
impl Cli {
pub fn to_config() -> Config {
let cli = Cli::parse();
let config_path = home_dir().unwrap().join(".helios/helios.toml");
let cli_config = cli.as_cli_config();
Config::from_file(&config_path, &cli.network, &cli_config)
}
fn as_cli_config(&self) -> CliConfig {
let checkpoint = match &self.checkpoint {
Some(checkpoint) => Some(hex_str_to_bytes(checkpoint).expect("invalid checkpoint")),
None => self.get_cached_checkpoint(),
};
CliConfig {
checkpoint,
execution_rpc: self.execution_rpc.clone(),
consensus_rpc: self.consensus_rpc.clone(),
data_dir: self.get_data_dir(),
rpc_port: self.rpc_port,
fallback: self.fallback.clone(),
load_external_fallback: self.load_external_fallback,
with_ws: self.with_ws,
with_http: self.with_http,
}
}
fn get_cached_checkpoint(&self) -> Option<Vec<u8>> {
let data_dir = self.get_data_dir();
let checkpoint_file = data_dir.join("checkpoint");
if checkpoint_file.exists() {
let checkpoint_res = fs::read(checkpoint_file);
match checkpoint_res {
Ok(checkpoint) => Some(checkpoint),
Err(_) => None,
}
} else {
None
}
}
fn get_data_dir(&self) -> PathBuf {
if let Some(dir) = &self.data_dir {
PathBuf::from_str(dir).expect("cannot find data dir")
} else {
home_dir()
.unwrap()
.join(format!(".helios/data/{}", self.network))
}
}
}

View File

@ -1,142 +1,19 @@
use std::{
fs,
path::PathBuf,
process::exit,
str::FromStr,
sync::{Arc, Mutex},
};
use clap::Parser;
use common::utils::hex_str_to_bytes;
use dirs::home_dir;
use env_logger::Env; use env_logger::Env;
use eyre::Result; use eyre::Result;
use client::{database::FileDB, Client, ClientBuilder}; use client::{Client, ClientBuilder};
use config::{CliConfig, Config};
use futures::executor::block_on; mod cli;
use log::info;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let config = get_config(); let config = cli::Cli::to_config();
let mut client = ClientBuilder::new().config(config).build()?; let mut client = ClientBuilder::new().config(config).build()?;
client.start().await?; client.start().await?;
register_shutdown_handler(client); Client::register_shutdown_handler(client);
std::future::pending().await std::future::pending().await
} }
fn register_shutdown_handler(client: Client<FileDB>) {
let client = Arc::new(client);
let shutdown_counter = Arc::new(Mutex::new(0));
ctrlc::set_handler(move || {
let mut counter = shutdown_counter.lock().unwrap();
*counter += 1;
let counter_value = *counter;
if counter_value == 3 {
info!("forced shutdown");
exit(0);
}
info!(
"shutting down... press ctrl-c {} more times to force quit",
3 - counter_value
);
if counter_value == 1 {
let client = client.clone();
std::thread::spawn(move || {
block_on(client.shutdown());
exit(0);
});
}
})
.expect("could not register shutdown handler");
}
fn get_config() -> Config {
let cli = Cli::parse();
let config_path = home_dir().unwrap().join(".helios/helios.toml");
let cli_config = cli.as_cli_config();
Config::from_file(&config_path, &cli.network, &cli_config)
}
#[derive(Parser)]
struct Cli {
#[clap(short, long, default_value = "mainnet")]
network: String,
#[clap(short = 'p', long, env)]
rpc_port: Option<u16>,
#[clap(short = 'w', long, env)]
checkpoint: Option<String>,
#[clap(short, long, env)]
execution_rpc: Option<String>,
#[clap(short, long, env)]
consensus_rpc: Option<String>,
#[clap(short, long, env)]
data_dir: Option<String>,
#[clap(short = 'f', long, env)]
fallback: Option<String>,
#[clap(short = 'l', long, env)]
load_external_fallback: bool,
#[clap(short = 's', long, env)]
with_ws: bool,
#[clap(short = 'h', long, env)]
with_http: bool,
}
impl Cli {
fn as_cli_config(&self) -> CliConfig {
let checkpoint = match &self.checkpoint {
Some(checkpoint) => Some(hex_str_to_bytes(checkpoint).expect("invalid checkpoint")),
None => self.get_cached_checkpoint(),
};
CliConfig {
checkpoint,
execution_rpc: self.execution_rpc.clone(),
consensus_rpc: self.consensus_rpc.clone(),
data_dir: self.get_data_dir(),
rpc_port: self.rpc_port,
fallback: self.fallback.clone(),
load_external_fallback: self.load_external_fallback,
with_ws: self.with_ws,
with_http: self.with_http,
}
}
fn get_cached_checkpoint(&self) -> Option<Vec<u8>> {
let data_dir = self.get_data_dir();
let checkpoint_file = data_dir.join("checkpoint");
if checkpoint_file.exists() {
let checkpoint_res = fs::read(checkpoint_file);
match checkpoint_res {
Ok(checkpoint) => Some(checkpoint),
Err(_) => None,
}
} else {
None
}
}
fn get_data_dir(&self) -> PathBuf {
if let Some(dir) = &self.data_dir {
PathBuf::from_str(dir).expect("cannot find data dir")
} else {
home_dir()
.unwrap()
.join(format!(".helios/data/{}", self.network))
}
}
}

View File

@ -9,11 +9,12 @@ eyre = "0.6.8"
serde = { version = "1.0.143", features = ["derive"] } serde = { version = "1.0.143", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" } ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
ethers = "1.0.0" ethers = { version = "1.0.2", features = [ "ws", "default" ] }
jsonrpsee = { version = "0.15.1", features = ["full"] } jsonrpsee = { version = "0.15.1", features = ["full"] }
futures = "0.3.23" futures = "0.3.23"
log = "0.4.17" log = "0.4.17"
thiserror = "1.0.37" thiserror = "1.0.37"
ctrlc = "3.2.3"
common = { path = "../common" } common = { path = "../common" }
consensus = { path = "../consensus" } consensus = { path = "../consensus" }

View File

@ -1,7 +1,7 @@
use std::path::PathBuf; use std::path::PathBuf;
use config::{Network, Config}; use config::{Config, Network};
use execution::rpc::ExecutionRpc; use execution::rpc::WsRpc;
use crate::{database::FileDB, Client}; use crate::{database::FileDB, Client};
@ -100,7 +100,7 @@ impl ClientBuilder {
self self
} }
pub fn build(self) -> eyre::Result<Client<FileDB, _>> { fn build_base_config(&self) -> eyre::Result<Config> {
let base_config = if let Some(network) = self.network { let base_config = if let Some(network) = self.network {
network.to_base_config() network.to_base_config()
} else { } else {
@ -111,7 +111,7 @@ impl ClientBuilder {
config.to_base_config() config.to_base_config()
}; };
let consensus_rpc = self.consensus_rpc.unwrap_or_else(|| { let consensus_rpc = self.consensus_rpc.clone().unwrap_or_else(|| {
self.config self.config
.as_ref() .as_ref()
.expect("missing consensus rpc") .expect("missing consensus rpc")
@ -119,7 +119,7 @@ impl ClientBuilder {
.clone() .clone()
}); });
let execution_rpc = self.execution_rpc.unwrap_or_else(|| { let execution_rpc = self.execution_rpc.clone().unwrap_or_else(|| {
self.config self.config
.as_ref() .as_ref()
.expect("missing execution rpc") .expect("missing execution rpc")
@ -127,8 +127,8 @@ impl ClientBuilder {
.clone() .clone()
}); });
let checkpoint = if let Some(checkpoint) = self.checkpoint { let checkpoint = if let Some(checkpoint) = &self.checkpoint {
checkpoint checkpoint.clone()
} else if let Some(config) = &self.config { } else if let Some(config) = &self.config {
config.checkpoint.clone() config.checkpoint.clone()
} else { } else {
@ -144,7 +144,7 @@ impl ClientBuilder {
}; };
let data_dir = if self.data_dir.is_some() { let data_dir = if self.data_dir.is_some() {
self.data_dir self.data_dir.clone()
} else if let Some(config) = &self.config { } else if let Some(config) = &self.config {
config.data_dir.clone() config.data_dir.clone()
} else { } else {
@ -152,7 +152,7 @@ impl ClientBuilder {
}; };
let fallback = if self.fallback.is_some() { let fallback = if self.fallback.is_some() {
self.fallback self.fallback.clone()
} else if let Some(config) = &self.config { } else if let Some(config) = &self.config {
config.fallback.clone() config.fallback.clone()
} else { } else {
@ -177,7 +177,7 @@ impl ClientBuilder {
self.with_http self.with_http
}; };
let config = Config { Ok(Config {
consensus_rpc, consensus_rpc,
execution_rpc, execution_rpc,
checkpoint, checkpoint,
@ -190,8 +190,13 @@ impl ClientBuilder {
load_external_fallback, load_external_fallback,
with_ws, with_ws,
with_http, with_http,
}; })
}
}
impl ClientBuilder {
pub fn build(self) -> eyre::Result<Client<FileDB, WsRpc>> {
let config = self.build_base_config()?;
Client::new(config) Client::new(config)
} }
} }

View File

@ -1,4 +1,5 @@
use std::sync::Arc; use std::process::exit;
use std::sync::{Arc, Mutex};
use config::networks::Network; use config::networks::Network;
use ethers::prelude::{Address, U256}; use ethers::prelude::{Address, U256};
@ -7,8 +8,9 @@ use ethers::types::{Filter, Log, Transaction, TransactionReceipt, H256};
use common::types::BlockTag; use common::types::BlockTag;
use config::{CheckpointFallback, Config}; use config::{CheckpointFallback, Config};
use consensus::{types::Header, ConsensusClient}; use consensus::{types::Header, ConsensusClient};
use execution::rpc::ExecutionRpc; use execution::rpc::{ExecutionRpc, WsRpc};
use execution::types::{CallOpts, ExecutionBlock}; use execution::types::{CallOpts, ExecutionBlock};
use futures::executor::block_on;
use log::{info, warn}; use log::{info, warn};
use tokio::spawn; use tokio::spawn;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -28,9 +30,10 @@ pub struct Client<DB: Database, R: ExecutionRpc> {
http: bool, http: bool,
} }
impl<R> Client<FileDB, R> where R: ExecutionRpc { impl Client<FileDB, WsRpc> {
pub fn new(config: Config) -> eyre::Result<Self> { pub fn new(config: Config) -> eyre::Result<Self> {
let config = Arc::new(config); let config = Arc::new(config);
let node = Node::new(config.clone())?; let node = Node::new(config.clone())?;
let node = Arc::new(RwLock::new(node)); let node = Arc::new(RwLock::new(node));
@ -53,11 +56,48 @@ impl<R> Client<FileDB, R> where R: ExecutionRpc {
} }
} }
impl Client<FileDB, WsRpc> {
pub fn register_shutdown_handler(client: Client<FileDB, WsRpc>) {
let client = Arc::new(client);
let shutdown_counter = Arc::new(Mutex::new(0));
ctrlc::set_handler(move || {
let mut counter = shutdown_counter.lock().unwrap();
*counter += 1;
let counter_value = *counter;
if counter_value == 3 {
info!("forced shutdown");
exit(0);
}
info!(
"shutting down... press ctrl-c {} more times to force quit",
3 - counter_value
);
if counter_value == 1 {
let client = client.clone();
std::thread::spawn(move || {
block_on(client.shutdown());
exit(0);
});
}
})
.expect("could not register shutdown handler");
}
}
impl<DB: Database, R: ExecutionRpc> Client<DB, R> { impl<DB: Database, R: ExecutionRpc> Client<DB, R> {
pub async fn start(&mut self) -> eyre::Result<()> { pub async fn start(&mut self) -> eyre::Result<()> {
if let Some(rpc) = &mut self.rpc { if let Some(rpc) = &mut self.rpc {
if self.ws { rpc.start_ws().await?; } if self.ws {
if self.http { rpc.start_http().await?; } rpc.start_ws().await?;
}
if self.http {
rpc.start_http().await?;
}
} }
if self.node.write().await.sync().await.is_err() { if self.node.write().await.sync().await.is_err() {
@ -222,7 +262,10 @@ impl<DB: Database, R: ExecutionRpc> Client<DB, R> {
.await .await
} }
pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> eyre::Result<Option<Transaction>> { pub async fn get_transaction_by_hash(
&self,
tx_hash: &H256,
) -> eyre::Result<Option<Transaction>> {
self.node self.node
.read() .read()
.await .await

View File

@ -1,4 +1,3 @@
/// Re-export builder logic /// Re-export builder logic
mod builder; mod builder;
pub use crate::builder::*; pub use crate::builder::*;

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use ethers::prelude::{Address, U256}; use ethers::prelude::{Address, U256};
use ethers::types::{Filter, Log, Transaction, TransactionReceipt, H256}; use ethers::types::{Filter, Log, Transaction, TransactionReceipt, H256};
use execution::rpc::ExecutionRpc; use execution::rpc::{ExecutionRpc, WsRpc};
use eyre::{eyre, Result}; use eyre::{eyre, Result};
use common::errors::BlockNotFoundError; use common::errors::BlockNotFoundError;
@ -20,7 +20,7 @@ use execution::ExecutionClient;
use crate::errors::NodeError; use crate::errors::NodeError;
pub struct Node<R> where R: ExecutionRpc, { pub struct Node<R: ExecutionRpc> {
pub consensus: ConsensusClient<NimbusRpc>, pub consensus: ConsensusClient<NimbusRpc>,
pub execution: Arc<ExecutionClient<R>>, pub execution: Arc<ExecutionClient<R>>,
pub config: Arc<Config>, pub config: Arc<Config>,
@ -29,7 +29,7 @@ pub struct Node<R> where R: ExecutionRpc, {
pub history_size: usize, pub history_size: usize,
} }
impl<R> Node<R> where R: ExecutionRpc { impl Node<WsRpc> {
pub fn new(config: Arc<Config>) -> Result<Self, NodeError> { pub fn new(config: Arc<Config>) -> Result<Self, NodeError> {
let consensus_rpc = &config.consensus_rpc; let consensus_rpc = &config.consensus_rpc;
let checkpoint_hash = &config.checkpoint; let checkpoint_hash = &config.checkpoint;
@ -37,9 +37,18 @@ impl<R> Node<R> where R: ExecutionRpc {
let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()) let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone())
.map_err(NodeError::ConsensusClientCreationError)?; .map_err(NodeError::ConsensusClientCreationError)?;
let execution = Arc::new(
ExecutionClient::new(execution_rpc).map_err(NodeError::ExecutionClientCreationError)?, let execution = if config.with_ws {
); Arc::new(
ExecutionClient::new_with_ws(execution_rpc)
.map_err(NodeError::ExecutionClientCreationError)?,
)
} else {
Arc::new(
ExecutionClient::new(execution_rpc)
.map_err(NodeError::ExecutionClientCreationError)?,
)
};
let payloads = BTreeMap::new(); let payloads = BTreeMap::new();
let finalized_payloads = BTreeMap::new(); let finalized_payloads = BTreeMap::new();
@ -53,7 +62,12 @@ impl<R> Node<R> where R: ExecutionRpc {
history_size: 64, history_size: 64,
}) })
} }
}
impl<R> Node<R>
where
R: ExecutionRpc,
{
pub async fn sync(&mut self) -> Result<(), NodeError> { pub async fn sync(&mut self) -> Result<(), NodeError> {
self.consensus self.consensus
.sync() .sync()

View File

@ -16,9 +16,12 @@ use common::{
types::BlockTag, types::BlockTag,
utils::{hex_str_to_bytes, u64_to_hex_string}, utils::{hex_str_to_bytes, u64_to_hex_string},
}; };
use execution::{types::{CallOpts, ExecutionBlock}, rpc::ExecutionRpc}; use execution::{
rpc::ExecutionRpc,
types::{CallOpts, ExecutionBlock},
};
pub struct Rpc<R> where R: ExecutionRpc { pub struct Rpc<R: ExecutionRpc> {
node: Arc<RwLock<Node<R>>>, node: Arc<RwLock<Node<R>>>,
http_handle: Option<HttpServerHandle>, http_handle: Option<HttpServerHandle>,
ws_handle: Option<WsServerHandle>, ws_handle: Option<WsServerHandle>,
@ -27,7 +30,7 @@ pub struct Rpc<R> where R: ExecutionRpc {
port: u16, port: u16,
} }
impl<R> Rpc<R> where R: ExecutionRpc { impl<R: ExecutionRpc> Rpc<R> {
pub fn new(node: Arc<RwLock<Node<R>>>, with_http: bool, with_ws: bool, port: u16) -> Self { pub fn new(node: Arc<RwLock<Node<R>>>, with_http: bool, with_ws: bool, port: u16) -> Self {
Rpc { Rpc {
node, node,
@ -112,13 +115,13 @@ trait NetRpc {
} }
#[derive(Clone)] #[derive(Clone)]
struct RpcInner<R> where R: ExecutionRpc { struct RpcInner<R: ExecutionRpc> {
node: Arc<RwLock<Node<R>>>, node: Arc<RwLock<Node<R>>>,
http_port: u16, http_port: u16,
ws_port: u16, ws_port: u16,
} }
impl<R> From<&Rpc<R>> for RpcInner<R> where R: ExecutionRpc { impl<R: ExecutionRpc> From<&Rpc<R>> for RpcInner<R> {
fn from(rpc: &Rpc<R>) -> Self { fn from(rpc: &Rpc<R>) -> Self {
RpcInner { RpcInner {
node: Arc::clone(&rpc.node), node: Arc::clone(&rpc.node),
@ -128,7 +131,7 @@ impl<R> From<&Rpc<R>> for RpcInner<R> where R: ExecutionRpc {
} }
} }
impl<R> RpcInner<R> where R: ExecutionRpc { impl<R: ExecutionRpc> RpcInner<R> {
pub async fn start_http(&self) -> Result<(HttpServerHandle, SocketAddr)> { pub async fn start_http(&self) -> Result<(HttpServerHandle, SocketAddr)> {
let addr = format!("127.0.0.1:{}", self.http_port); let addr = format!("127.0.0.1:{}", self.http_port);
let server = HttpServerBuilder::default().build(addr).await?; let server = HttpServerBuilder::default().build(addr).await?;
@ -167,7 +170,7 @@ impl<R> RpcInner<R> where R: ExecutionRpc {
} }
#[async_trait] #[async_trait]
impl<R> EthRpcServer for RpcInner<R> where R: ExecutionRpc { impl<R: ExecutionRpc> EthRpcServer for RpcInner<R> {
async fn get_balance(&self, address: &str, block: BlockTag) -> Result<String, Error> { async fn get_balance(&self, address: &str, block: BlockTag) -> Result<String, Error> {
let address = convert_err(Address::from_str(address))?; let address = convert_err(Address::from_str(address))?;
let node = self.node.read().await; let node = self.node.read().await;
@ -181,7 +184,7 @@ impl<R> EthRpcServer for RpcInner<R> where R: ExecutionRpc {
let node = self.node.read().await; let node = self.node.read().await;
let nonce = convert_err(node.get_nonce(&address, block).await)?; let nonce = convert_err(node.get_nonce(&address, block).await)?;
Ok(format!("0x{:x}", nonce)) Ok(format!("0x{nonce:x}"))
} }
async fn get_code(&self, address: &str, block: BlockTag) -> Result<String, Error> { async fn get_code(&self, address: &str, block: BlockTag) -> Result<String, Error> {
@ -288,7 +291,7 @@ impl<R> EthRpcServer for RpcInner<R> where R: ExecutionRpc {
} }
#[async_trait] #[async_trait]
impl<R> NetRpcServer for RpcInner<R> where R: ExecutionRpc { impl<R: ExecutionRpc> NetRpcServer for RpcInner<R> {
async fn version(&self) -> Result<String, Error> { async fn version(&self) -> Result<String, Error> {
let node = self.node.read().await; let node = self.node.read().await;
Ok(node.chain_id().to_string()) Ok(node.chain_id().to_string())

View File

@ -8,5 +8,5 @@ eyre = "0.6.8"
serde = { version = "1.0.143", features = ["derive"] } serde = { version = "1.0.143", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" } ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
ethers = "1.0.0" ethers = { version = "1.0.2", features = [ "ws", "default" ] }
thiserror = "1.0.37" thiserror = "1.0.37"

View File

@ -20,7 +20,7 @@ impl Display for BlockTag {
Self::Number(num) => num.to_string(), Self::Number(num) => num.to_string(),
}; };
write!(f, "{}", formatted) write!(f, "{formatted}")
} }
} }

View File

@ -14,7 +14,7 @@ pub fn format_hex(num: &U256) -> String {
.unwrap() .unwrap()
.trim_start_matches('0') .trim_start_matches('0')
.to_string(); .to_string();
format!("0x{}", stripped) format!("0x{stripped}")
} }
pub fn hex_str_to_bytes(s: &str) -> Result<Vec<u8>> { pub fn hex_str_to_bytes(s: &str) -> Result<Vec<u8>> {
@ -35,5 +35,5 @@ pub fn address_to_hex_string(address: &Address) -> String {
} }
pub fn u64_to_hex_string(val: u64) -> String { pub fn u64_to_hex_string(val: u64) -> String {
format!("0x{:x}", val) format!("0x{val:x}")
} }

View File

@ -10,7 +10,7 @@ eyre = "0.6.8"
serde = { version = "1.0.143", features = ["derive"] } serde = { version = "1.0.143", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" } ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
ethers = "1.0.0" ethers = { version = "1.0.2", features = [ "ws", "default" ] }
figment = { version = "0.10.7", features = ["toml", "env"] } figment = { version = "0.10.7", features = ["toml", "env"] }
thiserror = "1.0.37" thiserror = "1.0.37"
log = "0.4.17" log = "0.4.17"

View File

@ -101,7 +101,7 @@ impl CheckpointFallback {
let service_list = list let service_list = list
.get(network.to_string().to_lowercase()) .get(network.to_string().to_lowercase())
.ok_or_else(|| { .ok_or_else(|| {
eyre::eyre!(format!("missing {} fallback checkpoint services", network)) eyre::eyre!(format!("missing {network} fallback checkpoint services"))
})?; })?;
let parsed: Vec<CheckpointFallbackService> = let parsed: Vec<CheckpointFallbackService> =
serde_yaml::from_value(service_list.clone())?; serde_yaml::from_value(service_list.clone())?;
@ -202,7 +202,7 @@ impl CheckpointFallback {
/// assert_eq!("https://sync-mainnet.beaconcha.in/checkpointz/v1/beacon/slots", url); /// assert_eq!("https://sync-mainnet.beaconcha.in/checkpointz/v1/beacon/slots", url);
/// ``` /// ```
pub fn construct_url(endpoint: &str) -> String { pub fn construct_url(endpoint: &str) -> String {
format!("{}/checkpointz/v1/beacon/slots", endpoint) format!("{endpoint}/checkpointz/v1/beacon/slots")
} }
/// Returns a list of all checkpoint fallback endpoints. /// Returns a list of all checkpoint fallback endpoints.

View File

@ -57,20 +57,14 @@ impl Config {
figment::error::Kind::MissingField(field) => { figment::error::Kind::MissingField(field) => {
let field = field.replace('_', "-"); let field = field.replace('_', "-");
println!( println!("\x1b[91merror\x1b[0m: missing configuration field: {field}");
"\x1b[91merror\x1b[0m: missing configuration field: {}",
field
);
println!( println!("\n\ttry supplying the propoper command line argument: --{field}");
"\n\ttry supplying the propoper command line argument: --{}",
field
);
println!("\talternatively, you can add the field to your helios.toml file or as an environment variable"); println!("\talternatively, you can add the field to your helios.toml file or as an environment variable");
println!("\nfor more information, check the github README"); println!("\nfor more information, check the github README");
} }
_ => println!("cannot parse configuration: {}", err), _ => println!("cannot parse configuration: {err}"),
} }
exit(1); exit(1);
} }

View File

@ -50,9 +50,9 @@ async fn test_get_all_fallback_endpoints() {
.await .await
.unwrap(); .unwrap();
let urls = cf.get_all_fallback_endpoints(&networks::Network::MAINNET); let urls = cf.get_all_fallback_endpoints(&networks::Network::MAINNET);
assert!(urls.len() > 0); assert!(!urls.is_empty());
let urls = cf.get_all_fallback_endpoints(&networks::Network::GOERLI); let urls = cf.get_all_fallback_endpoints(&networks::Network::GOERLI);
assert!(urls.len() > 0); assert!(!urls.is_empty());
} }
#[tokio::test] #[tokio::test]
@ -62,7 +62,7 @@ async fn test_get_healthy_fallback_endpoints() {
.await .await
.unwrap(); .unwrap();
let urls = cf.get_healthy_fallback_endpoints(&networks::Network::MAINNET); let urls = cf.get_healthy_fallback_endpoints(&networks::Network::MAINNET);
assert!(urls.len() > 0); assert!(!urls.is_empty());
let urls = cf.get_healthy_fallback_endpoints(&networks::Network::GOERLI); let urls = cf.get_healthy_fallback_endpoints(&networks::Network::GOERLI);
assert!(urls.len() > 0); assert!(!urls.is_empty());
} }

View File

@ -11,7 +11,7 @@ serde_json = "1.0.85"
hex = "0.4.3" hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" } ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
blst = "0.3.10" blst = "0.3.10"
ethers = "1.0.0" ethers = { version = "1.0.2", features = [ "ws", "default" ] }
bytes = "1.2.1" bytes = "1.2.1"
toml = "0.5.9" toml = "0.5.9"
async-trait = "0.1.57" async-trait = "0.1.57"

View File

@ -661,7 +661,7 @@ mod tests {
let mut update = updates[0].clone(); let mut update = updates[0].clone();
update.finalized_header = Header::default(); update.finalized_header = Header::default();
let err = client.verify_update(&mut update).err().unwrap(); let err = client.verify_update(&update).err().unwrap();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
ConsensusError::InvalidFinalityProof.to_string() ConsensusError::InvalidFinalityProof.to_string()
@ -681,7 +681,7 @@ mod tests {
let mut update = updates[0].clone(); let mut update = updates[0].clone();
update.sync_aggregate.sync_committee_signature = Vector::default(); update.sync_aggregate.sync_committee_signature = Vector::default();
let err = client.verify_update(&mut update).err().unwrap(); let err = client.verify_update(&update).err().unwrap();
assert_eq!( assert_eq!(
err.to_string(), err.to_string(),
ConsensusError::InvalidSignature.to_string() ConsensusError::InvalidSignature.to_string()

View File

@ -19,24 +19,24 @@ async fn main() -> Result<()> {
.fetch_latest_checkpoint(&networks::Network::GOERLI) .fetch_latest_checkpoint(&networks::Network::GOERLI)
.await .await
.unwrap(); .unwrap();
println!("Fetched latest goerli checkpoint: {}", goerli_checkpoint); println!("Fetched latest goerli checkpoint: {goerli_checkpoint}");
// Fetch the latest mainnet checkpoint // Fetch the latest mainnet checkpoint
let mainnet_checkpoint = cf let mainnet_checkpoint = cf
.fetch_latest_checkpoint(&networks::Network::MAINNET) .fetch_latest_checkpoint(&networks::Network::MAINNET)
.await .await
.unwrap(); .unwrap();
println!("Fetched latest mainnet checkpoint: {}", mainnet_checkpoint); println!("Fetched latest mainnet checkpoint: {mainnet_checkpoint}");
// Let's get a list of all the fallback service endpoints for mainnet // Let's get a list of all the fallback service endpoints for mainnet
let endpoints = cf.get_all_fallback_endpoints(&networks::Network::MAINNET); let endpoints = cf.get_all_fallback_endpoints(&networks::Network::MAINNET);
println!("Fetched all mainnet fallback endpoints: {:?}", endpoints); println!("Fetched all mainnet fallback endpoints: {endpoints:?}");
// Since we built the checkpoint fallback services, we can also just get the raw checkpoint fallback services. // Since we built the checkpoint fallback services, we can also just get the raw checkpoint fallback services.
// The `get_fallback_services` method returns a reference to the internal list of CheckpointFallbackService objects // The `get_fallback_services` method returns a reference to the internal list of CheckpointFallbackService objects
// for the given network. // for the given network.
let services = cf.get_fallback_services(&networks::Network::MAINNET); let services = cf.get_fallback_services(&networks::Network::MAINNET);
println!("Fetched all mainnet fallback services: {:?}", services); println!("Fetched all mainnet fallback services: {services:?}");
Ok(()) Ok(())
} }

View File

@ -8,7 +8,7 @@ async fn main() -> Result<()> {
// Load the config from the global config file // Load the config from the global config file
let config_path = home::home_dir().unwrap().join(".helios/helios.toml"); let config_path = home::home_dir().unwrap().join(".helios/helios.toml");
let config = Config::from_file(&config_path, "mainnet", &CliConfig::default()); let config = Config::from_file(&config_path, "mainnet", &CliConfig::default());
println!("Constructed config: {:#?}", config); println!("Constructed config: {config:#?}");
Ok(()) Ok(())
} }

View File

@ -11,7 +11,7 @@ serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.85" serde_json = "1.0.85"
hex = "0.4.3" hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" } ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
ethers = "1.0.0" ethers = { version = "1.0.2", features = [ "ws", "default" ] }
revm = "2.1.0" revm = "2.1.0"
bytes = "1.2.1" bytes = "1.2.1"
futures = "0.3.23" futures = "0.3.23"

View File

@ -15,6 +15,7 @@ use revm::KECCAK_EMPTY;
use triehash_ethereum::ordered_trie_root; use triehash_ethereum::ordered_trie_root;
use crate::errors::ExecutionError; use crate::errors::ExecutionError;
use crate::rpc::WsRpc;
use crate::types::Transactions; use crate::types::Transactions;
use super::proof::{encode_account, verify_proof}; use super::proof::{encode_account, verify_proof};
@ -30,6 +31,13 @@ pub struct ExecutionClient<R: ExecutionRpc> {
pub rpc: R, pub rpc: R,
} }
impl ExecutionClient<WsRpc> {
pub fn new_with_ws(rpc: &str) -> Result<Self> {
let rpc = WsRpc::new(rpc)?;
Ok(Self { rpc })
}
}
impl<R: ExecutionRpc> ExecutionClient<R> { impl<R: ExecutionRpc> ExecutionClient<R> {
pub fn new(rpc: &str) -> Result<Self> { pub fn new(rpc: &str) -> Result<Self> {
let rpc = ExecutionRpc::new(rpc)?; let rpc = ExecutionRpc::new(rpc)?;

View File

@ -90,7 +90,7 @@ fn get_rest_path(p: &Vec<u8>, s: usize) -> String {
let mut ret = String::new(); let mut ret = String::new();
for i in s..p.len() * 2 { for i in s..p.len() * 2 {
let n = get_nibble(p, i); let n = get_nibble(p, i);
ret += &format!("{:01x}", n); ret += &format!("{n:01x}");
} }
ret ret
} }

View File

@ -8,6 +8,9 @@ use eyre::Result;
use crate::types::CallOpts; use crate::types::CallOpts;
pub mod http_rpc; pub mod http_rpc;
pub use http_rpc::*;
pub mod ws_rpc;
pub use ws_rpc::*;
pub mod mock_rpc; pub mod mock_rpc;
#[async_trait] #[async_trait]

View File

@ -1,14 +1,8 @@
use std::str::FromStr;
use async_trait::async_trait; use async_trait::async_trait;
use common::errors::RpcError; use common::errors::RpcError;
use ethers::prelude::{Address, Ws}; use ethers::{
use ethers::providers::{HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient}; prelude::*,
use ethers::types::transaction::eip2718::TypedTransaction; types::transaction::{eip2718::TypedTransaction, eip2930::AccessList},
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{
BlockId, Bytes, EIP1186ProofResponse, Eip1559TransactionRequest, Filter, Log, Transaction,
TransactionReceipt, H256, U256,
}; };
use eyre::Result; use eyre::Result;
@ -30,9 +24,9 @@ impl Clone for WsRpc {
#[async_trait] #[async_trait]
impl ExecutionRpc for WsRpc { impl ExecutionRpc for WsRpc {
fn new(rpc: &str) -> Result<Self> { fn new(rpc: &str) -> Result<Self> {
Ok(WsRpc { Ok(Self {
url: rpc.to_string(), url: rpc.to_string(),
provider: None, provider: None::<Provider<Ws>>,
}) })
} }
@ -51,6 +45,11 @@ impl ExecutionRpc for WsRpc {
let block = Some(BlockId::from(block)); let block = Some(BlockId::from(block));
let proof_response = self let proof_response = self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"get_proof",
eyre::eyre!("Provider not connected!"),
))?
.get_proof(*address, slots.to_vec(), block) .get_proof(*address, slots.to_vec(), block)
.await .await
.map_err(|e| RpcError::new("get_proof", e))?; .map_err(|e| RpcError::new("get_proof", e))?;
@ -76,6 +75,11 @@ impl ExecutionRpc for WsRpc {
let tx = TypedTransaction::Eip1559(raw_tx); let tx = TypedTransaction::Eip1559(raw_tx);
let list = self let list = self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"create_access_list",
eyre::eyre!("Provider not connected!"),
))?
.create_access_list(&tx, block) .create_access_list(&tx, block)
.await .await
.map_err(|e| RpcError::new("create_access_list", e))?; .map_err(|e| RpcError::new("create_access_list", e))?;
@ -87,6 +91,11 @@ impl ExecutionRpc for WsRpc {
let block = Some(BlockId::from(block)); let block = Some(BlockId::from(block));
let code = self let code = self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"get_code",
eyre::eyre!("Provider not connected!"),
))?
.get_code(*address, block) .get_code(*address, block)
.await .await
.map_err(|e| RpcError::new("get_code", e))?; .map_err(|e| RpcError::new("get_code", e))?;
@ -98,6 +107,11 @@ impl ExecutionRpc for WsRpc {
let bytes = Bytes::from(bytes.to_owned()); let bytes = Bytes::from(bytes.to_owned());
let tx = self let tx = self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"send_raw_transaction",
eyre::eyre!("Provider not connected!"),
))?
.send_raw_transaction(bytes) .send_raw_transaction(bytes)
.await .await
.map_err(|e| RpcError::new("send_raw_transaction", e))?; .map_err(|e| RpcError::new("send_raw_transaction", e))?;
@ -108,6 +122,11 @@ impl ExecutionRpc for WsRpc {
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>> { async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>> {
let receipt = self let receipt = self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"get_transaction_receipt",
eyre::eyre!("Provider not connected!"),
))?
.get_transaction_receipt(*tx_hash) .get_transaction_receipt(*tx_hash)
.await .await
.map_err(|e| RpcError::new("get_transaction_receipt", e))?; .map_err(|e| RpcError::new("get_transaction_receipt", e))?;
@ -118,6 +137,11 @@ impl ExecutionRpc for WsRpc {
async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> { async fn get_transaction(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
Ok(self Ok(self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"get_transaction",
eyre::eyre!("Provider not connected!"),
))?
.get_transaction(*tx_hash) .get_transaction(*tx_hash)
.await .await
.map_err(|e| RpcError::new("get_transaction", e))?) .map_err(|e| RpcError::new("get_transaction", e))?)
@ -126,6 +150,11 @@ impl ExecutionRpc for WsRpc {
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> { async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>> {
Ok(self Ok(self
.provider .provider
.as_ref()
.ok_or(RpcError::new(
"get_logs",
eyre::eyre!("Provider not connected!"),
))?
.get_logs(filter) .get_logs(filter)
.await .await
.map_err(|e| RpcError::new("get_logs", e))?) .map_err(|e| RpcError::new("get_logs", e))?)

View File

@ -18,11 +18,13 @@ async fn test_get_account() {
let execution = get_client(); let execution = get_client();
let address = Address::from_str("14f9D4aF749609c1438528C0Cce1cC3f6D411c47").unwrap(); let address = Address::from_str("14f9D4aF749609c1438528C0Cce1cC3f6D411c47").unwrap();
let mut payload = ExecutionPayload::default(); let payload = ExecutionPayload {
payload.state_root = Vector::from_iter( state_root: Vector::from_iter(
hex_str_to_bytes("0xaa02f5db2ee75e3da400d10f3c30e894b6016ce8a2501680380a907b6674ce0d") hex_str_to_bytes("0xaa02f5db2ee75e3da400d10f3c30e894b6016ce8a2501680380a907b6674ce0d")
.unwrap(), .unwrap(),
); ),
..Default::default()
};
let account = execution let account = execution
.get_account(&address, None, &payload) .get_account(&address, None, &payload)
@ -102,11 +104,13 @@ async fn test_get_tx_not_included() {
#[tokio::test] #[tokio::test]
async fn test_get_logs() { async fn test_get_logs() {
let execution = get_client(); let execution = get_client();
let mut payload = ExecutionPayload::default(); let mut payload = ExecutionPayload {
payload.receipts_root = Vector::from_iter( receipts_root: Vector::from_iter(
hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f") hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f")
.unwrap(), .unwrap(),
); ),
..ExecutionPayload::default()
};
payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap())); payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap()));
@ -130,11 +134,13 @@ async fn test_get_receipt() {
let tx_hash = let tx_hash =
H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap(); H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap();
let mut payload = ExecutionPayload::default(); let mut payload = ExecutionPayload {
payload.receipts_root = Vector::from_iter( receipts_root: Vector::from_iter(
hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f") hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f")
.unwrap(), .unwrap(),
); ),
..ExecutionPayload::default()
};
payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap())); payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap()));
@ -185,8 +191,10 @@ async fn test_get_receipt_not_included() {
#[tokio::test] #[tokio::test]
async fn test_get_block() { async fn test_get_block() {
let execution = get_client(); let execution = get_client();
let mut payload = ExecutionPayload::default(); let payload = ExecutionPayload {
payload.block_number = 12345; block_number: 12345,
..ExecutionPayload::default()
};
let block = execution.get_block(&payload, false).await.unwrap(); let block = execution.get_block(&payload, false).await.unwrap();