refactor: core consensus (#61)
* try to update next sync committee periodically * perform verification through generic updates * apply updates with generic update struct * better logging * fix checkpoint save * clean up * better update timing
This commit is contained in:
parent
c57c866c17
commit
aa71f4ac17
|
@ -1,5 +1,4 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use ethers::prelude::{Address, U256};
|
use ethers::prelude::{Address, U256};
|
||||||
use ethers::types::{Transaction, TransactionReceipt, H256};
|
use ethers::types::{Transaction, TransactionReceipt, H256};
|
||||||
|
@ -59,7 +58,8 @@ impl<DB: Database> Client<DB> {
|
||||||
warn!("{}", err);
|
warn!("{}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(Duration::from_secs(10)).await;
|
let next_update = node.read().await.duration_until_next_update();
|
||||||
|
sleep(next_update).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use ethers::prelude::{Address, U256};
|
use ethers::prelude::{Address, U256};
|
||||||
use ethers::types::{Transaction, TransactionReceipt, H256};
|
use ethers::types::{Transaction, TransactionReceipt, H256};
|
||||||
|
@ -56,6 +57,13 @@ impl Node {
|
||||||
self.update_payloads().await
|
self.update_payloads().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn duration_until_next_update(&self) -> Duration {
|
||||||
|
self.consensus
|
||||||
|
.duration_until_next_update()
|
||||||
|
.to_std()
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
async fn update_payloads(&mut self) -> Result<()> {
|
async fn update_payloads(&mut self) -> Result<()> {
|
||||||
let latest_header = self.consensus.get_header();
|
let latest_header = self.consensus.get_header();
|
||||||
let latest_payload = self
|
let latest_payload = self
|
||||||
|
|
|
@ -12,7 +12,7 @@ pub fn mainnet() -> Config {
|
||||||
)
|
)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
checkpoint: hex_str_to_bytes(
|
checkpoint: hex_str_to_bytes(
|
||||||
"0x03e315e11b3f88cd63dfb62c74a313c4a65949ce9e37599e0ee66533ceceadfd",
|
"0x5ca31c7c795d8f2de2e844718cdb08835639c644365427b9f20f82083e7dac9a",
|
||||||
)
|
)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
consensus_rpc: "http://testing.mainnet.beacon-api.nimbus.team".to_string(),
|
consensus_rpc: "http://testing.mainnet.beacon-api.nimbus.team".to_string(),
|
||||||
|
|
|
@ -6,7 +6,7 @@ use blst::min_pk::{PublicKey, Signature};
|
||||||
use blst::BLST_ERROR;
|
use blst::BLST_ERROR;
|
||||||
use chrono::Duration;
|
use chrono::Duration;
|
||||||
use eyre::{eyre, Result};
|
use eyre::{eyre, Result};
|
||||||
use log::info;
|
use log::{debug, info};
|
||||||
use ssz_rs::prelude::*;
|
use ssz_rs::prelude::*;
|
||||||
|
|
||||||
use common::types::*;
|
use common::types::*;
|
||||||
|
@ -134,256 +134,256 @@ impl<R: Rpc> ConsensusClient<R> {
|
||||||
self.verify_optimistic_update(&optimistic_update)?;
|
self.verify_optimistic_update(&optimistic_update)?;
|
||||||
self.apply_optimistic_update(&optimistic_update);
|
self.apply_optimistic_update(&optimistic_update);
|
||||||
|
|
||||||
|
if self.store.next_sync_committee.is_none() {
|
||||||
|
debug!("checking for sync committee update");
|
||||||
|
let current_period = calc_sync_period(self.store.finalized_header.slot);
|
||||||
|
let mut updates = self.rpc.get_updates(current_period).await?;
|
||||||
|
|
||||||
|
if updates.len() == 1 {
|
||||||
|
let mut update = updates.get_mut(0).unwrap();
|
||||||
|
let res = self.verify_update(&mut update);
|
||||||
|
|
||||||
|
if res.is_ok() {
|
||||||
|
info!("updating sync committee");
|
||||||
|
self.apply_update(&update);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_update(&mut self, update: &mut Update) -> Result<()> {
|
fn verify_generic_update(&self, update: &GenericUpdate) -> Result<()> {
|
||||||
|
let bits = get_bits(&update.sync_aggregate.sync_committee_bits);
|
||||||
|
if bits == 0 {
|
||||||
|
return Err(eyre!("Insufficient Participation"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let update_finalized_slot = update.finalized_header.clone().unwrap_or_default().slot;
|
||||||
|
let valid_time = self.current_slot() >= update.signature_slot
|
||||||
|
&& update.signature_slot > update.attested_header.slot
|
||||||
|
&& update.attested_header.slot >= update_finalized_slot;
|
||||||
|
|
||||||
|
if !valid_time {
|
||||||
|
return Err(eyre!("Invalid Timestamp"));
|
||||||
|
}
|
||||||
|
|
||||||
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
||||||
let update_signature_period = calc_sync_period(update.signature_slot);
|
let update_sig_period = calc_sync_period(update.signature_slot);
|
||||||
|
let valid_period = if self.store.next_sync_committee.is_some() {
|
||||||
|
update_sig_period == store_period || update_sig_period == store_period + 1
|
||||||
|
} else {
|
||||||
|
update_sig_period == store_period
|
||||||
|
};
|
||||||
|
|
||||||
if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
|
if !valid_period {
|
||||||
{
|
return Err(eyre!("Invalid Period"));
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !(update.signature_slot > update.attested_header.slot
|
let update_attested_period = calc_sync_period(update.attested_header.slot);
|
||||||
&& update.attested_header.slot > update.finalized_header.slot)
|
let update_has_next_committee = self.store.next_sync_committee.is_none()
|
||||||
|
&& update.next_sync_committee.is_some()
|
||||||
|
&& update_attested_period == store_period;
|
||||||
|
|
||||||
|
if update.attested_header.slot <= self.store.finalized_header.slot
|
||||||
|
&& !update_has_next_committee
|
||||||
{
|
{
|
||||||
return Err(eyre!("Invalid Update"));
|
return Err(eyre!("Update Not Relevent"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let finality_branch_valid = is_finality_proof_valid(
|
if update.finalized_header.is_some() && update.finality_branch.is_some() {
|
||||||
|
let is_valid = is_finality_proof_valid(
|
||||||
&update.attested_header,
|
&update.attested_header,
|
||||||
&mut update.finalized_header,
|
&mut update.finalized_header.clone().unwrap(),
|
||||||
&update.finality_branch,
|
&update.finality_branch.clone().unwrap(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if !(finality_branch_valid) {
|
if !is_valid {
|
||||||
return Err(eyre!("Invalid Update"));
|
return Err(eyre!("Invalid Finality Proof"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let next_committee_branch_valid = is_next_committee_proof_valid(
|
if update.next_sync_committee.is_some() && update.next_sync_committee_branch.is_some() {
|
||||||
|
let is_valid = is_next_committee_proof_valid(
|
||||||
&update.attested_header,
|
&update.attested_header,
|
||||||
&mut update.next_sync_committee,
|
&mut update.next_sync_committee.clone().unwrap(),
|
||||||
&update.next_sync_committee_branch,
|
&update.next_sync_committee_branch.clone().unwrap(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if !next_committee_branch_valid {
|
if !is_valid {
|
||||||
return Err(eyre!("Invalid Update"));
|
return Err(eyre!("Invalid Next Sync Committee Proof"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let sync_committee = if store_period == update_signature_period {
|
let sync_committee = if update_sig_period == store_period {
|
||||||
&self.store.current_sync_committee
|
&self.store.current_sync_committee
|
||||||
} else {
|
} else {
|
||||||
self.store.next_sync_committee.as_ref().unwrap()
|
self.store.next_sync_committee.as_ref().unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
let pks =
|
let pks =
|
||||||
get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
|
get_participating_keys(sync_committee, &update.sync_aggregate.sync_committee_bits)?;
|
||||||
|
|
||||||
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
|
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
|
||||||
|
|
||||||
let committee_quorum = pks.len() > 1;
|
let header_root =
|
||||||
if !committee_quorum {
|
bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes());
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let header_root = bytes_to_bytes32(update.attested_header.hash_tree_root()?.as_bytes());
|
|
||||||
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
|
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
|
||||||
let sig = &update.sync_aggregate.sync_committee_signature;
|
let sig = &update.sync_aggregate.sync_committee_signature;
|
||||||
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
|
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
|
||||||
|
|
||||||
if !is_valid_sig {
|
if !is_valid_sig {
|
||||||
return Err(eyre!("Invalid Update"));
|
return Err(eyre!("Invalid Signature"));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn verify_update(&self, update: &Update) -> Result<()> {
|
||||||
|
let update = GenericUpdate::from(update);
|
||||||
|
self.verify_generic_update(&update)
|
||||||
|
}
|
||||||
|
|
||||||
fn verify_finality_update(&self, update: &FinalityUpdate) -> Result<()> {
|
fn verify_finality_update(&self, update: &FinalityUpdate) -> Result<()> {
|
||||||
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
let update = GenericUpdate::from(update);
|
||||||
let update_signature_period = calc_sync_period(update.signature_slot);
|
self.verify_generic_update(&update)
|
||||||
|
|
||||||
if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
|
|
||||||
{
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
if !(update.signature_slot > update.attested_header.slot
|
|
||||||
&& update.attested_header.slot > update.finalized_header.slot)
|
|
||||||
{
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let finality_branch_valid = is_finality_proof_valid(
|
|
||||||
&update.attested_header,
|
|
||||||
&mut update.finalized_header.clone(),
|
|
||||||
&update.finality_branch,
|
|
||||||
);
|
|
||||||
|
|
||||||
if !(finality_branch_valid) {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let sync_committee = &self.store.current_sync_committee;
|
|
||||||
|
|
||||||
let pks =
|
|
||||||
get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
|
|
||||||
|
|
||||||
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
|
|
||||||
|
|
||||||
let committee_quorum = pks.len() > 1;
|
|
||||||
if !committee_quorum {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let header_root =
|
|
||||||
bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes());
|
|
||||||
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
|
|
||||||
let sig = &update.sync_aggregate.sync_committee_signature;
|
|
||||||
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
|
|
||||||
|
|
||||||
if !is_valid_sig {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> {
|
fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> {
|
||||||
|
let update = GenericUpdate::from(update);
|
||||||
|
self.verify_generic_update(&update)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_generic_update(&mut self, update: &GenericUpdate) {
|
||||||
|
let committee_bits = get_bits(&update.sync_aggregate.sync_committee_bits);
|
||||||
|
|
||||||
|
self.store.current_max_active_participants =
|
||||||
|
u64::max(self.store.current_max_active_participants, committee_bits);
|
||||||
|
|
||||||
|
let should_update_optimistic = committee_bits > self.safety_theshhold()
|
||||||
|
&& update.attested_header.slot > self.store.optimistic_header.slot;
|
||||||
|
|
||||||
|
if should_update_optimistic {
|
||||||
|
self.store.optimistic_header = update.attested_header.clone();
|
||||||
|
self.log_optimistic_update(update);
|
||||||
|
}
|
||||||
|
|
||||||
|
let update_attested_period = calc_sync_period(update.attested_header.slot);
|
||||||
|
|
||||||
|
let update_finalized_slot = update
|
||||||
|
.finalized_header
|
||||||
|
.as_ref()
|
||||||
|
.map(|h| h.slot)
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let update_finalized_period = calc_sync_period(update_finalized_slot);
|
||||||
|
|
||||||
|
let update_has_finalized_next_committee = self.store.next_sync_committee.is_none()
|
||||||
|
&& self.has_sync_update(update)
|
||||||
|
&& self.has_finality_update(update)
|
||||||
|
&& update_finalized_period == update_attested_period;
|
||||||
|
|
||||||
|
let should_apply_update = {
|
||||||
|
let has_majority = committee_bits * 3 >= 512 * 2;
|
||||||
|
let update_is_newer = update_finalized_slot > self.store.finalized_header.slot;
|
||||||
|
let good_update = update_is_newer || update_has_finalized_next_committee;
|
||||||
|
|
||||||
|
has_majority && good_update
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_apply_update {
|
||||||
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
||||||
let update_signature_period = calc_sync_period(update.signature_slot);
|
|
||||||
|
|
||||||
if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
|
|
||||||
{
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
if !(update.signature_slot > update.attested_header.slot) {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let sync_committee = &self.store.current_sync_committee;
|
|
||||||
|
|
||||||
let pks =
|
|
||||||
get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
|
|
||||||
|
|
||||||
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
|
|
||||||
|
|
||||||
let committee_quorum = pks.len() > 1;
|
|
||||||
if !committee_quorum {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let header_root =
|
|
||||||
bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes());
|
|
||||||
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
|
|
||||||
let sig = &update.sync_aggregate.sync_committee_signature;
|
|
||||||
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
|
|
||||||
|
|
||||||
if !is_valid_sig {
|
|
||||||
return Err(eyre!("Invalid Update"));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn apply_update(&mut self, update: &Update) {
|
|
||||||
let store_period = calc_sync_period(self.store.finalized_header.slot);
|
|
||||||
let update_signature_period = calc_sync_period(update.signature_slot);
|
|
||||||
|
|
||||||
self.store.finalized_header = update.finalized_header.clone();
|
|
||||||
|
|
||||||
if update.finalized_header.slot % 32 == 0 {
|
|
||||||
let n = update.finalized_header.clone().hash_tree_root().unwrap();
|
|
||||||
let checkpoint = n.as_bytes().to_vec();
|
|
||||||
self.last_checkpoint = Some(checkpoint);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.store.next_sync_committee.is_none() {
|
if self.store.next_sync_committee.is_none() {
|
||||||
self.store.next_sync_committee = Some(update.next_sync_committee.clone());
|
self.store.next_sync_committee = update.next_sync_committee.clone();
|
||||||
} else if update_signature_period == store_period + 1 {
|
} else if update_finalized_period == store_period + 1 {
|
||||||
self.store.current_sync_committee =
|
info!("sync committee updated");
|
||||||
self.store.next_sync_committee.as_ref().unwrap().clone();
|
self.store.current_sync_committee = self.store.next_sync_committee.clone().unwrap();
|
||||||
self.store.next_sync_committee = Some(update.next_sync_committee.clone());
|
self.store.next_sync_committee = update.next_sync_committee.clone();
|
||||||
}
|
|
||||||
|
|
||||||
let participation =
|
|
||||||
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
|
|
||||||
let delay = self.get_delay(self.store.finalized_header.slot);
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"applying update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}",
|
|
||||||
self.store.finalized_header.slot,
|
|
||||||
participation,
|
|
||||||
delay.num_hours(),
|
|
||||||
delay.num_minutes(),
|
|
||||||
delay.num_seconds(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn apply_finality_update(&mut self, update: &FinalityUpdate) {
|
|
||||||
if self.store.finalized_header.slot != update.finalized_header.slot {
|
|
||||||
self.store.finalized_header = update.finalized_header.clone();
|
|
||||||
self.store.previous_max_active_participants =
|
self.store.previous_max_active_participants =
|
||||||
self.store.current_max_active_participants;
|
self.store.current_max_active_participants;
|
||||||
self.store.current_max_active_participants =
|
self.store.current_max_active_participants = 0;
|
||||||
get_bits(&update.sync_aggregate.sync_committee_bits);
|
|
||||||
|
|
||||||
let participation =
|
|
||||||
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
|
|
||||||
let delay = self.get_delay(self.store.finalized_header.slot);
|
|
||||||
|
|
||||||
if update.finalized_header.slot % 32 == 0 {
|
|
||||||
let n = update.finalized_header.clone().hash_tree_root().unwrap();
|
|
||||||
let checkpoint = n.as_bytes().to_vec();
|
|
||||||
self.last_checkpoint = Some(checkpoint);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
if update_finalized_slot > self.store.finalized_header.slot {
|
||||||
"applying finality update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}",
|
self.store.finalized_header = update.finalized_header.clone().unwrap();
|
||||||
self.store.finalized_header.slot,
|
self.log_finality_update(update);
|
||||||
participation,
|
|
||||||
delay.num_hours(),
|
if self.store.finalized_header.slot % 32 == 0 {
|
||||||
delay.num_minutes(),
|
let checkpoint_res = self.store.finalized_header.hash_tree_root();
|
||||||
delay.num_seconds(),
|
if let Ok(checkpoint) = checkpoint_res {
|
||||||
);
|
self.last_checkpoint = Some(checkpoint.as_bytes().to_vec());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.store.finalized_header.slot > self.store.optimistic_header.slot {
|
if self.store.finalized_header.slot > self.store.optimistic_header.slot {
|
||||||
self.store.optimistic_header = self.store.finalized_header.clone();
|
self.store.optimistic_header = self.store.finalized_header.clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) {
|
|
||||||
let votes = get_bits(&update.sync_aggregate.sync_committee_bits);
|
|
||||||
if votes > self.store.current_max_active_participants {
|
|
||||||
self.store.current_max_active_participants = votes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let safety_theshhold = cmp::max(
|
fn apply_update(&mut self, update: &Update) {
|
||||||
self.store.current_max_active_participants,
|
let update = GenericUpdate::from(update);
|
||||||
self.store.previous_max_active_participants,
|
self.apply_generic_update(&update);
|
||||||
) / 2;
|
}
|
||||||
|
|
||||||
if votes > safety_theshhold
|
fn apply_finality_update(&mut self, update: &FinalityUpdate) {
|
||||||
&& update.attested_header.slot > self.store.optimistic_header.slot
|
let update = GenericUpdate::from(update);
|
||||||
{
|
self.apply_generic_update(&update);
|
||||||
self.store.optimistic_header = update.attested_header.clone();
|
}
|
||||||
|
|
||||||
|
fn log_finality_update(&self, update: &GenericUpdate) {
|
||||||
let participation =
|
let participation =
|
||||||
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
|
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
|
||||||
let delay = self.get_delay(update.attested_header.slot);
|
let decimals = if participation == 100.0 { 1 } else { 2 };
|
||||||
|
let age = self.age(self.store.finalized_header.slot);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"applying optimistic update slot={} confidence={:.2}% delay={:02}:{:02}:{:02}",
|
"finalized slot slot={} confidence={:.decimals$}% age={:02}:{:02}:{:02}:{:02}",
|
||||||
self.store.optimistic_header.slot,
|
self.store.finalized_header.slot,
|
||||||
participation,
|
participation,
|
||||||
delay.num_hours(),
|
age.num_days(),
|
||||||
delay.num_minutes(),
|
age.num_hours() % 24,
|
||||||
delay.num_seconds(),
|
age.num_minutes() % 60,
|
||||||
|
age.num_seconds() % 60,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) {
|
||||||
|
let update = GenericUpdate::from(update);
|
||||||
|
self.apply_generic_update(&update);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn log_optimistic_update(&self, update: &GenericUpdate) {
|
||||||
|
let participation =
|
||||||
|
get_bits(&update.sync_aggregate.sync_committee_bits) as f32 / 512_f32 * 100f32;
|
||||||
|
let decimals = if participation == 100.0 { 1 } else { 2 };
|
||||||
|
let age = self.age(self.store.optimistic_header.slot);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"updated head slot={} confidence={:.decimals$}% age={:02}:{:02}:{:02}:{:02}",
|
||||||
|
self.store.optimistic_header.slot,
|
||||||
|
participation,
|
||||||
|
age.num_days(),
|
||||||
|
age.num_hours() % 24,
|
||||||
|
age.num_minutes() % 60,
|
||||||
|
age.num_seconds() % 60,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_finality_update(&self, update: &GenericUpdate) -> bool {
|
||||||
|
update.finalized_header.is_some() && update.finality_branch.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_sync_update(&self, update: &GenericUpdate) -> bool {
|
||||||
|
update.next_sync_committee.is_some() && update.next_sync_committee_branch.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn safety_theshhold(&self) -> u64 {
|
||||||
|
cmp::max(
|
||||||
|
self.store.current_max_active_participants,
|
||||||
|
self.store.previous_max_active_participants,
|
||||||
|
) / 2
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compute_committee_sign_root(&self, header: Bytes32, slot: u64) -> Result<Node> {
|
fn compute_committee_sign_root(&self, header: Bytes32, slot: u64) -> Result<Node> {
|
||||||
|
@ -401,14 +401,47 @@ impl<R: Rpc> ConsensusClient<R> {
|
||||||
compute_signing_root(header, domain)
|
compute_signing_root(header, domain)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_delay(&self, slot: u64) -> Duration {
|
fn age(&self, slot: u64) -> Duration {
|
||||||
let expected_time = slot * 12 + self.config.general.genesis_time;
|
let expected_time = self.slot_timestamp(slot);
|
||||||
let now = std::time::SystemTime::now()
|
let now = std::time::SystemTime::now()
|
||||||
.duration_since(UNIX_EPOCH)
|
.duration_since(UNIX_EPOCH)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let delay = now - std::time::Duration::from_secs(expected_time);
|
let delay = now - std::time::Duration::from_secs(expected_time);
|
||||||
chrono::Duration::from_std(delay).unwrap()
|
chrono::Duration::from_std(delay).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn current_slot(&self) -> u64 {
|
||||||
|
let now = std::time::SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let genesis_time = self.config.general.genesis_time;
|
||||||
|
let since_genesis = now - std::time::Duration::from_secs(genesis_time);
|
||||||
|
|
||||||
|
since_genesis.as_secs() / 12
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slot_timestamp(&self, slot: u64) -> u64 {
|
||||||
|
slot * 12 + self.config.general.genesis_time
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the duration until the next update
|
||||||
|
/// Updates are scheduled for 4 seconds into each slot
|
||||||
|
pub fn duration_until_next_update(&self) -> Duration {
|
||||||
|
let current_slot = self.current_slot();
|
||||||
|
let next_slot = current_slot + 1;
|
||||||
|
let next_slot_timestamp = self.slot_timestamp(next_slot);
|
||||||
|
|
||||||
|
let now = std::time::SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
let time_to_next_slot = next_slot_timestamp - now;
|
||||||
|
let next_update = time_to_next_slot + 4;
|
||||||
|
|
||||||
|
Duration::seconds(next_update as i64)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_participating_keys(
|
fn get_participating_keys(
|
||||||
|
|
|
@ -256,6 +256,58 @@ pub struct SyncAggregate {
|
||||||
pub sync_committee_signature: SignatureBytes,
|
pub sync_committee_signature: SignatureBytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct GenericUpdate {
|
||||||
|
pub attested_header: Header,
|
||||||
|
pub sync_aggregate: SyncAggregate,
|
||||||
|
pub signature_slot: u64,
|
||||||
|
pub next_sync_committee: Option<SyncCommittee>,
|
||||||
|
pub next_sync_committee_branch: Option<Vec<Bytes32>>,
|
||||||
|
pub finalized_header: Option<Header>,
|
||||||
|
pub finality_branch: Option<Vec<Bytes32>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&Update> for GenericUpdate {
|
||||||
|
fn from(update: &Update) -> Self {
|
||||||
|
Self {
|
||||||
|
attested_header: update.attested_header.clone(),
|
||||||
|
sync_aggregate: update.sync_aggregate.clone(),
|
||||||
|
signature_slot: update.signature_slot.clone(),
|
||||||
|
next_sync_committee: Some(update.next_sync_committee.clone()),
|
||||||
|
next_sync_committee_branch: Some(update.next_sync_committee_branch.clone()),
|
||||||
|
finalized_header: Some(update.finalized_header.clone()),
|
||||||
|
finality_branch: Some(update.finality_branch.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&FinalityUpdate> for GenericUpdate {
|
||||||
|
fn from(update: &FinalityUpdate) -> Self {
|
||||||
|
Self {
|
||||||
|
attested_header: update.attested_header.clone(),
|
||||||
|
sync_aggregate: update.sync_aggregate.clone(),
|
||||||
|
signature_slot: update.signature_slot.clone(),
|
||||||
|
next_sync_committee: None,
|
||||||
|
next_sync_committee_branch: None,
|
||||||
|
finalized_header: Some(update.finalized_header.clone()),
|
||||||
|
finality_branch: Some(update.finality_branch.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&OptimisticUpdate> for GenericUpdate {
|
||||||
|
fn from(update: &OptimisticUpdate) -> Self {
|
||||||
|
Self {
|
||||||
|
attested_header: update.attested_header.clone(),
|
||||||
|
sync_aggregate: update.sync_aggregate.clone(),
|
||||||
|
signature_slot: update.signature_slot.clone(),
|
||||||
|
next_sync_committee: None,
|
||||||
|
next_sync_committee_branch: None,
|
||||||
|
finalized_header: None,
|
||||||
|
finality_branch: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn pubkey_deserialize<'de, D>(deserializer: D) -> Result<BLSPubKey, D::Error>
|
fn pubkey_deserialize<'de, D>(deserializer: D) -> Result<BLSPubKey, D::Error>
|
||||||
where
|
where
|
||||||
D: serde::Deserializer<'de>,
|
D: serde::Deserializer<'de>,
|
||||||
|
|
Loading…
Reference in New Issue