From 55839e5479cbff29a502b19bdd43f542d3d648f7 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Wed, 30 Nov 2022 09:01:43 -0800 Subject: [PATCH] :building_construction: start websocket support --- client/src/client.rs | 96 ++++++++++++++++++++++---------- client/src/rpc.rs | 129 ++++++++++++++++++++++++++----------------- common/src/utils.rs | 15 ++++- config/src/lib.rs | 2 + 4 files changed, 160 insertions(+), 82 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index e482420..df15d1e 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -19,41 +19,22 @@ use crate::database::{Database, FileDB}; use crate::node::Node; use crate::rpc::Rpc; -pub struct Client { - node: Arc>, - rpc: Option, - db: Option, -} - -impl Client { - fn new(config: Config) -> Result { - let config = Arc::new(config); - let node = Node::new(config.clone())?; - let node = Arc::new(RwLock::new(node)); - - let rpc = config.rpc_port.map(|port| Rpc::new(node.clone(), port)); - - let data_dir = config.data_dir.clone(); - let db = data_dir.map(FileDB::new); - - Ok(Client { node, rpc, db }) - } -} - #[derive(Default)] pub struct ClientBuilder { - network: Option, - consensus_rpc: Option, - execution_rpc: Option, - checkpoint: Option>, - rpc_port: Option, - data_dir: Option, - config: Option, + pub network: Option, + pub consensus_rpc: Option, + pub execution_rpc: Option, + pub checkpoint: Option>, + pub rpc_port: Option, + pub data_dir: Option, + pub config: Option, + pub with_ws: bool, + pub with_http: bool, } impl ClientBuilder { pub fn new() -> Self { - Self::default() + Self::default().with_http(true) } pub fn network(mut self, network: Network) -> Self { @@ -77,6 +58,34 @@ impl ClientBuilder { self } + /// Enables the client to serve a websocket connection. + /// + /// # Example + /// ```rust + /// let mut client_builder = client::ClientBuilder::new().with_ws(true); + /// assert_eq!(client_builder.with_ws, true); + /// client_builder = client_builder.with_ws(false); + /// assert_eq!(client_builder.with_ws, false); + /// ``` + pub fn with_ws(mut self, option: bool) -> Self { + self.with_ws = option; + self + } + + /// Enables the client to serve an http connection (enabled by default). + /// + /// # Example + /// ```rust + /// let mut client_builder = client::ClientBuilder::new(); + /// assert_eq!(client_builder.with_http, true); + /// client_builder = client_builder.with_http(false); + /// assert_eq!(client_builder.with_http, false); + /// ``` + pub fn with_http(mut self, option: bool) -> Self { + self.with_http = option; + self + } + pub fn rpc_port(mut self, port: u16) -> Self { self.rpc_port = Some(port); self @@ -152,16 +161,43 @@ impl ClientBuilder { chain: base_config.chain, forks: base_config.forks, max_checkpoint_age: base_config.max_checkpoint_age, + with_ws: self.with_ws, + with_http: self.with_http, }; Client::new(config) } } +pub struct Client { + node: Arc>, + rpc: Option, + db: Option, +} + +impl Client { + fn new(config: Config) -> Result { + let config = Arc::new(config); + let node = Node::new(config.clone())?; + let node = Arc::new(RwLock::new(node)); + + let rpc = config + .rpc_port + .map(|port| Rpc::new(node.clone(), config.with_http, config.with_ws, port)); + + let data_dir = config.data_dir.clone(); + let db = data_dir.map(FileDB::new); + + Ok(Client { node, rpc, db }) + } +} + impl Client { pub async fn start(&mut self) -> Result<()> { if let Some(rpc) = &mut self.rpc { - rpc.start().await?; + // We can start both ws and http servers since they only run if enabled in the config. + rpc.start_ws().await?; + rpc.start_http().await?; } let res = self.node.write().await.sync().await; diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 12dc87a..c67ff7c 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -1,9 +1,5 @@ -use ethers::{ - abi::AbiEncode, - types::{Address, Filter, Log, Transaction, TransactionReceipt, H256, U256}, -}; +use ethers::types::{Address, Filter, Log, Transaction, TransactionReceipt, H256}; use eyre::Result; -use log::info; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use tokio::sync::RwLock; @@ -11,6 +7,7 @@ use jsonrpsee::{ core::{async_trait, server::rpc_module::Methods, Error}, http_server::{HttpServerBuilder, HttpServerHandle}, proc_macros::rpc, + ws_server::{WsServerBuilder, WsServerHandle}, }; use crate::{errors::NodeError, node::Node}; @@ -23,31 +20,43 @@ use execution::types::{CallOpts, ExecutionBlock}; pub struct Rpc { node: Arc>, - handle: Option, + http_handle: Option, + ws_handle: Option, + pub with_http: bool, + pub with_ws: bool, port: u16, } impl Rpc { - pub fn new(node: Arc>, port: u16) -> Self { + pub fn new(node: Arc>, with_http: bool, with_ws: bool, port: u16) -> Self { Rpc { node, - handle: None, + http_handle: None, + ws_handle: None, + with_http, + with_ws, port, } } - pub async fn start(&mut self) -> Result { - let rpc_inner = RpcInner { - node: self.node.clone(), - port: self.port, - }; + pub async fn start_http(&mut self) -> Result> { + if self.with_http { + let (handle, addr) = RpcInner::from(&*self).start_http().await?; + self.http_handle = Some(handle); + log::info!("http rpc server started at {}", addr); + return Ok(Some(addr)); + } + Ok(None) + } - let (handle, addr) = start(rpc_inner).await?; - self.handle = Some(handle); - - info!("rpc server started at {}", addr); - - Ok(addr) + pub async fn start_ws(&mut self) -> Result> { + if self.with_ws { + let (handle, addr) = RpcInner::from(&*self).start_ws().await?; + self.ws_handle = Some(handle); + log::info!("http rpc server started at {}", addr); + return Ok(Some(addr)); + } + Ok(None) } } @@ -108,6 +117,53 @@ struct RpcInner { port: u16, } +impl From<&Rpc> for RpcInner { + fn from(rpc: &Rpc) -> Self { + RpcInner { + node: Arc::clone(&rpc.node), + port: rpc.port, + } + } +} + +impl RpcInner { + pub async fn start_http(&self) -> Result<(HttpServerHandle, SocketAddr)> { + let addr = format!("127.0.0.1:{}", self.port); + let server = HttpServerBuilder::default().build(addr).await?; + + let addr = server.local_addr()?; + + let mut methods = Methods::new(); + let eth_methods: Methods = EthRpcServer::into_rpc(self.clone()).into(); + let net_methods: Methods = NetRpcServer::into_rpc(self.clone()).into(); + + methods.merge(eth_methods)?; + methods.merge(net_methods)?; + + let handle = server.start(methods)?; + + Ok((handle, addr)) + } + + pub async fn start_ws(&self) -> Result<(WsServerHandle, SocketAddr)> { + let addr = format!("127.0.0.1:{}", self.port); + let server = WsServerBuilder::default().build(addr).await?; + + let addr = server.local_addr()?; + + let mut methods = Methods::new(); + let eth_methods: Methods = EthRpcServer::into_rpc(self.clone()).into(); + let net_methods: Methods = NetRpcServer::into_rpc(self.clone()).into(); + + methods.merge(eth_methods)?; + methods.merge(net_methods)?; + + let handle = server.start(methods)?; + + Ok((handle, addr)) + } +} + #[async_trait] impl EthRpcServer for RpcInner { async fn get_balance(&self, address: &str, block: BlockTag) -> Result { @@ -115,7 +171,7 @@ impl EthRpcServer for RpcInner { let node = self.node.read().await; let balance = convert_err(node.get_balance(&address, block).await)?; - Ok(format_hex(&balance)) + Ok(common::utils::format_hex(&balance)) } async fn get_transaction_count(&self, address: &str, block: BlockTag) -> Result { @@ -164,13 +220,13 @@ impl EthRpcServer for RpcInner { async fn gas_price(&self) -> Result { let node = self.node.read().await; let gas_price = convert_err(node.get_gas_price())?; - Ok(format_hex(&gas_price)) + Ok(common::utils::format_hex(&gas_price)) } async fn max_priority_fee_per_gas(&self) -> Result { let node = self.node.read().await; let tip = convert_err(node.get_priority_fee())?; - Ok(format_hex(&tip)) + Ok(common::utils::format_hex(&tip)) } async fn block_number(&self) -> Result { @@ -237,35 +293,6 @@ impl NetRpcServer for RpcInner { } } -async fn start(rpc: RpcInner) -> Result<(HttpServerHandle, SocketAddr)> { - let addr = format!("127.0.0.1:{}", rpc.port); - let server = HttpServerBuilder::default().build(addr).await?; - - let addr = server.local_addr()?; - - let mut methods = Methods::new(); - let eth_methods: Methods = EthRpcServer::into_rpc(rpc.clone()).into(); - let net_methods: Methods = NetRpcServer::into_rpc(rpc).into(); - - methods.merge(eth_methods)?; - methods.merge(net_methods)?; - - let handle = server.start(methods)?; - - Ok((handle, addr)) -} - fn convert_err(res: Result) -> Result { res.map_err(|err| Error::Custom(err.to_string())) } - -fn format_hex(num: &U256) -> String { - let stripped = num - .encode_hex() - .strip_prefix("0x") - .unwrap() - .trim_start_matches('0') - .to_string(); - - format!("0x{}", stripped) -} diff --git a/common/src/utils.rs b/common/src/utils.rs index 97baf91..1e51a5e 100644 --- a/common/src/utils.rs +++ b/common/src/utils.rs @@ -1,9 +1,22 @@ -use ethers::prelude::Address; +use ethers::{ + abi::AbiEncode, + types::{Address, U256}, +}; use eyre::Result; use ssz_rs::{Node, Vector}; use super::types::Bytes32; +pub fn format_hex(num: &U256) -> String { + let stripped = num + .encode_hex() + .strip_prefix("0x") + .unwrap() + .trim_start_matches('0') + .to_string(); + format!("0x{}", stripped) +} + pub fn hex_str_to_bytes(s: &str) -> Result> { let stripped = s.strip_prefix("0x").unwrap_or(s); Ok(hex::decode(stripped)?) diff --git a/config/src/lib.rs b/config/src/lib.rs index 750481d..e5c2d1e 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -27,6 +27,8 @@ pub struct Config { pub chain: ChainConfig, pub forks: Forks, pub max_checkpoint_age: u64, + pub with_ws: bool, + pub with_http: bool, } impl Config {