fix: node memory leak (#30)

* fix: node memory leak

* fix ci

* typo

* set override

* set uses

* fix

* remove duplicate name

* use nightly

* fix tests

* use BTreeMap for payload cache
This commit is contained in:
Noah Citron 2022-09-14 13:57:48 -04:00 committed by GitHub
parent 8e080faf4a
commit 4719717ddb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 71 deletions

View File

@ -6,19 +6,47 @@ on:
pull_request: pull_request:
branches: [ "master" ] branches: [ "master" ]
env:
CARGO_TERM_COLOR: always
jobs: jobs:
build: check:
name: Check
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v2
- name: Build - uses: actions-rs/toolchain@v1
run: cargo build --verbose with:
- name: Format profile: minimal
run: cargo fmt --check --verbose toolchain: nightly
- name: Run tests override: true
run: cargo test --verbose - uses: actions-rs/cargo@v1
with:
command: check
test:
name: Test Suite
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly
override: true
- uses: actions-rs/cargo@v1
with:
command: test
fmt:
name: Rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly
override: true
- run: rustup component add rustfmt
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check

View File

@ -1,3 +1,5 @@
#![feature(map_first_last)]
mod client; mod client;
pub use crate::client::*; pub use crate::client::*;

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use ethers::prelude::{Address, U256}; use ethers::prelude::{Address, U256};
@ -18,10 +18,9 @@ pub struct Node {
consensus: ConsensusClient<NimbusRpc>, consensus: ConsensusClient<NimbusRpc>,
execution: ExecutionClient<HttpRpc>, execution: ExecutionClient<HttpRpc>,
config: Arc<Config>, config: Arc<Config>,
payloads: HashMap<u64, ExecutionPayload>, payloads: BTreeMap<u64, ExecutionPayload>,
block_hashes: HashMap<Vec<u8>, u64>, finalized_payloads: BTreeMap<u64, ExecutionPayload>,
latest_block: u64, history_size: usize,
finalized_block: u64,
} }
impl Node { impl Node {
@ -34,17 +33,16 @@ impl Node {
ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()).await?; ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()).await?;
let execution = ExecutionClient::new(execution_rpc)?; let execution = ExecutionClient::new(execution_rpc)?;
let payloads = HashMap::new(); let payloads = BTreeMap::new();
let block_hashes = HashMap::new(); let finalized_payloads = BTreeMap::new();
Ok(Node { Ok(Node {
consensus, consensus,
execution, execution,
config, config,
payloads, payloads,
block_hashes, finalized_payloads,
latest_block: 0, history_size: 64,
finalized_block: 0,
}) })
} }
@ -65,66 +63,66 @@ impl Node {
.get_execution_payload(&Some(latest_header.slot)) .get_execution_payload(&Some(latest_header.slot))
.await?; .await?;
self.latest_block = latest_payload.block_number;
self.block_hashes.insert(
latest_payload.block_hash.to_vec(),
latest_payload.block_number,
);
self.payloads
.insert(latest_payload.block_number, latest_payload);
let finalized_header = self.consensus.get_finalized_header(); let finalized_header = self.consensus.get_finalized_header();
let finalized_payload = self let finalized_payload = self
.consensus .consensus
.get_execution_payload(&Some(finalized_header.slot)) .get_execution_payload(&Some(finalized_header.slot))
.await?; .await?;
self.finalized_block = finalized_payload.block_number;
self.block_hashes.insert(
finalized_payload.block_hash.to_vec(),
finalized_payload.block_number,
);
self.payloads self.payloads
.insert(latest_payload.block_number, latest_payload);
self.payloads
.insert(finalized_payload.block_number, finalized_payload.clone());
self.finalized_payloads
.insert(finalized_payload.block_number, finalized_payload); .insert(finalized_payload.block_number, finalized_payload);
while self.payloads.len() > self.history_size {
self.payloads.pop_first();
}
while self.finalized_payloads.len() > usize::max(self.history_size / 32, 1) {
self.finalized_payloads.pop_first();
}
Ok(()) Ok(())
} }
pub fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> { pub fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> {
let payload = self.get_payload(block)?; let payload = self.get_payload(block)?;
let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); let mut evm = Evm::new(self.execution.clone(), payload.clone(), self.chain_id());
evm.call(opts) evm.call(opts)
} }
pub fn estimate_gas(&self, opts: &CallOpts) -> Result<u64> { pub fn estimate_gas(&self, opts: &CallOpts) -> Result<u64> {
let payload = self.get_payload(&BlockTag::Latest)?; let payload = self.get_payload(&BlockTag::Latest)?;
let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); let mut evm = Evm::new(self.execution.clone(), payload.clone(), self.chain_id());
evm.estimate_gas(opts) evm.estimate_gas(opts)
} }
pub async fn get_balance(&self, address: &Address, block: &BlockTag) -> Result<U256> { pub async fn get_balance(&self, address: &Address, block: &BlockTag) -> Result<U256> {
let payload = self.get_payload(block)?; let payload = self.get_payload(block)?;
let account = self.execution.get_account(&address, None, &payload).await?; let account = self.execution.get_account(&address, None, payload).await?;
Ok(account.balance) Ok(account.balance)
} }
pub async fn get_nonce(&self, address: &Address, block: &BlockTag) -> Result<u64> { pub async fn get_nonce(&self, address: &Address, block: &BlockTag) -> Result<u64> {
let payload = self.get_payload(block)?; let payload = self.get_payload(block)?;
let account = self.execution.get_account(&address, None, &payload).await?; let account = self.execution.get_account(&address, None, payload).await?;
Ok(account.nonce) Ok(account.nonce)
} }
pub async fn get_code(&self, address: &Address, block: &BlockTag) -> Result<Vec<u8>> { pub async fn get_code(&self, address: &Address, block: &BlockTag) -> Result<Vec<u8>> {
let payload = self.get_payload(block)?; let payload = self.get_payload(block)?;
self.execution.get_code(&address, &payload).await self.execution.get_code(&address, payload).await
} }
pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result<U256> { pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result<U256> {
let payload = self.get_payload(&BlockTag::Latest)?; let payload = self.get_payload(&BlockTag::Latest)?;
let account = self let account = self
.execution .execution
.get_account(address, Some(&[slot]), &payload) .get_account(address, Some(&[slot]), payload)
.await?; .await?;
let value = account.slots.get(&slot); let value = account.slots.get(&slot);
match value { match value {
Some(value) => Ok(*value), Some(value) => Ok(*value),
@ -170,16 +168,18 @@ impl Node {
pub fn get_block_by_number(&self, block: &BlockTag) -> Result<ExecutionBlock> { pub fn get_block_by_number(&self, block: &BlockTag) -> Result<ExecutionBlock> {
let payload = self.get_payload(block)?; let payload = self.get_payload(block)?;
self.execution.get_block(&payload) self.execution.get_block(payload)
} }
pub fn get_block_by_hash(&self, hash: &Vec<u8>) -> Result<ExecutionBlock> { pub fn get_block_by_hash(&self, hash: &Vec<u8>) -> Result<ExecutionBlock> {
let block = self let payloads = self
.block_hashes .payloads
.get(hash) .iter()
.ok_or(eyre!("Block Not Found"))?; .filter(|entry| &entry.1.block_hash.to_vec() == hash)
let payload = self.get_payload(&BlockTag::Number(*block))?; .collect::<Vec<(&u64, &ExecutionPayload)>>();
self.execution.get_block(&payload)
let payload = payloads.get(0).ok_or(eyre!("Block Not Found"))?.1;
self.execution.get_block(payload)
} }
pub fn chain_id(&self) -> u64 { pub fn chain_id(&self) -> u64 {
@ -190,19 +190,19 @@ impl Node {
self.consensus.get_header() self.consensus.get_header()
} }
fn get_payload(&self, block: &BlockTag) -> Result<ExecutionPayload> { fn get_payload(&self, block: &BlockTag) -> Result<&ExecutionPayload> {
match block { match block {
BlockTag::Latest => { BlockTag::Latest => {
let payload = self.payloads.get(&self.latest_block); let payload = self.payloads.last_key_value();
payload.cloned().ok_or(eyre!("Block Not Found")) Ok(payload.ok_or(eyre!("Block Not Found"))?.1)
} }
BlockTag::Finalized => { BlockTag::Finalized => {
let payload = self.payloads.get(&self.finalized_block); let payload = self.finalized_payloads.last_key_value();
payload.cloned().ok_or(eyre!("Block Not Found")) Ok(payload.ok_or(eyre!("Block Not Found"))?.1)
} }
BlockTag::Number(num) => { BlockTag::Number(num) => {
let payload = self.payloads.get(&num); let payload = self.payloads.get(num);
payload.cloned().ok_or(eyre!("Block Not Found")) payload.ok_or(eyre!("Block Not Found"))
} }
} }
} }

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{BTreeMap, HashMap};
use std::str::FromStr; use std::str::FromStr;
use ethers::abi::AbiEncode; use ethers::abi::AbiEncode;
@ -140,7 +140,7 @@ impl<R: Rpc> ExecutionClient<R> {
pub async fn get_transaction_receipt( pub async fn get_transaction_receipt(
&self, &self,
tx_hash: &H256, tx_hash: &H256,
payloads: &HashMap<u64, ExecutionPayload>, payloads: &BTreeMap<u64, ExecutionPayload>,
) -> Result<Option<TransactionReceipt>> { ) -> Result<Option<TransactionReceipt>> {
let receipt = self.rpc.get_transaction_receipt(tx_hash).await?; let receipt = self.rpc.get_transaction_receipt(tx_hash).await?;
if receipt.is_none() { if receipt.is_none() {
@ -148,7 +148,8 @@ impl<R: Rpc> ExecutionClient<R> {
} }
let receipt = receipt.unwrap(); let receipt = receipt.unwrap();
let payload = payloads.get(&receipt.block_number.unwrap().as_u64()); let block_number = receipt.block_number.unwrap().as_u64();
let payload = payloads.get(&block_number);
if payload.is_none() { if payload.is_none() {
return Ok(None); return Ok(None);
} }
@ -187,7 +188,7 @@ impl<R: Rpc> ExecutionClient<R> {
pub async fn get_transaction( pub async fn get_transaction(
&self, &self,
hash: &H256, hash: &H256,
payloads: &HashMap<u64, ExecutionPayload>, payloads: &BTreeMap<u64, ExecutionPayload>,
) -> Result<Option<Transaction>> { ) -> Result<Option<Transaction>> {
let tx = self.rpc.get_transaction(hash).await?; let tx = self.rpc.get_transaction(hash).await?;
if tx.is_none() { if tx.is_none() {
@ -201,9 +202,8 @@ impl<R: Rpc> ExecutionClient<R> {
return Ok(None); return Ok(None);
} }
let block_number = block_number.unwrap(); let block_number = block_number.unwrap().as_u64();
let payload = payloads.get(&block_number);
let payload = payloads.get(&block_number.as_u64());
if payload.is_none() { if payload.is_none() {
return Ok(None); return Ok(None);
} }

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::BTreeMap;
use std::str::FromStr; use std::str::FromStr;
use ethers::types::{Address, H256, U256}; use ethers::types::{Address, H256, U256};
@ -89,7 +89,7 @@ async fn test_get_tx() {
let mut payload = ExecutionPayload::default(); let mut payload = ExecutionPayload::default();
payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap())); payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap()));
let mut payloads = HashMap::new(); let mut payloads = BTreeMap::new();
payloads.insert(7530933, payload); payloads.insert(7530933, payload);
let tx = execution let tx = execution
@ -108,7 +108,8 @@ async fn test_get_tx_bad_proof() {
H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap(); H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap();
let payload = ExecutionPayload::default(); let payload = ExecutionPayload::default();
let mut payloads = HashMap::new();
let mut payloads = BTreeMap::new();
payloads.insert(7530933, payload); payloads.insert(7530933, payload);
let tx_res = execution.get_transaction(&tx_hash, &payloads).await; let tx_res = execution.get_transaction(&tx_hash, &payloads).await;
@ -122,7 +123,7 @@ async fn test_get_tx_not_included() {
let tx_hash = let tx_hash =
H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap(); H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap();
let payloads = HashMap::new(); let payloads = BTreeMap::new();
let tx_opt = execution let tx_opt = execution
.get_transaction(&tx_hash, &payloads) .get_transaction(&tx_hash, &payloads)
@ -139,7 +140,6 @@ async fn test_get_receipt() {
H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap(); H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap();
let mut payload = ExecutionPayload::default(); let mut payload = ExecutionPayload::default();
payload.receipts_root = Vector::from_iter( payload.receipts_root = Vector::from_iter(
hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f") hex_str_to_bytes("dd82a78eccb333854f0c99e5632906e092d8a49c27a21c25cae12b82ec2a113f")
.unwrap(), .unwrap(),
@ -147,7 +147,7 @@ async fn test_get_receipt() {
payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap())); payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap()));
let mut payloads = HashMap::new(); let mut payloads = BTreeMap::new();
payloads.insert(7530933, payload); payloads.insert(7530933, payload);
let receipt = execution let receipt = execution
@ -168,7 +168,7 @@ async fn test_get_receipt_bad_proof() {
let mut payload = ExecutionPayload::default(); let mut payload = ExecutionPayload::default();
payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap())); payload.transactions.push(List::from_iter(hex_str_to_bytes("0x02f8b20583623355849502f900849502f91082ea6094326c977e6efc84e512bb9c30f76e30c160ed06fb80b844a9059cbb0000000000000000000000007daccf9b3c1ae2fa5c55f1c978aeef700bc83be0000000000000000000000000000000000000000000000001158e460913d00000c080a0e1445466b058b6f883c0222f1b1f3e2ad9bee7b5f688813d86e3fa8f93aa868ca0786d6e7f3aefa8fe73857c65c32e4884d8ba38d0ecfb947fbffb82e8ee80c167").unwrap()));
let mut payloads = HashMap::new(); let mut payloads = BTreeMap::new();
payloads.insert(7530933, payload); payloads.insert(7530933, payload);
let receipt_res = execution.get_transaction_receipt(&tx_hash, &payloads).await; let receipt_res = execution.get_transaction_receipt(&tx_hash, &payloads).await;
@ -182,7 +182,7 @@ async fn test_get_receipt_not_included() {
let tx_hash = let tx_hash =
H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap(); H256::from_str("2dac1b27ab58b493f902dda8b63979a112398d747f1761c0891777c0983e591f").unwrap();
let payloads = HashMap::new(); let payloads = BTreeMap::new();
let receipt_opt = execution let receipt_opt = execution
.get_transaction_receipt(&tx_hash, &payloads) .get_transaction_receipt(&tx_hash, &payloads)
.await .await