Compare commits
3 Commits
master
...
refcell/di
Author | SHA1 | Date |
---|---|---|
Andreas Bigger | adcdddf20c | |
Andreas Bigger | 57887c4c36 | |
Andreas Bigger | 3f52dc0cfa |
File diff suppressed because it is too large
Load Diff
|
@ -14,6 +14,10 @@ members = [
|
|||
"execution",
|
||||
]
|
||||
|
||||
[features]
|
||||
default = []
|
||||
p2p = []
|
||||
|
||||
[dependencies]
|
||||
client = { path = "./client" }
|
||||
config = { path = "./config" }
|
||||
|
|
|
@ -163,3 +163,4 @@ If you are having trouble with Helios or are considering contributing, feel free
|
|||
## Disclaimer
|
||||
|
||||
_This code is being provided as is. No guarantee, representation or warranty is being made, express or implied, as to the safety or correctness of the code. It has not been audited and as such there can be no assurance it will work as intended, and users may experience delays, failures, errors, omissions or loss of transmitted information. Nothing in this repo should be construed as investment advice or legal advice for any particular facts or circumstances and is not meant to replace competent counsel. It is strongly advised for you to contact a reputable attorney in your jurisdiction for any questions or concerns with respect thereto. a16z is not liable for any use of the foregoing, and users should proceed with caution and use at their own risk. See a16z.com/disclosures for more info._
|
||||
|
||||
|
|
|
@ -14,8 +14,11 @@ jsonrpsee = { version = "0.15.1", features = ["full"] }
|
|||
futures = "0.3.23"
|
||||
log = "0.4.17"
|
||||
thiserror = "1.0.37"
|
||||
discv5 = { version = "0.1.0" }
|
||||
libp2p = { version = "0.50.0", features = ["full"] }
|
||||
|
||||
common = { path = "../common" }
|
||||
consensus = { path = "../consensus" }
|
||||
execution = { path = "../execution" }
|
||||
config = { path = "../config" }
|
||||
|
||||
|
|
|
@ -30,6 +30,9 @@ pub enum NodeError {
|
|||
#[error("consensus sync error: {0}")]
|
||||
ConsensusSyncError(Report),
|
||||
|
||||
#[error("p2p error: {0}")]
|
||||
P2PError(Report),
|
||||
|
||||
#[error(transparent)]
|
||||
BlockNotFoundError(#[from] BlockNotFoundError),
|
||||
}
|
||||
|
|
|
@ -12,20 +12,27 @@ use config::Config;
|
|||
use consensus::rpc::nimbus_rpc::NimbusRpc;
|
||||
use consensus::types::{ExecutionPayload, Header};
|
||||
use consensus::ConsensusClient;
|
||||
use discv5::{enr, Discv5, Discv5ConfigBuilder, Discv5Event};
|
||||
use execution::evm::Evm;
|
||||
use execution::rpc::http_rpc::HttpRpc;
|
||||
use execution::types::{CallOpts, ExecutionBlock};
|
||||
use execution::ExecutionClient;
|
||||
use futures::StreamExt;
|
||||
use libp2p::swarm::SwarmEvent;
|
||||
use libp2p::{identity, ping, Multiaddr, PeerId, Swarm};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::errors::NodeError;
|
||||
|
||||
pub struct Node {
|
||||
key: identity::Keypair,
|
||||
pub consensus: ConsensusClient<NimbusRpc>,
|
||||
pub execution: Arc<ExecutionClient<HttpRpc>>,
|
||||
pub config: Arc<Config>,
|
||||
payloads: BTreeMap<u64, ExecutionPayload>,
|
||||
finalized_payloads: BTreeMap<u64, ExecutionPayload>,
|
||||
pub history_size: usize,
|
||||
disc: Option<Arc<Discv5>>,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
|
@ -43,16 +50,139 @@ impl Node {
|
|||
let payloads = BTreeMap::new();
|
||||
let finalized_payloads = BTreeMap::new();
|
||||
|
||||
let key = identity::Keypair::generate_ed25519();
|
||||
|
||||
Ok(Node {
|
||||
key,
|
||||
consensus,
|
||||
execution,
|
||||
config,
|
||||
payloads,
|
||||
finalized_payloads,
|
||||
history_size: 64,
|
||||
disc: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "p2p")]
|
||||
/// Syncs
|
||||
pub async fn sync(&mut self) -> Result<(), NodeError> {
|
||||
self.start_p2p().await;
|
||||
self.consensus
|
||||
.sync()
|
||||
.await
|
||||
.map_err(NodeError::ConsensusSyncError)?;
|
||||
self.update_payloads().await
|
||||
}
|
||||
|
||||
/// Starts the p2p discovery server
|
||||
pub async fn start_p2p(&mut self) -> Result<(), NodeError> {
|
||||
// listening address and port
|
||||
let listen_addr = "0.0.0.0:9000"
|
||||
.parse::<SocketAddr>()
|
||||
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
|
||||
// construct a local ENR
|
||||
let enr_key = common::utils::from_libp2p(&self.key)
|
||||
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
let enr = enr::EnrBuilder::new("v4")
|
||||
.build(&enr_key)
|
||||
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
|
||||
// default configuration
|
||||
let config = Discv5ConfigBuilder::new().build();
|
||||
|
||||
// construct the discv5 server
|
||||
let discv5 =
|
||||
Discv5::new(enr, enr_key, config).map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
|
||||
// Set the server
|
||||
let server = Arc::new(discv5);
|
||||
self.disc = Some(Arc::clone(&server));
|
||||
|
||||
// Start
|
||||
let mut cloned = Arc::clone(&server);
|
||||
tokio::spawn(async move {
|
||||
if let Some(serv) = Arc::get_mut(&mut cloned) {
|
||||
if let Err(e) = serv.start(listen_addr).await {
|
||||
log::warn!("Failed to start p2p discovery server. Error: {:?}", e);
|
||||
}
|
||||
|
||||
let mut event_stream = serv.event_stream().await.unwrap();
|
||||
loop {
|
||||
match event_stream.recv().await {
|
||||
Some(Discv5Event::SocketUpdated(addr)) => {
|
||||
println!("Nodes ENR socket address has been updated to: {addr:?}");
|
||||
}
|
||||
Some(Discv5Event::Discovered(enr)) => {
|
||||
println!("A peer has been discovered: {}", enr.node_id());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::warn!("Failed to get mutable reference to the discv5 p2p discovery server inside client node.");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Swarm all connected peers on the discovery server
|
||||
pub async fn p2p_connect(&self) -> Result<(), NodeError> {
|
||||
// Transform the local keypair to a CombinedKey
|
||||
let local_key = &self.key;
|
||||
let local_peer_id = PeerId::from(local_key.public());
|
||||
log::info!("Local peer id: {:?}", local_peer_id);
|
||||
|
||||
let transport = libp2p::development_transport(local_key.clone())
|
||||
.await
|
||||
.map_err(|_| NodeError::P2PError(eyre::eyre!("Failed to create libp2p transport")))?;
|
||||
log::debug!("Created libp2p transport");
|
||||
|
||||
// Create a ping network behaviour.
|
||||
//
|
||||
// For illustrative purposes, the ping protocol is configured to
|
||||
// keep the connection alive, so a continuous sequence of pings
|
||||
// can be observed.
|
||||
let behaviour = ping::Behaviour::new(ping::Config::new());
|
||||
let mut swarm = Swarm::with_threadpool_executor(transport, behaviour, local_peer_id);
|
||||
log::debug!("Created libp2p swarm");
|
||||
|
||||
// Tell the swarm to listen on all interfaces and a random, OS-assigned
|
||||
// port.
|
||||
let addr = "/ip4/0.0.0.0/tcp/0"
|
||||
.parse()
|
||||
.map_err(|_| NodeError::P2PError(eyre::eyre!("Failed to parse Multiaddr string.")))?;
|
||||
log::debug!("Swarm listening on {addr:?}");
|
||||
swarm
|
||||
.listen_on(addr)
|
||||
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
|
||||
// Dial the peer identified by the multi-address given as the second
|
||||
// command-line argument, if any.
|
||||
if let Some(addr) = std::env::args().nth(1) {
|
||||
let remote: Multiaddr = addr.parse().map_err(|_| {
|
||||
NodeError::P2PError(eyre::eyre!("Failed to parse Multiaddr string."))
|
||||
})?;
|
||||
swarm
|
||||
.dial(remote)
|
||||
.map_err(|e| NodeError::P2PError(eyre::eyre!(e)))?;
|
||||
log::info!("Dialed {}", addr)
|
||||
}
|
||||
|
||||
loop {
|
||||
match swarm.select_next_some().await {
|
||||
SwarmEvent::NewListenAddr { address, .. } => {
|
||||
log::info!("Listening on {address:?}")
|
||||
}
|
||||
SwarmEvent::Behaviour(event) => log::info!("{event:?}"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "p2p"))]
|
||||
pub async fn sync(&mut self) -> Result<(), NodeError> {
|
||||
self.consensus
|
||||
.sync()
|
||||
|
|
|
@ -10,3 +10,5 @@ hex = "0.4.3"
|
|||
ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "cb08f18ca919cc1b685b861d0fa9e2daabe89737" }
|
||||
ethers = "1.0.2"
|
||||
thiserror = "1.0.37"
|
||||
libp2p-core = { version = "0.38.0", features = ["secp256k1"] }
|
||||
discv5 = { version = "0.1.0", features = ["libp2p", "libp2p-core"] }
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use discv5::enr::{self, CombinedKey};
|
||||
use ethers::prelude::Address;
|
||||
use eyre::Result;
|
||||
use libp2p_core::identity::Keypair;
|
||||
use ssz_rs::{Node, Vector};
|
||||
|
||||
use super::types::Bytes32;
|
||||
|
@ -24,3 +26,20 @@ pub fn address_to_hex_string(address: &Address) -> String {
|
|||
pub fn u64_to_hex_string(val: u64) -> String {
|
||||
format!("0x{val:x}")
|
||||
}
|
||||
|
||||
/// Transforms a [Keypair](libp2p_core::identity::Keypair) into a [CombinedKey].
|
||||
pub fn from_libp2p(key: &libp2p_core::identity::Keypair) -> Result<CombinedKey, &'static str> {
|
||||
match key {
|
||||
Keypair::Secp256k1(key) => {
|
||||
let secret = enr::k256::ecdsa::SigningKey::from_bytes(&key.secret().to_bytes())
|
||||
.expect("libp2p key must be valid");
|
||||
Ok(CombinedKey::Secp256k1(secret))
|
||||
}
|
||||
Keypair::Ed25519(key) => {
|
||||
let ed_keypair = enr::ed25519_dalek::SecretKey::from_bytes(&key.encode()[..32])
|
||||
.expect("libp2p key must be valid");
|
||||
Ok(CombinedKey::from(ed_keypair))
|
||||
}
|
||||
_ => Err("ENR: Unsupported libp2p key type"),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue