implementing discovery step 1: change name of the ConsensusRpc to reflect gnerality add a feature flag to conditionally support p2p and use extra crates add cli flag for consensus p2p

This commit is contained in:
geemo 2023-02-23 03:18:09 -06:00
parent 8da632f8f2
commit 1aa4977598
16 changed files with 3586 additions and 111 deletions

3385
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -45,6 +45,9 @@ lto = true
codegen-units = 1
panic = "abort"
[features]
consensus_p2p = ["consensus/p2p"]
######################################
# Examples
######################################

View File

@ -157,22 +157,22 @@ Client ----> Node
Node ----> ConsensusClient
Node ----> ExecutionClient
ExecutionClient ----> ExecutionRpc
ConsensusClient ----> ConsensusRpc
ConsensusClient ----> ConsensusNetworkInterface
Node ----> Evm
Evm ----> ExecutionClient
ExecutionRpc --> UntrustedExecutionRpc
ConsensusRpc --> UntrustedConsensusRpc
ConsensusNetworkInterface --> UntrustedConsensusNetworkInterface
classDef node fill:#f9f,stroke:#333,stroke-width:4px, color:black;
class Node,Client node
classDef execution fill:#f0f,stroke:#333,stroke-width:4px;
class ExecutionClient,ExecutionRpc execution
classDef consensus fill:#ff0,stroke:#333,stroke-width:4px;
class ConsensusClient,ConsensusRpc consensus
class ConsensusClient,ConsensusNetworkInterface consensus
classDef evm fill:#0ff,stroke:#333,stroke-width:4px;
class Evm evm
classDef providerC fill:#ffc
class UntrustedConsensusRpc providerC
class UntrustedConsensusNetworkInterface providerC
classDef providerE fill:#fbf
class UntrustedExecutionRpc providerE
classDef rpc fill:#e10
@ -181,7 +181,7 @@ class Rpc rpc
subgraph "External Network"
UntrustedExecutionRpc
UntrustedConsensusRpc
UntrustedConsensusNetworkInterface
end
```

View File

@ -99,6 +99,8 @@ struct Cli {
load_external_fallback: bool,
#[clap(short = 's', long, env)]
strict_checkpoint_age: bool,
#[clap(short = 'q', long, env)]
p2p_enabled: bool,
}
impl Cli {
@ -117,6 +119,7 @@ impl Cli {
fallback: self.fallback.clone(),
load_external_fallback: self.load_external_fallback,
strict_checkpoint_age: self.strict_checkpoint_age,
p2p_enabled: self.p2p_enabled,
}
}

View File

@ -14,6 +14,7 @@ pub struct CliConfig {
pub fallback: Option<String>,
pub load_external_fallback: bool,
pub strict_checkpoint_age: bool,
pub p2p_enabled: bool,
}
impl CliConfig {

View File

@ -19,6 +19,12 @@ log = "0.4.17"
chrono = "0.4.22"
thiserror = "1.0.37"
reqwest = { version = "0.11.13", features = ["json"] }
discv5 = { version = "0.1.0", optional = true }
# TODO: only import the necessary features
libp2p = { version = "0.50.0", features = ["full"], optional = true }
tree_hash = { version = "0.4.0", optional = true }
tree_hash_derive = { version = "0.4.0", optional = true }
ssz-rs-derive = { git = "https://github.com/ralexstokes/ssz-rs", rev = "d09f55b4f8554491e3431e01af1c32347a8781cd", optional = true }
common = { path = "../common" }
config = { path = "../config" }
@ -30,3 +36,12 @@ tokio = { version = "1", features = ["full"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-timer = "0.2.5"
[features]
p2p = [
"dep:discv5",
"dep:libp2p",
"dep:tree_hash",
"dep:tree_hash_derive",
"dep:ssz-rs-derive",
]

View File

@ -17,7 +17,7 @@ use config::Config;
use crate::constants::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use crate::errors::ConsensusError;
use super::rpc::ConsensusRpc;
use super::rpc::ConsensusNetworkInterface;
use super::types::*;
use super::utils::*;
@ -35,8 +35,8 @@ use wasm_timer::UNIX_EPOCH;
// does not implement force updates
#[derive(Debug)]
pub struct ConsensusClient<R: ConsensusRpc> {
rpc: R,
pub struct ConsensusClient<R: ConsensusNetworkInterface> {
network_interface: R,
store: LightClientStore,
initial_checkpoint: Vec<u8>,
pub last_checkpoint: Option<Vec<u8>>,
@ -53,16 +53,16 @@ struct LightClientStore {
current_max_active_participants: u64,
}
impl<R: ConsensusRpc> ConsensusClient<R> {
impl<R: ConsensusNetworkInterface> ConsensusClient<R> {
pub fn new(
rpc: &str,
checkpoint_block_root: &[u8],
config: Arc<Config>,
) -> Result<ConsensusClient<R>> {
let rpc = R::new(rpc);
let network_interface = R::new(rpc);
Ok(ConsensusClient {
rpc,
network_interface,
store: LightClientStore::default(),
last_checkpoint: None,
config,
@ -71,7 +71,7 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
}
pub async fn check_rpc(&self) -> Result<()> {
let chain_id = self.rpc.chain_id().await?;
let chain_id = self.network_interface.chain_id().await?;
if chain_id != self.config.chain.chain_id {
Err(ConsensusError::IncorrectRpcNetwork.into())
@ -82,7 +82,7 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
pub async fn get_execution_payload(&self, slot: &Option<u64>) -> Result<ExecutionPayload> {
let slot = slot.unwrap_or(self.store.optimistic_header.slot);
let mut block = self.rpc.get_block(slot).await?;
let mut block = self.network_interface.get_block(slot).await?;
let block_hash = block.hash_tree_root()?;
let latest_slot = self.store.optimistic_header.slot;
@ -114,9 +114,9 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
) -> Result<Vec<ExecutionPayload>> {
let payloads_fut = (start_slot..end_slot)
.rev()
.map(|slot| self.rpc.get_block(slot));
.map(|slot| self.network_interface.get_block(slot));
let mut prev_parent_hash: Bytes32 = self
.rpc
.network_interface
.get_block(end_slot)
.await?
.body
@ -157,7 +157,7 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
let current_period = calc_sync_period(self.store.finalized_header.slot);
let updates = self
.rpc
.network_interface
.get_updates(current_period, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
.await?;
@ -166,11 +166,11 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
self.apply_update(&update);
}
let finality_update = self.rpc.get_finality_update().await?;
let finality_update = self.network_interface.get_finality_update().await?;
self.verify_finality_update(&finality_update)?;
self.apply_finality_update(&finality_update);
let optimistic_update = self.rpc.get_optimistic_update().await?;
let optimistic_update = self.network_interface.get_optimistic_update().await?;
self.verify_optimistic_update(&optimistic_update)?;
self.apply_optimistic_update(&optimistic_update);
@ -183,18 +183,18 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
}
pub async fn advance(&mut self) -> Result<()> {
let finality_update = self.rpc.get_finality_update().await?;
let finality_update = self.network_interface.get_finality_update().await?;
self.verify_finality_update(&finality_update)?;
self.apply_finality_update(&finality_update);
let optimistic_update = self.rpc.get_optimistic_update().await?;
let optimistic_update = self.network_interface.get_optimistic_update().await?;
self.verify_optimistic_update(&optimistic_update)?;
self.apply_optimistic_update(&optimistic_update);
if self.store.next_sync_committee.is_none() {
debug!("checking for sync committee update");
let current_period = calc_sync_period(self.store.finalized_header.slot);
let mut updates = self.rpc.get_updates(current_period, 1).await?;
let mut updates = self.network_interface.get_updates(current_period, 1).await?;
if updates.len() == 1 {
let update = updates.get_mut(0).unwrap();
@ -212,7 +212,7 @@ impl<R: ConsensusRpc> ConsensusClient<R> {
async fn bootstrap(&mut self) -> Result<()> {
let mut bootstrap = self
.rpc
.network_interface
.get_bootstrap(&self.initial_checkpoint)
.await
.map_err(|_| eyre!("could not fetch bootstrap"))?;
@ -648,7 +648,7 @@ mod tests {
use crate::{
consensus::calc_sync_period,
errors::ConsensusError,
rpc::{mock_rpc::MockRpc, ConsensusRpc},
rpc::{mock_rpc::MockRpc, ConsensusNetworkInterface},
types::Header,
ConsensusClient,
};
@ -679,7 +679,7 @@ mod tests {
let client = get_client(false).await;
let period = calc_sync_period(client.store.finalized_header.slot);
let updates = client
.rpc
.network_interface
.get_updates(period, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
.await
.unwrap();
@ -693,7 +693,7 @@ mod tests {
let client = get_client(false).await;
let period = calc_sync_period(client.store.finalized_header.slot);
let updates = client
.rpc
.network_interface
.get_updates(period, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
.await
.unwrap();
@ -713,7 +713,7 @@ mod tests {
let client = get_client(false).await;
let period = calc_sync_period(client.store.finalized_header.slot);
let updates = client
.rpc
.network_interface
.get_updates(period, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
.await
.unwrap();
@ -733,7 +733,7 @@ mod tests {
let client = get_client(false).await;
let period = calc_sync_period(client.store.finalized_header.slot);
let updates = client
.rpc
.network_interface
.get_updates(period, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
.await
.unwrap();
@ -753,7 +753,7 @@ mod tests {
let mut client = get_client(false).await;
client.sync().await.unwrap();
let update = client.rpc.get_finality_update().await.unwrap();
let update = client.network_interface.get_finality_update().await.unwrap();
client.verify_finality_update(&update).unwrap();
}
@ -763,7 +763,7 @@ mod tests {
let mut client = get_client(false).await;
client.sync().await.unwrap();
let mut update = client.rpc.get_finality_update().await.unwrap();
let mut update = client.network_interface.get_finality_update().await.unwrap();
update.finalized_header = Header::default();
let err = client.verify_finality_update(&update).err().unwrap();
@ -778,7 +778,7 @@ mod tests {
let mut client = get_client(false).await;
client.sync().await.unwrap();
let mut update = client.rpc.get_finality_update().await.unwrap();
let mut update = client.network_interface.get_finality_update().await.unwrap();
update.sync_aggregate.sync_committee_signature = Vector::default();
let err = client.verify_finality_update(&update).err().unwrap();
@ -793,7 +793,7 @@ mod tests {
let mut client = get_client(false).await;
client.sync().await.unwrap();
let update = client.rpc.get_optimistic_update().await.unwrap();
let update = client.network_interface.get_optimistic_update().await.unwrap();
client.verify_optimistic_update(&update).unwrap();
}
@ -802,7 +802,7 @@ mod tests {
let mut client = get_client(false).await;
client.sync().await.unwrap();
let mut update = client.rpc.get_optimistic_update().await.unwrap();
let mut update = client.network_interface.get_optimistic_update().await.unwrap();
update.sync_aggregate.sync_committee_signature = Vector::default();
let err = client.verify_optimistic_update(&update).err().unwrap();

View File

@ -1,6 +1,8 @@
pub mod errors;
pub mod rpc;
pub mod types;
#[cfg(feature = "p2p")]
pub mod p2p;
mod consensus;
pub use crate::consensus::*;

View File

@ -0,0 +1,60 @@
use discv5::{Discv5Config, Discv5ConfigBuilder, Enr};
use std::time::Duration;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub listen_addr: std::net::IpAddr,
pub libp2p_port: u16,
pub discovery_port: u16,
pub target_peers: usize,
#[serde(skip)]
pub discv5_config: Discv5Config,
pub boot_nodes_enr: Vec<Enr>,
pub disable_discovery: bool,
}
impl Default for Config {
// TODO: Consider defaults more closely these are lighthouse defaults practically
fn default() -> Self {
let filter_rate_limiter = Some(
discv5::RateLimiterBuilder::new()
.total_n_every(10, Duration::from_secs(1))
.ip_n_every(9, Duration::from_secs(1))
.node_n_every(8, Duration::from_secs(1))
.build()
.expect("The total rate limit has been specified"),
);
let discv5_config = Discv5ConfigBuilder::new()
.enable_packet_filter()
.session_cache_capacity(5000)
.request_timeout(Duration::from_secs(1))
.query_peer_timeout(Duration::from_secs(2))
.query_timeout(Duration::from_secs(30))
.request_retries(1)
.enr_peer_update_min(10)
.query_parallelism(5)
.disable_report_discovered_peers()
.ip_limit()
.incoming_bucket_limit(8)
.filter_rate_limiter(filter_rate_limiter)
.filter_max_bans_per_ip(Some(5))
.filter_max_nodes_per_ip(Some(10))
//.table_filter(|enr| enr.ip4().map_or(false, |ip| is_global(&ip)))
.ban_duration(Some(Duration::from_secs(3600)))
.ping_interval(Duration::from_secs(300))
.build();
Config {
listen_addr: "0.0.0.0".parse().expect("valid ip address"),
libp2p_port: 9000,
discovery_port: 9000,
target_peers: 50,
discv5_config,
boot_nodes_enr: vec![],
disable_discovery: false,
}
}
}

View File

@ -0,0 +1,41 @@
use discv5::{
enr::{self, CombinedKey},
Enr, Discv5Error,
};
use libp2p::identity::Keypair;
use crate::p2p::{
config::Config,
utils::ForkId,
};
use ssz_rs::Serialize;
pub const ETH2_ENR_KEY: &str = "eth2";
pub fn build_enr(
key: &CombinedKey,
config: &Config,
) -> Enr {
let mut enr_builder = enr::EnrBuilder::new("v4");
enr_builder.ip("0.0.0.0".parse().unwrap());
enr_builder.udp4(9000);
enr_builder.tcp4(9000);
let mut bytes = vec![];
&ForkId::new().serialize(&mut bytes).unwrap();
enr_builder.add_value(ETH2_ENR_KEY, bytes.as_slice());
enr_builder.build(key).unwrap()
}
// TODO: Do proper error handling
pub fn key_from_libp2p(key: &Keypair) -> Result<CombinedKey, Discv5Error> {
match key {
Keypair::Secp256k1(key) => {
let secret = discv5::enr::k256::ecdsa::SigningKey::from_bytes(&key.secret().to_bytes())
.map_err(|_| Discv5Error::KeyDerivationFailed)?;
Ok(CombinedKey::Secp256k1(secret))
}
_ => Err(Discv5Error::KeyTypeNotSupported("The only supported key type is Secp256k1")),
}
}

View File

@ -0,0 +1,86 @@
use discv5::{
Discv5, Enr, Discv5Event, Discv5Error, QueryError,
};
use libp2p::{
identity::Keypair,
};
use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use log::{debug, error};
use super::config::Config as ConsensusConfig;
mod enr;
use enr::{key_from_libp2p, build_enr};
enum EventStream {
Present(mpsc::Receiver<Discv5Event>),
InActive,
Awaiting(
Pin<
Box<
dyn Future<Output = Result<mpsc::Receiver<Discv5Event>, Discv5Error>>
+ Send,
>,
>,
),
}
type DiscResult = Result<Vec<Enr>, QueryError>;
enum DiscoveryError {
Discv5Error(Discv5Error),
BuildEnrError(String),
}
pub struct Discovery {
discv5: Discv5,
local_enr: Enr,
event_stream: EventStream,
active_queries: FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = DiscResult> + Send>>>,
pub started: bool,
}
impl Discovery {
pub async fn new(
local_key: &Keypair,
config: ConsensusConfig,
) -> Result<Self, Discv5Error> {
let enr_key = key_from_libp2p(local_key).map_err(|e| {
error!("Failed to build ENR key: {:?}", e);
DiscoveryError::InvalidKey
})?;
let local_enr = build_enr(&enr_key, &config);
let listen_socket = SocketAddr::new(config.listen_addr, config.discovery_port);
let mut discv5 = Discv5::new(local_enr.clone(), enr_key, config.discv5_config)?;
for boot_node_enr in config.boot_nodes_enr.clone() {
debug!("Adding boot node: {:?}", boot_node_enr);
let repr = boot_node_enr.to_string();
let _ = discv5.add_enr(boot_node_enr).map_err(|e| {
error!("Failed to add boot node: {:?}, {:?}", repr, e);
});
}
let event_stream = if !config.disable_discovery {
discv5
.start(listen_socket)
.await
.map_err(|e| e.to_string());
debug!("Discovery started");
EventStream::Awaiting(Box::pin(discv5.event_stream()))
} else {
EventStream::InActive
};
Ok(Self {
discv5,
local_enr,
event_stream,
active_queries: FuturesUnordered::new(),
started: !config.disable_discovery,
})
}
}

6
consensus/src/p2p/mod.rs Normal file
View File

@ -0,0 +1,6 @@
mod discovery;
mod config;
mod utils;
pub use discovery::*;
pub use ::config::*;

View File

@ -1,6 +1,6 @@
use std::{fs::read_to_string, path::PathBuf};
use super::ConsensusRpc;
use super::ConsensusNetworkInterface;
use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Update};
use async_trait::async_trait;
use eyre::Result;
@ -10,7 +10,7 @@ pub struct MockRpc {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConsensusRpc for MockRpc {
impl ConsensusNetworkInterface for MockRpc {
fn new(path: &str) -> Self {
MockRpc {
testdata: PathBuf::from(path),

View File

@ -9,7 +9,7 @@ use crate::types::{BeaconBlock, Bootstrap, FinalityUpdate, OptimisticUpdate, Upd
// implements https://github.com/ethereum/beacon-APIs/tree/master/apis/beacon/light_client
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait ConsensusRpc {
pub trait ConsensusNetworkInterface {
fn new(path: &str) -> Self;
async fn get_bootstrap(&self, block_root: &'_ [u8]) -> Result<Bootstrap>;
async fn get_updates(&self, period: u64, count: u8) -> Result<Vec<Update>>;

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use std::cmp;
use super::ConsensusRpc;
use super::ConsensusNetworkInterface;
use crate::constants::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use crate::types::*;
use common::errors::RpcError;
@ -14,7 +14,7 @@ pub struct NimbusRpc {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConsensusRpc for NimbusRpc {
impl ConsensusNetworkInterface for NimbusRpc {
fn new(rpc: &str) -> Self {
NimbusRpc {
rpc: rpc.to_string(),

View File

@ -53,6 +53,13 @@ struct ForkData {
genesis_validator_root: Bytes32,
}
#[derive(SimpleSerialize, Default, Debug)]
pub struct ForkId {
pub fork_digest: Vector<u8, 4>,
pub next_fork_version: Vector<u8, 4>,
pub next_fork_epoch: u64,
}
pub fn compute_signing_root(object_root: Bytes32, domain: Bytes32) -> Result<Node> {
let mut data = SigningData {
object_root,
@ -73,7 +80,7 @@ pub fn compute_domain(
Ok(d.to_vec().try_into().unwrap())
}
fn compute_fork_data_root(
pub fn compute_fork_data_root(
current_version: Vector<u8, 4>,
genesis_validator_root: Bytes32,
) -> Result<Node> {
@ -84,6 +91,18 @@ fn compute_fork_data_root(
Ok(fork_data.hash_tree_root()?)
}
pub fn compute_fork_digest(
current_version: Vector<u8, 4>,
genesis_validators_root: Bytes32,
) -> Result<Vector<u8, 4>> {
let root = compute_fork_data_root(current_version, genesis_validators_root)?;
Ok(root.as_bytes()
.iter()
.take(4)
.copied()
.collect::<Vector<u8, 4>>())
}
pub fn branch_to_nodes(branch: Vec<Bytes32>) -> Result<Vec<Node>> {
branch
.iter()