Compare commits

...

3 Commits

Author SHA1 Message Date
Andreas Bigger adcdddf20c 🏗️ p2p 2022-12-16 05:51:36 -07:00
Andreas Bigger 57887c4c36 Merge branch 'master' into @refcell/p2p 2022-12-14 09:58:22 -07:00
Andreas Bigger 3f52dc0cfa 🚀 p2p setup 2022-12-05 15:59:32 -08:00
8 changed files with 3300 additions and 40 deletions

3178
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,10 @@ members = [
"execution",
]
[features]
default = []
p2p = []
[dependencies]
client = { path = "./client" }
config = { path = "./config" }

View File

@ -163,3 +163,4 @@ If you are having trouble with Helios or are considering contributing, feel free
## Disclaimer
_This code is being provided as is. No guarantee, representation or warranty is being made, express or implied, as to the safety or correctness of the code. It has not been audited and as such there can be no assurance it will work as intended, and users may experience delays, failures, errors, omissions or loss of transmitted information. Nothing in this repo should be construed as investment advice or legal advice for any particular facts or circumstances and is not meant to replace competent counsel. It is strongly advised for you to contact a reputable attorney in your jurisdiction for any questions or concerns with respect thereto. a16z is not liable for any use of the foregoing, and users should proceed with caution and use at their own risk. See a16z.com/disclosures for more info._

View File

@ -14,8 +14,11 @@ jsonrpsee = { version = "0.15.1", features = ["full"] }
futures = "0.3.23"
log = "0.4.17"
thiserror = "1.0.37"
discv5 = { version = "0.1.0" }
libp2p = { version = "0.50.0", features = ["full"] }
common = { path = "../common" }
consensus = { path = "../consensus" }
execution = { path = "../execution" }
config = { path = "../config" }

View File

