add optimistic updates (#1)

This commit is contained in:
Noah Citron 2022-08-30 20:31:58 -04:00 committed by GitHub
parent ad43cf7668
commit 374dd1f38f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 256 additions and 66 deletions

View File

@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use clap::Parser; use clap::Parser;
use dirs::home_dir; use dirs::home_dir;
use eyre::Result; use eyre::Result;
use tokio::time::sleep; use tokio::{sync::Mutex, time::sleep};
use client::{rpc::Rpc, Client}; use client::{rpc::Rpc, Client};
use config::{networks, Config}; use config::{networks, Config};
@ -23,13 +23,16 @@ async fn main() -> Result<()> {
let mut client = Client::new(Arc::new(config)).await?; let mut client = Client::new(Arc::new(config)).await?;
client.sync().await?; client.sync().await?;
let mut rpc = Rpc::new(Arc::new(client), cli.port.unwrap_or(8545)); let client = Arc::new(Mutex::new(client));
let mut rpc = Rpc::new(client.clone(), cli.port.unwrap_or(8545));
let addr = rpc.start().await?; let addr = rpc.start().await?;
println!("started rpc at: {}", addr); println!("started rpc at: {}", addr);
sleep(Duration::from_secs(300)).await; loop {
sleep(Duration::from_secs(10)).await;
Ok(()) client.lock().await.advance().await?
}
} }
#[derive(Parser)] #[derive(Parser)]

View File

@ -36,6 +36,10 @@ impl Client {
self.consensus.sync().await self.consensus.sync().await
} }
pub async fn advance(&mut self) -> Result<()> {
self.consensus.advance().await
}
pub async fn call(&self, to: &Address, calldata: &Vec<u8>, value: U256) -> Result<Vec<u8>> { pub async fn call(&self, to: &Address, calldata: &Vec<u8>, value: U256) -> Result<Vec<u8>> {
let payload = self.consensus.get_execution_payload().await?; let payload = self.consensus.get_execution_payload().await?;
let mut evm = Evm::new(self.execution.clone(), payload); let mut evm = Evm::new(self.execution.clone(), payload);

View File

@ -5,6 +5,7 @@ use ethers::{
use eyre::Result; use eyre::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::Mutex;
use jsonrpsee::{ use jsonrpsee::{
core::{async_trait, Error}, core::{async_trait, Error},
@ -17,13 +18,13 @@ use common::utils::{hex_str_to_bytes, u64_to_hex_string};
use super::Client; use super::Client;
pub struct Rpc { pub struct Rpc {
client: Arc<Client>, client: Arc<Mutex<Client>>,
handle: Option<HttpServerHandle>, handle: Option<HttpServerHandle>,
port: u16, port: u16,
} }
impl Rpc { impl Rpc {
pub fn new(client: Arc<Client>, port: u16) -> Self { pub fn new(client: Arc<Mutex<Client>>, port: u16) -> Self {
Rpc { Rpc {
client, client,
handle: None, handle: None,
@ -55,7 +56,7 @@ trait EthRpc {
#[method(name = "estimateGas")] #[method(name = "estimateGas")]
async fn estimate_gas(&self, opts: CallOpts) -> Result<String, Error>; async fn estimate_gas(&self, opts: CallOpts) -> Result<String, Error>;
#[method(name = "chainId")] #[method(name = "chainId")]
fn chain_id(&self) -> Result<String, Error>; async fn chain_id(&self) -> Result<String, Error>;
#[method(name = "gasPrice")] #[method(name = "gasPrice")]
async fn gas_price(&self) -> Result<String, Error>; async fn gas_price(&self) -> Result<String, Error>;
#[method(name = "maxPriorityFeePerGas")] #[method(name = "maxPriorityFeePerGas")]
@ -65,7 +66,7 @@ trait EthRpc {
} }
struct RpcInner { struct RpcInner {
client: Arc<Client>, client: Arc<Mutex<Client>>,
port: u16, port: u16,
} }
@ -75,7 +76,8 @@ impl EthRpcServer for RpcInner {
match block { match block {
"latest" => { "latest" => {
let address = convert_err(Address::from_str(address))?; let address = convert_err(Address::from_str(address))?;
let balance = convert_err(self.client.get_balance(&address).await)?; let client = self.client.lock().await;
let balance = convert_err(client.get_balance(&address).await)?;
Ok(balance.encode_hex()) Ok(balance.encode_hex())
} }
@ -87,7 +89,8 @@ impl EthRpcServer for RpcInner {
match block { match block {
"latest" => { "latest" => {
let address = convert_err(Address::from_str(address))?; let address = convert_err(Address::from_str(address))?;
let nonce = convert_err(self.client.get_nonce(&address).await)?; let client = self.client.lock().await;
let nonce = convert_err(client.get_nonce(&address).await)?;
Ok(nonce.encode_hex()) Ok(nonce.encode_hex())
} }
@ -99,7 +102,8 @@ impl EthRpcServer for RpcInner {
match block { match block {
"latest" => { "latest" => {
let address = convert_err(Address::from_str(address))?; let address = convert_err(Address::from_str(address))?;
let code = convert_err(self.client.get_code(&address).await)?; let client = self.client.lock().await;
let code = convert_err(client.get_code(&address).await)?;
Ok(hex::encode(code)) Ok(hex::encode(code))
} }
@ -117,7 +121,8 @@ impl EthRpcServer for RpcInner {
16, 16,
))?; ))?;
let res = convert_err(self.client.call(&to, &data, value).await)?; let client = self.client.lock().await;
let res = convert_err(client.call(&to, &data, value).await)?;
Ok(hex::encode(res)) Ok(hex::encode(res))
} }
_ => Err(Error::Custom("Invalid Block Number".to_string())), _ => Err(Error::Custom("Invalid Block Number".to_string())),
@ -132,27 +137,32 @@ impl EthRpcServer for RpcInner {
16, 16,
))?; ))?;
let gas = convert_err(self.client.estimate_gas(&to, &data, value).await)?; let client = self.client.lock().await;
let gas = convert_err(client.estimate_gas(&to, &data, value).await)?;
Ok(u64_to_hex_string(gas)) Ok(u64_to_hex_string(gas))
} }
fn chain_id(&self) -> Result<String, Error> { async fn chain_id(&self) -> Result<String, Error> {
let id = self.client.chain_id(); let client = self.client.lock().await;
let id = client.chain_id();
Ok(u64_to_hex_string(id)) Ok(u64_to_hex_string(id))
} }
async fn gas_price(&self) -> Result<String, Error> { async fn gas_price(&self) -> Result<String, Error> {
let gas_price = convert_err(self.client.get_gas_price().await)?; let client = self.client.lock().await;
let gas_price = convert_err(client.get_gas_price().await)?;
Ok(gas_price.encode_hex()) Ok(gas_price.encode_hex())
} }
async fn max_priority_fee_per_gas(&self) -> Result<String, Error> { async fn max_priority_fee_per_gas(&self) -> Result<String, Error> {
let tip = convert_err(self.client.get_priority_fee().await)?; let client = self.client.lock().await;
let tip = convert_err(client.get_priority_fee().await)?;
Ok(tip.encode_hex()) Ok(tip.encode_hex())
} }
async fn block_number(&self) -> Result<String, Error> { async fn block_number(&self) -> Result<String, Error> {
let num = convert_err(self.client.get_block_number().await)?; let client = self.client.lock().await;
let num = convert_err(client.get_block_number().await)?;
Ok(u64_to_hex_string(num)) Ok(u64_to_hex_string(num))
} }
} }

View File

@ -11,10 +11,10 @@ pub fn goerli() -> Config {
) )
.unwrap(), .unwrap(),
checkpoint: hex_str_to_bytes( checkpoint: hex_str_to_bytes(
"0x172128eadf1da46467f4d6a822206698e2d3f957af117dd650954780d680dc99", "0x1e591af1e90f2db918b2a132991c7c2ee9a4ab26da496bd6e71e4f0bd65ea870",
) )
.unwrap(), .unwrap(),
consensus_rpc: "http://testing.prater.beacon-api.nimbus.team".to_string(), consensus_rpc: "http://34.207.158.131:5052".to_string(),
execution_rpc: execution_rpc:
"https://eth-goerli.g.alchemy.com:443/v2/o_8Qa9kgwDPf9G8sroyQ-uQtyhyWa3ao" "https://eth-goerli.g.alchemy.com:443/v2/o_8Qa9kgwDPf9G8sroyQ-uQtyhyWa3ao"
.to_string(), .to_string(),

View File

@ -1,3 +1,4 @@
use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use blst::min_pk::{PublicKey, Signature}; use blst::min_pk::{PublicKey, Signature};
@ -20,9 +21,12 @@ pub struct ConsensusClient {
#[derive(Debug)] #[derive(Debug)]
struct Store { struct Store {
header: Header, finalized_header: Header,
current_sync_committee: SyncCommittee, current_sync_committee: SyncCommittee,
next_sync_committee: Option<SyncCommittee>, next_sync_committee: Option<SyncCommittee>,
optimistic_header: Header,
previous_max_active_participants: u64,
current_max_active_participants: u64,
} }
impl ConsensusClient { impl ConsensusClient {
@ -50,19 +54,22 @@ impl ConsensusClient {
} }
let store = Store { let store = Store {
header: bootstrap.header, finalized_header: bootstrap.header.clone(),
current_sync_committee: bootstrap.current_sync_committee, current_sync_committee: bootstrap.current_sync_committee,
next_sync_committee: None, next_sync_committee: None,
optimistic_header: bootstrap.header,
previous_max_active_participants: 0,
current_max_active_participants: 0,
}; };
Ok(ConsensusClient { rpc, store, config }) Ok(ConsensusClient { rpc, store, config })
} }
pub async fn get_execution_payload(&self) -> Result<ExecutionPayload> { pub async fn get_execution_payload(&self) -> Result<ExecutionPayload> {
let slot = self.store.header.slot; let slot = self.store.optimistic_header.slot;
let mut block = self.rpc.get_block(slot).await?.clone(); let mut block = self.rpc.get_block(slot).await?.clone();
let block_hash = block.hash_tree_root()?; let block_hash = block.hash_tree_root()?;
let verified_block_hash = self.store.header.clone().hash_tree_root()?; let verified_block_hash = self.store.optimistic_header.clone().hash_tree_root()?;
if verified_block_hash != block_hash { if verified_block_hash != block_hash {
Err(eyre::eyre!("Block Root Mismatch")) Err(eyre::eyre!("Block Root Mismatch"))
@ -72,11 +79,11 @@ impl ConsensusClient {
} }
pub fn get_head(&self) -> &Header { pub fn get_head(&self) -> &Header {
&self.store.header &self.store.optimistic_header
} }
pub async fn sync(&mut self) -> Result<()> { pub async fn sync(&mut self) -> Result<()> {
let current_period = calc_sync_period(self.store.header.slot); let current_period = calc_sync_period(self.store.finalized_header.slot);
let updates = self.rpc.get_updates(current_period).await?; let updates = self.rpc.get_updates(current_period).await?;
for mut update in updates { for mut update in updates {
@ -85,32 +92,34 @@ impl ConsensusClient {
} }
let finality_update = self.rpc.get_finality_update().await?; let finality_update = self.rpc.get_finality_update().await?;
let mut finality_update_generic = Update { self.verify_finality_update(&finality_update)?;
attested_header: finality_update.attested_header, self.apply_finality_update(&finality_update);
next_sync_committee: None,
next_sync_committee_branch: Vec::new(),
finalized_header: finality_update.finalized_header,
finality_branch: finality_update.finality_branch,
sync_aggregate: finality_update.sync_aggregate,
signature_slot: finality_update.signature_slot,
};
self.verify_update(&mut finality_update_generic)?; let optimistic_update = self.rpc.get_optimistic_update().await?;
self.apply_update(&finality_update_generic); self.verify_optimistic_update(&optimistic_update)?;
self.apply_optimistic_update(&optimistic_update);
self.rpc.get_block(self.store.header.slot).await?; Ok(())
}
pub async fn advance(&mut self) -> Result<()> {
let finality_update = self.rpc.get_finality_update().await?;
self.verify_finality_update(&finality_update)?;
self.apply_finality_update(&finality_update);
let optimistic_update = self.rpc.get_optimistic_update().await?;
self.verify_optimistic_update(&optimistic_update)?;
self.apply_optimistic_update(&optimistic_update);
Ok(()) Ok(())
} }
fn verify_update(&mut self, update: &mut Update) -> Result<()> { fn verify_update(&mut self, update: &mut Update) -> Result<()> {
let current_slot = self.store.header.slot; let store_period = calc_sync_period(self.store.finalized_header.slot);
let update_slot = update.finalized_header.slot; let update_signature_period = calc_sync_period(update.signature_slot);
let current_period = calc_sync_period(current_slot); if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
let update_period = calc_sync_period(update_slot); {
if !(update_period == current_period + 1 || update_period == current_period) {
return Err(eyre::eyre!("Invalid Update")); return Err(eyre::eyre!("Invalid Update"));
} }
@ -130,26 +139,25 @@ impl ConsensusClient {
return Err(eyre::eyre!("Invalid Update")); return Err(eyre::eyre!("Invalid Update"));
} }
if update.next_sync_committee.is_some() {
let next_committee_branch_valid = is_next_committee_proof_valid( let next_committee_branch_valid = is_next_committee_proof_valid(
&update.attested_header, &update.attested_header,
&mut update.next_sync_committee.clone().unwrap(), &mut update.next_sync_committee,
&update.next_sync_committee_branch, &update.next_sync_committee_branch,
); );
if !next_committee_branch_valid { if !next_committee_branch_valid {
return Err(eyre::eyre!("Invalid Update")); return Err(eyre::eyre!("Invalid Update"));
} }
}
let sync_committee = if current_period == update_period { let sync_committee = if store_period == update_signature_period {
&self.store.current_sync_committee &self.store.current_sync_committee
} else { } else {
self.store.next_sync_committee.as_ref().unwrap() self.store.next_sync_committee.as_ref().unwrap()
}; };
let pks = let pks =
get_participating_keys(sync_committee, &update.sync_aggregate.sync_committee_bits)?; get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect(); let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
let committee_quorum = pks.len() > 1; let committee_quorum = pks.len() > 1;
@ -169,20 +177,152 @@ impl ConsensusClient {
Ok(()) Ok(())
} }
fn apply_update(&mut self, update: &Update) { fn verify_finality_update(&self, update: &FinalityUpdate) -> Result<()> {
let current_period = calc_sync_period(self.store.header.slot); let store_period = calc_sync_period(self.store.finalized_header.slot);
let update_period = calc_sync_period(update.finalized_header.slot); let update_signature_period = calc_sync_period(update.signature_slot);
self.store.header = update.finalized_header.clone(); if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
{
return Err(eyre::eyre!("Invalid Update"));
}
if !(update.signature_slot > update.attested_header.slot
&& update.attested_header.slot > update.finalized_header.slot)
{
return Err(eyre::eyre!("Invalid Update"));
}
let finality_branch_valid = is_finality_proof_valid(
&update.attested_header,
&mut update.finalized_header.clone(),
&update.finality_branch,
);
if !(finality_branch_valid) {
return Err(eyre::eyre!("Invalid Update"));
}
let sync_committee = &self.store.current_sync_committee;
let pks =
get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
let committee_quorum = pks.len() > 1;
if !committee_quorum {
return Err(eyre::eyre!("Invalid Update"));
}
let header_root =
bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes());
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
let sig = &update.sync_aggregate.sync_committee_signature;
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
if !is_valid_sig {
return Err(eyre::eyre!("Invalid Update"));
}
Ok(())
}
fn verify_optimistic_update(&self, update: &OptimisticUpdate) -> Result<()> {
let store_period = calc_sync_period(self.store.finalized_header.slot);
let update_signature_period = calc_sync_period(update.signature_slot);
if !(update_signature_period == store_period + 1 || update_signature_period == store_period)
{
return Err(eyre::eyre!("Invalid Update"));
}
if !(update.signature_slot > update.attested_header.slot) {
return Err(eyre::eyre!("Invalid Update"));
}
let sync_committee = &self.store.current_sync_committee;
let pks =
get_participating_keys(&sync_committee, &update.sync_aggregate.sync_committee_bits)?;
let pks: Vec<&PublicKey> = pks.iter().map(|pk| pk).collect();
let committee_quorum = pks.len() > 1;
if !committee_quorum {
return Err(eyre::eyre!("Invalid Update"));
}
let header_root =
bytes_to_bytes32(update.attested_header.clone().hash_tree_root()?.as_bytes());
let signing_root = self.compute_committee_sign_root(header_root, update.signature_slot)?;
let sig = &update.sync_aggregate.sync_committee_signature;
let is_valid_sig = is_aggregate_valid(sig, signing_root.as_bytes(), &pks);
if !is_valid_sig {
return Err(eyre::eyre!("Invalid Update"));
}
Ok(())
}
fn apply_update(&mut self, update: &Update) {
let store_period = calc_sync_period(self.store.finalized_header.slot);
let update_signature_period = calc_sync_period(update.signature_slot);
self.store.finalized_header = update.finalized_header.clone();
if self.store.next_sync_committee.is_none() { if self.store.next_sync_committee.is_none() {
self.store.next_sync_committee = self.store.next_sync_committee = Some(update.next_sync_committee.clone());
Some(update.next_sync_committee.as_ref().unwrap().clone()); } else if update_signature_period == store_period + 1 {
} else if update_period == current_period + 1 {
self.store.current_sync_committee = self.store.current_sync_committee =
self.store.next_sync_committee.as_ref().unwrap().clone(); self.store.next_sync_committee.as_ref().unwrap().clone();
self.store.next_sync_committee = self.store.next_sync_committee = Some(update.next_sync_committee.clone());
Some(update.next_sync_committee.as_ref().unwrap().clone()); }
println!(
"applying update for slot: {}",
self.store.finalized_header.slot
);
}
fn apply_finality_update(&mut self, update: &FinalityUpdate) {
if self.store.finalized_header.slot != update.finalized_header.slot {
self.store.finalized_header = update.finalized_header.clone();
self.store.previous_max_active_participants =
self.store.current_max_active_participants;
self.store.current_max_active_participants =
get_bits(&update.sync_aggregate.sync_committee_bits);
println!(
"applying finality update for slot: {}",
self.store.finalized_header.slot
);
}
if self.store.finalized_header.slot > self.store.optimistic_header.slot {
self.store.optimistic_header = self.store.finalized_header.clone();
}
}
fn apply_optimistic_update(&mut self, update: &OptimisticUpdate) {
let votes = get_bits(&update.sync_aggregate.sync_committee_bits);
if votes > self.store.current_max_active_participants {
self.store.current_max_active_participants = votes;
}
let safety_theshhold = cmp::max(
self.store.current_max_active_participants,
self.store.previous_max_active_participants,
) / 2;
if votes > safety_theshhold
&& update.attested_header.slot > self.store.optimistic_header.slot
{
self.store.optimistic_header = update.attested_header.clone();
println!(
"applying optimistic update for slot: {}",
self.store.optimistic_header.slot
);
} }
} }
@ -218,6 +358,17 @@ fn get_participating_keys(
Ok(pks) Ok(pks)
} }
fn get_bits(bitfield: &Bitvector<512>) -> u64 {
let mut count = 0;
bitfield.iter().for_each(|bit| {
if bit == true {
count += 1;
}
});
count
}
fn is_aggregate_valid(sig_bytes: &SignatureBytes, msg: &[u8], pks: &[&PublicKey]) -> bool { fn is_aggregate_valid(sig_bytes: &SignatureBytes, msg: &[u8], pks: &[&PublicKey]) -> bool {
let dst: &[u8] = b"BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_"; let dst: &[u8] = b"BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_";
let sig_res = Signature::from_bytes(&sig_bytes); let sig_res = Signature::from_bytes(&sig_bytes);

View File

@ -41,6 +41,15 @@ impl Rpc {
Ok(res.data) Ok(res.data)
} }
pub async fn get_optimistic_update(&self) -> Result<OptimisticUpdate> {
let req = format!("{}/eth/v0/beacon/light_client/optimistic_update", self.rpc);
let res = reqwest::get(req)
.await?
.json::<OptimisticUpdateResponse>()
.await?;
Ok(res.data)
}
pub async fn get_block(&self, slot: u64) -> Result<BeaconBlock> { pub async fn get_block(&self, slot: u64) -> Result<BeaconBlock> {
let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot); let req = format!("{}/eth/v2/beacon/blocks/{}", self.rpc, slot);
let res = reqwest::get(req) let res = reqwest::get(req)
@ -71,6 +80,11 @@ struct FinalityUpdateResponse {
data: FinalityUpdate, data: FinalityUpdate,
} }
#[derive(serde::Deserialize, Debug)]
struct OptimisticUpdateResponse {
data: OptimisticUpdate,
}
#[derive(serde::Deserialize, Debug)] #[derive(serde::Deserialize, Debug)]
struct BootstrapResponse { struct BootstrapResponse {
data: BootstrapData, data: BootstrapData,

View File

@ -130,7 +130,7 @@ pub struct Bootstrap {
#[derive(serde::Deserialize, Debug, Clone)] #[derive(serde::Deserialize, Debug, Clone)]
pub struct Update { pub struct Update {
pub attested_header: Header, pub attested_header: Header,
pub next_sync_committee: Option<SyncCommittee>, pub next_sync_committee: SyncCommittee,
#[serde(deserialize_with = "branch_deserialize")] #[serde(deserialize_with = "branch_deserialize")]
pub next_sync_committee_branch: Vec<Bytes32>, pub next_sync_committee_branch: Vec<Bytes32>,
pub finalized_header: Header, pub finalized_header: Header,
@ -152,6 +152,14 @@ pub struct FinalityUpdate {
pub signature_slot: u64, pub signature_slot: u64,
} }
#[derive(serde::Deserialize, Debug)]
pub struct OptimisticUpdate {
pub attested_header: Header,
pub sync_aggregate: SyncAggregate,
#[serde(deserialize_with = "u64_deserialize")]
pub signature_slot: u64,
}
#[derive(serde::Deserialize, Debug, Clone, Default, SimpleSerialize)] #[derive(serde::Deserialize, Debug, Clone, Default, SimpleSerialize)]
pub struct Header { pub struct Header {
#[serde(deserialize_with = "u64_deserialize")] #[serde(deserialize_with = "u64_deserialize")]