⚙️ upstream sync

This commit is contained in:
Andreas Bigger 2022-12-02 11:45:53 -08:00
commit f28483dfd7
6 changed files with 166 additions and 64 deletions

View File

@ -89,6 +89,10 @@ struct Cli {
fallback: Option<String>, fallback: Option<String>,
#[clap(short = 'l', long, env)] #[clap(short = 'l', long, env)]
load_external_fallback: bool, load_external_fallback: bool,
#[clap(short = 's', long, env)]
with_ws: bool,
#[clap(short = 'h', long, env)]
with_http: bool,
} }
impl Cli { impl Cli {
@ -106,6 +110,8 @@ impl Cli {
rpc_port: self.rpc_port, rpc_port: self.rpc_port,
fallback: self.fallback.clone(), fallback: self.fallback.clone(),
load_external_fallback: self.load_external_fallback, load_external_fallback: self.load_external_fallback,
with_ws: self.with_ws,
with_http: self.with_http,
} }
} }

View File

@ -21,20 +21,22 @@ use crate::rpc::Rpc;
#[derive(Default)] #[derive(Default)]
pub struct ClientBuilder { pub struct ClientBuilder {
network: Option<Network>, pub network: Option<Network>,
consensus_rpc: Option<String>, pub consensus_rpc: Option<String>,
execution_rpc: Option<String>, pub execution_rpc: Option<String>,
checkpoint: Option<Vec<u8>>, pub checkpoint: Option<Vec<u8>>,
rpc_port: Option<u16>, pub rpc_port: Option<u16>,
data_dir: Option<PathBuf>, pub data_dir: Option<PathBuf>,
config: Option<Config>, pub config: Option<Config>,
fallback: Option<String>, pub fallback: Option<String>,
load_external_fallback: bool, pub load_external_fallback: bool,
pub with_ws: bool,
pub with_http: bool,
} }
impl ClientBuilder { impl ClientBuilder {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default().with_http(true)
} }
pub fn network(mut self, network: Network) -> Self { pub fn network(mut self, network: Network) -> Self {
@ -59,6 +61,34 @@ impl ClientBuilder {
self self
} }
/// Enables the client to serve a websocket connection.
///
/// # Example
/// ```rust
/// let mut client_builder = client::ClientBuilder::new().with_ws(true);
/// assert_eq!(client_builder.with_ws, true);
/// client_builder = client_builder.with_ws(false);
/// assert_eq!(client_builder.with_ws, false);
/// ```
pub fn with_ws(mut self, option: bool) -> Self {
self.with_ws = option;
self
}
/// Enables the client to serve an http connection (enabled by default).
///
/// # Example
/// ```rust
/// let mut client_builder = client::ClientBuilder::new();
/// assert_eq!(client_builder.with_http, true);
/// client_builder = client_builder.with_http(false);
/// assert_eq!(client_builder.with_http, false);
/// ```
pub fn with_http(mut self, option: bool) -> Self {
self.with_http = option;
self
}
pub fn rpc_port(mut self, port: u16) -> Self { pub fn rpc_port(mut self, port: u16) -> Self {
self.rpc_port = Some(port); self.rpc_port = Some(port);
self self
@ -149,6 +179,18 @@ impl ClientBuilder {
self.load_external_fallback self.load_external_fallback
}; };
let with_ws = if let Some(config) = &self.config {
self.with_ws || config.with_ws
} else {
self.with_ws
};
let with_http = if let Some(config) = &self.config {
self.with_http || config.with_http
} else {
self.with_http
};
let config = Config { let config = Config {
consensus_rpc, consensus_rpc,
execution_rpc, execution_rpc,
@ -160,6 +202,8 @@ impl ClientBuilder {
max_checkpoint_age: base_config.max_checkpoint_age, max_checkpoint_age: base_config.max_checkpoint_age,
fallback, fallback,
load_external_fallback, load_external_fallback,
with_ws,
with_http,
}; };
Client::new(config) Client::new(config)
@ -180,7 +224,9 @@ impl Client<FileDB> {
let node = Node::new(config.clone())?; let node = Node::new(config.clone())?;
let node = Arc::new(RwLock::new(node)); let node = Arc::new(RwLock::new(node));
let rpc = config.rpc_port.map(|port| Rpc::new(node.clone(), port)); let rpc = config
.rpc_port
.map(|port| Rpc::new(node.clone(), config.with_http, config.with_ws, port));
let data_dir = config.data_dir.clone(); let data_dir = config.data_dir.clone();
let db = data_dir.map(FileDB::new); let db = data_dir.map(FileDB::new);
@ -198,7 +244,9 @@ impl Client<FileDB> {
impl<DB: Database> Client<DB> { impl<DB: Database> Client<DB> {
pub async fn start(&mut self) -> Result<()> { pub async fn start(&mut self) -> Result<()> {
if let Some(rpc) = &mut self.rpc { if let Some(rpc) = &mut self.rpc {
rpc.start().await?; // We can start both ws and http servers since they only run if enabled in the config.
rpc.start_ws().await?;
rpc.start_http().await?;
} }
if self.node.write().await.sync().await.is_err() { if self.node.write().await.sync().await.is_err() {

View File

@ -1,9 +1,5 @@
use ethers::{ use ethers::types::{Address, Filter, Log, Transaction, TransactionReceipt, H256};
abi::AbiEncode,
types::{Address, Filter, Log, Transaction, TransactionReceipt, H256, U256},
};
use eyre::Result; use eyre::Result;
use log::info;
use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc}; use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -11,6 +7,7 @@ use jsonrpsee::{
core::{async_trait, server::rpc_module::Methods, Error}, core::{async_trait, server::rpc_module::Methods, Error},
http_server::{HttpServerBuilder, HttpServerHandle}, http_server::{HttpServerBuilder, HttpServerHandle},
proc_macros::rpc, proc_macros::rpc,
ws_server::{WsServerBuilder, WsServerHandle},
}; };
use crate::{errors::NodeError, node::Node}; use crate::{errors::NodeError, node::Node};
@ -23,31 +20,43 @@ use execution::types::{CallOpts, ExecutionBlock};
pub struct Rpc { pub struct Rpc {
node: Arc<RwLock<Node>>, node: Arc<RwLock<Node>>,
handle: Option<HttpServerHandle>, http_handle: Option<HttpServerHandle>,
ws_handle: Option<WsServerHandle>,
pub with_http: bool,
pub with_ws: bool,
port: u16, port: u16,
} }
impl Rpc { impl Rpc {
pub fn new(node: Arc<RwLock<Node>>, port: u16) -> Self { pub fn new(node: Arc<RwLock<Node>>, with_http: bool, with_ws: bool, port: u16) -> Self {
Rpc { Rpc {
node, node,
handle: None, http_handle: None,
ws_handle: None,
with_http,
with_ws,
port, port,
} }
} }
pub async fn start(&mut self) -> Result<SocketAddr> { pub async fn start_http(&mut self) -> Result<Option<SocketAddr>> {
let rpc_inner = RpcInner { if self.with_http {
node: self.node.clone(), let (handle, addr) = RpcInner::from(&*self).start_http().await?;
port: self.port, self.http_handle = Some(handle);
}; log::info!("http rpc server started at {}", addr);
return Ok(Some(addr));
}
Ok(None)
}
let (handle, addr) = start(rpc_inner).await?; pub async fn start_ws(&mut self) -> Result<Option<SocketAddr>> {
self.handle = Some(handle); if self.with_ws {
let (handle, addr) = RpcInner::from(&*self).start_ws().await?;
info!("rpc server started at {}", addr); self.ws_handle = Some(handle);
log::info!("http rpc server started at {}", addr);
Ok(addr) return Ok(Some(addr));
}
Ok(None)
} }
} }
@ -108,6 +117,53 @@ struct RpcInner {
port: u16, port: u16,
} }
impl From<&Rpc> for RpcInner {
fn from(rpc: &Rpc) -> Self {
RpcInner {
node: Arc::clone(&rpc.node),
port: rpc.port,
}
}
}
impl RpcInner {
pub async fn start_http(&self) -> Result<(HttpServerHandle, SocketAddr)> {
let addr = format!("127.0.0.1:{}", self.port);
let server = HttpServerBuilder::default().build(addr).await?;
let addr = server.local_addr()?;
let mut methods = Methods::new();
let eth_methods: Methods = EthRpcServer::into_rpc(self.clone()).into();
let net_methods: Methods = NetRpcServer::into_rpc(self.clone()).into();
methods.merge(eth_methods)?;
methods.merge(net_methods)?;
let handle = server.start(methods)?;
Ok((handle, addr))
}
pub async fn start_ws(&self) -> Result<(WsServerHandle, SocketAddr)> {
let addr = format!("127.0.0.1:{}", self.port);
let server = WsServerBuilder::default().build(addr).await?;
let addr = server.local_addr()?;
let mut methods = Methods::new();
let eth_methods: Methods = EthRpcServer::into_rpc(self.clone()).into();
let net_methods: Methods = NetRpcServer::into_rpc(self.clone()).into();
methods.merge(eth_methods)?;
methods.merge(net_methods)?;
let handle = server.start(methods)?;
Ok((handle, addr))
}
}
#[async_trait] #[async_trait]
impl EthRpcServer for RpcInner { impl EthRpcServer for RpcInner {
async fn get_balance(&self, address: &str, block: BlockTag) -> Result<String, Error> { async fn get_balance(&self, address: &str, block: BlockTag) -> Result<String, Error> {
@ -115,7 +171,7 @@ impl EthRpcServer for RpcInner {
let node = self.node.read().await; let node = self.node.read().await;
let balance = convert_err(node.get_balance(&address, block).await)?; let balance = convert_err(node.get_balance(&address, block).await)?;
Ok(format_hex(&balance)) Ok(common::utils::format_hex(&balance))
} }
async fn get_transaction_count(&self, address: &str, block: BlockTag) -> Result<String, Error> { async fn get_transaction_count(&self, address: &str, block: BlockTag) -> Result<String, Error> {
@ -164,13 +220,13 @@ impl EthRpcServer for RpcInner {
async fn gas_price(&self) -> Result<String, Error> { async fn gas_price(&self) -> Result<String, Error> {
let node = self.node.read().await; let node = self.node.read().await;
let gas_price = convert_err(node.get_gas_price())?; let gas_price = convert_err(node.get_gas_price())?;
Ok(format_hex(&gas_price)) Ok(common::utils::format_hex(&gas_price))
} }
async fn max_priority_fee_per_gas(&self) -> Result<String, Error> { async fn max_priority_fee_per_gas(&self) -> Result<String, Error> {
let node = self.node.read().await; let node = self.node.read().await;
let tip = convert_err(node.get_priority_fee())?; let tip = convert_err(node.get_priority_fee())?;
Ok(format_hex(&tip)) Ok(common::utils::format_hex(&tip))
} }
async fn block_number(&self) -> Result<String, Error> { async fn block_number(&self) -> Result<String, Error> {
@ -237,35 +293,6 @@ impl NetRpcServer for RpcInner {
} }
} }
async fn start(rpc: RpcInner) -> Result<(HttpServerHandle, SocketAddr)> {
let addr = format!("127.0.0.1:{}", rpc.port);
let server = HttpServerBuilder::default().build(addr).await?;
let addr = server.local_addr()?;
let mut methods = Methods::new();
let eth_methods: Methods = EthRpcServer::into_rpc(rpc.clone()).into();
let net_methods: Methods = NetRpcServer::into_rpc(rpc).into();
methods.merge(eth_methods)?;
methods.merge(net_methods)?;
let handle = server.start(methods)?;
Ok((handle, addr))
}
fn convert_err<T, E: Display>(res: Result<T, E>) -> Result<T, Error> { fn convert_err<T, E: Display>(res: Result<T, E>) -> Result<T, Error> {
res.map_err(|err| Error::Custom(err.to_string())) res.map_err(|err| Error::Custom(err.to_string()))
} }
fn format_hex(num: &U256) -> String {
let stripped = num
.encode_hex()
.strip_prefix("0x")
.unwrap()
.trim_start_matches('0')
.to_string();
format!("0x{}", stripped)
}

View File

@ -1,9 +1,22 @@
use ethers::prelude::Address; use ethers::{
abi::AbiEncode,
types::{Address, U256},
};
use eyre::Result; use eyre::Result;
use ssz_rs::{Node, Vector}; use ssz_rs::{Node, Vector};
use super::types::Bytes32; use super::types::Bytes32;
pub fn format_hex(num: &U256) -> String {
let stripped = num
.encode_hex()
.strip_prefix("0x")
.unwrap()
.trim_start_matches('0')
.to_string();
format!("0x{}", stripped)
}
pub fn hex_str_to_bytes(s: &str) -> Result<Vec<u8>> { pub fn hex_str_to_bytes(s: &str) -> Result<Vec<u8>> {
let stripped = s.strip_prefix("0x").unwrap_or(s); let stripped = s.strip_prefix("0x").unwrap_or(s);
Ok(hex::decode(stripped)?) Ok(hex::decode(stripped)?)

View File

@ -13,6 +13,8 @@ pub struct CliConfig {
pub data_dir: PathBuf, pub data_dir: PathBuf,
pub fallback: Option<String>, pub fallback: Option<String>,
pub load_external_fallback: bool, pub load_external_fallback: bool,
pub with_ws: bool,
pub with_http: bool,
} }
impl CliConfig { impl CliConfig {
@ -46,6 +48,10 @@ impl CliConfig {
Value::from(self.load_external_fallback), Value::from(self.load_external_fallback),
); );
user_dict.insert("with_ws", Value::from(self.with_ws));
user_dict.insert("with_http", Value::from(self.with_http));
Serialized::from(user_dict, network) Serialized::from(user_dict, network)
} }
} }

View File

@ -27,6 +27,8 @@ pub struct Config {
pub max_checkpoint_age: u64, pub max_checkpoint_age: u64,
pub fallback: Option<String>, pub fallback: Option<String>,
pub load_external_fallback: bool, pub load_external_fallback: bool,
pub with_ws: bool,
pub with_http: bool,
} }
impl Config { impl Config {