From aa71f4ac17c8408499e8f7c64de3661f5085610c Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Wed, 28 Sep 2022 16:48:24 -0400 Subject: [PATCH] refactor: core consensus (#61) * try to update next sync committee periodically * perform verification through generic updates * apply updates with generic update struct * better logging * fix checkpoint save * clean up * better update timing --- client/src/client.rs | 4 +- client/src/node.rs | 8 + config/src/networks.rs | 2 +- consensus/src/consensus.rs | 433 ++++++++++++++++++++----------------- consensus/src/types.rs | 52 +++++ 5 files changed, 296 insertions(+), 203 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index 18ce440..bceeca1 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{Transaction, TransactionReceipt, H256}; @@ -59,7 +58,8 @@ impl Client { warn!("{}", err); } - sleep(Duration::from_secs(10)).await; + let next_update = node.read().await.duration_until_next_update(); + sleep(next_update).await; } }); diff --git a/client/src/node.rs b/client/src/node.rs index 3bd38f7..ebf5549 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::sync::Arc; +use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{Transaction, TransactionReceipt, H256}; @@ -56,6 +57,13 @@ impl Node { self.update_payloads().await } + pub fn duration_until_next_update(&self) -> Duration { + self.consensus + .duration_until_next_update() + .to_std() + .unwrap() + } + async fn update_payloads(&mut self) -> Result<()> { let latest_header = self.consensus.get_header(); let latest_payload = self diff --git a/config/src/networks.rs b/config/src/networks.rs index 50e0004..47a1214 100644 --- a/config/src/networks.rs +++ b/config/src/networks.rs @@ -12,7 +12,7 @@ pub fn mainnet() -> Config { ) .unwrap(), checkpoint: hex_str_to_bytes( - "0x03e315e11b3f88cd63dfb62c74a313c4a65949ce9e37599e0ee66533ceceadfd", + "0x5ca31c7c795d8f2de2e844718cdb08835639c644365427b9f20f82083e7dac9a", ) .unwrap(), consensus_rpc: "http://testing.mainnet.beacon-api.nimbus.team".to_string(), diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 8841ad9..e7756f7 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -6,7 +6,7 @@ use blst::min_pk::{PublicKey, Signature}; use blst::BLST_ERROR; use chrono::Duration; use eyre::{eyre, Result}; -use log::info; +use log::{debug, info}; use ssz_rs::prelude::*; use common::types::*; @@ -134,256 +134,256 @@ impl ConsensusClient { self.verify_optimistic_update(&optimistic_update)?; self.apply_optimistic_update(&optimistic_update); + if self.store.next_sync_committee.is_none() { + debug!("checking for sync committee update"); + let current_period = calc_sync_period(self.store.finalized_header.slot); + let mut updates = self.rpc.get_updates(current_period).await?; + + if updates.len() == 1 { + let mut update = updates.get_mut(0).unwrap(); + let res = self.verify_update(&mut update); + + if res.is_ok() { + info!("updating sync committee"); + self.apply_update(&update); + } + } + } + Ok(()) } - fn verify_update(&mut self, update: &mut Update) -> Result<()> { + fn verify_generic_update(&self, update: &GenericUpdate) -> Result<()> { + let bits = get_bits(&update.sync_aggregate.sync_committee_bits); + if bits == 0 { + return Err(eyre!("Insufficient Participation")); + } + + let update_finalized_slot = update.finalized_header.clone().unwrap_or_default().slot; + let valid_time = self.current_slot() >= update.signature_slot + && update.signature_slot > update.attested_header.slot + && update.attested_header.slot >= update_finalized_slot; + + if !valid_time { + return Err(eyre!("Invalid Timestamp")); + } + let store_period = calc_sync_period(self.store.finalized_header.slot); - let update_signature_period = calc_sync_period(update.signature_slot); + let update_sig_period = calc_sync_period(update.signature_slot); + let valid_period = if self.store.next_sync_committee.is_some() { + update_sig_period == store_period || update_sig_period == store_period + 1 + } else { + update_sig_period == store_period + }; - if !(update_signature_period == store_period + 1 || update_signature_period == store_period) + if !valid_period { + return Err(eyre!("Invalid Period")); + } + + let update_attested_period = calc_sync_period(update.attested_header.slot); + let update_has_next_committee = self.store.next_sync_committee.is_none() + && update.next_sync_committee.is_some() + && update_attested_period == store_period; + + if update.attested_header.slot <= self.store.finalized_header.slot + && !update_has_next_committee { - return Err(eyre!("Invalid Update")); + return Err(eyre!("Update Not Relevent")); } - if !(update.signature_slot > update.attested_header.slot - && update.attested_header.slot > update.finalized_header.slot) - { - return Err(eyre!("Invalid Update")); + if update.finalized_header.is_some() && update.finality_branch.is_some() { + let is_valid = is_finality_proof_valid( + &update.attested_header, + &mut update.finalized_header.clone().unwrap(), + &update.finality_branch.clone().unwrap(), + ); + + if !is_valid { + return Err(eyre!("Invalid Finality Proof")); + } } - let finality_branch_valid = is_finality_proof_valid( - &update.attested_header, - &mut update.finalized_header, - &update.finality_branch, - ); + if update.next_sync_committee.is_some() && update.next_sync_committee_branch.is_some() { + let is_valid = is_next_committee_proof_valid( + &update.attested_header, + &mut update.next_sync_committee.clone().unwrap(), + &update.next_sync_committee_branch.clone().unwrap(), + ); - if !(finality_branch_valid) { - return Err(eyre!("Invalid Update")); + if !is_valid { + return Err(eyre!("Invalid Next Sync Committee Proof")); + } } - let next_committee_branch_valid = is_next_committee_proof_valid( - &update.attested_header, - &mut update.next_sync_committee, - &update.next_sync_committee_branch, - ); - - if !next_committee_branch_valid { - return Err(eyre!("Invalid Update")); - } - - let sync_committee = if store_period == update_signature_period { + let sync_committee = if update_sig_period == store_period { &self.store.current_sync_committee } else { self.store.next_sync_committee.as_ref().unwrap() }; let pks = - get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; - + get_participating_keys(sync_committee, &update.sync_aggregate.sync_committee_bits)?; let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); - let committee_quorum = pks.len() > 1; - if !committee_quorum { - return Err(eyre!("Invalid Update")); - } - - let header_root = bytes_to_bytes32(update.attested_header.hash_tree_root()?.as_bytes()); + let header_root = + bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes()); let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?; let sig = &update.sync_aggregate.sync_committee_signature; let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks); if !is_valid_sig { - return Err(eyre!("Invalid Update")); + return Err(eyre!("Invalid Signature")); } Ok(()) } + fn verify_update(&self, update: &Update) -> Result<()> { + let update = GenericUpdate::from(update); + self.verify_generic_update(&update) + } + fn verify_finality_update(&self, update: &FinalityUpdate) -> Result<()> { - let store_period = calc_sync_period(self.store.finalized_header.slot); - let update_signature_period = calc_sync_period(update.signature_slot); - - if !(update_signature_period == store_period + 1 || update_signature_period == store_period) - { - return Err(eyre!("Invalid Update")); - } - - if !(update.signature_slot > update.attested_header.slot - && update.attested_header.slot > update.finalized_header.slot) - { - return Err(eyre!("Invalid Update")); - } - - let finality_branch_valid = is_finality_proof_valid( - &update.attested_header, - &mut update.finalized_header.clone(), - &update.finality_branch, - ); - - if !(finality_branch_valid) { - return Err(eyre!("Invalid Update")); - } - - let sync_committee = &self.store.current_sync_committee; - - let pks = - get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; - - let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); - - let committee_quorum = pks.len() > 1; - if !committee_quorum { - return Err(eyre!("Invalid Update")); - } - - let header_root = - bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes()); - let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?; - let sig = &update.sync_aggregate.sync_committee_signature; - let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks); - - if !is_valid_sig { - return Err(eyre!("Invalid Update")); - } - - Ok(()) + let update = GenericUpdate::from(update); + self.verify_generic_update(&update) } fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> { - let store_period = calc_sync_period(self.store.finalized_header.slot); - let update_signature_period = calc_sync_period(update.signature_slot); + let update = GenericUpdate::from(update); + self.verify_generic_update(&update) + } - if !(update_signature_period == store_period + 1 || update_signature_period == store_period) - { - return Err(eyre!("Invalid Update")); + fn apply_generic_update(&mut self, update: &GenericUpdate) { + let committee_bits = get_bits(&update.sync_aggregate.sync_committee_bits); + + self.store.current_max_active_participants = + u64::max(self.store.current_max_active_participants, committee_bits); + + let should_update_optimistic = committee_bits > self.safety_theshhold() + && update.attested_header.slot > self.store.optimistic_header.slot; + + if should_update_optimistic { + self.store.optimistic_header = update.attested_header.clone(); + self.log_optimistic_update(update); } - if !(update.signature_slot > update.attested_header.slot) { - return Err(eyre!("Invalid Update")); + let update_attested_period = calc_sync_period(update.attested_header.slot); + + let update_finalized_slot = update + .finalized_header + .as_ref() + .map(|h| h.slot) + .unwrap_or(0); + + let update_finalized_period = calc_sync_period(update_finalized_slot); + + let update_has_finalized_next_committee = self.store.next_sync_committee.is_none() + && self.has_sync_update(update) + && self.has_finality_update(update) + && update_finalized_period == update_attested_period; + + let should_apply_update = { + let has_majority = committee_bits * 3 >= 512 * 2; + let update_is_newer = update_finalized_slot > self.store.finalized_header.slot; + let good_update = update_is_newer || update_has_finalized_next_committee; + + has_majority && good_update + }; + + if should_apply_update { + let store_period = calc_sync_period(self.store.finalized_header.slot); + + if self.store.next_sync_committee.is_none() { + self.store.next_sync_committee = update.next_sync_committee.clone(); + } else if update_finalized_period == store_period + 1 { + info!("sync committee updated"); + self.store.current_sync_committee = self.store.next_sync_committee.clone().unwrap(); + self.store.next_sync_committee = update.next_sync_committee.clone(); + self.store.previous_max_active_participants = + self.store.current_max_active_participants; + self.store.current_max_active_participants = 0; + } + + if update_finalized_slot > self.store.finalized_header.slot { + self.store.finalized_header = update.finalized_header.clone().unwrap(); + self.log_finality_update(update); + + if self.store.finalized_header.slot % 32 == 0 { + let checkpoint_res = self.store.finalized_header.hash_tree_root(); + if let Ok(checkpoint) = checkpoint_res { + self.last_checkpoint = Some(checkpoint.as_bytes().to_vec()); + } + } + + if self.store.finalized_header.slot > self.store.optimistic_header.slot { + self.store.optimistic_header = self.store.finalized_header.clone(); + } + } } - - let sync_committee = &self.store.current_sync_committee; - - let pks = - get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?; - - let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); - - let committee_quorum = pks.len() > 1; - if !committee_quorum { - return Err(eyre!("Invalid Update")); - } - - let header_root = - bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes()); - let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?; - let sig = &update.sync_aggregate.sync_committee_signature; - let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks); - - if !is_valid_sig { - return Err(eyre!("Invalid Update")); - } - - Ok(()) } fn apply_update(&mut self, update: &Update) { - let store_period = calc_sync_period(self.store.finalized_header.slot); - let update_signature_period = calc_sync_period(update.signature_slot); - - self.store.finalized_header = update.finalized_header.clone(); - - if update.finalized_header.slot % 32 == 0 { - let n = update.finalized_header.clone().hash_tree_root().unwrap(); - let checkpoint = n.as_bytes().to_vec(); - self.last_checkpoint = Some(checkpoint); - } - - if self.store.next_sync_committee.is_none() { - self.store.next_sync_committee = Some(update.next_sync_committee.clone()); - } else if update_signature_period == store_period + 1 { - self.store.current_sync_committee = - self.store.next_sync_committee.as_ref().unwrap().clone(); - self.store.next_sync_committee = Some(update.next_sync_committee.clone()); - } - - let participation = - get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; - let delay = self.get_delay(self.store.finalized_header.slot); - - info!( - "applying update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}", - self.store.finalized_header.slot, - participation, - delay.num_hours(), - delay.num_minutes(), - delay.num_seconds(), - ); + let update = GenericUpdate::from(update); + self.apply_generic_update(&update); } fn apply_finality_update(&mut self, update: &FinalityUpdate) { - if self.store.finalized_header.slot != update.finalized_header.slot { - self.store.finalized_header = update.finalized_header.clone(); - self.store.previous_max_active_participants = - self.store.current_max_active_participants; - self.store.current_max_active_participants = - get_bits(&update.sync_aggregate.sync_committee_bits); + let update = GenericUpdate::from(update); + self.apply_generic_update(&update); + } - let participation = - get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; - let delay = self.get_delay(self.store.finalized_header.slot); + fn log_finality_update(&self, update: &GenericUpdate) { + let participation = + get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; + let decimals = if participation == 100.0 { 1 } else { 2 }; + let age = self.age(self.store.finalized_header.slot); - if update.finalized_header.slot % 32 == 0 { - let n = update.finalized_header.clone().hash_tree_root().unwrap(); - let checkpoint = n.as_bytes().to_vec(); - self.last_checkpoint = Some(checkpoint); - } - - info!( - "applying finality update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}", - self.store.finalized_header.slot, - participation, - delay.num_hours(), - delay.num_minutes(), - delay.num_seconds(), - ); - } - - if self.store.finalized_header.slot > self.store.optimistic_header.slot { - self.store.optimistic_header = self.store.finalized_header.clone(); - } + info!( + "finalized slot slot={} confidence={:.decimals$}% age={:02}:{:02}:{:02}:{:02}", + self.store.finalized_header.slot, + participation, + age.num_days(), + age.num_hours() % 24, + age.num_minutes() % 60, + age.num_seconds() % 60, + ); } fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) { - let votes = get_bits(&update.sync_aggregate.sync_committee_bits); - if votes > self.store.current_max_active_participants { - self.store.current_max_active_participants = votes; - } + let update = GenericUpdate::from(update); + self.apply_generic_update(&update); + } - let safety_theshhold = cmp::max( + fn log_optimistic_update(&self, update: &GenericUpdate) { + let participation = + get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; + let decimals = if participation == 100.0 { 1 } else { 2 }; + let age = self.age(self.store.optimistic_header.slot); + + info!( + "updated head slot={} confidence={:.decimals$}% age={:02}:{:02}:{:02}:{:02}", + self.store.optimistic_header.slot, + participation, + age.num_days(), + age.num_hours() % 24, + age.num_minutes() % 60, + age.num_seconds() % 60, + ); + } + + fn has_finality_update(&self, update: &GenericUpdate) -> bool { + update.finalized_header.is_some() && update.finality_branch.is_some() + } + + fn has_sync_update(&self, update: &GenericUpdate) -> bool { + update.next_sync_committee.is_some() && update.next_sync_committee_branch.is_some() + } + + fn safety_theshhold(&self) -> u64 { + cmp::max( self.store.current_max_active_participants, self.store.previous_max_active_participants, - ) / 2; - - if votes > safety_theshhold - && update.attested_header.slot > self.store.optimistic_header.slot - { - self.store.optimistic_header = update.attested_header.clone(); - - let participation = - get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32; - let delay = self.get_delay(update.attested_header.slot); - - info!( - "applying optimistic update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}", - self.store.optimistic_header.slot, - participation, - delay.num_hours(), - delay.num_minutes(), - delay.num_seconds(), - ); - } + ) / 2 } fn compute_committee_sign_root(&self, header: Bytes32, slot: u64) -> Result { @@ -401,14 +401,47 @@ impl ConsensusClient { compute_signing_root(header, domain) } - fn get_delay(&self, slot: u64) -> Duration { - let expected_time = slot * 12 + self.config.general.genesis_time; + fn age(&self, slot: u64) -> Duration { + let expected_time = self.slot_timestamp(slot); let now = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap(); let delay = now - std::time::Duration::from_secs(expected_time); chrono::Duration::from_std(delay).unwrap() } + + fn current_slot(&self) -> u64 { + let now = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap(); + + let genesis_time = self.config.general.genesis_time; + let since_genesis = now - std::time::Duration::from_secs(genesis_time); + + since_genesis.as_secs() / 12 + } + + fn slot_timestamp(&self, slot: u64) -> u64 { + slot * 12 + self.config.general.genesis_time + } + + /// Gets the duration until the next update + /// Updates are scheduled for 4 seconds into each slot + pub fn duration_until_next_update(&self) -> Duration { + let current_slot = self.current_slot(); + let next_slot = current_slot + 1; + let next_slot_timestamp = self.slot_timestamp(next_slot); + + let now = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let time_to_next_slot = next_slot_timestamp - now; + let next_update = time_to_next_slot + 4; + + Duration::seconds(next_update as i64) + } } fn get_participating_keys( diff --git a/consensus/src/types.rs b/consensus/src/types.rs index afeae56..a4e534f 100644 --- a/consensus/src/types.rs +++ b/consensus/src/types.rs @@ -256,6 +256,58 @@ pub struct SyncAggregate { pub sync_committee_signature: SignatureBytes, } +pub struct GenericUpdate { + pub attested_header: Header, + pub sync_aggregate: SyncAggregate, + pub signature_slot: u64, + pub next_sync_committee: Option, + pub next_sync_committee_branch: Option>, + pub finalized_header: Option
, + pub finality_branch: Option>, +} + +impl From<&Update> for GenericUpdate { + fn from(update: &Update) -> Self { + Self { + attested_header: update.attested_header.clone(), + sync_aggregate: update.sync_aggregate.clone(), + signature_slot: update.signature_slot.clone(), + next_sync_committee: Some(update.next_sync_committee.clone()), + next_sync_committee_branch: Some(update.next_sync_committee_branch.clone()), + finalized_header: Some(update.finalized_header.clone()), + finality_branch: Some(update.finality_branch.clone()), + } + } +} + +impl From<&FinalityUpdate> for GenericUpdate { + fn from(update: &FinalityUpdate) -> Self { + Self { + attested_header: update.attested_header.clone(), + sync_aggregate: update.sync_aggregate.clone(), + signature_slot: update.signature_slot.clone(), + next_sync_committee: None, + next_sync_committee_branch: None, + finalized_header: Some(update.finalized_header.clone()), + finality_branch: Some(update.finality_branch.clone()), + } + } +} + +impl From<&OptimisticUpdate> for GenericUpdate { + fn from(update: &OptimisticUpdate) -> Self { + Self { + attested_header: update.attested_header.clone(), + sync_aggregate: update.sync_aggregate.clone(), + signature_slot: update.signature_slot.clone(), + next_sync_committee: None, + next_sync_committee_branch: None, + finalized_header: None, + finality_branch: None, + } + } +} + fn pubkey_deserialize<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>,