diff --git a/client/src/builder.rs b/client/src/builder.rs new file mode 100644 index 0000000..fe992ad --- /dev/null +++ b/client/src/builder.rs @@ -0,0 +1,197 @@ +use std::path::PathBuf; + +use config::{Network, Config}; +use execution::rpc::ExecutionRpc; + +use crate::{database::FileDB, Client}; + +#[derive(Default)] +pub struct ClientBuilder { + pub network: Option, + pub consensus_rpc: Option, + pub execution_rpc: Option, + pub checkpoint: Option>, + pub rpc_port: Option, + pub data_dir: Option, + pub config: Option, + pub fallback: Option, + pub load_external_fallback: bool, + pub with_ws: bool, + pub with_http: bool, +} + +impl ClientBuilder { + pub fn new() -> Self { + Self::default().with_http(true) + } + + pub fn network(mut self, network: Network) -> Self { + self.network = Some(network); + self + } + + pub fn consensus_rpc(mut self, consensus_rpc: &str) -> Self { + self.consensus_rpc = Some(consensus_rpc.to_string()); + self + } + + pub fn execution_rpc(mut self, execution_rpc: &str) -> Self { + self.execution_rpc = Some(execution_rpc.to_string()); + self + } + + pub fn checkpoint(mut self, checkpoint: &str) -> Self { + let checkpoint = hex::decode(checkpoint.strip_prefix("0x").unwrap_or(checkpoint)) + .expect("cannot parse checkpoint"); + self.checkpoint = Some(checkpoint); + self + } + + /// Enables the client to serve a websocket connection. + /// + /// # Example + /// ```rust + /// let mut client_builder = client::ClientBuilder::new().with_ws(true); + /// assert_eq!(client_builder.with_ws, true); + /// client_builder = client_builder.with_ws(false); + /// assert_eq!(client_builder.with_ws, false); + /// ``` + pub fn with_ws(mut self, option: bool) -> Self { + self.with_ws = option; + self + } + + /// Enables the client to serve an http connection (enabled by default). + /// + /// # Example + /// ```rust + /// let mut client_builder = client::ClientBuilder::new(); + /// assert_eq!(client_builder.with_http, true); + /// client_builder = client_builder.with_http(false); + /// assert_eq!(client_builder.with_http, false); + /// ``` + pub fn with_http(mut self, option: bool) -> Self { + self.with_http = option; + self + } + + pub fn rpc_port(mut self, port: u16) -> Self { + self.rpc_port = Some(port); + self + } + + pub fn data_dir(mut self, data_dir: PathBuf) -> Self { + self.data_dir = Some(data_dir); + self + } + + pub fn config(mut self, config: Config) -> Self { + self.config = Some(config); + self + } + + pub fn fallback(mut self, fallback: &str) -> Self { + self.fallback = Some(fallback.to_string()); + self + } + + pub fn load_external_fallback(mut self) -> Self { + self.load_external_fallback = true; + self + } + + pub fn build(self) -> eyre::Result> { + let base_config = if let Some(network) = self.network { + network.to_base_config() + } else { + let config = self + .config + .as_ref() + .ok_or(eyre::eyre!("missing network config"))?; + config.to_base_config() + }; + + let consensus_rpc = self.consensus_rpc.unwrap_or_else(|| { + self.config + .as_ref() + .expect("missing consensus rpc") + .consensus_rpc + .clone() + }); + + let execution_rpc = self.execution_rpc.unwrap_or_else(|| { + self.config + .as_ref() + .expect("missing execution rpc") + .execution_rpc + .clone() + }); + + let checkpoint = if let Some(checkpoint) = self.checkpoint { + checkpoint + } else if let Some(config) = &self.config { + config.checkpoint.clone() + } else { + base_config.checkpoint + }; + + let rpc_port = if self.rpc_port.is_some() { + self.rpc_port + } else if let Some(config) = &self.config { + config.rpc_port + } else { + None + }; + + let data_dir = if self.data_dir.is_some() { + self.data_dir + } else if let Some(config) = &self.config { + config.data_dir.clone() + } else { + None + }; + + let fallback = if self.fallback.is_some() { + self.fallback + } else if let Some(config) = &self.config { + config.fallback.clone() + } else { + None + }; + + let load_external_fallback = if let Some(config) = &self.config { + self.load_external_fallback || config.load_external_fallback + } else { + self.load_external_fallback + }; + + let with_ws = if let Some(config) = &self.config { + self.with_ws || config.with_ws + } else { + self.with_ws + }; + + let with_http = if let Some(config) = &self.config { + self.with_http || config.with_http + } else { + self.with_http + }; + + let config = Config { + consensus_rpc, + execution_rpc, + checkpoint, + rpc_port, + data_dir, + chain: base_config.chain, + forks: base_config.forks, + max_checkpoint_age: base_config.max_checkpoint_age, + fallback, + load_external_fallback, + with_ws, + with_http, + }; + + Client::new(config) + } +} \ No newline at end of file diff --git a/client/src/client.rs b/client/src/client.rs index 534864f..108b323 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -1,14 +1,13 @@ -use std::path::PathBuf; use std::sync::Arc; use config::networks::Network; use ethers::prelude::{Address, U256}; use ethers::types::{Filter, Log, Transaction, TransactionReceipt, H256}; -use eyre::{eyre, Result}; use common::types::BlockTag; use config::{CheckpointFallback, Config}; use consensus::{types::Header, ConsensusClient}; +use execution::rpc::ExecutionRpc; use execution::types::{CallOpts, ExecutionBlock}; use log::{info, warn}; use tokio::spawn; @@ -19,207 +18,16 @@ use crate::database::{Database, FileDB}; use crate::node::Node; use crate::rpc::Rpc; -#[derive(Default)] -pub struct ClientBuilder { - pub network: Option, - pub consensus_rpc: Option, - pub execution_rpc: Option, - pub checkpoint: Option>, - pub rpc_port: Option, - pub data_dir: Option, - pub config: Option, - pub fallback: Option, - pub load_external_fallback: bool, - pub with_ws: bool, - pub with_http: bool, -} - -impl ClientBuilder { - pub fn new() -> Self { - Self::default().with_http(true) - } - - pub fn network(mut self, network: Network) -> Self { - self.network = Some(network); - self - } - - pub fn consensus_rpc(mut self, consensus_rpc: &str) -> Self { - self.consensus_rpc = Some(consensus_rpc.to_string()); - self - } - - pub fn execution_rpc(mut self, execution_rpc: &str) -> Self { - self.execution_rpc = Some(execution_rpc.to_string()); - self - } - - pub fn checkpoint(mut self, checkpoint: &str) -> Self { - let checkpoint = hex::decode(checkpoint.strip_prefix("0x").unwrap_or(checkpoint)) - .expect("cannot parse checkpoint"); - self.checkpoint = Some(checkpoint); - self - } - - /// Enables the client to serve a websocket connection. - /// - /// # Example - /// ```rust - /// let mut client_builder = client::ClientBuilder::new().with_ws(true); - /// assert_eq!(client_builder.with_ws, true); - /// client_builder = client_builder.with_ws(false); - /// assert_eq!(client_builder.with_ws, false); - /// ``` - pub fn with_ws(mut self, option: bool) -> Self { - self.with_ws = option; - self - } - - /// Enables the client to serve an http connection (enabled by default). - /// - /// # Example - /// ```rust - /// let mut client_builder = client::ClientBuilder::new(); - /// assert_eq!(client_builder.with_http, true); - /// client_builder = client_builder.with_http(false); - /// assert_eq!(client_builder.with_http, false); - /// ``` - pub fn with_http(mut self, option: bool) -> Self { - self.with_http = option; - self - } - - pub fn rpc_port(mut self, port: u16) -> Self { - self.rpc_port = Some(port); - self - } - - pub fn data_dir(mut self, data_dir: PathBuf) -> Self { - self.data_dir = Some(data_dir); - self - } - - pub fn config(mut self, config: Config) -> Self { - self.config = Some(config); - self - } - - pub fn fallback(mut self, fallback: &str) -> Self { - self.fallback = Some(fallback.to_string()); - self - } - - pub fn load_external_fallback(mut self) -> Self { - self.load_external_fallback = true; - self - } - - pub fn build(self) -> Result> { - let base_config = if let Some(network) = self.network { - network.to_base_config() - } else { - let config = self - .config - .as_ref() - .ok_or(eyre!("missing network config"))?; - config.to_base_config() - }; - - let consensus_rpc = self.consensus_rpc.unwrap_or_else(|| { - self.config - .as_ref() - .expect("missing consensus rpc") - .consensus_rpc - .clone() - }); - - let execution_rpc = self.execution_rpc.unwrap_or_else(|| { - self.config - .as_ref() - .expect("missing execution rpc") - .execution_rpc - .clone() - }); - - let checkpoint = if let Some(checkpoint) = self.checkpoint { - checkpoint - } else if let Some(config) = &self.config { - config.checkpoint.clone() - } else { - base_config.checkpoint - }; - - let rpc_port = if self.rpc_port.is_some() { - self.rpc_port - } else if let Some(config) = &self.config { - config.rpc_port - } else { - None - }; - - let data_dir = if self.data_dir.is_some() { - self.data_dir - } else if let Some(config) = &self.config { - config.data_dir.clone() - } else { - None - }; - - let fallback = if self.fallback.is_some() { - self.fallback - } else if let Some(config) = &self.config { - config.fallback.clone() - } else { - None - }; - - let load_external_fallback = if let Some(config) = &self.config { - self.load_external_fallback || config.load_external_fallback - } else { - self.load_external_fallback - }; - - let with_ws = if let Some(config) = &self.config { - self.with_ws || config.with_ws - } else { - self.with_ws - }; - - let with_http = if let Some(config) = &self.config { - self.with_http || config.with_http - } else { - self.with_http - }; - - let config = Config { - consensus_rpc, - execution_rpc, - checkpoint, - rpc_port, - data_dir, - chain: base_config.chain, - forks: base_config.forks, - max_checkpoint_age: base_config.max_checkpoint_age, - fallback, - load_external_fallback, - with_ws, - with_http, - }; - - Client::new(config) - } -} - -pub struct Client { - node: Arc>, - rpc: Option, +pub struct Client { + node: Arc>>, + rpc: Option>, db: Option, fallback: Option, load_external_fallback: bool, } -impl Client { - fn new(config: Config) -> Result { +impl Client where R: ExecutionRpc { + pub fn new(config: Config) -> eyre::Result { let config = Arc::new(config); let node = Node::new(config.clone())?; let node = Arc::new(RwLock::new(node)); @@ -241,8 +49,8 @@ impl Client { } } -impl Client { - pub async fn start(&mut self) -> Result<()> { +impl Client { + pub async fn start(&mut self) -> eyre::Result<()> { if let Some(rpc) = &mut self.rpc { // We can start both ws and http servers since they only run if enabled in the config. rpc.start_ws().await?; @@ -362,7 +170,7 @@ impl Client { } } - pub async fn call(&self, opts: &CallOpts, block: BlockTag) -> Result> { + pub async fn call(&self, opts: &CallOpts, block: BlockTag) -> eyre::Result> { self.node .read() .await @@ -371,7 +179,7 @@ impl Client { .map_err(|err| err.into()) } - pub async fn estimate_gas(&self, opts: &CallOpts) -> Result { + pub async fn estimate_gas(&self, opts: &CallOpts) -> eyre::Result { self.node .read() .await @@ -380,30 +188,30 @@ impl Client { .map_err(|err| err.into()) } - pub async fn get_balance(&self, address: &Address, block: BlockTag) -> Result { + pub async fn get_balance(&self, address: &Address, block: BlockTag) -> eyre::Result { self.node.read().await.get_balance(address, block).await } - pub async fn get_nonce(&self, address: &Address, block: BlockTag) -> Result { + pub async fn get_nonce(&self, address: &Address, block: BlockTag) -> eyre::Result { self.node.read().await.get_nonce(address, block).await } - pub async fn get_code(&self, address: &Address, block: BlockTag) -> Result> { + pub async fn get_code(&self, address: &Address, block: BlockTag) -> eyre::Result> { self.node.read().await.get_code(address, block).await } - pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result { + pub async fn get_storage_at(&self, address: &Address, slot: H256) -> eyre::Result { self.node.read().await.get_storage_at(address, slot).await } - pub async fn send_raw_transaction(&self, bytes: &[u8]) -> Result { + pub async fn send_raw_transaction(&self, bytes: &[u8]) -> eyre::Result { self.node.read().await.send_raw_transaction(bytes).await } pub async fn get_transaction_receipt( &self, tx_hash: &H256, - ) -> Result> { + ) -> eyre::Result> { self.node .read() .await @@ -411,7 +219,7 @@ impl Client { .await } - pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> Result> { + pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> eyre::Result> { self.node .read() .await @@ -419,19 +227,19 @@ impl Client { .await } - pub async fn get_logs(&self, filter: &Filter) -> Result> { + pub async fn get_logs(&self, filter: &Filter) -> eyre::Result> { self.node.read().await.get_logs(filter).await } - pub async fn get_gas_price(&self) -> Result { + pub async fn get_gas_price(&self) -> eyre::Result { self.node.read().await.get_gas_price() } - pub async fn get_priority_fee(&self) -> Result { + pub async fn get_priority_fee(&self) -> eyre::Result { self.node.read().await.get_priority_fee() } - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> eyre::Result { self.node.read().await.get_block_number() } @@ -439,7 +247,7 @@ impl Client { &self, block: BlockTag, full_tx: bool, - ) -> Result> { + ) -> eyre::Result> { self.node .read() .await @@ -451,7 +259,7 @@ impl Client { &self, hash: &Vec, full_tx: bool, - ) -> Result> { + ) -> eyre::Result> { self.node .read() .await @@ -463,7 +271,7 @@ impl Client { self.node.read().await.chain_id() } - pub async fn get_header(&self) -> Result
{ + pub async fn get_header(&self) -> eyre::Result
{ self.node.read().await.get_header() } } diff --git a/client/src/lib.rs b/client/src/lib.rs index 99729fc..abc747c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,8 +1,20 @@ + +/// Re-export builder logic +mod builder; +pub use crate::builder::*; + +/// Re-export client logic mod client; pub use crate::client::*; +/// Expose database module pub mod database; + +/// Expose errors module pub mod errors; + +/// Expose rpc module pub mod rpc; +/// Node module is internal to the client crate mod node; diff --git a/client/src/node.rs b/client/src/node.rs index fb98292..4bf3489 100644 --- a/client/src/node.rs +++ b/client/src/node.rs @@ -4,6 +4,7 @@ use std::time::Duration; use ethers::prelude::{Address, U256}; use ethers::types::{Filter, Log, Transaction, TransactionReceipt, H256}; +use execution::rpc::ExecutionRpc; use eyre::{eyre, Result}; use common::errors::BlockNotFoundError; @@ -13,22 +14,22 @@ use consensus::rpc::nimbus_rpc::NimbusRpc; use consensus::types::{ExecutionPayload, Header}; use consensus::ConsensusClient; use execution::evm::Evm; -use execution::rpc::http_rpc::HttpRpc; +// use execution::rpc::http_rpc::HttpRpc; use execution::types::{CallOpts, ExecutionBlock}; use execution::ExecutionClient; use crate::errors::NodeError; -pub struct Node { +pub struct Node where R: ExecutionRpc, { pub consensus: ConsensusClient, - pub execution: Arc>, + pub execution: Arc>, pub config: Arc, payloads: BTreeMap, finalized_payloads: BTreeMap, pub history_size: usize, } -impl Node { +impl Node where R: ExecutionRpc { pub fn new(config: Arc) -> Result { let consensus_rpc = &config.consensus_rpc; let checkpoint_hash = &config.checkpoint; diff --git a/client/src/rpc.rs b/client/src/rpc.rs index c67ff7c..fb1fb3b 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -16,10 +16,10 @@ use common::{ types::BlockTag, utils::{hex_str_to_bytes, u64_to_hex_string}, }; -use execution::types::{CallOpts, ExecutionBlock}; +use execution::{types::{CallOpts, ExecutionBlock}, rpc::ExecutionRpc}; -pub struct Rpc { - node: Arc>, +pub struct Rpc where R: ExecutionRpc { + node: Arc>>, http_handle: Option, ws_handle: Option, pub with_http: bool, @@ -27,8 +27,8 @@ pub struct Rpc { port: u16, } -impl Rpc { - pub fn new(node: Arc>, with_http: bool, with_ws: bool, port: u16) -> Self { +impl Rpc where R: ExecutionRpc { + pub fn new(node: Arc>>, with_http: bool, with_ws: bool, port: u16) -> Self { Rpc { node, http_handle: None, @@ -112,13 +112,13 @@ trait NetRpc { } #[derive(Clone)] -struct RpcInner { - node: Arc>, +struct RpcInner where R: ExecutionRpc { + node: Arc>>, port: u16, } -impl From<&Rpc> for RpcInner { - fn from(rpc: &Rpc) -> Self { +impl From<&Rpc> for RpcInner where R: ExecutionRpc { + fn from(rpc: &Rpc) -> Self { RpcInner { node: Arc::clone(&rpc.node), port: rpc.port, @@ -126,7 +126,7 @@ impl From<&Rpc> for RpcInner { } } -impl RpcInner { +impl RpcInner where R: ExecutionRpc { pub async fn start_http(&self) -> Result<(HttpServerHandle, SocketAddr)> { let addr = format!("127.0.0.1:{}", self.port); let server = HttpServerBuilder::default().build(addr).await?; @@ -165,7 +165,7 @@ impl RpcInner { } #[async_trait] -impl EthRpcServer for RpcInner { +impl EthRpcServer for RpcInner where R: ExecutionRpc { async fn get_balance(&self, address: &str, block: BlockTag) -> Result { let address = convert_err(Address::from_str(address))?; let node = self.node.read().await; @@ -286,7 +286,7 @@ impl EthRpcServer for RpcInner { } #[async_trait] -impl NetRpcServer for RpcInner { +impl NetRpcServer for RpcInner where R: ExecutionRpc { async fn version(&self) -> Result { let node = self.node.read().await; Ok(node.chain_id().to_string()) diff --git a/execution/src/rpc/http_rpc.rs b/execution/src/rpc/http_rpc.rs index 4b02ee4..8624fa1 100644 --- a/execution/src/rpc/http_rpc.rs +++ b/execution/src/rpc/http_rpc.rs @@ -40,6 +40,10 @@ impl ExecutionRpc for HttpRpc { }) } + async fn connect(&mut self) -> Result<()> { + Ok(()) + } + async fn get_proof( &self, address: &Address, diff --git a/execution/src/rpc/mock_rpc.rs b/execution/src/rpc/mock_rpc.rs index 4527c68..5b06a0a 100644 --- a/execution/src/rpc/mock_rpc.rs +++ b/execution/src/rpc/mock_rpc.rs @@ -24,6 +24,10 @@ impl ExecutionRpc for MockRpc { Ok(MockRpc { path }) } + async fn connect(&mut self) -> Result<()> { + Ok(()) + } + async fn get_proof( &self, _address: &Address, diff --git a/execution/src/rpc/mod.rs b/execution/src/rpc/mod.rs index 1a47c16..e5448fe 100644 --- a/execution/src/rpc/mod.rs +++ b/execution/src/rpc/mod.rs @@ -16,6 +16,9 @@ pub trait ExecutionRpc: Send + Clone + Sync + 'static { where Self: Sized; + /// Connect allows the rpc to connect if asynchronous connection is required (eg websockets). + async fn connect(&mut self) -> Result<()>; + async fn get_proof( &self, address: &Address, diff --git a/execution/src/rpc/ws_rpc.rs b/execution/src/rpc/ws_rpc.rs new file mode 100644 index 0000000..2f8e2d3 --- /dev/null +++ b/execution/src/rpc/ws_rpc.rs @@ -0,0 +1,133 @@ +use std::str::FromStr; + +use async_trait::async_trait; +use common::errors::RpcError; +use ethers::prelude::{Address, Ws}; +use ethers::providers::{HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient}; +use ethers::types::transaction::eip2718::TypedTransaction; +use ethers::types::transaction::eip2930::AccessList; +use ethers::types::{ + BlockId, Bytes, EIP1186ProofResponse, Eip1559TransactionRequest, Filter, Log, Transaction, + TransactionReceipt, H256, U256, +}; +use eyre::Result; + +use crate::types::CallOpts; + +use super::ExecutionRpc; + +pub struct WsRpc { + pub url: String, + pub provider: Option>, +} + +impl Clone for WsRpc { + fn clone(&self) -> Self { + Self::new(&self.url).unwrap() + } +} + +#[async_trait] +impl ExecutionRpc for WsRpc { + fn new(rpc: &str) -> Result { + Ok(WsRpc { + url: rpc.to_string(), + provider: None, + }) + } + + async fn connect(&mut self) -> Result<()> { + let provider = Provider::::connect(&self.url).await?; + self.provider = Some(provider); + Ok(()) + } + + async fn get_proof( + &self, + address: &Address, + slots: &[H256], + block: u64, + ) -> Result { + let block = Some(BlockId::from(block)); + let proof_response = self + .provider + .get_proof(*address, slots.to_vec(), block) + .await + .map_err(|e| RpcError::new("get_proof", e))?; + + Ok(proof_response) + } + + async fn create_access_list(&self, opts: &CallOpts, block: u64) -> Result { + let block = Some(BlockId::from(block)); + + let mut raw_tx = Eip1559TransactionRequest::new(); + raw_tx.to = Some(opts.to.into()); + raw_tx.from = opts.from; + raw_tx.value = opts.value; + raw_tx.gas = Some(opts.gas.unwrap_or(U256::from(100_000_000))); + raw_tx.max_fee_per_gas = Some(U256::zero()); + raw_tx.max_priority_fee_per_gas = Some(U256::zero()); + raw_tx.data = opts + .data + .as_ref() + .map(|data| Bytes::from(data.as_slice().to_owned())); + + let tx = TypedTransaction::Eip1559(raw_tx); + let list = self + .provider + .create_access_list(&tx, block) + .await + .map_err(|e| RpcError::new("create_access_list", e))?; + + Ok(list.access_list) + } + + async fn get_code(&self, address: &Address, block: u64) -> Result> { + let block = Some(BlockId::from(block)); + let code = self + .provider + .get_code(*address, block) + .await + .map_err(|e| RpcError::new("get_code", e))?; + + Ok(code.to_vec()) + } + + async fn send_raw_transaction(&self, bytes: &[u8]) -> Result { + let bytes = Bytes::from(bytes.to_owned()); + let tx = self + .provider + .send_raw_transaction(bytes) + .await + .map_err(|e| RpcError::new("send_raw_transaction", e))?; + + Ok(tx.tx_hash()) + } + + async fn get_transaction_receipt(&self, tx_hash: &H256) -> Result> { + let receipt = self + .provider + .get_transaction_receipt(*tx_hash) + .await + .map_err(|e| RpcError::new("get_transaction_receipt", e))?; + + Ok(receipt) + } + + async fn get_transaction(&self, tx_hash: &H256) -> Result> { + Ok(self + .provider + .get_transaction(*tx_hash) + .await + .map_err(|e| RpcError::new("get_transaction", e))?) + } + + async fn get_logs(&self, filter: &Filter) -> Result> { + Ok(self + .provider + .get_logs(filter) + .await + .map_err(|e| RpcError::new("get_logs", e))?) + } +}