feat: use RwLock for concurrent reads (#46)
* feat: use RwLock for concurrent reads * use reads for most client actions
This commit is contained in:
parent
9c3e5f4833
commit
abfed6a8fe
|
@ -10,7 +10,7 @@ use consensus::types::Header;
|
||||||
use execution::types::{CallOpts, ExecutionBlock};
|
use execution::types::{CallOpts, ExecutionBlock};
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use tokio::spawn;
|
use tokio::spawn;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::RwLock;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
use crate::database::{Database, FileDB};
|
use crate::database::{Database, FileDB};
|
||||||
|
@ -18,7 +18,7 @@ use crate::node::{BlockTag, Node};
|
||||||
use crate::rpc::Rpc;
|
use crate::rpc::Rpc;
|
||||||
|
|
||||||
pub struct Client<DB: Database> {
|
pub struct Client<DB: Database> {
|
||||||
node: Arc<Mutex<Node>>,
|
node: Arc<RwLock<Node>>,
|
||||||
rpc: Option<Rpc>,
|
rpc: Option<Rpc>,
|
||||||
db: DB,
|
db: DB,
|
||||||
}
|
}
|
||||||
|
@ -27,7 +27,7 @@ impl Client<FileDB> {
|
||||||
pub async fn new(config: Config) -> Result<Self> {
|
pub async fn new(config: Config) -> Result<Self> {
|
||||||
let config = Arc::new(config);
|
let config = Arc::new(config);
|
||||||
let node = Node::new(config.clone()).await?;
|
let node = Node::new(config.clone()).await?;
|
||||||
let node = Arc::new(Mutex::new(node));
|
let node = Arc::new(RwLock::new(node));
|
||||||
|
|
||||||
let rpc = if let Some(port) = config.general.rpc_port {
|
let rpc = if let Some(port) = config.general.rpc_port {
|
||||||
Some(Rpc::new(node.clone(), port))
|
Some(Rpc::new(node.clone(), port))
|
||||||
|
@ -48,13 +48,13 @@ impl<DB: Database> Client<DB> {
|
||||||
|
|
||||||
let node = self.node.clone();
|
let node = self.node.clone();
|
||||||
spawn(async move {
|
spawn(async move {
|
||||||
let res = node.lock().await.sync().await;
|
let res = node.write().await.sync().await;
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
warn!("{}", err);
|
warn!("{}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let res = node.lock().await.advance().await;
|
let res = node.write().await.advance().await;
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
warn!("{}", err);
|
warn!("{}", err);
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@ impl<DB: Database> Client<DB> {
|
||||||
println!();
|
println!();
|
||||||
info!("shutting down");
|
info!("shutting down");
|
||||||
|
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let checkpoint = if let Some(checkpoint) = node.get_last_checkpoint() {
|
let checkpoint = if let Some(checkpoint) = node.get_last_checkpoint() {
|
||||||
checkpoint
|
checkpoint
|
||||||
} else {
|
} else {
|
||||||
|
@ -85,31 +85,31 @@ impl<DB: Database> Client<DB> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> {
|
pub async fn call(&self, opts: &CallOpts, block: &BlockTag) -> Result<Vec<u8>> {
|
||||||
self.node.lock().await.call(opts, block)
|
self.node.read().await.call(opts, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn estimate_gas(&self, opts: &CallOpts) -> Result<u64> {
|
pub async fn estimate_gas(&self, opts: &CallOpts) -> Result<u64> {
|
||||||
self.node.lock().await.estimate_gas(opts)
|
self.node.read().await.estimate_gas(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_balance(&self, address: &Address, block: &BlockTag) -> Result<U256> {
|
pub async fn get_balance(&self, address: &Address, block: &BlockTag) -> Result<U256> {
|
||||||
self.node.lock().await.get_balance(address, block).await
|
self.node.read().await.get_balance(address, block).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_nonce(&self, address: &Address, block: &BlockTag) -> Result<u64> {
|
pub async fn get_nonce(&self, address: &Address, block: &BlockTag) -> Result<u64> {
|
||||||
self.node.lock().await.get_nonce(address, block).await
|
self.node.read().await.get_nonce(address, block).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_code(&self, address: &Address, block: &BlockTag) -> Result<Vec<u8>> {
|
pub async fn get_code(&self, address: &Address, block: &BlockTag) -> Result<Vec<u8>> {
|
||||||
self.node.lock().await.get_code(address, block).await
|
self.node.read().await.get_code(address, block).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result<U256> {
|
pub async fn get_storage_at(&self, address: &Address, slot: H256) -> Result<U256> {
|
||||||
self.node.lock().await.get_storage_at(address, slot).await
|
self.node.read().await.get_storage_at(address, slot).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_raw_transaction(&self, bytes: &Vec<u8>) -> Result<H256> {
|
pub async fn send_raw_transaction(&self, bytes: &Vec<u8>) -> Result<H256> {
|
||||||
self.node.lock().await.send_raw_transaction(bytes).await
|
self.node.read().await.send_raw_transaction(bytes).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_transaction_receipt(
|
pub async fn get_transaction_receipt(
|
||||||
|
@ -117,7 +117,7 @@ impl<DB: Database> Client<DB> {
|
||||||
tx_hash: &H256,
|
tx_hash: &H256,
|
||||||
) -> Result<Option<TransactionReceipt>> {
|
) -> Result<Option<TransactionReceipt>> {
|
||||||
self.node
|
self.node
|
||||||
.lock()
|
.read()
|
||||||
.await
|
.await
|
||||||
.get_transaction_receipt(tx_hash)
|
.get_transaction_receipt(tx_hash)
|
||||||
.await
|
.await
|
||||||
|
@ -125,37 +125,37 @@ impl<DB: Database> Client<DB> {
|
||||||
|
|
||||||
pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
|
pub async fn get_transaction_by_hash(&self, tx_hash: &H256) -> Result<Option<Transaction>> {
|
||||||
self.node
|
self.node
|
||||||
.lock()
|
.read()
|
||||||
.await
|
.await
|
||||||
.get_transaction_by_hash(tx_hash)
|
.get_transaction_by_hash(tx_hash)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_gas_price(&self) -> Result<U256> {
|
pub async fn get_gas_price(&self) -> Result<U256> {
|
||||||
self.node.lock().await.get_gas_price()
|
self.node.read().await.get_gas_price()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_priority_fee(&self) -> Result<U256> {
|
pub async fn get_priority_fee(&self) -> Result<U256> {
|
||||||
self.node.lock().await.get_priority_fee()
|
self.node.read().await.get_priority_fee()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_block_number(&self) -> Result<u64> {
|
pub async fn get_block_number(&self) -> Result<u64> {
|
||||||
self.node.lock().await.get_block_number()
|
self.node.read().await.get_block_number()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_block_by_number(&self, block: &BlockTag) -> Result<ExecutionBlock> {
|
pub async fn get_block_by_number(&self, block: &BlockTag) -> Result<ExecutionBlock> {
|
||||||
self.node.lock().await.get_block_by_number(block)
|
self.node.read().await.get_block_by_number(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_block_by_hash(&self, hash: &Vec<u8>) -> Result<ExecutionBlock> {
|
pub async fn get_block_by_hash(&self, hash: &Vec<u8>) -> Result<ExecutionBlock> {
|
||||||
self.node.lock().await.get_block_by_hash(hash)
|
self.node.read().await.get_block_by_hash(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn chain_id(&self) -> u64 {
|
pub async fn chain_id(&self) -> u64 {
|
||||||
self.node.lock().await.chain_id()
|
self.node.read().await.chain_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_header(&self) -> Header {
|
pub async fn get_header(&self) -> Header {
|
||||||
self.node.lock().await.get_header().clone()
|
self.node.read().await.get_header().clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use ethers::{
|
||||||
use eyre::Result;
|
use eyre::Result;
|
||||||
use log::{debug, info, warn};
|
use log::{debug, info, warn};
|
||||||
use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
|
use std::{fmt::Display, net::SocketAddr, str::FromStr, sync::Arc};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use jsonrpsee::{
|
use jsonrpsee::{
|
||||||
core::{async_trait, server::rpc_module::Methods, Error},
|
core::{async_trait, server::rpc_module::Methods, Error},
|
||||||
|
@ -19,13 +19,13 @@ use common::utils::{hex_str_to_bytes, u64_to_hex_string};
|
||||||
use execution::types::{CallOpts, ExecutionBlock};
|
use execution::types::{CallOpts, ExecutionBlock};
|
||||||
|
|
||||||
pub struct Rpc {
|
pub struct Rpc {
|
||||||
node: Arc<Mutex<Node>>,
|
node: Arc<RwLock<Node>>,
|
||||||
handle: Option<HttpServerHandle>,
|
handle: Option<HttpServerHandle>,
|
||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Rpc {
|
impl Rpc {
|
||||||
pub fn new(node: Arc<Mutex<Node>>, port: u16) -> Self {
|
pub fn new(node: Arc<RwLock<Node>>, port: u16) -> Self {
|
||||||
Rpc {
|
Rpc {
|
||||||
node,
|
node,
|
||||||
handle: None,
|
handle: None,
|
||||||
|
@ -87,7 +87,7 @@ trait NetRpc {
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct RpcInner {
|
struct RpcInner {
|
||||||
node: Arc<Mutex<Node>>,
|
node: Arc<RwLock<Node>>,
|
||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ impl EthRpcServer for RpcInner {
|
||||||
debug!("eth_getBalance");
|
debug!("eth_getBalance");
|
||||||
let block = convert_err(decode_block(block))?;
|
let block = convert_err(decode_block(block))?;
|
||||||
let address = convert_err(Address::from_str(address))?;
|
let address = convert_err(Address::from_str(address))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let balance = convert_err(node.get_balance(&address, &block).await)?;
|
let balance = convert_err(node.get_balance(&address, &block).await)?;
|
||||||
|
|
||||||
Ok(balance.encode_hex())
|
Ok(balance.encode_hex())
|
||||||
|
@ -106,7 +106,7 @@ impl EthRpcServer for RpcInner {
|
||||||
async fn get_transaction_count(&self, address: &str, block: &str) -> Result<String, Error> {
|
async fn get_transaction_count(&self, address: &str, block: &str) -> Result<String, Error> {
|
||||||
let block = convert_err(decode_block(block))?;
|
let block = convert_err(decode_block(block))?;
|
||||||
let address = convert_err(Address::from_str(address))?;
|
let address = convert_err(Address::from_str(address))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let nonce = convert_err(node.get_nonce(&address, &block).await)?;
|
let nonce = convert_err(node.get_nonce(&address, &block).await)?;
|
||||||
|
|
||||||
Ok(nonce.encode_hex())
|
Ok(nonce.encode_hex())
|
||||||
|
@ -115,7 +115,7 @@ impl EthRpcServer for RpcInner {
|
||||||
async fn get_code(&self, address: &str, block: &str) -> Result<String, Error> {
|
async fn get_code(&self, address: &str, block: &str) -> Result<String, Error> {
|
||||||
let block = convert_err(decode_block(block))?;
|
let block = convert_err(decode_block(block))?;
|
||||||
let address = convert_err(Address::from_str(address))?;
|
let address = convert_err(Address::from_str(address))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let code = convert_err(node.get_code(&address, &block).await)?;
|
let code = convert_err(node.get_code(&address, &block).await)?;
|
||||||
|
|
||||||
Ok(hex::encode(code))
|
Ok(hex::encode(code))
|
||||||
|
@ -124,7 +124,7 @@ impl EthRpcServer for RpcInner {
|
||||||
async fn call(&self, opts: CallOpts, block: &str) -> Result<String, Error> {
|
async fn call(&self, opts: CallOpts, block: &str) -> Result<String, Error> {
|
||||||
debug!("eth_call");
|
debug!("eth_call");
|
||||||
let block = convert_err(decode_block(block))?;
|
let block = convert_err(decode_block(block))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let res = convert_err(node.call(&opts, &block))?;
|
let res = convert_err(node.call(&opts, &block))?;
|
||||||
|
|
||||||
Ok(format!("0x{}", hex::encode(res)))
|
Ok(format!("0x{}", hex::encode(res)))
|
||||||
|
@ -132,32 +132,32 @@ impl EthRpcServer for RpcInner {
|
||||||
|
|
||||||
async fn estimate_gas(&self, opts: CallOpts) -> Result<String, Error> {
|
async fn estimate_gas(&self, opts: CallOpts) -> Result<String, Error> {
|
||||||
debug!("eth_estimateGas");
|
debug!("eth_estimateGas");
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let gas = convert_err(node.estimate_gas(&opts))?;
|
let gas = convert_err(node.estimate_gas(&opts))?;
|
||||||
|
|
||||||
Ok(u64_to_hex_string(gas))
|
Ok(u64_to_hex_string(gas))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn chain_id(&self) -> Result<String, Error> {
|
async fn chain_id(&self) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let id = node.chain_id();
|
let id = node.chain_id();
|
||||||
Ok(u64_to_hex_string(id))
|
Ok(u64_to_hex_string(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn gas_price(&self) -> Result<String, Error> {
|
async fn gas_price(&self) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let gas_price = convert_err(node.get_gas_price())?;
|
let gas_price = convert_err(node.get_gas_price())?;
|
||||||
Ok(gas_price.encode_hex())
|
Ok(gas_price.encode_hex())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn max_priority_fee_per_gas(&self) -> Result<String, Error> {
|
async fn max_priority_fee_per_gas(&self) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let tip = convert_err(node.get_priority_fee())?;
|
let tip = convert_err(node.get_priority_fee())?;
|
||||||
Ok(tip.encode_hex())
|
Ok(tip.encode_hex())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn block_number(&self) -> Result<String, Error> {
|
async fn block_number(&self) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let num = convert_err(node.get_block_number())?;
|
let num = convert_err(node.get_block_number())?;
|
||||||
Ok(u64_to_hex_string(num))
|
Ok(u64_to_hex_string(num))
|
||||||
}
|
}
|
||||||
|
@ -168,7 +168,7 @@ impl EthRpcServer for RpcInner {
|
||||||
_full_tx: bool,
|
_full_tx: bool,
|
||||||
) -> Result<ExecutionBlock, Error> {
|
) -> Result<ExecutionBlock, Error> {
|
||||||
let block = convert_err(decode_block(block))?;
|
let block = convert_err(decode_block(block))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let block = convert_err(node.get_block_by_number(&block))?;
|
let block = convert_err(node.get_block_by_number(&block))?;
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
|
@ -176,21 +176,21 @@ impl EthRpcServer for RpcInner {
|
||||||
|
|
||||||
async fn get_block_by_hash(&self, hash: &str, _full_tx: bool) -> Result<ExecutionBlock, Error> {
|
async fn get_block_by_hash(&self, hash: &str, _full_tx: bool) -> Result<ExecutionBlock, Error> {
|
||||||
let hash = convert_err(hex_str_to_bytes(hash))?;
|
let hash = convert_err(hex_str_to_bytes(hash))?;
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let block = convert_err(node.get_block_by_hash(&hash))?;
|
let block = convert_err(node.get_block_by_hash(&hash))?;
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_raw_transaction(&self, bytes: &str) -> Result<String, Error> {
|
async fn send_raw_transaction(&self, bytes: &str) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let bytes = convert_err(hex_str_to_bytes(bytes))?;
|
let bytes = convert_err(hex_str_to_bytes(bytes))?;
|
||||||
let tx_hash = convert_err(node.send_raw_transaction(&bytes).await)?;
|
let tx_hash = convert_err(node.send_raw_transaction(&bytes).await)?;
|
||||||
Ok(hex::encode(tx_hash))
|
Ok(hex::encode(tx_hash))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_transaction_receipt(&self, hash: &str) -> Result<TransactionReceipt, Error> {
|
async fn get_transaction_receipt(&self, hash: &str) -> Result<TransactionReceipt, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?);
|
let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?);
|
||||||
let receipt = convert_err(node.get_transaction_receipt(&hash).await)?;
|
let receipt = convert_err(node.get_transaction_receipt(&hash).await)?;
|
||||||
|
|
||||||
|
@ -201,7 +201,7 @@ impl EthRpcServer for RpcInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_transaction_by_hash(&self, hash: &str) -> Result<Transaction, Error> {
|
async fn get_transaction_by_hash(&self, hash: &str) -> Result<Transaction, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?);
|
let hash = H256::from_slice(&convert_err(hex_str_to_bytes(hash))?);
|
||||||
let tx = convert_err(node.get_transaction_by_hash(&hash).await)?;
|
let tx = convert_err(node.get_transaction_by_hash(&hash).await)?;
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ impl EthRpcServer for RpcInner {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl NetRpcServer for RpcInner {
|
impl NetRpcServer for RpcInner {
|
||||||
async fn version(&self) -> Result<String, Error> {
|
async fn version(&self) -> Result<String, Error> {
|
||||||
let node = self.node.lock().await;
|
let node = self.node.read().await;
|
||||||
Ok(node.chain_id().to_string())
|
Ok(node.chain_id().to_string())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue