From 897f679a2c9a09635ef63d1dd3172251997ec265 Mon Sep 17 00:00:00 2001 From: Noah Citron Date: Thu, 22 Sep 2022 12:40:06 -0700 Subject: [PATCH] feat: use access lists to batch fetch proofs (#44) * add create_access_list to rpc * batch fetch proofs with access lists * refactor * use caching for estimate_gas * cleanup * add rate limiting to bulk proof fetch --- Cargo.lock | 1 + client/src/rpc.rs | 5 +- execution/Cargo.toml | 1 + execution/src/evm.rs | 157 ++++++++++++++++++++++++---------- execution/src/execution.rs | 10 ++- execution/src/rpc/http_rpc.rs | 55 ++++++++++-- execution/src/rpc/mock_rpc.rs | 15 +++- execution/src/rpc/mod.rs | 8 +- execution/src/types.rs | 18 +++- 9 files changed, 212 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16455a1..4f288b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1033,6 +1033,7 @@ dependencies = [ "futures", "hex", "jsonrpsee", + "log", "openssl", "reqwest", "revm", diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 2eee486..ff35b0d 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -3,7 +3,7 @@ use ethers::{ types::{Address, Transaction, TransactionReceipt, H256}, }; use eyre::Result; -use log::{info, warn}; +use log::{debug, info, warn}; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use tokio::sync::Mutex; @@ -94,6 +94,7 @@ struct RpcInner { #[async_trait] impl EthRpcServer for RpcInner { async fn get_balance(&self, address: &str, block: &str) -> Result { + debug!("eth_getBalance"); let block = convert_err(decode_block(block))?; let address = convert_err(Address::from_str(address))?; let node = self.node.lock().await; @@ -121,6 +122,7 @@ impl EthRpcServer for RpcInner { } async fn call(&self, opts: CallOpts, block: &str) -> Result { + debug!("eth_call"); let block = convert_err(decode_block(block))?; let node = self.node.lock().await; let res = convert_err(node.call(&opts, &block))?; @@ -129,6 +131,7 @@ impl EthRpcServer for RpcInner { } async fn estimate_gas(&self, opts: CallOpts) -> Result { + debug!("eth_estimateGas"); let node = self.node.lock().await; let gas = convert_err(node.estimate_gas(&opts))?; diff --git a/execution/Cargo.toml b/execution/Cargo.toml index 64163bd..3badcb4 100644 --- a/execution/Cargo.toml +++ b/execution/Cargo.toml @@ -23,6 +23,7 @@ toml = "0.5.9" triehash-ethereum = { git = "https://github.com/openethereum/parity-ethereum" } async-trait = "0.1.57" openssl = { version = "0.10", features = ["vendored"] } +log = "0.4.17" common = { path = "../common" } consensus = { path = "../consensus" } diff --git a/execution/src/evm.rs b/execution/src/evm.rs index 0256d11..a282943 100644 --- a/execution/src/evm.rs +++ b/execution/src/evm.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, str::FromStr, thread}; +use std::{collections::HashMap, fmt::Display, str::FromStr, thread}; use bytes::Bytes; use ethers::{ @@ -6,12 +6,17 @@ use ethers::{ prelude::{Address, H160, H256, U256}, }; use eyre::Result; +use futures::future::join_all; +use log::trace; use revm::{AccountInfo, Bytecode, Database, Env, TransactOut, TransactTo, EVM}; use tokio::runtime::Runtime; use consensus::types::ExecutionPayload; -use crate::{rpc::Rpc, types::CallOpts}; +use crate::{ + rpc::Rpc, + types::{Account, CallOpts}, +}; use super::ExecutionClient; @@ -30,6 +35,9 @@ impl Evm { } pub fn call(&mut self, opts: &CallOpts) -> Result> { + let account_map = self.batch_fetch_accounts(opts); + self.evm.db.as_mut().unwrap().set_accounts(account_map); + self.evm.env = self.get_env(opts); let output = self.evm.transact().1; @@ -45,6 +53,9 @@ impl Evm { } pub fn estimate_gas(&mut self, opts: &CallOpts) -> Result { + let account_map = self.batch_fetch_accounts(opts); + self.evm.db.as_mut().unwrap().set_accounts(account_map); + self.evm.env = self.get_env(opts); let gas = self.evm.transact().2; @@ -56,6 +67,54 @@ impl Evm { Ok(gas_scaled) } + fn batch_fetch_accounts(&self, opts: &CallOpts) -> HashMap { + let db = self.evm.db.as_ref().unwrap(); + let rpc = db.execution.rpc.clone(); + let payload = db.payload.clone(); + let execution = db.execution.clone(); + let block = db.payload.block_number; + + let opts_moved = CallOpts { + from: opts.from, + to: opts.to, + value: opts.value, + data: opts.data.clone(), + gas: opts.gas, + gas_price: opts.gas_price, + }; + + let block_moved = block.clone(); + let handle = thread::spawn(move || { + let list_fut = rpc.create_access_list(&opts_moved, block_moved); + let runtime = Runtime::new()?; + let list = runtime.block_on(list_fut)?; + + let account_futs = list.0.iter().map(|account| { + let addr_fut = futures::future::ready(account.address); + let account_fut = execution.get_account( + &account.address, + Some(account.storage_keys.as_slice()), + &payload, + ); + async move { (addr_fut.await, account_fut.await) } + }); + + let accounts = runtime.block_on(join_all(account_futs)); + + Ok::<_, eyre::Error>(accounts) + }); + + let accounts = handle.join().unwrap().unwrap(); + let mut account_map = HashMap::new(); + accounts.iter().for_each(|account| { + let addr = account.0; + let account = account.1.as_ref().unwrap().clone(); + account_map.insert(addr, account); + }); + + account_map + } + fn get_env(&self, opts: &CallOpts) -> Env { let mut env = Env::default(); let payload = &self.evm.db.as_ref().unwrap().payload; @@ -81,6 +140,7 @@ impl Evm { struct ProofDB { execution: ExecutionClient, payload: ExecutionPayload, + accounts: HashMap, error: Option, } @@ -89,6 +149,7 @@ impl ProofDB { ProofDB { execution, payload, + accounts: HashMap::new(), error: None, } } @@ -102,6 +163,25 @@ impl ProofDB { } } } + + pub fn set_accounts(&mut self, accounts: HashMap) { + self.accounts = accounts; + } + + fn get_account(&mut self, address: Address, slots: &[H256]) -> Account { + let execution = self.execution.clone(); + let addr = address.clone(); + let payload = self.payload.clone(); + let slots = slots.to_owned(); + + let handle = thread::spawn(move || { + let account_fut = execution.get_account(&addr, Some(&slots), &payload); + let runtime = Runtime::new()?; + runtime.block_on(account_fut) + }); + + self.safe_unwrap(handle.join().unwrap()) + } } impl Database for ProofDB { @@ -110,31 +190,17 @@ impl Database for ProofDB { return AccountInfo::default(); } - let execution = self.execution.clone(); - let addr = address.clone(); - let payload = self.payload.clone(); + trace!( + "fetch basic evm state for addess=0x{}", + hex::encode(address.as_bytes()) + ); - let handle = thread::spawn(move || { - let account_fut = execution.get_account(&addr, None, &payload); - let runtime = Runtime::new()?; - runtime.block_on(account_fut) - }); - - let account = self.safe_unwrap(handle.join().unwrap()); - - let execution = self.execution.clone(); - let addr = address.clone(); - let payload = self.payload.clone(); - - let handle = thread::spawn(move || { - let code_fut = execution.get_code(&addr, &payload); - let runtime = Runtime::new()?; - runtime.block_on(code_fut) - }); - - let bytecode = self.safe_unwrap(handle.join().unwrap()); - let bytecode = Bytecode::new_raw(Bytes::from(bytecode)); + let account = match self.accounts.get(&address) { + Some(account) => account.clone(), + None => self.get_account(address, &[]), + }; + let bytecode = Bytecode::new_raw(Bytes::from(account.code.clone())); AccountInfo::new(account.balance, account.nonce, bytecode) } @@ -143,26 +209,30 @@ impl Database for ProofDB { } fn storage(&mut self, address: H160, slot: U256) -> U256 { - let execution = self.execution.clone(); - let addr = address.clone(); + trace!( + "fetch evm state for address=0x{}, slot={}", + hex::encode(address.as_bytes()), + slot + ); + let slot = H256::from_uint(&slot); - let slots = [slot]; - let payload = self.payload.clone(); - let handle = thread::spawn(move || { - let account_fut = execution.get_account(&addr, Some(&slots), &payload); - let runtime = Runtime::new()?; - runtime.block_on(account_fut) - }); - - let account = self.safe_unwrap(handle.join().unwrap()); - let value = account.slots.get(&slot); - match value { - Some(value) => *value, - None => { - self.error = Some("slot not found".to_string()); - U256::default() - } + match self.accounts.get(&address) { + Some(account) => match account.slots.get(&slot) { + Some(slot) => slot.clone(), + None => self + .get_account(address, &[slot]) + .slots + .get(&slot) + .unwrap() + .clone(), + }, + None => self + .get_account(address, &[slot]) + .slots + .get(&slot) + .unwrap() + .clone(), } } @@ -173,4 +243,5 @@ impl Database for ProofDB { fn is_precompile(address: &Address) -> bool { address.le(&Address::from_str("0x0000000000000000000000000000000000000009").unwrap()) + && address.gt(&Address::zero()) } diff --git a/execution/src/execution.rs b/execution/src/execution.rs index 0b2a891..5713a01 100644 --- a/execution/src/execution.rs +++ b/execution/src/execution.rs @@ -18,7 +18,7 @@ use super::types::{Account, ExecutionBlock}; #[derive(Clone)] pub struct ExecutionClient { - rpc: R, + pub rpc: R, } impl ExecutionClient { @@ -76,9 +76,17 @@ impl ExecutionClient { slot_map.insert(storage_proof.key, storage_proof.value); } + let code = self.rpc.get_code(address, payload.block_number).await?; + let code_hash = keccak256(&code).into(); + + if proof.code_hash != code_hash { + eyre::bail!("Invalid Proof"); + } + Ok(Account { balance: proof.balance, nonce: proof.nonce.as_u64(), + code, code_hash: proof.code_hash, storage_hash: proof.storage_hash, slots: slot_map, diff --git a/execution/src/rpc/http_rpc.rs b/execution/src/rpc/http_rpc.rs index e316236..1680955 100644 --- a/execution/src/rpc/http_rpc.rs +++ b/execution/src/rpc/http_rpc.rs @@ -1,21 +1,43 @@ +use std::str::FromStr; + use async_trait::async_trait; use ethers::prelude::{Address, Http}; -use ethers::providers::{Middleware, Provider}; -use ethers::types::{BlockId, Bytes, EIP1186ProofResponse, Transaction, TransactionReceipt, H256}; +use ethers::providers::{HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient}; +use ethers::types::transaction::eip2718::TypedTransaction; +use ethers::types::transaction::eip2930::AccessList; +use ethers::types::{ + BlockId, Bytes, EIP1186ProofResponse, Eip1559TransactionRequest, Transaction, + TransactionReceipt, H256, U256, +}; use eyre::Result; +use log::trace; + +use crate::types::CallOpts; use super::Rpc; -#[derive(Clone)] pub struct HttpRpc { - provider: Provider, + url: String, + provider: Provider>, +} + +impl Clone for HttpRpc { + fn clone(&self) -> Self { + Self::new(&self.url).unwrap() + } } #[async_trait] impl Rpc for HttpRpc { fn new(rpc: &str) -> Result { - let provider = Provider::try_from(rpc)?; - Ok(HttpRpc { provider }) + let http = Http::from_str(rpc)?; + let mut client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy), 10, 1); + client.set_compute_units(300); + let provider = Provider::new(client); + Ok(HttpRpc { + url: rpc.to_string(), + provider, + }) } async fn get_proof( @@ -24,6 +46,7 @@ impl Rpc for HttpRpc { slots: &[H256], block: u64, ) -> Result { + trace!("fetching proof"); let block = Some(BlockId::from(block)); let proof_response = self .provider @@ -32,6 +55,26 @@ impl Rpc for HttpRpc { Ok(proof_response) } + async fn create_access_list(&self, opts: &CallOpts, block: u64) -> Result { + let block = Some(BlockId::from(block)); + + let mut tx = Eip1559TransactionRequest::new(); + tx.to = Some(opts.to.into()); + tx.from = opts.from; + tx.value = opts.value; + // TODO: better way to set gas limit + tx.gas = Some(U256::from(10_000_000)); + tx.data = opts + .data + .as_ref() + .map(|data| Bytes::from(data.as_slice().to_owned())); + + let tx = TypedTransaction::Eip1559(tx); + let list = self.provider.create_access_list(&tx, block).await?; + + Ok(list.access_list) + } + async fn get_code(&self, address: &Address, block: u64) -> Result> { let block = Some(BlockId::from(block)); let code = self.provider.get_code(*address, block).await?; diff --git a/execution/src/rpc/mock_rpc.rs b/execution/src/rpc/mock_rpc.rs index 027d0b1..41d9dc2 100644 --- a/execution/src/rpc/mock_rpc.rs +++ b/execution/src/rpc/mock_rpc.rs @@ -2,8 +2,13 @@ use std::{fs::read_to_string, path::PathBuf}; use async_trait::async_trait; use common::utils::hex_str_to_bytes; -use ethers::types::{Address, EIP1186ProofResponse, Transaction, TransactionReceipt, H256}; -use eyre::Result; +use ethers::types::{ + transaction::eip2930::AccessList, Address, EIP1186ProofResponse, Transaction, + TransactionReceipt, H256, +}; +use eyre::{eyre, Result}; + +use crate::types::CallOpts; use super::Rpc; @@ -29,13 +34,17 @@ impl Rpc for MockRpc { Ok(serde_json::from_str(&proof)?) } + async fn create_access_list(&self, _opts: &CallOpts, _block: u64) -> Result { + Err(eyre!("not implemented")) + } + async fn get_code(&self, _address: &Address, _block: u64) -> Result> { let code = read_to_string(self.path.join("code.json"))?; hex_str_to_bytes(&code[0..code.len() - 1]) } async fn send_raw_transaction(&self, _bytes: &Vec) -> Result { - Err(eyre::eyre!("not implemented")) + Err(eyre!("not implemented")) } async fn get_transaction_receipt(&self, _tx_hash: &H256) -> Result> { diff --git a/execution/src/rpc/mod.rs b/execution/src/rpc/mod.rs index f0583ce..cf011b1 100644 --- a/execution/src/rpc/mod.rs +++ b/execution/src/rpc/mod.rs @@ -1,7 +1,12 @@ use async_trait::async_trait; -use ethers::types::{Address, EIP1186ProofResponse, Transaction, TransactionReceipt, H256}; +use ethers::types::{ + transaction::eip2930::AccessList, Address, EIP1186ProofResponse, Transaction, + TransactionReceipt, H256, +}; use eyre::Result; +use crate::types::CallOpts; + pub mod http_rpc; pub mod mock_rpc; @@ -18,6 +23,7 @@ pub trait Rpc: Send + Clone + 'static { block: u64, ) -> Result; + async fn create_access_list(&self, opts: &CallOpts, block: u64) -> Result; async fn get_code(&self, address: &Address, block: u64) -> Result>; async fn send_raw_transaction(&self, bytes: &Vec) -> Result; async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result>; diff --git a/execution/src/types.rs b/execution/src/types.rs index 0ed1d87..461e39b 100644 --- a/execution/src/types.rs +++ b/execution/src/types.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; use ethers::prelude::{Address, H256, U256}; use eyre::Result; @@ -6,11 +6,12 @@ use serde::{Deserialize, Serialize}; use common::utils::u64_to_hex_string; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct Account { pub balance: U256, pub nonce: u64, pub code_hash: H256, + pub code: Vec, pub storage_hash: H256, pub slots: HashMap, } @@ -49,7 +50,7 @@ pub struct ExecutionBlock { pub uncles: Vec, } -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct CallOpts { pub from: Option
, @@ -61,6 +62,17 @@ pub struct CallOpts { pub data: Option>, } +impl fmt::Debug for CallOpts { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CallOpts") + .field("from", &self.from) + .field("to", &self.to) + .field("value", &self.value) + .field("data", &hex::encode(&self.data.clone().unwrap_or_default())) + .finish() + } +} + fn bytes_deserialize<'de, D>(deserializer: D) -> Result>, D::Error> where D: serde::Deserializer<'de>,