diff --git a/cli/src/main.rs b/cli/src/main.rs index dcb6f13..eb5a12d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use clap::Parser; use dirs::home_dir; use eyre::Result; -use tokio::time::sleep; +use tokio::{sync::Mutex, time::sleep}; use client::{rpc::Rpc, Client}; use config::{networks, Config}; @@ -23,13 +23,16 @@ async fn main() -> Result<()> { let mut client = Client::new(Arc::new(config)).await?; client.sync().await?; - let mut rpc = Rpc::new(Arc::new(client), cli.port.unwrap_or(8545)); + let client = Arc::new(Mutex::new(client)); + + let mut rpc = Rpc::new(client.clone(), cli.port.unwrap_or(8545)); let addr = rpc.start().await?; println!("started rpc at: {}", addr); - sleep(Duration::from_secs(300)).await; - - Ok(()) + loop { + sleep(Duration::from_secs(10)).await; + client.lock().await.advance().await? + } } #[derive(Parser)] diff --git a/client/src/client.rs b/client/src/client.rs index 71b6dbe..3620fb1 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -36,6 +36,10 @@ impl Client { self.consensus.sync().await } + pub async fn advance(&mut self) -> Result<()> { + self.consensus.advance().await + } + pub async fn call(&self, to: &Address, calldata: &Vec, value: U256) -> Result> { let payload = self.consensus.get_execution_payload().await?; let mut evm = Evm::new(self.execution.clone(), payload); diff --git a/client/src/rpc.rs b/client/src/rpc.rs index f421114..7790338 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -5,6 +5,7 @@ use ethers::{ use eyre::Result; use serde::{Deserialize, Serialize}; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; +use tokio::sync::Mutex; use jsonrpsee::{ core::{async_trait, Error}, @@ -17,13 +18,13 @@ use common::utils::{hex_str_to_bytes, u64_to_hex_string}; use super::Client; pub struct Rpc { - client: Arc, + client: Arc>, handle: Option, port: u16, } impl Rpc { - pub fn new(client: Arc, port: u16) -> Self { + pub fn new(client: Arc>, port: u16) -> Self { Rpc { client, handle: None, @@ -55,7 +56,7 @@ trait EthRpc { #[method(name = "estimateGas")] async fn estimate_gas(&self, opts: CallOpts) -> Result; #[method(name = "chainId")] - fn chain_id(&self) -> Result; + async fn chain_id(&self) -> Result; #[method(name = "gasPrice")] async fn gas_price(&self) -> Result; #[method(name = "maxPriorityFeePerGas")] @@ -65,7 +66,7 @@ trait EthRpc { } struct RpcInner { - client: Arc, + client: Arc>, port: u16, } @@ -75,7 +76,8 @@ impl EthRpcServer for RpcInner { match block { "latest" => { let address = convert_err(Address::from_str(address))?; - let balance = convert_err(self.client.get_balance(&address).await)?; + let client = self.client.lock().await; + let balance = convert_err(client.get_balance(&address).await)?; Ok(balance.encode_hex()) } @@ -87,7 +89,8 @@ impl EthRpcServer for RpcInner { match block { "latest" => { let address = convert_err(Address::from_str(address))?; - let nonce = convert_err(self.client.get_nonce(&address).await)?; + let client = self.client.lock().await; + let nonce = convert_err(client.get_nonce(&address).await)?; Ok(nonce.encode_hex()) } @@ -99,7 +102,8 @@ impl EthRpcServer for RpcInner { match block { "latest" => { let address = convert_err(Address::from_str(address))?; - let code = convert_err(self.client.get_code(&address).await)?; + let client = self.client.lock().await; + let code = convert_err(client.get_code(&address).await)?; Ok(hex::encode(code)) } @@ -117,7 +121,8 @@ impl EthRpcServer for RpcInner { 16, ))?; - let res = convert_err(self.client.call(&to, &data, value).await)?; + let client = self.client.lock().await; + let res = convert_err(client.call(&to, &data, value).await)?; Ok(hex::encode(res)) } _ => Err(Error::Custom("Invalid Block Number".to_string())), @@ -132,27 +137,32 @@ impl EthRpcServer for RpcInner { 16, ))?; - let gas = convert_err(self.client.estimate_gas(&to, &data, value).await)?; + let client = self.client.lock().await; + let gas = convert_err(client.estimate_gas(&to, &data, value).await)?; Ok(u64_to_hex_string(gas)) } - fn chain_id(&self) -> Result { - let id = self.client.chain_id(); + async fn chain_id(&self) -> Result { + let client = self.client.lock().await; + let id = client.chain_id(); Ok(u64_to_hex_string(id)) } async fn gas_price(&self) -> Result { - let gas_price = convert_err(self.client.get_gas_price().await)?; + let client = self.client.lock().await; + let gas_price = convert_err(client.get_gas_price().await)?; Ok(gas_price.encode_hex()) } async fn max_priority_fee_per_gas(&self) -> Result { - let tip = convert_err(self.client.get_priority_fee().await)?; + let client = self.client.lock().await; + let tip = convert_err(client.get_priority_fee().await)?; Ok(tip.encode_hex()) } async fn block_number(&self) -> Result { - let num = convert_err(self.client.get_block_number().await)?; + let client = self.client.lock().await; + let num = convert_err(client.get_block_number().await)?; Ok(u64_to_hex_string(num)) } } diff --git a/config/src/networks.rs b/config/src/networks.rs index 8dc0dcb..849ecd9 100644 --- a/config/src/networks.rs +++ b/config/src/networks.rs @@ -11,10 +11,10 @@ pub fn goerli() -> Config { ) .unwrap(), checkpoint: hex_str_to_bytes( - "0x172128eadf1da46467f4d6a822206698e2d3f957af117dd650954780d680dc99", + "0x1e591af1e90f2db918b2a132991c7c2ee9a4ab26da496bd6e71e4f0bd65ea870", ) .unwrap(), - consensus_rpc: "http://testing.prater.beacon-api.nimbus.team".to_string(), + consensus_rpc: "http://34.207.158.131:5052".to_string(), execution_rpc: "https://eth-goerli.g.alchemy.com:443/v2/o_8Qa9kgwDPf9G8sroyQ-uQtyhyWa3ao" .to_string(), diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 7f1dcd6..67b60be 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -1,3 +1,4 @@ +use std::cmp; use std::sync::Arc; use blst::min_pk::{PublicKey, Signature}; @@ -20,9 +21,12 @@ pub struct ConsensusClient { #[derive(Debug)] struct Store { - header: Header, + finalized_header: Header, current_sync_committee: SyncCommittee, next_sync_committee: Option, + optimistic_header: Header, + previous_max_active_participants: u64, + current_max_active_participants: u64, } impl ConsensusClient { @@ -50,19 +54,22 @@ impl ConsensusClient { } let store = Store { - header: bootstrap.header, + finalized_header: bootstrap.header.clone(), current_sync_committee: bootstrap.current_sync_committee, next_sync_committee: None, + optimistic_header: bootstrap.header, + previous_max_active_participants: 0, + current_max_active_participants: 0, }; Ok(ConsensusClient { rpc, store, config }) } pub async fn get_execution_payload(&self) -> Result { - let slot = self.store.header.slot; + let slot = self.store.optimistic_header.slot; let mut block = self.rpc.get_block(slot).await?.clone(); let block_hash = block.hash_tree_root()?; - let verified_block_hash = self.store.header.clone().hash_tree_root()?; + let verified_block_hash = self.store.optimistic_header.clone().hash_tree_root()?; if verified_block_hash != block_hash { Err(eyre::eyre!("Block Root Mismatch")) @@ -72,11 +79,11 @@ impl ConsensusClient { } pub fn get_head(&self) -> &Header { - &self.store.header + &self.store.optimistic_header } pub async fn sync(&mut self) -> Result<()> { - let current_period = calc_sync_period(self.store.header.slot); + let current_period = calc_sync_period(self.store.finalized_header.slot); let updates = self.rpc.get_updates(current_period).await?; for mut update in updates { @@ -85,32 +92,34 @@ impl ConsensusClient { } let finality_update = self.rpc.get_finality_update().await?; - let mut finality_update_generic = Update { - attested_header: finality_update.attested_header, - next_sync_committee: None, - next_sync_committee_branch: Vec::new(), - finalized_header: finality_update.finalized_header, - finality_branch: finality_update.finality_branch, - sync_aggregate: finality_update.sync_aggregate, - signature_slot: finality_update.signature_slot, - }; + self.verify_finality_update(&finality_update)?; + self.apply_finality_update(&finality_update); - self.verify_update(&mut finality_update_generic)?; - self.apply_update(&finality_update_generic); + let optimistic_update = self.rpc.get_optimistic_update().await?; + self.verify_optimistic_update(&optimistic_update)?; + self.apply_optimistic_update(&optimistic_update); - self.rpc.get_block(self.store.header.slot).await?; + Ok(()) + } + + pub async fn advance(&mut self) -> Result<()> { + let finality_update = self.rpc.get_finality_update().await?; + self.verify_finality_update(&finality_update)?; + self.apply_finality_update(&finality_update); + + let optimistic_update = self.rpc.get_optimistic_update().await?; + self.verify_optimistic_update(&optimistic_update)?; + self.apply_optimistic_update(&optimistic_update); Ok(()) } fn verify_update(&mut self, update: &mut Update) -> Result<()> { - let current_slot = self.store.header.slot; - let update_slot = update.finalized_header.slot; + let store_period = calc_sync_period(self.store.finalized_header.slot); + let update_signature_period = calc_sync_period(update.signature_slot); - let current_period = calc_sync_period(current_slot); - let update_period = calc_sync_period(update_slot); - - if !(update_period == current_period + 1 || update_period == current_period) { + if !(update_signature_period == store_period + 1 || update_signature_period == store_period) + { return Err(eyre::eyre!("Invalid Update")); } @@ -130,26 +139,25 @@ impl ConsensusClient { return Err(eyre::eyre!("Invalid Update")); } - if update.next_sync_committee.is_some() { - let next_committee_branch_valid = is_next_committee_proof_valid( - &update.attested_header, - &mut update.next_sync_committee.clone().unwrap(), - &update.next_sync_committee_branch, - ); + let next_committee_branch_valid = is_next_committee_proof_valid( + &update.attested_header, + &mut update.next_sync_committee, + &update.next_sync_committee_branch, + ); - if !next_committee_branch_valid { - return Err(eyre::eyre!("Invalid Update")); - } + if !next_committee_branch_valid { + return Err(eyre::eyre!("Invalid Update")); } - let sync_committee = if current_period == update_period { + let sync_committee = if store_period == update_signature_period { &self.store.current_sync_committee } else { self.store.next_sync_committee.as_ref().unwrap() }; let pks = - get_participating_keys(sync_committee, &update.sync_aggregate.sync_committee_bits)?; + get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; + let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); let committee_quorum = pks.len() > 1; @@ -169,20 +177,152 @@ impl ConsensusClient { Ok(()) } - fn apply_update(&mut self, update: &Update) { - let current_period = calc_sync_period(self.store.header.slot); - let update_period = calc_sync_period(update.finalized_header.slot); + fn verify_finality_update(&self, update: &FinalityUpdate) -> Result<()> { + let store_period = calc_sync_period(self.store.finalized_header.slot); + let update_signature_period = calc_sync_period(update.signature_slot); - self.store.header = update.finalized_header.clone(); + if !(update_signature_period == store_period + 1 || update_signature_period == store_period) + { + return Err(eyre::eyre!("Invalid Update")); + } + + if !(update.signature_slot > update.attested_header.slot + && update.attested_header.slot > update.finalized_header.slot) + { + return Err(eyre::eyre!("Invalid Update")); + } + + let finality_branch_valid = is_finality_proof_valid( + &update.attested_header, + &mut update.finalized_header.clone(), + &update.finality_branch, + ); + + if !(finality_branch_valid) { + return Err(eyre::eyre!("Invalid Update")); + } + + let sync_committee = &self.store.current_sync_committee; + + let pks = + get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; + + let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); + + let committee_quorum = pks.len() > 1; + if !committee_quorum { + return Err(eyre::eyre!("Invalid Update")); + } + + let header_root = + bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes()); + let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?; + let sig = &update.sync_aggregate.sync_committee_signature; + let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks); + + if !is_valid_sig { + return Err(eyre::eyre!("Invalid Update")); + } + + Ok(()) + } + + fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> { + let store_period = calc_sync_period(self.store.finalized_header.slot); + let update_signature_period = calc_sync_period(update.signature_slot); + + if !(update_signature_period == store_period + 1 || update_signature_period == store_period) + { + return Err(eyre::eyre!("Invalid Update")); + } + + if !(update.signature_slot > update.attested_header.slot) { + return Err(eyre::eyre!("Invalid Update")); + } + + let sync_committee = &self.store.current_sync_committee; + + let pks = + get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; + + let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); + + let committee_quorum = pks.len() > 1; + if !committee_quorum { + return Err(eyre::eyre!("Invalid Update")); + } + + let header_root = + bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes()); + let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?; + let sig = &update.sync_aggregate.sync_committee_signature; + let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks); + + if !is_valid_sig { + return Err(eyre::eyre!("Invalid Update")); + } + + Ok(()) + } + + fn apply_update(&mut self, update: &Update) { + let store_period = calc_sync_period(self.store.finalized_header.slot); + let update_signature_period = calc_sync_period(update.signature_slot); + + self.store.finalized_header = update.finalized_header.clone(); if self.store.next_sync_committee.is_none() { - self.store.next_sync_committee = - Some(update.next_sync_committee.as_ref().unwrap().clone()); - } else if update_period == current_period + 1 { + self.store.next_sync_committee = Some(update.next_sync_committee.clone()); + } else if update_signature_period == store_period + 1 { self.store.current_sync_committee = self.store.next_sync_committee.as_ref().unwrap().clone(); - self.store.next_sync_committee = - Some(update.next_sync_committee.as_ref().unwrap().clone()); + self.store.next_sync_committee = Some(update.next_sync_committee.clone()); + } + + println!( + "applying update for slot: {}", + self.store.finalized_header.slot + ); + } + + fn apply_finality_update(&mut self, update: &FinalityUpdate) { + if self.store.finalized_header.slot != update.finalized_header.slot { + self.store.finalized_header = update.finalized_header.clone(); + self.store.previous_max_active_participants = + self.store.current_max_active_participants; + self.store.current_max_active_participants = + get_bits(&update.sync_aggregate.sync_committee_bits); + + println!( + "applying finality update for slot: {}", + self.store.finalized_header.slot + ); + } + + if self.store.finalized_header.slot > self.store.optimistic_header.slot { + self.store.optimistic_header = self.store.finalized_header.clone(); + } + } + + fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) { + let votes = get_bits(&update.sync_aggregate.sync_committee_bits); + if votes > self.store.current_max_active_participants { + self.store.current_max_active_participants = votes; + } + + let safety_theshhold = cmp::max( + self.store.current_max_active_participants, + self.store.previous_max_active_participants, + ) / 2; + + if votes > safety_theshhold + && update.attested_header.slot > self.store.optimistic_header.slot + { + self.store.optimistic_header = update.attested_header.clone(); + println!( + "applying optimistic update for slot: {}", + self.store.optimistic_header.slot + ); } } @@ -218,6 +358,17 @@ fn get_participating_keys( Ok(pks) } +fn get_bits(bitfield: &Bitvector<512>) -> u64 { + let mut count = 0; + bitfield.iter().for_each(|bit| { + if bit == true { + count += 1; + } + }); + + count +} + fn is_aggregate_valid(sig_bytes: &SignatureBytes, msg: &[u8], pks: &[&PublicKey]) -> bool { let dst: &[u8] = b"BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_"; let sig_res = Signature::from_bytes(&sig_bytes); diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index f1f16a3..7fa0ef4 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -41,6 +41,15 @@ impl Rpc { Ok(res.data) } + pub async fn get_optimistic_update(&self) -> Result { + let req = format!("{}/eth/v0/beacon/light_client/optimistic_update", self.rpc); + let res = reqwest::get(req) + .await? + .json::() + .await?; + Ok(res.data) + } + pub async fn get_block(&self, slot: u64) -> Result { let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot); let res = reqwest::get(req) @@ -71,6 +80,11 @@ struct FinalityUpdateResponse { data: FinalityUpdate, } +#[derive(serde::Deserialize, Debug)] +struct OptimisticUpdateResponse { + data: OptimisticUpdate, +} + #[derive(serde::Deserialize, Debug)] struct BootstrapResponse { data: BootstrapData, diff --git a/consensus/src/types.rs b/consensus/src/types.rs index 08e6c52..8f46cb3 100644 --- a/consensus/src/types.rs +++ b/consensus/src/types.rs @@ -130,7 +130,7 @@ pub struct Bootstrap { #[derive(serde::Deserialize, Debug, Clone)] pub struct Update { pub attested_header: Header, - pub next_sync_committee: Option, + pub next_sync_committee: SyncCommittee, #[serde(deserialize_with = "branch_deserialize")] pub next_sync_committee_branch: Vec, pub finalized_header: Header, @@ -152,6 +152,14 @@ pub struct FinalityUpdate { pub signature_slot: u64, } +#[derive(serde::Deserialize, Debug)] +pub struct OptimisticUpdate { + pub attested_header: Header, + pub sync_aggregate: SyncAggregate, + #[serde(deserialize_with = "u64_deserialize")] + pub signature_slot: u64, +} + #[derive(serde::Deserialize, Debug, Clone, Default, SimpleSerialize)] pub struct Header { #[serde(deserialize_with = "u64_deserialize")]