@ -30,6 +30,9 @@ pub enum NodeError {
#[error("consensus sync error: {0}")]
ConsensusSyncError(Report),
#[error("p2p error: {0}")]
P2PError(Report),
#[error(transparent)]
BlockNotFoundError(#[from] BlockNotFoundError),
}

View File

@ -12,20 +12,27 @@ use config::Config;
use consensus::rpc::nimbus_rpc::NimbusRpc;
use consensus::types::{ExecutionPayload, Header};
use consensus::ConsensusClient;
use discv5::{enr, Discv5, Discv5ConfigBuilder, Discv5Event};
use execution::evm::Evm;
use execution::rpc::http_rpc::HttpRpc;
use execution::types::{CallOpts, ExecutionBlock};
use execution::ExecutionClient;
use futures::StreamExt;
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, ping, Multiaddr, PeerId, Swarm};
use std::net::SocketAddr;
use crate::errors::NodeError;
pub struct Node {
key: identity::Keypair,
pub consensus: ConsensusClient<NimbusRpc>,
pub execution: Arc<ExecutionClient<HttpRpc>>,
pub config: Arc<Config>,
payloads: BTreeMap<u64, ExecutionPayload>,
finalized_payloads: BTreeMap<u64, ExecutionPayload>,
pub history_size: usize,
disc: Option<Arc<Discv5>>,
}
impl Node {
@ -43,16 +50,139 @@ impl Node {
let payloads = BTreeMap::new();
let finalized_payloads = BTreeMap::new();
let key = identity::Keypair::generate_ed25519();
Ok(Node {
key,
consensus,
execution,
config,
payloads,
finalized_payloads,
history_size: 64,
disc: None,
})
}
#[cfg(feature = "p2p")]
/// Syncs
pub async fn sync(&mut self) -> Result<(), NodeError> {
self.start_p2p().await;
self.consensus
.sync()
.await
.map_err(NodeError::ConsensusSyncError)?;
self.update_payloads().await
}
/// Starts the p2p discovery server
pub async fn start_p2p(&mut self) -> Result<(), NodeError> {
// listening address and port
let listen_addr = "0.0.0.0:9000"
.parse::<SocketAddr>()
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
// construct a local ENR
let enr_key = common::utils::from_libp2p(&self.key)
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
let enr = enr::EnrBuilder::new("v4")
.build(&enr_key)
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
// default configuration
let config = Discv5ConfigBuilder::new().build();
// construct the discv5 server
let discv5 =
Discv5::new(enr, enr_key, config).map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
// Set the server
let server = Arc::new(discv5);
self.disc = Some(Arc::clone(&server));
// Start
let mut cloned = Arc::clone(&server);
tokio::spawn(async move {
if let Some(serv) = Arc::get_mut(&mut cloned) {
if let Err(e) = serv.start(listen_addr).await {
log::warn!("Failed to start p2p discovery server. Error: {:?}", e);
}
let mut event_stream = serv.event_stream().await.unwrap();
loop {
match event_stream.recv().await {
Some(Discv5Event::SocketUpdated(addr)) => {
println!("Nodes ENR socket address has been updated to: {addr:?}");
}
Some(Discv5Event::Discovered(enr)) => {
println!("A peer has been discovered: {}", enr.node_id());
}
_ => {}
}
}
} else {
log::warn!("Failed to get mutable reference to the discv5 p2p discovery server inside client node.");
}
});
Ok(())
}
/// Swarm all connected peers on the discovery server
pub async fn p2p_connect(&self) -> Result<(), NodeError> {
// Transform the local keypair to a CombinedKey
let local_key = &self.key;
let local_peer_id = PeerId::from(local_key.public());
log::info!("Local peer id: {:?}", local_peer_id);
let transport = libp2p::development_transport(local_key.clone())
.await
.map_err(|_| NodeError::P2PError(eyre::eyre!("Failed to create libp2p transport")))?;
log::debug!("Created libp2p transport");
// Create a ping network behaviour.
//
// For illustrative purposes, the ping protocol is configured to
// keep the connection alive, so a continuous sequence of pings
// can be observed.
let behaviour = ping::Behaviour::new(ping::Config::new());
let mut swarm = Swarm::with_threadpool_executor(transport, behaviour, local_peer_id);
log::debug!("Created libp2p swarm");
// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
let addr = "/ip4/0.0.0.0/tcp/0"
.parse()
.map_err(|_| NodeError::P2PError(eyre::eyre!("Failed to parse Multiaddr string.")))?;
log::debug!("Swarm listening on {addr:?}");
swarm
.listen_on(addr)
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
// Dial the peer identified by the multi-address given as the second
// command-line argument, if any.
if let Some(addr) = std::env::args().nth(1) {
let remote: Multiaddr = addr.parse().map_err(|_| {
NodeError::P2PError(eyre::eyre!("Failed to parse Multiaddr string."))
})?;
swarm
.dial(remote)
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
log::info!("Dialed {}", addr)
}
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => {
log::info!("Listening on {address:?}")
}
SwarmEvent::Behaviour(event) => log::info!("{event:?}"),
_ => {}
}
}
}
#[cfg(not(feature = "p2p"))]
pub async fn sync(&mut self) -> Result<(), NodeError> {
self.consensus
.sync()

View File

@ -10,3 +10,5 @@ hex = "0.4.3"
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
ethers = "1.0.2"
thiserror = "1.0.37"
libp2p-core = { version = "0.38.0", features = ["secp256k1"] }
discv5 = { version = "0.1.0", features = ["libp2p", "libp2p-core"] }

View File

@ -1,5 +1,7 @@
use discv5::enr::{self, CombinedKey};
use ethers::prelude::Address;
use eyre::Result;
use libp2p_core::identity::Keypair;
use ssz_rs::{Node, Vector};
use super::types::Bytes32;
@ -24,3 +26,20 @@ pub fn address_to_hex_string(address: &Address) -> String {
pub fn u64_to_hex_string(val: u64) -> String {
format!("0x{val:x}")
}
/// Transforms a [Keypair](libp2p_core::identity::Keypair) into a [CombinedKey].
pub fn from_libp2p(key: &libp2p_core::identity::Keypair) -> Result<CombinedKey, &'static str> {
match key {
Keypair::Secp256k1(key) => {
let secret = enr::k256::ecdsa::SigningKey::from_bytes(&key.secret().to_bytes())
.expect("libp2p key must be valid");
Ok(CombinedKey::Secp256k1(secret))
}
Keypair::Ed25519(key) => {
let ed_keypair = enr::ed25519_dalek::SecretKey::from_bytes(&key.encode()[..32])
.expect("libp2p key must be valid");
Ok(CombinedKey::from(ed_keypair))
}
_ => Err("ENR: Unsupported libp2p key type"),
}
}