diff --git a/cli/src/main.rs b/cli/src/main.rs index f0c356d..e431937 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,19 +1,24 @@ -use std::{sync::Arc, time::Duration}; - use clap::Parser; use common::utils::hex_str_to_bytes; use dirs::home_dir; use env_logger::Env; use eyre::Result; -use tokio::{sync::Mutex, time::sleep}; -use client::{rpc::Rpc, Client}; +use client::Client; use config::{networks, Config}; #[tokio::main] async fn main() -> Result<()> { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + let config = get_config()?; + let mut client = Client::new(config).await?; + client.start().await?; + + std::future::pending().await +} + +fn get_config() -> Result { let cli = Cli::parse(); let mut config = match cli.network.as_str() { "goerli" => networks::goerli(), @@ -28,26 +33,19 @@ async fn main() -> Result<()> { config.general.checkpoint = hex_str_to_bytes(&checkpoint)?; } - let mut client = Client::new(Arc::new(config)).await?; - client.sync().await?; - - let client = Arc::new(Mutex::new(client)); - - let mut rpc = Rpc::new(client.clone(), cli.port); - rpc.start().await?; - - loop { - sleep(Duration::from_secs(10)).await; - client.lock().await.advance().await? + if let Some(port) = cli.port { + config.general.rpc_port = Some(port); } + + Ok(config) } #[derive(Parser)] struct Cli { #[clap(short, long, default_value = "goerli")] network: String, - #[clap(short, long, default_value = "8545")] - port: u16, + #[clap(short, long)] + port: Option, #[clap(short, long)] checkpoint: Option, } diff --git a/client/src/client.rs b/client/src/client.rs index b919e1e..fe0cdbd 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -1,198 +1,132 @@ -use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{Transaction, TransactionReceipt, H256}; use eyre::Result; use config::Config; -use consensus::rpc::nimbus_rpc::NimbusRpc; -use consensus::types::{ExecutionPayload, Header}; -use consensus::ConsensusClient; -use execution::evm::Evm; -use execution::rpc::http_rpc::HttpRpc; +use consensus::types::Header; use execution::types::{CallOpts, ExecutionBlock}; -use execution::ExecutionClient; +use log::warn; +use tokio::spawn; +use tokio::sync::Mutex; +use tokio::time::sleep; + +use crate::node::Node; +use crate::rpc::Rpc; pub struct Client { - consensus: ConsensusClient, - execution: ExecutionClient, - config: Arc, - payloads: HashMap, - block_hashes: HashMap, u64>, - block_head: u64, + node: Arc>, + rpc: Option, } impl Client { - pub async fn new(config: Arc) -> Result { - let consensus_rpc = &config.general.consensus_rpc; - let checkpoint_hash = &config.general.checkpoint; - let execution_rpc = &config.general.execution_rpc; + pub async fn new(config: Config) -> Result { + let config = Arc::new(config); + let node = Node::new(config.clone()).await?; + let node = Arc::new(Mutex::new(node)); - let consensus = - ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()).await?; - let execution = ExecutionClient::new(execution_rpc)?; + let rpc = if let Some(port) = config.general.rpc_port { + Some(Rpc::new(node.clone(), port)) + } else { + None + }; - let payloads = HashMap::new(); - let block_hashes = HashMap::new(); - - Ok(Client { - consensus, - execution, - config, - payloads, - block_hashes, - block_head: 0, - }) + Ok(Client { node, rpc }) } - pub async fn sync(&mut self) -> Result<()> { - self.consensus.sync().await?; + pub async fn start(&mut self) -> Result<()> { + self.rpc.as_mut().unwrap().start().await?; + self.node.lock().await.sync().await?; - let head = self.consensus.get_header(); - let payload = self - .consensus - .get_execution_payload(&Some(head.slot)) - .await?; + let node = self.node.clone(); + spawn(async move { + loop { + let res = node.lock().await.advance().await; + if let Err(err) = res { + warn!("{}", err); + } - self.block_head = payload.block_number; - self.block_hashes - .insert(payload.block_hash.to_vec(), payload.block_number); - self.payloads.insert(payload.block_number, payload); + sleep(Duration::from_secs(10)).await; + } + }); Ok(()) } - pub async fn advance(&mut self) -> Result<()> { - self.consensus.advance().await?; - - let head = self.consensus.get_header(); - let payload = self - .consensus - .get_execution_payload(&Some(head.slot)) - .await?; - - self.block_head = payload.block_number; - self.block_hashes - .insert(payload.block_hash.to_vec(), payload.block_number); - self.payloads.insert(payload.block_number, payload); - - Ok(()) + pub async fn call(&self, opts: &CallOpts, block: &Option) -> Result> { + self.node.lock().await.call(opts, block) } - pub fn call(&self, opts: &CallOpts, block: &Option) -> Result> { - let payload = self.get_payload(block)?; - let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); - evm.call(opts) - } - - pub fn estimate_gas(&self, opts: &CallOpts) -> Result { - let payload = self.get_payload(&None)?; - let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); - evm.estimate_gas(opts) + pub async fn estimate_gas(&self, opts: &CallOpts) -> Result { + self.node.lock().await.estimate_gas(opts) } pub async fn get_balance(&self, address: &Address, block: &Option) -> Result { - let payload = self.get_payload(block)?; - let account = self.execution.get_account(&address, None, &payload).await?; - Ok(account.balance) + self.node.lock().await.get_balance(address, block).await } pub async fn get_nonce(&self, address: &Address, block: &Option) -> Result { - let payload = self.get_payload(block)?; - let account = self.execution.get_account(&address, None, &payload).await?; - Ok(account.nonce) + self.node.lock().await.get_nonce(address, block).await } pub async fn get_code(&self, address: &Address, block: &Option) -> Result> { - let payload = self.get_payload(block)?; - self.execution.get_code(&address, &payload).await + self.node.lock().await.get_code(address, block).await } pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result { - let payload = self.get_payload(&None)?; - let account = self - .execution - .get_account(address, Some(&[slot]), &payload) - .await?; - let value = account.slots.get(&slot); - match value { - Some(value) => Ok(*value), - None => Err(eyre::eyre!("Slot Not Found")), - } + self.node.lock().await.get_storage_at(address, slot).await } pub async fn send_raw_transaction(&self, bytes: &Vec) -> Result { - self.execution.send_raw_transaction(bytes).await + self.node.lock().await.send_raw_transaction(bytes).await } pub async fn get_transaction_receipt( &self, tx_hash: &H256, ) -> Result> { - self.execution - .get_transaction_receipt(tx_hash, &self.payloads) + self.node + .lock() + .await + .get_transaction_receipt(tx_hash) .await } pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> Result> { - self.execution - .get_transaction(tx_hash, &self.payloads) + self.node + .lock() + .await + .get_transaction_by_hash(tx_hash) .await } - pub fn get_gas_price(&self) -> Result { - let payload = self.get_payload(&None)?; - let base_fee = U256::from_little_endian(&payload.base_fee_per_gas.to_bytes_le()); - let tip = U256::from(10_u64.pow(9)); - Ok(base_fee + tip) + pub async fn get_gas_price(&self) -> Result { + self.node.lock().await.get_gas_price() } - pub fn get_priority_fee(&self) -> Result { - let tip = U256::from(10_u64.pow(9)); - Ok(tip) + pub async fn get_priority_fee(&self) -> Result { + self.node.lock().await.get_priority_fee() } - pub fn get_block_number(&self) -> Result { - let payload = self.get_payload(&None)?; - Ok(payload.block_number) + pub async fn get_block_number(&self) -> Result { + self.node.lock().await.get_block_number() } - pub fn get_block_by_number(&self, block: &Option) -> Result { - let payload = self.get_payload(block)?; - self.execution.get_block(&payload) + pub async fn get_block_by_number(&self, block: &Option) -> Result { + self.node.lock().await.get_block_by_number(block) } - pub fn get_block_by_hash(&self, hash: &Vec) -> Result { - let block = self.block_hashes.get(hash); - let payload = self.get_payload(&block.cloned())?; - self.execution.get_block(&payload) + pub async fn get_block_by_hash(&self, hash: &Vec) -> Result { + self.node.lock().await.get_block_by_hash(hash) } - pub fn chain_id(&self) -> u64 { - self.config.general.chain_id + pub async fn chain_id(&self) -> u64 { + self.node.lock().await.chain_id() } - pub fn get_header(&self) -> &Header { - self.consensus.get_header() - } - - fn get_payload(&self, block: &Option) -> Result { - match block { - Some(block) => { - let payload = self.payloads.get(block); - match payload { - Some(payload) => Ok(payload.clone()), - None => Err(eyre::eyre!("Block Not Found")), - } - } - None => { - let payload = self.payloads.get(&self.block_head); - match payload { - Some(payload) => Ok(payload.clone()), - None => Err(eyre::eyre!("Block Not Found")), - } - } - } + pub async fn get_header(&self) -> Header { + self.node.lock().await.get_header().clone() } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 074f9e3..1c4ab3a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -2,3 +2,5 @@ mod client; pub use crate::client::*; pub mod rpc; + +mod node; diff --git a/client/src/node.rs b/client/src/node.rs new file mode 100644 index 0000000..d25a3fe --- /dev/null +++ b/client/src/node.rs @@ -0,0 +1,198 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use ethers::prelude::{Address, U256}; +use ethers::types::{Transaction, TransactionReceipt, H256}; +use eyre::Result; + +use config::Config; +use consensus::rpc::nimbus_rpc::NimbusRpc; +use consensus::types::{ExecutionPayload, Header}; +use consensus::ConsensusClient; +use execution::evm::Evm; +use execution::rpc::http_rpc::HttpRpc; +use execution::types::{CallOpts, ExecutionBlock}; +use execution::ExecutionClient; + +pub struct Node { + consensus: ConsensusClient, + execution: ExecutionClient, + config: Arc, + payloads: HashMap, + block_hashes: HashMap, u64>, + block_head: u64, +} + +impl Node { + pub async fn new(config: Arc) -> Result { + let consensus_rpc = &config.general.consensus_rpc; + let checkpoint_hash = &config.general.checkpoint; + let execution_rpc = &config.general.execution_rpc; + + let consensus = + ConsensusClient::new(consensus_rpc, checkpoint_hash, config.clone()).await?; + let execution = ExecutionClient::new(execution_rpc)?; + + let payloads = HashMap::new(); + let block_hashes = HashMap::new(); + + Ok(Node { + consensus, + execution, + config, + payloads, + block_hashes, + block_head: 0, + }) + } + + pub async fn sync(&mut self) -> Result<()> { + self.consensus.sync().await?; + + let head = self.consensus.get_header(); + let payload = self + .consensus + .get_execution_payload(&Some(head.slot)) + .await?; + + self.block_head = payload.block_number; + self.block_hashes + .insert(payload.block_hash.to_vec(), payload.block_number); + self.payloads.insert(payload.block_number, payload); + + Ok(()) + } + + pub async fn advance(&mut self) -> Result<()> { + self.consensus.advance().await?; + + let head = self.consensus.get_header(); + let payload = self + .consensus + .get_execution_payload(&Some(head.slot)) + .await?; + + self.block_head = payload.block_number; + self.block_hashes + .insert(payload.block_hash.to_vec(), payload.block_number); + self.payloads.insert(payload.block_number, payload); + + Ok(()) + } + + pub fn call(&self, opts: &CallOpts, block: &Option) -> Result> { + let payload = self.get_payload(block)?; + let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); + evm.call(opts) + } + + pub fn estimate_gas(&self, opts: &CallOpts) -> Result { + let payload = self.get_payload(&None)?; + let mut evm = Evm::new(self.execution.clone(), payload, self.chain_id()); + evm.estimate_gas(opts) + } + + pub async fn get_balance(&self, address: &Address, block: &Option) -> Result { + let payload = self.get_payload(block)?; + let account = self.execution.get_account(&address, None, &payload).await?; + Ok(account.balance) + } + + pub async fn get_nonce(&self, address: &Address, block: &Option) -> Result { + let payload = self.get_payload(block)?; + let account = self.execution.get_account(&address, None, &payload).await?; + Ok(account.nonce) + } + + pub async fn get_code(&self, address: &Address, block: &Option) -> Result> { + let payload = self.get_payload(block)?; + self.execution.get_code(&address, &payload).await + } + + pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result { + let payload = self.get_payload(&None)?; + let account = self + .execution + .get_account(address, Some(&[slot]), &payload) + .await?; + let value = account.slots.get(&slot); + match value { + Some(value) => Ok(*value), + None => Err(eyre::eyre!("Slot Not Found")), + } + } + + pub async fn send_raw_transaction(&self, bytes: &Vec) -> Result { + self.execution.send_raw_transaction(bytes).await + } + + pub async fn get_transaction_receipt( + &self, + tx_hash: &H256, + ) -> Result> { + self.execution + .get_transaction_receipt(tx_hash, &self.payloads) + .await + } + + pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> Result> { + self.execution + .get_transaction(tx_hash, &self.payloads) + .await + } + + pub fn get_gas_price(&self) -> Result { + let payload = self.get_payload(&None)?; + let base_fee = U256::from_little_endian(&payload.base_fee_per_gas.to_bytes_le()); + let tip = U256::from(10_u64.pow(9)); + Ok(base_fee + tip) + } + + pub fn get_priority_fee(&self) -> Result { + let tip = U256::from(10_u64.pow(9)); + Ok(tip) + } + + pub fn get_block_number(&self) -> Result { + let payload = self.get_payload(&None)?; + Ok(payload.block_number) + } + + pub fn get_block_by_number(&self, block: &Option) -> Result { + let payload = self.get_payload(block)?; + self.execution.get_block(&payload) + } + + pub fn get_block_by_hash(&self, hash: &Vec) -> Result { + let block = self.block_hashes.get(hash); + let payload = self.get_payload(&block.cloned())?; + self.execution.get_block(&payload) + } + + pub fn chain_id(&self) -> u64 { + self.config.general.chain_id + } + + pub fn get_header(&self) -> &Header { + self.consensus.get_header() + } + + fn get_payload(&self, block: &Option) -> Result { + match block { + Some(block) => { + let payload = self.payloads.get(block); + match payload { + Some(payload) => Ok(payload.clone()), + None => Err(eyre::eyre!("Block Not Found")), + } + } + None => { + let payload = self.payloads.get(&self.block_head); + match payload { + Some(payload) => Ok(payload.clone()), + None => Err(eyre::eyre!("Block Not Found")), + } + } + } + } +} diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 9bdf608..c3a1f53 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -3,7 +3,7 @@ use ethers::{ types::{Address, Transaction, TransactionReceipt, H256}, }; use eyre::Result; -use log::{error, info}; +use log::{info, warn}; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use tokio::sync::Mutex; @@ -13,20 +13,21 @@ use jsonrpsee::{ proc_macros::rpc, }; -use super::Client; +use crate::node::Node; + use common::utils::{hex_str_to_bytes, u64_to_hex_string}; use execution::types::{CallOpts, ExecutionBlock}; pub struct Rpc { - client: Arc>, + node: Arc>, handle: Option, port: u16, } impl Rpc { - pub fn new(client: Arc>, port: u16) -> Self { + pub fn new(node: Arc>, port: u16) -> Self { Rpc { - client, + node, handle: None, port, } @@ -34,7 +35,7 @@ impl Rpc { pub async fn start(&mut self) -> Result { let rpc_inner = RpcInner { - client: self.client.clone(), + node: self.node.clone(), port: self.port, }; let (handle, addr) = start(rpc_inner).await?; @@ -86,7 +87,7 @@ trait NetRpc { #[derive(Clone)] struct RpcInner { - client: Arc>, + node: Arc>, port: u16, } @@ -95,8 +96,8 @@ impl EthRpcServer for RpcInner { async fn get_balance(&self, address: &str, block: &str) -> Result { let block = convert_err(decode_block(block))?; let address = convert_err(Address::from_str(address))?; - let client = self.client.lock().await; - let balance = convert_err(client.get_balance(&address, &block).await)?; + let node = self.node.lock().await; + let balance = convert_err(node.get_balance(&address, &block).await)?; Ok(balance.encode_hex()) } @@ -104,8 +105,8 @@ impl EthRpcServer for RpcInner { async fn get_transaction_count(&self, address: &str, block: &str) -> Result { let block = convert_err(decode_block(block))?; let address = convert_err(Address::from_str(address))?; - let client = self.client.lock().await; - let nonce = convert_err(client.get_nonce(&address, &block).await)?; + let node = self.node.lock().await; + let nonce = convert_err(node.get_nonce(&address, &block).await)?; Ok(nonce.encode_hex()) } @@ -113,48 +114,48 @@ impl EthRpcServer for RpcInner { async fn get_code(&self, address: &str, block: &str) -> Result { let block = convert_err(decode_block(block))?; let address = convert_err(Address::from_str(address))?; - let client = self.client.lock().await; - let code = convert_err(client.get_code(&address, &block).await)?; + let node = self.node.lock().await; + let code = convert_err(node.get_code(&address, &block).await)?; Ok(hex::encode(code)) } async fn call(&self, opts: CallOpts, block: &str) -> Result { let block = convert_err(decode_block(block))?; - let client = self.client.lock().await; - let res = convert_err(client.call(&opts, &block))?; + let node = self.node.lock().await; + let res = convert_err(node.call(&opts, &block))?; Ok(format!("0x{}", hex::encode(res))) } async fn estimate_gas(&self, opts: CallOpts) -> Result { - let client = self.client.lock().await; - let gas = convert_err(client.estimate_gas(&opts))?; + let node = self.node.lock().await; + let gas = convert_err(node.estimate_gas(&opts))?; Ok(u64_to_hex_string(gas)) } async fn chain_id(&self) -> Result { - let client = self.client.lock().await; - let id = client.chain_id(); + let node = self.node.lock().await; + let id = node.chain_id(); Ok(u64_to_hex_string(id)) } async fn gas_price(&self) -> Result { - let client = self.client.lock().await; - let gas_price = convert_err(client.get_gas_price())?; + let node = self.node.lock().await; + let gas_price = convert_err(node.get_gas_price())?; Ok(gas_price.encode_hex()) } async fn max_priority_fee_per_gas(&self) -> Result { - let client = self.client.lock().await; - let tip = convert_err(client.get_priority_fee())?; + let node = self.node.lock().await; + let tip = convert_err(node.get_priority_fee())?; Ok(tip.encode_hex()) } async fn block_number(&self) -> Result { - let client = self.client.lock().await; - let num = convert_err(client.get_block_number())?; + let node = self.node.lock().await; + let num = convert_err(node.get_block_number())?; Ok(u64_to_hex_string(num)) } @@ -164,31 +165,31 @@ impl EthRpcServer for RpcInner { _full_tx: bool, ) -> Result { let block = convert_err(decode_block(block))?; - let client = self.client.lock().await; - let block = convert_err(client.get_block_by_number(&block))?; + let node = self.node.lock().await; + let block = convert_err(node.get_block_by_number(&block))?; Ok(block) } async fn get_block_by_hash(&self, hash: &str, _full_tx: bool) -> Result { let hash = convert_err(hex_str_to_bytes(hash))?; - let client = self.client.lock().await; - let block = convert_err(client.get_block_by_hash(&hash))?; + let node = self.node.lock().await; + let block = convert_err(node.get_block_by_hash(&hash))?; Ok(block) } async fn send_raw_transaction(&self, bytes: &str) -> Result { - let client = self.client.lock().await; + let node = self.node.lock().await; let bytes = convert_err(hex_str_to_bytes(bytes))?; - let tx_hash = convert_err(client.send_raw_transaction(&bytes).await)?; + let tx_hash = convert_err(node.send_raw_transaction(&bytes).await)?; Ok(hex::encode(tx_hash)) } async fn get_transaction_receipt(&self, hash: &str) -> Result { - let client = self.client.lock().await; + let node = self.node.lock().await; let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?); - let receipt = convert_err(client.get_transaction_receipt(&hash).await)?; + let receipt = convert_err(node.get_transaction_receipt(&hash).await)?; match receipt { Some(receipt) => Ok(receipt), @@ -197,9 +198,9 @@ impl EthRpcServer for RpcInner { } async fn get_transaction_by_hash(&self, hash: &str) -> Result { - let client = self.client.lock().await; + let node = self.node.lock().await; let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?); - let tx = convert_err(client.get_transaction_by_hash(&hash).await)?; + let tx = convert_err(node.get_transaction_by_hash(&hash).await)?; match tx { Some(tx) => Ok(tx), @@ -211,8 +212,8 @@ impl EthRpcServer for RpcInner { #[async_trait] impl NetRpcServer for RpcInner { async fn version(&self) -> Result { - let client = self.client.lock().await; - Ok(client.chain_id().to_string()) + let node = self.node.lock().await; + Ok(node.chain_id().to_string()) } } @@ -236,7 +237,7 @@ async fn start(rpc: RpcInner) -> Result<(HttpServerHandle, SocketAddr)> { fn convert_err(res: Result) -> Result { res.map_err(|err| { - error!("{}", err); + warn!("{}", err); Error::Custom(err.to_string()) }) } diff --git a/config/configs/goerli.toml b/config/configs/goerli.toml index 4d058b8..6e833f9 100644 --- a/config/configs/goerli.toml +++ b/config/configs/goerli.toml @@ -5,6 +5,7 @@ genesis_root = "0x043db0d9a83813551ee2f33450d23797757d430911a9320530ad8a0eabc43e checkpoint = "0x172128eadf1da46467f4d6a822206698e2d3f957af117dd650954780d680dc99" consensus_rpc = "http://testing.prater.beacon-api.nimbus.team" execution_rpc = "https://eth-goerli.g.alchemy.com:443/v2/o_8Qa9kgwDPf9G8sroyQ-uQtyhyWa3ao" +rpc_port = 8545 [forks] diff --git a/config/src/lib.rs b/config/src/lib.rs index 359f96b..18bdddd 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -23,6 +23,7 @@ pub struct General { pub checkpoint: Vec, pub consensus_rpc: String, pub execution_rpc: String, + pub rpc_port: Option, } #[derive(Deserialize, Debug)] diff --git a/config/src/networks.rs b/config/src/networks.rs index f04876f..d836c3b 100644 --- a/config/src/networks.rs +++ b/config/src/networks.rs @@ -19,6 +19,7 @@ pub fn goerli() -> Config { execution_rpc: "https://eth-goerli.g.alchemy.com:443/v2/o_8Qa9kgwDPf9G8sroyQ-uQtyhyWa3ao" .to_string(), + rpc_port: Some(8545), }, forks: Forks { genesis: Fork {