add discovery protocol but not integrated yet

This commit is contained in:
geemo 2023-02-26 00:49:55 -06:00
parent 1aa4977598
commit e73aa9bc05
3 changed files with 198 additions and 19 deletions

View File

@ -1,13 +1,12 @@
use discv5::{ use discv5::{
enr::{self, CombinedKey}, enr::{self, CombinedKey, CombinedPublicKey},
Enr, Discv5Error, Enr, Discv5Error,
}; };
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use crate::p2p::{ use libp2p::PeerId;
config::Config, use crate::utils::ForkId;
utils::ForkId, use crate::p2p::config::Config;
}; use ssz_rs::{Serialize, Deserialize};
use ssz_rs::Serialize;
pub const ETH2_ENR_KEY: &str = "eth2"; pub const ETH2_ENR_KEY: &str = "eth2";
@ -22,12 +21,52 @@ pub fn build_enr(
enr_builder.tcp4(9000); enr_builder.tcp4(9000);
let mut bytes = vec![]; let mut bytes = vec![];
&ForkId::new().serialize(&mut bytes).unwrap(); ForkId::default().serialize(&mut bytes).unwrap();
enr_builder.add_value(ETH2_ENR_KEY, bytes.as_slice()); enr_builder.add_value(ETH2_ENR_KEY, bytes.as_slice());
enr_builder.build(key).unwrap() enr_builder.build(key).unwrap()
} }
pub trait EnrAsPeerId {
fn as_peer_id(&self) -> PeerId;
}
impl EnrAsPeerId for Enr {
fn as_peer_id(&self) -> PeerId {
let public_key = self.public_key();
match public_key {
CombinedPublicKey::Secp256k1(pk) => {
let pk_bytes = pk.to_bytes();
let libp2p_pk = libp2p::core::PublicKey::Secp256k1(
libp2p::core::identity::secp256k1::PublicKey::decode(&pk_bytes)
.expect("Failed to decode public key"),
);
PeerId::from_public_key(&libp2p_pk)
}
CombinedPublicKey::Ed25519(pk) => {
let pk_bytes = pk.to_bytes();
let libp2p_pk = libp2p::core::PublicKey::Ed25519(
libp2p::core::identity::ed25519::PublicKey::decode(&pk_bytes)
.expect("Failed to decode public key"),
);
PeerId::from_public_key(&libp2p_pk)
}
}
}
}
pub trait EnrForkId {
fn fork_id(&self) -> Result<ForkId, &'static str>;
}
impl EnrForkId for Enr {
fn fork_id(&self) -> Result<ForkId, &'static str> {
let eth2_bytes = self.get(ETH2_ENR_KEY).ok_or("No eth2 enr key")?;
ForkId::deserialize(eth2_bytes).map_err(|_| "Failed to decode fork id")
}
}
// TODO: Do proper error handling // TODO: Do proper error handling
pub fn key_from_libp2p(key: &Keypair) -> Result<CombinedKey, Discv5Error> { pub fn key_from_libp2p(key: &Keypair) -> Result<CombinedKey, Discv5Error> {
match key { match key {

View File

@ -1,19 +1,30 @@
use discv5::{ use discv5::{
enr::NodeId,
Discv5, Enr, Discv5Event, Discv5Error, QueryError, Discv5, Enr, Discv5Event, Discv5Error, QueryError,
}; };
use libp2p::{ use libp2p::{
identity::Keypair, identity::Keypair,
swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters},
PeerId, Multiaddr,
multiaddr::Protocol,
futures::FutureExt,
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
use futures::stream::FuturesUnordered; use futures::{
stream::FuturesUnordered,
StreamExt,
};
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant;
use std::collections::HashMap;
use std::task::{Context, Poll};
use log::{debug, error}; use log::{debug, error};
use super::config::Config as ConsensusConfig; use super::config::Config as ConsensusConfig;
mod enr; mod enr;
use enr::{key_from_libp2p, build_enr}; use enr::{key_from_libp2p, build_enr, EnrForkId, EnrAsPeerId};
enum EventStream { enum EventStream {
Present(mpsc::Receiver<Discv5Event>), Present(mpsc::Receiver<Discv5Event>),
@ -30,15 +41,34 @@ enum EventStream {
type DiscResult = Result<Vec<Enr>, QueryError>; type DiscResult = Result<Vec<Enr>, QueryError>;
enum DiscoveryError { pub enum DiscoveryError {
Discv5Error(Discv5Error), Discv5Error(Discv5Error),
BuildEnrError(String), UnexpectedError(String),
}
impl From<&str> for DiscoveryError {
fn from(e: &str) -> Self {
DiscoveryError::UnexpectedError(e.to_string())
}
}
impl From<String> for DiscoveryError {
fn from(e: String) -> Self {
DiscoveryError::UnexpectedError(e)
}
}
impl From<Discv5Error> for DiscoveryError {
fn from(e: Discv5Error) -> Self {
DiscoveryError::Discv5Error(e)
}
} }
pub struct Discovery { pub struct Discovery {
discv5: Discv5, discv5: Discv5,
local_enr: Enr, local_enr: Enr,
event_stream: EventStream, event_stream: EventStream,
multiaddr_map: HashMap<PeerId, Multiaddr>,
active_queries: FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = DiscResult> + Send>>>, active_queries: FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = DiscResult> + Send>>>,
pub started: bool, pub started: bool,
} }
@ -47,11 +77,8 @@ impl Discovery {
pub async fn new( pub async fn new(
local_key: &Keypair, local_key: &Keypair,
config: ConsensusConfig, config: ConsensusConfig,
) -> Result<Self, Discv5Error> { ) -> Result<Self, DiscoveryError> {
let enr_key = key_from_libp2p(local_key).map_err(|e| { let enr_key = key_from_libp2p(local_key)?;
error!("Failed to build ENR key: {:?}", e);
DiscoveryError::InvalidKey
})?;
let local_enr = build_enr(&enr_key, &config); let local_enr = build_enr(&enr_key, &config);
let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port); let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port);
@ -68,7 +95,7 @@ impl Discovery {
discv5 discv5
.start(listen_socket) .start(listen_socket)
.await .await
.map_err(|e| e.to_string()); .map_err(|e| e.to_string())?;
debug!("Discovery started"); debug!("Discovery started");
EventStream::Awaiting(Box::pin(discv5.event_stream())) EventStream::Awaiting(Box::pin(discv5.event_stream()))
} else { } else {
@ -79,8 +106,122 @@ impl Discovery {
discv5, discv5,
local_enr, local_enr,
event_stream, event_stream,
multiaddr_map: HashMap::new(),
active_queries: FuturesUnordered::new(), active_queries: FuturesUnordered::new(),
started: !config.disable_discovery, started: !config.disable_discovery,
}) })
} }
fn find_peers(&mut self) {
let fork_digest = self.local_enr.fork_id().unwrap().fork_digest;
let predicate: Box<dyn Fn(&Enr) -> bool + Send> = Box::new(move |enr: &Enr| {
enr.fork_id().map(|e| e.fork_digest) == Ok(fork_digest.clone()) && enr.tcp4().is_some()
});
let target = NodeId::random();
let peers_enr = self.discv5.find_node_predicate(target, predicate, 16);
self.active_queries.push(Box::pin(peers_enr));
}
fn get_peers(&mut self, cx: &mut Context) -> Option<DiscoveredPeers> {
while let Poll::Ready(Some(res)) = self.active_queries.poll_next_unpin(cx) {
if res.is_ok() {
self.active_queries = FuturesUnordered::new();
let mut peers: HashMap<PeerId, Option<Instant>> = HashMap::new();
for peer_enr in res.unwrap() {
let peer_id = peer_enr.clone().as_peer_id();
if peer_enr.ip4().is_some() && peer_enr.tcp4().is_some() {
let mut multiaddr: Multiaddr = peer_enr.ip4().unwrap().into();
multiaddr.push(Protocol::Tcp(peer_enr.tcp4().unwrap()));
self.multiaddr_map.insert(peer_id, multiaddr);
}
peers.insert(peer_id, None);
}
return Some(DiscoveredPeers { peers });
}
}
None
}
}
#[derive(Debug, Clone)]
pub struct DiscoveredPeers {
pub peers: HashMap<PeerId, Option<Instant>>,
}
impl NetworkBehaviour for Discovery {
type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
type OutEvent = DiscoveredPeers;
fn new_handler(&mut self) -> Self::ConnectionHandler {
Self::ConnectionHandler {}
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut peer_addresses = Vec::new();
if let Some(address) = self.multiaddr_map.get(peer_id) {
peer_addresses.push(address.clone());
}
peer_addresses
}
fn poll(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if !self.started {
self.started = true;
self.find_peers();
return Poll::Pending;
}
if let Some(dp) = self.get_peers(cx) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp));
}
// Process the discovery server event stream
match self.event_stream {
EventStream::Awaiting(ref mut fut) => {
// Still awaiting the event stream, poll it
if let Poll::Ready(event_stream) = fut.poll_unpin(cx) {
match event_stream {
Ok(stream) => {
println!("Discv5 event stream ready");
self.event_stream = EventStream::Present(stream);
}
Err(_) => {
println!("Discv5 event stream failed");
self.event_stream = EventStream::InActive;
}
}
}
}
EventStream::InActive => {}
EventStream::Present(ref mut stream) => {
while let Poll::Ready(Some(event)) = stream.poll_recv(cx) {
match event {
Discv5Event::SessionEstablished(_enr, _) => {
// println!("Session Established: {:?}", enr);
}
_ => (),
}
}
}
}
Poll::Pending
}
} }

View File

@ -1,6 +1,5 @@
mod discovery; mod discovery;
mod config; mod config;
mod utils;
pub use discovery::*; pub use discovery::*;
pub use ::config::*; pub use ::config::*;