From e73aa9bc056440bcbc1827bfaadd4995d5b9ba55 Mon Sep 17 00:00:00 2001 From: geemo Date: Sun, 26 Feb 2023 00:49:55 -0600 Subject: [PATCH] add discovery protocol but not integrated yet --- consensus/src/p2p/discovery/enr.rs | 53 ++++++++-- consensus/src/p2p/discovery/mod.rs | 163 +++++++++++++++++++++++++++-- consensus/src/p2p/mod.rs | 1 - 3 files changed, 198 insertions(+), 19 deletions(-) diff --git a/consensus/src/p2p/discovery/enr.rs b/consensus/src/p2p/discovery/enr.rs index 8f4a28d..67eba40 100644 --- a/consensus/src/p2p/discovery/enr.rs +++ b/consensus/src/p2p/discovery/enr.rs @@ -1,13 +1,12 @@ use discv5::{ - enr::{self, CombinedKey}, + enr::{self, CombinedKey, CombinedPublicKey}, Enr, Discv5Error, }; use libp2p::identity::Keypair; -use crate::p2p::{ - config::Config, - utils::ForkId, -}; -use ssz_rs::Serialize; +use libp2p::PeerId; +use crate::utils::ForkId; +use crate::p2p::config::Config; +use ssz_rs::{Serialize, Deserialize}; pub const ETH2_ENR_KEY: &str = "eth2"; @@ -22,12 +21,52 @@ pub fn build_enr( enr_builder.tcp4(9000); let mut bytes = vec![]; - &ForkId::new().serialize(&mut bytes).unwrap(); + ForkId::default().serialize(&mut bytes).unwrap(); enr_builder.add_value(ETH2_ENR_KEY, bytes.as_slice()); enr_builder.build(key).unwrap() } +pub trait EnrAsPeerId { + fn as_peer_id(&self) -> PeerId; +} + +impl EnrAsPeerId for Enr { + fn as_peer_id(&self) -> PeerId { + let public_key = self.public_key(); + + match public_key { + CombinedPublicKey::Secp256k1(pk) => { + let pk_bytes = pk.to_bytes(); + let libp2p_pk = libp2p::core::PublicKey::Secp256k1( + libp2p::core::identity::secp256k1::PublicKey::decode(&pk_bytes) + .expect("Failed to decode public key"), + ); + PeerId::from_public_key(&libp2p_pk) + } + CombinedPublicKey::Ed25519(pk) => { + let pk_bytes = pk.to_bytes(); + let libp2p_pk = libp2p::core::PublicKey::Ed25519( + libp2p::core::identity::ed25519::PublicKey::decode(&pk_bytes) + .expect("Failed to decode public key"), + ); + PeerId::from_public_key(&libp2p_pk) + } + } + } +} + +pub trait EnrForkId { + fn fork_id(&self) -> Result; +} + +impl EnrForkId for Enr { + fn fork_id(&self) -> Result { + let eth2_bytes = self.get(ETH2_ENR_KEY).ok_or("No eth2 enr key")?; + ForkId::deserialize(eth2_bytes).map_err(|_| "Failed to decode fork id") + } +} + // TODO: Do proper error handling pub fn key_from_libp2p(key: &Keypair) -> Result { match key { diff --git a/consensus/src/p2p/discovery/mod.rs b/consensus/src/p2p/discovery/mod.rs index f22fe5b..1eaf105 100644 --- a/consensus/src/p2p/discovery/mod.rs +++ b/consensus/src/p2p/discovery/mod.rs @@ -1,19 +1,30 @@ use discv5::{ + enr::NodeId, Discv5, Enr, Discv5Event, Discv5Error, QueryError, }; use libp2p::{ - identity::Keypair, + identity::Keypair, + swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}, + PeerId, Multiaddr, + multiaddr::Protocol, + futures::FutureExt, }; use tokio::sync::mpsc; -use futures::stream::FuturesUnordered; +use futures::{ + stream::FuturesUnordered, + StreamExt, +}; use std::future::Future; use std::pin::Pin; use std::net::SocketAddr; +use std::time::Instant; +use std::collections::HashMap; +use std::task::{Context, Poll}; use log::{debug, error}; use super::config::Config as ConsensusConfig; mod enr; -use enr::{key_from_libp2p, build_enr}; +use enr::{key_from_libp2p, build_enr, EnrForkId, EnrAsPeerId}; enum EventStream { Present(mpsc::Receiver), @@ -30,15 +41,34 @@ enum EventStream { type DiscResult = Result, QueryError>; -enum DiscoveryError { +pub enum DiscoveryError { Discv5Error(Discv5Error), - BuildEnrError(String), + UnexpectedError(String), +} + +impl From<&str> for DiscoveryError { + fn from(e: &str) -> Self { + DiscoveryError::UnexpectedError(e.to_string()) + } +} + +impl From for DiscoveryError { + fn from(e: String) -> Self { + DiscoveryError::UnexpectedError(e) + } +} + +impl From for DiscoveryError { + fn from(e: Discv5Error) -> Self { + DiscoveryError::Discv5Error(e) + } } pub struct Discovery { discv5: Discv5, local_enr: Enr, event_stream: EventStream, + multiaddr_map: HashMap, active_queries: FuturesUnordered + Send>>>, pub started: bool, } @@ -47,11 +77,8 @@ impl Discovery { pub async fn new( local_key: &Keypair, config: ConsensusConfig, - ) -> Result { - let enr_key = key_from_libp2p(local_key).map_err(|e| { - error!("Failed to build ENR key: {:?}", e); - DiscoveryError::InvalidKey - })?; + ) -> Result { + let enr_key = key_from_libp2p(local_key)?; let local_enr = build_enr(&enr_key, &config); let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port); @@ -68,7 +95,7 @@ impl Discovery { discv5 .start(listen_socket) .await - .map_err(|e| e.to_string()); + .map_err(|e| e.to_string())?; debug!("Discovery started"); EventStream::Awaiting(Box::pin(discv5.event_stream())) } else { @@ -79,8 +106,122 @@ impl Discovery { discv5, local_enr, event_stream, + multiaddr_map: HashMap::new(), active_queries: FuturesUnordered::new(), started: !config.disable_discovery, }) } + + fn find_peers(&mut self) { + let fork_digest = self.local_enr.fork_id().unwrap().fork_digest; + + let predicate: Box bool + Send> = Box::new(move |enr: &Enr| { + enr.fork_id().map(|e| e.fork_digest) == Ok(fork_digest.clone()) && enr.tcp4().is_some() + }); + + let target = NodeId::random(); + + let peers_enr = self.discv5.find_node_predicate(target, predicate, 16); + + self.active_queries.push(Box::pin(peers_enr)); + } + + fn get_peers(&mut self, cx: &mut Context) -> Option { + while let Poll::Ready(Some(res)) = self.active_queries.poll_next_unpin(cx) { + if res.is_ok() { + self.active_queries = FuturesUnordered::new(); + + let mut peers: HashMap> = HashMap::new(); + + for peer_enr in res.unwrap() { + let peer_id = peer_enr.clone().as_peer_id(); + + if peer_enr.ip4().is_some() && peer_enr.tcp4().is_some() { + let mut multiaddr: Multiaddr = peer_enr.ip4().unwrap().into(); + + multiaddr.push(Protocol::Tcp(peer_enr.tcp4().unwrap())); + + self.multiaddr_map.insert(peer_id, multiaddr); + } + + peers.insert(peer_id, None); + } + + return Some(DiscoveredPeers { peers }); + } + } + + None + } +} + +#[derive(Debug, Clone)] +pub struct DiscoveredPeers { + pub peers: HashMap>, +} + +impl NetworkBehaviour for Discovery { + type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler; + type OutEvent = DiscoveredPeers; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + Self::ConnectionHandler {} + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut peer_addresses = Vec::new(); + + if let Some(address) = self.multiaddr_map.get(peer_id) { + peer_addresses.push(address.clone()); + } + + peer_addresses + } + + fn poll( + &mut self, + cx: &mut Context, + _: &mut impl PollParameters, + ) -> Poll> { + if !self.started { + self.started = true; + self.find_peers(); + + return Poll::Pending; + } + + if let Some(dp) = self.get_peers(cx) { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp)); + } + // Process the discovery server event stream + match self.event_stream { + EventStream::Awaiting(ref mut fut) => { + // Still awaiting the event stream, poll it + if let Poll::Ready(event_stream) = fut.poll_unpin(cx) { + match event_stream { + Ok(stream) => { + println!("Discv5 event stream ready"); + self.event_stream = EventStream::Present(stream); + } + Err(_) => { + println!("Discv5 event stream failed"); + self.event_stream = EventStream::InActive; + } + } + } + } + EventStream::InActive => {} + EventStream::Present(ref mut stream) => { + while let Poll::Ready(Some(event)) = stream.poll_recv(cx) { + match event { + Discv5Event::SessionEstablished(_enr, _) => { + // println!("Session Established: {:?}", enr); + } + _ => (), + } + } + } + } + Poll::Pending + } } diff --git a/consensus/src/p2p/mod.rs b/consensus/src/p2p/mod.rs index cb0f8ff..eb38942 100644 --- a/consensus/src/p2p/mod.rs +++ b/consensus/src/p2p/mod.rs @@ -1,6 +1,5 @@ mod discovery; mod config; -mod utils; pub use discovery::*; pub use ::config::*;