add broken discovery example. Discv5 starts but everything is not quite set up. Change the ConsensusNetworkInterface trait function to async so it works with p2p plus for the rpc setup you can add a feature to retrieve the requisite paths from a cached file.

This commit is contained in:
geemo 2023-03-03 02:58:07 -06:00
parent 7d7ac4b5aa
commit 4a35ad2165
11 changed files with 95 additions and 25 deletions

View File

@ -108,7 +108,7 @@ impl ClientBuilder {
self self
} }
pub fn build<DB: Database, N: ConsensusNetworkInterface>(self) -> Result<Client<DB, N>> { pub async fn build<DB: Database, N: ConsensusNetworkInterface>(self) -> Result<Client<DB, N>> {
let base_config = if let Some(network) = self.network { let base_config = if let Some(network) = self.network {
network.to_base_config() network.to_base_config()
} else { } else {
@ -208,7 +208,7 @@ impl ClientBuilder {
strict_checkpoint_age, strict_checkpoint_age,
}; };
Client::new(config) Client::new(config).await
} }
} }
@ -222,7 +222,7 @@ pub struct Client<DB: Database, N: ConsensusNetworkInterface> {
} }
impl<DB: Database, N: ConsensusNetworkInterface> Client<DB, N> { impl<DB: Database, N: ConsensusNetworkInterface> Client<DB, N> {
fn new(mut config: Config) -> Result<Self> { async fn new(mut config: Config) -> Result<Self> {
let db = DB::new(&config)?; let db = DB::new(&config)?;
if config.checkpoint.is_none() { if config.checkpoint.is_none() {
let checkpoint = db.load_checkpoint()?; let checkpoint = db.load_checkpoint()?;
@ -230,7 +230,7 @@ impl<DB: Database, N: ConsensusNetworkInterface> Client<DB, N> {
} }
let config = Arc::new(config); let config = Arc::new(config);
let node = Node::<N>::new(config.clone())?; let node = Node::<N>::new(config.clone()).await?;
let node = Arc::new(RwLock::new(node)); let node = Arc::new(RwLock::new(node));
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -344,7 +344,7 @@ impl<DB: Database, N: ConsensusNetworkInterface> Client<DB, N> {
// We fail fast here since the node is unrecoverable at this point // We fail fast here since the node is unrecoverable at this point
let config = self.node.read().await.config.clone(); let config = self.node.read().await.config.clone();
let consensus = let consensus =
ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?;
self.node.write().await.consensus = consensus; self.node.write().await.consensus = consensus;
self.node.write().await.sync().await?; self.node.write().await.sync().await?;
@ -385,7 +385,7 @@ impl<DB: Database, N: ConsensusNetworkInterface> Client<DB, N> {
// We fail fast here since the node is unrecoverable at this point // We fail fast here since the node is unrecoverable at this point
let config = self.node.read().await.config.clone(); let config = self.node.read().await.config.clone();
let consensus = let consensus =
ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone())?; ConsensusClient::new(&config.consensus_rpc, checkpoint.as_bytes(), config.clone()).await?;
self.node.write().await.consensus = consensus; self.node.write().await.consensus = consensus;
self.node.write().await.sync().await?; self.node.write().await.sync().await?;
Ok(()) Ok(())

View File

@ -33,12 +33,13 @@ pub struct Node<N: ConsensusNetworkInterface> {
} }
impl<N: ConsensusNetworkInterface> Node<N> { impl<N: ConsensusNetworkInterface> Node<N> {
pub fn new(config: Arc<Config>) -> Result<Self, NodeError> { pub async fn new(config: Arc<Config>) -> Result<Self, NodeError> {
let consensus_rpc = &config.consensus_rpc; let consensus_rpc = &config.consensus_rpc;
let checkpoint_hash = &config.checkpoint.as_ref().unwrap(); let checkpoint_hash = &config.checkpoint.as_ref().unwrap();
let execution_rpc = &config.execution_rpc; let execution_rpc = &config.execution_rpc;
let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()) let consensus = ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone())
.await
.map_err(NodeError::ConsensusClientCreationError)?; .map_err(NodeError::ConsensusClientCreationError)?;
let execution = Arc::new( let execution = Arc::new(
ExecutionClient::new(execution_rpc).map_err(NodeError::ExecutionClientCreationError)?, ExecutionClient::new(execution_rpc).map_err(NodeError::ExecutionClientCreationError)?,

View File

@ -54,12 +54,12 @@ struct LightClientStore {
} }
impl<N: ConsensusNetworkInterface> ConsensusClient<N> { impl<N: ConsensusNetworkInterface> ConsensusClient<N> {
pub fn new( pub async fn new(
rpc: &str, rpc: &str,
checkpoint_block_root: &[u8], checkpoint_block_root: &[u8],
config: Arc<Config>, config: Arc<Config>,
) -> Result<ConsensusClient<N>> { ) -> Result<ConsensusClient<N>> {
let network_interface = N::new(rpc); let network_interface = N::new(rpc).await;
Ok(ConsensusClient { Ok(ConsensusClient {
network_interface, network_interface,

View File

@ -14,14 +14,16 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
StreamExt, StreamExt,
}; };
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::net::SocketAddr; pin::Pin,
use std::time::Instant; net::SocketAddr,
use std::collections::HashMap; time::Instant,
use std::task::{Context, Poll}; collections::HashMap,
task::{Context, Poll},
sync::{Arc, Mutex},
};
use log::{debug, error}; use log::{debug, error};
use std::sync::{Arc, Mutex};
use super::config::Config as ConsensusConfig; use super::config::Config as ConsensusConfig;
mod enr; mod enr;
@ -42,6 +44,7 @@ enum EventStream {
type DiscResult = Result<Vec<Enr>, QueryError>; type DiscResult = Result<Vec<Enr>, QueryError>;
#[derive(Debug)]
pub enum DiscoveryError { pub enum DiscoveryError {
Discv5Error(Discv5Error), Discv5Error(Discv5Error),
UnexpectedError(String), UnexpectedError(String),
@ -79,12 +82,14 @@ impl Discovery {
local_key: &Keypair, local_key: &Keypair,
config: ConsensusConfig, config: ConsensusConfig,
) -> Result<Self, DiscoveryError> { ) -> Result<Self, DiscoveryError> {
// convert the keypair to an ENR key
let enr_key = key_from_libp2p(local_key)?; let enr_key = key_from_libp2p(local_key)?;
let local_enr = build_enr(&enr_key, &config); let local_enr = build_enr(&enr_key, &config);
let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port); let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port);
let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config.discv5_config)?; let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config.discv5_config)?;
// Add bootnodes to routing table
for boot_node_enr in config.boot_nodes_enr.clone() { for boot_node_enr in config.boot_nodes_enr.clone() {
debug!("Adding boot node: {:?}", boot_node_enr); debug!("Adding boot node: {:?}", boot_node_enr);
let repr = boot_node_enr.to_string(); let repr = boot_node_enr.to_string();
@ -92,6 +97,8 @@ impl Discovery {
error!("Failed to add boot node: {:?}, {:?}", repr, e); error!("Failed to add boot node: {:?}, {:?}", repr, e);
}); });
} }
// Start the discv5 service and obtain an event stream
let event_stream = if !config.disable_discovery { let event_stream = if !config.disable_discovery {
discv5 discv5
.start(listen_socket) .start(listen_socket)

View File

@ -1,7 +1,11 @@
use eyre::Result; use eyre::Result;
use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Update}; use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Update};
use async_trait::async_trait; use async_trait::async_trait;
use libp2p::swarm::NetworkBehaviour; use libp2p::{
swarm::NetworkBehaviour,
identity::Keypair,
};
use crate::p2p::discovery::Discovery; use crate::p2p::discovery::Discovery;
use crate::rpc::ConsensusNetworkInterface; use crate::rpc::ConsensusNetworkInterface;
@ -13,8 +17,12 @@ pub struct P2pNetworkInterface {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConsensusNetworkInterface for P2pNetworkInterface { impl ConsensusNetworkInterface for P2pNetworkInterface {
fn new(_path: &str) -> Self { // TODO: Really implement this function
unimplemented!() async fn new(_path: &str) -> Self {
let local_key = Keypair::generate_secp256k1();
let config = super::config::Config::default();
let discovery = Discovery::new(&local_key, config).await.unwrap();
Self { discovery }
} }
async fn get_bootstrap(&self, _block_root: &'_ [u8]) -> Result<Bootstrap> { async fn get_bootstrap(&self, _block_root: &'_ [u8]) -> Result<Bootstrap> {

View File

@ -12,7 +12,7 @@ pub struct MockRpc {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConsensusNetworkInterface for MockRpc { impl ConsensusNetworkInterface for MockRpc {
fn new(path: &str) -> Self { async fn new(path: &str) -> Self {
MockRpc { MockRpc {
testdata: PathBuf::from(path), testdata: PathBuf::from(path),
} }

View File

@ -10,7 +10,7 @@ use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Upd
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait ConsensusNetworkInterface: Sync + Send + 'static { pub trait ConsensusNetworkInterface: Sync + Send + 'static {
fn new(path: &str) -> Self; async fn new(path: &str) -> Self;
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap>; async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap>;
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>>; async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>>;
async fn get_finality_update(&self) -> Result<FinalityUpdate>; async fn get_finality_update(&self) -> Result<FinalityUpdate>;

View File

@ -15,7 +15,7 @@ pub struct NimbusRpc {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConsensusNetworkInterface for NimbusRpc { impl ConsensusNetworkInterface for NimbusRpc {
fn new(rpc: &str) -> Self { async fn new(rpc: &str) -> Self {
NimbusRpc { NimbusRpc {
rpc: rpc.to_string(), rpc: rpc.to_string(),
} }

View File

@ -4,6 +4,7 @@ use env_logger::Env;
use ethers::{types::Address, utils}; use ethers::{types::Address, utils};
use eyre::Result; use eyre::Result;
use helios::{config::networks::Network, prelude::*}; use helios::{config::networks::Network, prelude::*};
use consensus::rpc::nimbus_rpc::NimbusRpc;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -15,13 +16,14 @@ async fn main() -> Result<()> {
let consensus_rpc = "https://www.lightclientdata.org"; let consensus_rpc = "https://www.lightclientdata.org";
log::info!("Using consensus RPC URL: {}", consensus_rpc); log::info!("Using consensus RPC URL: {}", consensus_rpc);
let mut client: Client<FileDB> = ClientBuilder::new() let mut client: Client<FileDB, NimbusRpc> = ClientBuilder::new()
.network(Network::MAINNET) .network(Network::MAINNET)
.consensus_rpc(consensus_rpc) .consensus_rpc(consensus_rpc)
.execution_rpc(untrusted_rpc_url) .execution_rpc(untrusted_rpc_url)
.load_external_fallback() .load_external_fallback()
.data_dir(PathBuf::from("/tmp/helios")) .data_dir(PathBuf::from("/tmp/helios"))
.build()?; .build()
.await?;
log::info!( log::info!(
"Built client on network \"{}\" with external checkpoint fallbacks", "Built client on network \"{}\" with external checkpoint fallbacks",

View File

@ -1,5 +1,6 @@
use std::path::PathBuf; use std::path::PathBuf;
use consensus::rpc::nimbus_rpc::NimbusRpc;
use eyre::Result; use eyre::Result;
use helios::prelude::*; use helios::prelude::*;
@ -35,7 +36,10 @@ async fn main() -> Result<()> {
builder = builder.load_external_fallback(); builder = builder.load_external_fallback();
// Build the client // Build the client
let _client: Client<FileDB> = builder.build().unwrap(); let _client: Client<FileDB, NimbusRpc> = builder
.build()
.await
.unwrap();
println!("Constructed client!"); println!("Constructed client!");
Ok(()) Ok(())

48
examples/discovery.rs Normal file
View File

@ -0,0 +1,48 @@
use std::{path::PathBuf, str::FromStr};
use env_logger::Env;
use ethers::{types::Address, utils};
use eyre::Result;
use helios::{config::networks::Network, prelude::*};
use consensus::p2p::P2pNetworkInterface;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let untrusted_rpc_url = "https://eth-mainnet.g.alchemy.com/v2/nObEU8Wh4FIT-X_UDFpK9oVGiTHzznML";
log::info!("Using untrusted RPC URL [REDACTED]");
let consensus_rpc = "https://www.lightclientdata.org";
log::info!("Using consensus RPC URL: {}", consensus_rpc);
let mut client: Client<FileDB, P2pNetworkInterface> = ClientBuilder::new()
.network(Network::MAINNET)
.consensus_rpc(consensus_rpc)
.execution_rpc(untrusted_rpc_url)
.load_external_fallback()
.data_dir(PathBuf::from("/tmp/helios"))
.build()
.await?;
log::info!(
"Built client on network \"{}\" with external checkpoint fallbacks",
Network::MAINNET
);
client.start().await?;
let head_block_num = client.get_block_number().await?;
let addr = Address::from_str("0x00000000219ab540356cBB839Cbe05303d7705Fa")?;
let block = BlockTag::Latest;
let balance = client.get_balance(&addr, block).await?;
log::info!("synced up to block: {}", head_block_num);
log::info!(
"balance of deposit contract: {}",
utils::format_ether(balance)
);
Ok(())
}