From 4a35ad2165b5bfbeade5f2c9ac76dbe2716eed8e Mon Sep 17 00:00:00 2001 From: geemo Date: Fri, 3 Mar 2023 02:58:07 -0600 Subject: [PATCH] add broken discovery example. Discv5 starts but everything is not quite set up. Change the ConsensusNetworkInterface trait function to async so it works with p2p plus for the rpc setup you can add a feature to retrieve the requisite paths from a cached file. --- client/src/client.rs | 12 +++--- client/src/node.rs | 3 +- consensus/src/consensus.rs | 4 +- consensus/src/p2p/discovery/mod.rs | 21 ++++++---- consensus/src/p2p/p2p_network_interface.rs | 14 +++++-- consensus/src/rpc/mock_rpc.rs | 2 +- consensus/src/rpc/mod.rs | 2 +- consensus/src/rpc/nimbus_rpc.rs | 2 +- examples/basic.rs | 6 ++- examples/client.rs | 6 ++- examples/discovery.rs | 48 ++++++++++++++++++++++ 11 files changed, 95 insertions(+), 25 deletions(-) create mode 100644 examples/discovery.rs diff --git a/client/src/client.rs b/client/src/client.rs index aafecdb..f4314e6 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -108,7 +108,7 @@ impl ClientBuilder { self } - pub fn build(self) -> Result> { + pub async fn build(self) -> Result> { let base_config = if let Some(network) = self.network { network.to_base_config() } else { @@ -208,7 +208,7 @@ impl ClientBuilder { strict_checkpoint_age, }; - Client::new(config) + Client::new(config).await } } @@ -222,7 +222,7 @@ pub struct Client { } impl Client { - fn new(mut config: Config) -> Result { + async fn new(mut config: Config) -> Result { let db = DB::new(&config)?; if config.checkpoint.is_none() { let checkpoint = db.load_checkpoint()?; @@ -230,7 +230,7 @@ impl Client { } let config = Arc::new(config); - let node = Node::::new(config.clone())?; + let node = Node::::new(config.clone()).await?; let node = Arc::new(RwLock::new(node)); #[cfg(not(target_arch = "wasm32"))] @@ -344,7 +344,7 @@ impl Client { // We fail fast here since the node is unrecoverable at this point let config = self.node.read().await.config.clone(); let consensus = - ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; + ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?; self.node.write().await.consensus = consensus; self.node.write().await.sync().await?; @@ -385,7 +385,7 @@ impl Client { // We fail fast here since the node is unrecoverable at this point let config = self.node.read().await.config.clone(); let consensus = - ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; + ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?; self.node.write().await.consensus = consensus; self.node.write().await.sync().await?; Ok(()) diff --git a/client/src/node.rs b/client/src/node.rs index b89d21c..c818bce 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -33,12 +33,13 @@ pub struct Node { } impl Node { - pub fn new(config: Arc) -> Result { + pub async fn new(config: Arc) -> Result { let consensus_rpc = &config.consensus_rpc; let checkpoint_hash = &config.checkpoint.as_ref().unwrap(); let execution_rpc = &config.execution_rpc; let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()) + .await .map_err(NodeError::ConsensusClientCreationError)?; let execution = Arc::new( ExecutionClient::new(execution_rpc).map_err(NodeError::ExecutionClientCreationError)?, diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index d5b9949..4e178bd 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -54,12 +54,12 @@ struct LightClientStore { } impl ConsensusClient { - pub fn new( + pub async fn new( rpc: &str, checkpoint_block_root: &[u8], config: Arc, ) -> Result> { - let network_interface = N::new(rpc); + let network_interface = N::new(rpc).await; Ok(ConsensusClient { network_interface, diff --git a/consensus/src/p2p/discovery/mod.rs b/consensus/src/p2p/discovery/mod.rs index 732f7f2..318c8b7 100644 --- a/consensus/src/p2p/discovery/mod.rs +++ b/consensus/src/p2p/discovery/mod.rs @@ -14,14 +14,16 @@ use futures::{ stream::FuturesUnordered, StreamExt, }; -use std::future::Future; -use std::pin::Pin; -use std::net::SocketAddr; -use std::time::Instant; -use std::collections::HashMap; -use std::task::{Context, Poll}; +use std::{ + future::Future, + pin::Pin, + net::SocketAddr, + time::Instant, + collections::HashMap, + task::{Context, Poll}, + sync::{Arc, Mutex}, +}; use log::{debug, error}; -use std::sync::{Arc, Mutex}; use super::config::Config as ConsensusConfig; mod enr; @@ -42,6 +44,7 @@ enum EventStream { type DiscResult = Result, QueryError>; +#[derive(Debug)] pub enum DiscoveryError { Discv5Error(Discv5Error), UnexpectedError(String), @@ -79,12 +82,14 @@ impl Discovery { local_key: &Keypair, config: ConsensusConfig, ) -> Result { + // convert the keypair to an ENR key let enr_key = key_from_libp2p(local_key)?; let local_enr = build_enr(&enr_key, &config); let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port); let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config.discv5_config)?; + // Add bootnodes to routing table for boot_node_enr in config.boot_nodes_enr.clone() { debug!("Adding boot node: {:?}", boot_node_enr); let repr = boot_node_enr.to_string(); @@ -92,6 +97,8 @@ impl Discovery { error!("Failed to add boot node: {:?}, {:?}", repr, e); }); } + + // Start the discv5 service and obtain an event stream let event_stream = if !config.disable_discovery { discv5 .start(listen_socket) diff --git a/consensus/src/p2p/p2p_network_interface.rs b/consensus/src/p2p/p2p_network_interface.rs index 6ec8c17..afac81a 100644 --- a/consensus/src/p2p/p2p_network_interface.rs +++ b/consensus/src/p2p/p2p_network_interface.rs @@ -1,7 +1,11 @@ use eyre::Result; use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Update}; use async_trait::async_trait; -use libp2p::swarm::NetworkBehaviour; +use libp2p::{ + swarm::NetworkBehaviour, + identity::Keypair, +}; + use crate::p2p::discovery::Discovery; use crate::rpc::ConsensusNetworkInterface; @@ -13,8 +17,12 @@ pub struct P2pNetworkInterface { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for P2pNetworkInterface { - fn new(_path: &str) -> Self { - unimplemented!() + // TODO: Really implement this function + async fn new(_path: &str) -> Self { + let local_key = Keypair::generate_secp256k1(); + let config = super::config::Config::default(); + let discovery = Discovery::new(&local_key, config).await.unwrap(); + Self { discovery } } async fn get_bootstrap(&self, _block_root: &'_ [u8]) -> Result { diff --git a/consensus/src/rpc/mock_rpc.rs b/consensus/src/rpc/mock_rpc.rs index 782c35d..0db95d8 100644 --- a/consensus/src/rpc/mock_rpc.rs +++ b/consensus/src/rpc/mock_rpc.rs @@ -12,7 +12,7 @@ pub struct MockRpc { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for MockRpc { - fn new(path: &str) -> Self { + async fn new(path: &str) -> Self { MockRpc { testdata: PathBuf::from(path), } diff --git a/consensus/src/rpc/mod.rs b/consensus/src/rpc/mod.rs index bbba651..8ba7b65 100644 --- a/consensus/src/rpc/mod.rs +++ b/consensus/src/rpc/mod.rs @@ -10,7 +10,7 @@ use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Upd #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait ConsensusNetworkInterface: Sync + Send + 'static { - fn new(path: &str) -> Self; + async fn new(path: &str) -> Self; async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result; async fn get_updates(&self, period: u64, count: u8) -> Result>; async fn get_finality_update(&self) -> Result; diff --git a/consensus/src/rpc/nimbus_rpc.rs b/consensus/src/rpc/nimbus_rpc.rs index e6f6d15..1885f5e 100644 --- a/consensus/src/rpc/nimbus_rpc.rs +++ b/consensus/src/rpc/nimbus_rpc.rs @@ -15,7 +15,7 @@ pub struct NimbusRpc { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl ConsensusNetworkInterface for NimbusRpc { - fn new(rpc: &str) -> Self { + async fn new(rpc: &str) -> Self { NimbusRpc { rpc: rpc.to_string(), } diff --git a/examples/basic.rs b/examples/basic.rs index f4d1c39..0991307 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -4,6 +4,7 @@ use env_logger::Env; use ethers::{types::Address, utils}; use eyre::Result; use helios::{config::networks::Network, prelude::*}; +use consensus::rpc::nimbus_rpc::NimbusRpc; #[tokio::main] async fn main() -> Result<()> { @@ -15,13 +16,14 @@ async fn main() -> Result<()> { let consensus_rpc = "https://www.lightclientdata.org"; log::info!("Using consensus RPC URL: {}", consensus_rpc); - let mut client: Client = ClientBuilder::new() + let mut client: Client = ClientBuilder::new() .network(Network::MAINNET) .consensus_rpc(consensus_rpc) .execution_rpc(untrusted_rpc_url) .load_external_fallback() .data_dir(PathBuf::from("/tmp/helios")) - .build()?; + .build() + .await?; log::info!( "Built client on network \"{}\" with external checkpoint fallbacks", diff --git a/examples/client.rs b/examples/client.rs index 3914408..aaf55dc 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use consensus::rpc::nimbus_rpc::NimbusRpc; use eyre::Result; use helios::prelude::*; @@ -35,7 +36,10 @@ async fn main() -> Result<()> { builder = builder.load_external_fallback(); // Build the client - let _client: Client = builder.build().unwrap(); + let _client: Client = builder + .build() + .await + .unwrap(); println!("Constructed client!"); Ok(()) diff --git a/examples/discovery.rs b/examples/discovery.rs new file mode 100644 index 0000000..27d79b6 --- /dev/null +++ b/examples/discovery.rs @@ -0,0 +1,48 @@ +use std::{path::PathBuf, str::FromStr}; + +use env_logger::Env; +use ethers::{types::Address, utils}; +use eyre::Result; +use helios::{config::networks::Network, prelude::*}; + +use consensus::p2p::P2pNetworkInterface; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + + let untrusted_rpc_url = "https://eth-mainnet.g.alchemy.com/v2/nObEU8Wh4FIT-X_UDFpK9oVGiTHzznML"; + log::info!("Using untrusted RPC URL [REDACTED]"); + + let consensus_rpc = "https://www.lightclientdata.org"; + log::info!("Using consensus RPC URL: {}", consensus_rpc); + + let mut client: Client = ClientBuilder::new() + .network(Network::MAINNET) + .consensus_rpc(consensus_rpc) + .execution_rpc(untrusted_rpc_url) + .load_external_fallback() + .data_dir(PathBuf::from("/tmp/helios")) + .build() + .await?; + + log::info!( + "Built client on network \"{}\" with external checkpoint fallbacks", + Network::MAINNET + ); + + client.start().await?; + + let head_block_num = client.get_block_number().await?; + let addr = Address::from_str("0x00000000219ab540356cBB839Cbe05303d7705Fa")?; + let block = BlockTag::Latest; + let balance = client.get_balance(&addr, block).await?; + + log::info!("synced up to block: {}", head_block_num); + log::info!( + "balance of deposit contract: {}", + utils::format_ether(balance) + ); + + Ok(()) +}