feat: use access lists to batch fetch proofs (#44)

* add create_access_list to rpc

* batch fetch proofs with access lists

* refactor

* use caching for estimate_gas

* cleanup

* add rate limiting to bulk proof fetch
This commit is contained in:
Noah Citron 2022-09-22 12:40:06 -07:00 committed by GitHub
parent 0579855141
commit 897f679a2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 212 additions and 58 deletions

1
Cargo.lock generated
View File

@ -1033,6 +1033,7 @@ dependencies = [
"futures",
"hex",
"jsonrpsee",
"log",
"openssl",
"reqwest",
"revm",

View File

@ -3,7 +3,7 @@ use ethers::{
types::{Address, Transaction, TransactionReceipt, H256},
};
use eyre::Result;
use log::{info, warn};
use log::{debug, info, warn};
use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::Mutex;
@ -94,6 +94,7 @@ struct RpcInner {
#[async_trait]
impl EthRpcServer for RpcInner {
async fn get_balance(&self, address: &str, block: &str) -> Result<String, Error> {
debug!("eth_getBalance");
let block = convert_err(decode_block(block))?;
let address = convert_err(Address::from_str(address))?;
let node = self.node.lock().await;
@ -121,6 +122,7 @@ impl EthRpcServer for RpcInner {
}
async fn call(&self, opts: CallOpts, block: &str) -> Result<String, Error> {
debug!("eth_call");
let block = convert_err(decode_block(block))?;
let node = self.node.lock().await;
let res = convert_err(node.call(&opts, &block))?;
@ -129,6 +131,7 @@ impl EthRpcServer for RpcInner {
}
async fn estimate_gas(&self, opts: CallOpts) -> Result<String, Error> {
debug!("eth_estimateGas");
let node = self.node.lock().await;
let gas = convert_err(node.estimate_gas(&opts))?;

View File

@ -23,6 +23,7 @@ toml = "0.5.9"
triehash-ethereum = { git = "https://github.com/openethereum/parity-ethereum" }
async-trait = "0.1.57"
openssl = { version = "0.10", features = ["vendored"] }
log = "0.4.17"
common = { path = "../common" }
consensus = { path = "../consensus" }

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, str::FromStr, thread};
use std::{collections::HashMap, fmt::Display, str::FromStr, thread};
use bytes::Bytes;
use ethers::{
@ -6,12 +6,17 @@ use ethers::{
prelude::{Address, H160, H256, U256},
};
use eyre::Result;
use futures::future::join_all;
use log::trace;
use revm::{AccountInfo, Bytecode, Database, Env, TransactOut, TransactTo, EVM};
use tokio::runtime::Runtime;
use consensus::types::ExecutionPayload;
use crate::{rpc::Rpc, types::CallOpts};
use crate::{
rpc::Rpc,
types::{Account, CallOpts},
};
use super::ExecutionClient;
@ -30,6 +35,9 @@ impl<R: Rpc> Evm<R> {
}
pub fn call(&mut self, opts: &CallOpts) -> Result<Vec<u8>> {
let account_map = self.batch_fetch_accounts(opts);
self.evm.db.as_mut().unwrap().set_accounts(account_map);
self.evm.env = self.get_env(opts);
let output = self.evm.transact().1;
@ -45,6 +53,9 @@ impl<R: Rpc> Evm<R> {
}
pub fn estimate_gas(&mut self, opts: &CallOpts) -> Result<u64> {
let account_map = self.batch_fetch_accounts(opts);
self.evm.db.as_mut().unwrap().set_accounts(account_map);
self.evm.env = self.get_env(opts);
let gas = self.evm.transact().2;
@ -56,6 +67,54 @@ impl<R: Rpc> Evm<R> {
Ok(gas_scaled)
}
fn batch_fetch_accounts(&self, opts: &CallOpts) -> HashMap<Address, Account> {
let db = self.evm.db.as_ref().unwrap();
let rpc = db.execution.rpc.clone();
let payload = db.payload.clone();
let execution = db.execution.clone();
let block = db.payload.block_number;
let opts_moved = CallOpts {
from: opts.from,
to: opts.to,
value: opts.value,
data: opts.data.clone(),
gas: opts.gas,
gas_price: opts.gas_price,
};
let block_moved = block.clone();
let handle = thread::spawn(move || {
let list_fut = rpc.create_access_list(&opts_moved, block_moved);
let runtime = Runtime::new()?;
let list = runtime.block_on(list_fut)?;
let account_futs = list.0.iter().map(|account| {
let addr_fut = futures::future::ready(account.address);
let account_fut = execution.get_account(
&account.address,
Some(account.storage_keys.as_slice()),
&payload,
);
async move { (addr_fut.await, account_fut.await) }
});
let accounts = runtime.block_on(join_all(account_futs));
Ok::<_, eyre::Error>(accounts)
});
let accounts = handle.join().unwrap().unwrap();
let mut account_map = HashMap::new();
accounts.iter().for_each(|account| {
let addr = account.0;
let account = account.1.as_ref().unwrap().clone();
account_map.insert(addr, account);
});
account_map
}
fn get_env(&self, opts: &CallOpts) -> Env {
let mut env = Env::default();
let payload = &self.evm.db.as_ref().unwrap().payload;
@ -81,6 +140,7 @@ impl<R: Rpc> Evm<R> {
struct ProofDB<R: Rpc> {
execution: ExecutionClient<R>,
payload: ExecutionPayload,
accounts: HashMap<Address, Account>,
error: Option<String>,
}
@ -89,6 +149,7 @@ impl<R: Rpc> ProofDB<R> {
ProofDB {
execution,
payload,
accounts: HashMap::new(),
error: None,
}
}
@ -102,6 +163,25 @@ impl<R: Rpc> ProofDB<R> {
}
}
}
pub fn set_accounts(&mut self, accounts: HashMap<Address, Account>) {
self.accounts = accounts;
}
fn get_account(&mut self, address: Address, slots: &[H256]) -> Account {
let execution = self.execution.clone();
let addr = address.clone();
let payload = self.payload.clone();
let slots = slots.to_owned();
let handle = thread::spawn(move || {
let account_fut = execution.get_account(&addr, Some(&slots), &payload);
let runtime = Runtime::new()?;
runtime.block_on(account_fut)
});
self.safe_unwrap(handle.join().unwrap())
}
}
impl<R: Rpc> Database for ProofDB<R> {
@ -110,31 +190,17 @@ impl<R: Rpc> Database for ProofDB<R> {
return AccountInfo::default();
}
let execution = self.execution.clone();
let addr = address.clone();
let payload = self.payload.clone();
trace!(
"fetch basic evm state for addess=0x{}",
hex::encode(address.as_bytes())
);
let handle = thread::spawn(move || {
let account_fut = execution.get_account(&addr, None, &payload);
let runtime = Runtime::new()?;
runtime.block_on(account_fut)
});
let account = self.safe_unwrap(handle.join().unwrap());
let execution = self.execution.clone();
let addr = address.clone();
let payload = self.payload.clone();
let handle = thread::spawn(move || {
let code_fut = execution.get_code(&addr, &payload);
let runtime = Runtime::new()?;
runtime.block_on(code_fut)
});
let bytecode = self.safe_unwrap(handle.join().unwrap());
let bytecode = Bytecode::new_raw(Bytes::from(bytecode));
let account = match self.accounts.get(&address) {
Some(account) => account.clone(),
None => self.get_account(address, &[]),
};
let bytecode = Bytecode::new_raw(Bytes::from(account.code.clone()));
AccountInfo::new(account.balance, account.nonce, bytecode)
}
@ -143,26 +209,30 @@ impl<R: Rpc> Database for ProofDB<R> {
}
fn storage(&mut self, address: H160, slot: U256) -> U256 {
let execution = self.execution.clone();
let addr = address.clone();
trace!(
"fetch evm state for address=0x{}, slot={}",
hex::encode(address.as_bytes()),
slot
);
let slot = H256::from_uint(&slot);
let slots = [slot];
let payload = self.payload.clone();
let handle = thread::spawn(move || {
let account_fut = execution.get_account(&addr, Some(&slots), &payload);
let runtime = Runtime::new()?;
runtime.block_on(account_fut)
});
let account = self.safe_unwrap(handle.join().unwrap());
let value = account.slots.get(&slot);
match value {
Some(value) => *value,
None => {
self.error = Some("slot not found".to_string());
U256::default()
}
match self.accounts.get(&address) {
Some(account) => match account.slots.get(&slot) {
Some(slot) => slot.clone(),
None => self
.get_account(address, &[slot])
.slots
.get(&slot)
.unwrap()
.clone(),
},
None => self
.get_account(address, &[slot])
.slots
.get(&slot)
.unwrap()
.clone(),
}
}
@ -173,4 +243,5 @@ impl<R: Rpc> Database for ProofDB<R> {
fn is_precompile(address: &Address) -> bool {
address.le(&Address::from_str("0x0000000000000000000000000000000000000009").unwrap())
&& address.gt(&Address::zero())
}

View File

@ -18,7 +18,7 @@ use super::types::{Account, ExecutionBlock};
#[derive(Clone)]
pub struct ExecutionClient<R: Rpc> {
rpc: R,
pub rpc: R,
}
impl<R: Rpc> ExecutionClient<R> {
@ -76,9 +76,17 @@ impl<R: Rpc> ExecutionClient<R> {
slot_map.insert(storage_proof.key, storage_proof.value);
}
let code = self.rpc.get_code(address, payload.block_number).await?;
let code_hash = keccak256(&code).into();
if proof.code_hash != code_hash {
eyre::bail!("Invalid Proof");
}
Ok(Account {
balance: proof.balance,
nonce: proof.nonce.as_u64(),
code,
code_hash: proof.code_hash,
storage_hash: proof.storage_hash,
slots: slot_map,

View File

@ -1,21 +1,43 @@
use std::str::FromStr;
use async_trait::async_trait;
use ethers::prelude::{Address, Http};
use ethers::providers::{Middleware, Provider};
use ethers::types::{BlockId, Bytes, EIP1186ProofResponse, Transaction, TransactionReceipt, H256};
use ethers::providers::{HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient};
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{
BlockId, Bytes, EIP1186ProofResponse, Eip1559TransactionRequest, Transaction,
TransactionReceipt, H256, U256,
};
use eyre::Result;
use log::trace;
use crate::types::CallOpts;
use super::Rpc;
#[derive(Clone)]
pub struct HttpRpc {
provider: Provider<Http>,
url: String,
provider: Provider<RetryClient<Http>>,
}
impl Clone for HttpRpc {
fn clone(&self) -> Self {
Self::new(&self.url).unwrap()
}
}
#[async_trait]
impl Rpc for HttpRpc {
fn new(rpc: &str) -> Result<Self> {
let provider = Provider::try_from(rpc)?;
Ok(HttpRpc { provider })
let http = Http::from_str(rpc)?;
let mut client = RetryClient::new(http, Box::new(HttpRateLimitRetryPolicy), 10, 1);
client.set_compute_units(300);
let provider = Provider::new(client);
Ok(HttpRpc {
url: rpc.to_string(),
provider,
})
}
async fn get_proof(
@ -24,6 +46,7 @@ impl Rpc for HttpRpc {
slots: &[H256],
block: u64,
) -> Result<EIP1186ProofResponse> {
trace!("fetching proof");
let block = Some(BlockId::from(block));
let proof_response = self
.provider
@ -32,6 +55,26 @@ impl Rpc for HttpRpc {
Ok(proof_response)
}
async fn create_access_list(&self, opts: &CallOpts, block: u64) -> Result<AccessList> {
let block = Some(BlockId::from(block));
let mut tx = Eip1559TransactionRequest::new();
tx.to = Some(opts.to.into());
tx.from = opts.from;
tx.value = opts.value;
// TODO: better way to set gas limit
tx.gas = Some(U256::from(10_000_000));
tx.data = opts
.data
.as_ref()
.map(|data| Bytes::from(data.as_slice().to_owned()));
let tx = TypedTransaction::Eip1559(tx);
let list = self.provider.create_access_list(&tx, block).await?;
Ok(list.access_list)
}
async fn get_code(&self, address: &Address, block: u64) -> Result<Vec<u8>> {
let block = Some(BlockId::from(block));
let code = self.provider.get_code(*address, block).await?;

View File

@ -2,8 +2,13 @@ use std::{fs::read_to_string, path::PathBuf};
use async_trait::async_trait;
use common::utils::hex_str_to_bytes;
use ethers::types::{Address, EIP1186ProofResponse, Transaction, TransactionReceipt, H256};
use eyre::Result;
use ethers::types::{
transaction::eip2930::AccessList, Address, EIP1186ProofResponse, Transaction,
TransactionReceipt, H256,
};
use eyre::{eyre, Result};
use crate::types::CallOpts;
use super::Rpc;
@ -29,13 +34,17 @@ impl Rpc for MockRpc {
Ok(serde_json::from_str(&proof)?)
}
async fn create_access_list(&self, _opts: &CallOpts, _block: u64) -> Result<AccessList> {
Err(eyre!("not implemented"))
}
async fn get_code(&self, _address: &Address, _block: u64) -> Result<Vec<u8>> {
let code = read_to_string(self.path.join("code.json"))?;
hex_str_to_bytes(&code[0..code.len() - 1])
}
async fn send_raw_transaction(&self, _bytes: &Vec<u8>) -> Result<H256> {
Err(eyre::eyre!("not implemented"))
Err(eyre!("not implemented"))
}
async fn get_transaction_receipt(&self, _tx_hash: &H256) -> Result<Option<TransactionReceipt>> {

View File

@ -1,7 +1,12 @@
use async_trait::async_trait;
use ethers::types::{Address, EIP1186ProofResponse, Transaction, TransactionReceipt, H256};
use ethers::types::{
transaction::eip2930::AccessList, Address, EIP1186ProofResponse, Transaction,
TransactionReceipt, H256,
};
use eyre::Result;
use crate::types::CallOpts;
pub mod http_rpc;
pub mod mock_rpc;
@ -18,6 +23,7 @@ pub trait Rpc: Send + Clone + 'static {
block: u64,
) -> Result<EIP1186ProofResponse>;
async fn create_access_list(&self, opts: &CallOpts, block: u64) -> Result<AccessList>;
async fn get_code(&self, address: &Address, block: u64) -> Result<Vec<u8>>;
async fn send_raw_transaction(&self, bytes: &Vec<u8>) -> Result<H256>;
async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result<Option<TransactionReceipt>>;

View File

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};
use ethers::prelude::{Address, H256, U256};
use eyre::Result;
@ -6,11 +6,12 @@ use serde::{Deserialize, Serialize};
use common::utils::u64_to_hex_string;
#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct Account {
pub balance: U256,
pub nonce: u64,
pub code_hash: H256,
pub code: Vec<u8>,
pub storage_hash: H256,
pub slots: HashMap<H256, U256>,
}
@ -49,7 +50,7 @@ pub struct ExecutionBlock {
pub uncles: Vec<H256>,
}
#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CallOpts {
pub from: Option<Address>,
@ -61,6 +62,17 @@ pub struct CallOpts {
pub data: Option<Vec<u8>>,
}
impl fmt::Debug for CallOpts {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CallOpts")
.field("from", &self.from)
.field("to", &self.to)
.field("value", &self.value)
.field("data", &hex::encode(&self.data.clone().unwrap_or_default()))
.finish()
}
}
fn bytes_deserialize<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
where
D: serde::Deserializer<'de>,