diff --git a/client/src/client.rs b/client/src/client.rs index aafecdb..f4314e6 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -108,7 +108,7 @@ impl ClientBuilder { self } - pub fn build(self) -> Result> { + pub async fn build(self) -> Result> { let base_config = if let Some(network) = self.network { network.to_base_config() } else { @@ -208,7 +208,7 @@ impl ClientBuilder { strict_checkpoint_age, }; - Client::new(config) + Client::new(config).await } } @@ -222,7 +222,7 @@ pub struct Client { } impl Client { - fn new(mut config: Config) -> Result { + async fn new(mut config: Config) -> Result { let db = DB::new(&config)?; if config.checkpoint.is_none() { let checkpoint = db.load_checkpoint()?; @@ -230,7 +230,7 @@ impl Client { } let config = Arc::new(config); - let node = Node::::new(config.clone())?; + let node = Node::::new(config.clone()).await?; let node = Arc::new(RwLock::new(node)); #[cfg(not(target_arch = "wasm32"))] @@ -344,7 +344,7 @@ impl Client { // We fail fast here since the node is unrecoverable at this point let config = self.node.read().await.config.clone(); let consensus = - ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; + ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?; self.node.write().await.consensus = consensus; self.node.write().await.sync().await?; @@ -385,7 +385,7 @@ impl Client { // We fail fast here since the node is unrecoverable at this point let config = self.node.read().await.config.clone(); let consensus = - ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; + ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?; self.node.write().await.consensus = consensus; self.node.write().await.sync().await?; Ok(()) diff --git a/client/src/node.rs b/client/src/node.rs index b89d21c..c818bce 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -33,12 +33,13 @@ pub struct Node { } impl Node { - pub fn new(config: Arc) -> Result { + pub async fn new(config: Arc) -> Result { let consensus_rpc = &config.consensus_rpc; let checkpoint_hash = &config.checkpoint.as_ref().unwrap(); let execution_rpc = &config.execution_rpc; let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()) + .await .map_err(NodeError::ConsensusClientCreationError)?; let execution = Arc::new( ExecutionClient::new(execution_rpc).map_err(NodeError::ExecutionClientCreationError)?, diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index d5b9949..4e178bd 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -54,12 +54,12 @@ struct LightClientStore { } impl ConsensusClient { - pub fn new( + pub async fn new( rpc: &str, checkpoint_block_root: &[u8], config: Arc, ) -> Result> { - let network_interface = N::new(rpc); + let network_interface = N::new(rpc).await; Ok(ConsensusClient { network_interface, diff --git a/consensus/src/p2p/discovery/mod.rs b/consensus/src/p2p/discovery/mod.rs index 732f7f2..318c8b7 100644 --- a/consensus/src/p2p/discovery/mod.rs +++ b/consensus/src/p2p/discovery/mod.rs @@ -14,14 +14,16 @@ use futures::{ stream::FuturesUnordered, StreamExt, }; -use std::future::Future; -use std::pin::Pin; -use std::net::SocketAddr; -use std::time::Instant; -use std::collections::HashMap; -use std::task::{Context, Poll}; +use std::{ + future::Future, + pin::Pin, + net::SocketAddr, + time::Instant, + collections::HashMap, + task::{Context, Poll}, + sync::{Arc, Mutex}, +}; use log::{debug, error}; -use std::sync::{Arc, Mutex}; use super::config::Config as ConsensusConfig; mod enr; @@ -42,6 +44,7 @@ enum EventStream { type DiscResult = Result, QueryError>; +#[derive(Debug)] pub enum DiscoveryError { Discv5Error(Discv5Error), UnexpectedError(String), @@ -79,12 +82,14 @@ impl Discovery { local_key: &Keypair, config: ConsensusConfig, ) -> Result { + // convert the keypair to an ENR key let enr_key = key_from_libp2p(local_key)?; let local_enr = build_enr(&enr_key, &config); let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port); let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config.discv5_config)?; + // Add bootnodes to routing table for boot_node_enr in config.boot_nodes_enr.clone() { debug!("Adding boot node: {:?}", boot_node_enr); let repr = boot_node_enr.to_string(); @@ -92,6 +97,8 @@ impl Discovery { error!("Failed to add boot node: {:?}, {:?}", repr, e); }); } + + // Start the discv5 service and obtain an event stream let event_stream = if !config.disable_discovery { discv5 .start(listen_socket) diff --git a/consensus/src/p2p/p2p_network_interface.rs b/consensus/src/p2p/p2p_network_interface.rs index 6ec8c17..afac81a 100644 --- a/consensus/src/p2p/p2p_network_interface.rs +++ b/consensus/src/p2p/p2p_network_interface.rs @@ -1,7 +1,11 @@ use eyre::Result; use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Update}; use async_trait::async_trait; -use libp2p::swarm::NetworkBehaviour; +use libp2p::{ + swarm::NetworkBehaviour, + identity::Keypair, +}; + use crate::p2p::discovery::Discovery; use crate::rpc::ConsensusNetworkInterface; @@ -13,8 +17,12 @@ pub struct P2pNetworkInterface { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for P2pNetworkInterface { - fn new(_path: &str) -> Self { - unimplemented!() + // TODO: Really implement this function + async fn new(_path: &str) -> Self { + let local_key = Keypair::generate_secp256k1(); + let config = super::config::Config::default(); + let discovery = Discovery::new(&local_key, config).await.unwrap(); + Self { discovery } } async fn get_bootstrap(&self, _block_root: &'_ [u8]) -> Result { diff --git a/consensus/src/rpc/mock_rpc.rs b/consensus/src/rpc/mock_rpc.rs index 782c35d..0db95d8 100644 --- a/consensus/src/rpc/mock_rpc.rs +++ b/consensus/src/rpc/mock_rpc.rs @@ -12,7 +12,7 @@ pub struct MockRpc { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for MockRpc { - fn new(path: &str) -> Self { + async fn new(path: &str) -> Self { MockRpc { testdata: PathBuf::from(path), } diff --git a/consensus/src/rpc/mod.rs b/consensus/src/rpc/mod.rs index bbba651..8ba7b65 100644 --- a/consensus/src/rpc/mod.rs +++ b/consensus/src/rpc/mod.rs @@ -10,7 +10,7 @@ use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Upd #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait ConsensusNetworkInterface: Sync + Send + 'static { - fn new(path: &str) -> Self; + async fn new(path: &str) -> Self; async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result; async fn get_updates(&self, period: u64, count: u8) -> Result>; async fn get_finality_update(&self) -> Result; diff --git a/consensus/src/rpc/nimbus_rpc.rs b/consensus/src/rpc/nimbus_rpc.rs index e6f6d15..1885f5e 100644 --- a/consensus/src/rpc/nimbus_rpc.rs +++ b/consensus/src/rpc/nimbus_rpc.rs @@ -15,7 +15,7 @@ pub struct NimbusRpc { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for NimbusRpc { - fn new(rpc: &str) -> Self { + async fn new(rpc: &str) -> Self { NimbusRpc { rpc: rpc.to_string(), } diff --git a/examples/basic.rs b/examples/basic.rs index f4d1c39..0991307 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -4,6 +4,7 @@ use env_logger::Env; use ethers::{types::Address, utils}; use eyre::Result; use helios::{config::networks::Network, prelude::*}; +use consensus::rpc::nimbus_rpc::NimbusRpc; #[tokio::main] async fn main() -> Result<()> { @@ -15,13 +16,14 @@ async fn main() -> Result<()> { let consensus_rpc = "https://www.lightclientdata.org"; log::info!("Using consensus RPC URL: {}", consensus_rpc); - let mut client: Client = ClientBuilder::new() + let mut client: Client = ClientBuilder::new() .network(Network::MAINNET) .consensus_rpc(consensus_rpc) .execution_rpc(untrusted_rpc_url) .load_external_fallback() .data_dir(PathBuf::from("/tmp/helios")) - .build()?; + .build() + .await?; log::info!( "Built client on network \"{}\" with external checkpoint fallbacks", diff --git a/examples/client.rs b/examples/client.rs index 3914408..aaf55dc 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use consensus::rpc::nimbus_rpc::NimbusRpc; use eyre::Result; use helios::prelude::*; @@ -35,7 +36,10 @@ async fn main() -> Result<()> { builder = builder.load_external_fallback(); // Build the client - let _client: Client = builder.build().unwrap(); + let _client: Client = builder + .build() + .await + .unwrap(); println!("Constructed client!"); Ok(()) diff --git a/examples/discovery.rs b/examples/discovery.rs new file mode 100644 index 0000000..27d79b6 --- /dev/null +++ b/examples/discovery.rs @@ -0,0 +1,48 @@ +use std::{path::PathBuf, str::FromStr}; + +use env_logger::Env; +use ethers::{types::Address, utils}; +use eyre::Result; +use helios::{config::networks::Network, prelude::*}; + +use consensus::p2p::P2pNetworkInterface; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + + let untrusted_rpc_url = "https://eth-mainnet.g.alchemy.com/v2/nObEU8Wh4FIT-X_UDFpK9oVGiTHzznML"; + log::info!("Using untrusted RPC URL [REDACTED]"); + + let consensus_rpc = "https://www.lightclientdata.org"; + log::info!("Using consensus RPC URL: {}", consensus_rpc); + + let mut client: Client = ClientBuilder::new() + .network(Network::MAINNET) + .consensus_rpc(consensus_rpc) + .execution_rpc(untrusted_rpc_url) + .load_external_fallback() + .data_dir(PathBuf::from("/tmp/helios")) + .build() + .await?; + + log::info!( + "Built client on network \"{}\" with external checkpoint fallbacks", + Network::MAINNET + ); + + client.start().await?; + + let head_block_num = client.get_block_number().await?; + let addr = Address::from_str("0x00000000219ab540356cBB839Cbe05303d7705Fa")?; + let block = BlockTag::Latest; + let balance = client.get_balance(&addr, block).await?; + + log::info!("synced up to block: {}", head_block_num); + log::info!( + "balance of deposit contract: {}", + utils::format_ether(balance) + ); + + Ok(()) +}