Nonce manager (#59)

* feat: first stab at a NonceManager

* test: adjust the test

* fix: reset nonce if nonce manager errors

* feat: make nonce manager opt in

* fix: add read-only nonce call

* feat: improve http provider errors

* feat: convert to Atomic datatypes

* refactor: move to own file

* chore: remove tokio dep

* fix: improve nonce retry logic readability

* fix: use other privkey to avoid nonce races with other tests
This commit is contained in:
Georgios Konstantopoulos 2020-09-07 13:26:42 +03:00 committed by GitHub
parent 6197d8bb12
commit fb8f5a8ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 9 deletions

View File

@ -43,6 +43,13 @@ pub enum ClientError {
#[error(transparent)]
/// Thrown if the response could not be parsed
JsonRpcError(#[from] JsonRpcError),
#[error("Deserialization Error: {err}. Response: {text}")]
/// Serde JSON Error
SerdeJson {
err: serde_json::Error,
text: String,
},
}
impl From<ClientError> for ProviderError {
@ -73,7 +80,9 @@ impl JsonRpcClient for Provider {
.json(&payload)
.send()
.await?;
let res = res.json::<Response<R>>().await?;
let text = res.text().await?;
let res: Response<R> =
serde_json::from_str(&text).map_err(|err| ClientError::SerdeJson { err, text })?;
Ok(res.data.into_result()?)
}

View File

@ -1,7 +1,7 @@
use crate::Signer;
use crate::{NonceManager, Signer};
use ethers_core::types::{
Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest, TxHash,
Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest, TxHash, U256,
};
use ethers_providers::{
gas_oracle::{GasOracle, GasOracleError},
@ -9,7 +9,8 @@ use ethers_providers::{
};
use futures_util::{future::ok, join};
use std::{future::Future, ops::Deref, time::Duration};
use std::{future::Future, ops::Deref, sync::atomic::Ordering, time::Duration};
use thiserror::Error;
#[derive(Debug)]
@ -74,6 +75,7 @@ pub struct Client<P, S> {
pub(crate) signer: Option<S>,
pub(crate) address: Address,
pub(crate) gas_oracle: Option<Box<dyn GasOracle>>,
pub(crate) nonce_manager: Option<NonceManager>,
}
#[derive(Debug, Error)]
@ -110,6 +112,7 @@ where
signer: Some(signer),
address,
gas_oracle: None,
nonce_manager: None,
}
}
@ -141,7 +144,34 @@ where
// fill any missing fields
self.fill_transaction(&mut tx, block).await?;
// sign the transaction and broadcast it
// if we have a nonce manager set, we should try handling the result in
// case there was a nonce mismatch
let tx_hash = if let Some(ref nonce_manager) = self.nonce_manager {
let mut tx_clone = tx.clone();
match self.submit_transaction(tx).await {
Ok(tx_hash) => tx_hash,
Err(err) => {
let nonce = self.get_transaction_count(block).await?;
if nonce != nonce_manager.nonce.load(Ordering::SeqCst).into() {
// try re-submitting the transaction with the correct nonce if there
// was a nonce mismatch
nonce_manager.nonce.store(nonce.as_u64(), Ordering::SeqCst);
tx_clone.nonce = Some(nonce);
self.submit_transaction(tx_clone).await?
} else {
// propagate the error otherwise
return Err(err);
}
}
}
} else {
self.submit_transaction(tx).await?
};
Ok(tx_hash)
}
async fn submit_transaction(&self, tx: TransactionRequest) -> Result<TxHash, ClientError> {
Ok(if let Some(ref signer) = self.signer {
let signed_tx = signer.sign_transaction(tx).map_err(Into::into)?;
self.provider.send_raw_transaction(&signed_tx).await?
@ -171,10 +201,7 @@ where
let (gas_price, gas, nonce) = join!(
maybe(tx.gas_price, self.provider.get_gas_price()),
maybe(tx.gas, self.provider.estimate_gas(&tx)),
maybe(
tx.nonce,
self.provider.get_transaction_count(self.address(), block)
),
maybe(tx.nonce, self.get_transaction_count_with_manager(block)),
);
tx.gas_price = Some(gas_price?);
tx.gas = Some(gas?);
@ -183,6 +210,38 @@ where
Ok(())
}
async fn get_transaction_count_with_manager(
&self,
block: Option<BlockNumber>,
) -> Result<U256, ClientError> {
// If there's a nonce manager set, short circuit by just returning the next nonce
if let Some(ref nonce_manager) = self.nonce_manager {
// initialize the nonce the first time the manager is called
if !nonce_manager.initialized.load(Ordering::SeqCst) {
let nonce = self
.provider
.get_transaction_count(self.address(), block)
.await?;
nonce_manager.nonce.store(nonce.as_u64(), Ordering::SeqCst);
nonce_manager.initialized.store(true, Ordering::SeqCst);
}
return Ok(nonce_manager.next());
}
self.get_transaction_count(block).await
}
pub async fn get_transaction_count(
&self,
block: Option<BlockNumber>,
) -> Result<U256, ClientError> {
Ok(self
.provider
.get_transaction_count(self.address(), block)
.await?)
}
/// Returns the client's address
pub fn address(&self) -> Address {
self.address
@ -250,6 +309,11 @@ where
self.gas_oracle = Some(gas_oracle);
self
}
pub fn with_nonce_manager(mut self) -> Self {
self.nonce_manager = Some(NonceManager::new());
self
}
}
/// Calls the future if `item` is None, otherwise returns a `futures::ok`
@ -282,6 +346,7 @@ impl<P: JsonRpcClient, S> From<Provider<P>> for Client<P, S> {
signer: None,
address: Address::zero(),
gas_oracle: None,
nonce_manager: None,
}
}
}

View File

@ -40,6 +40,9 @@
mod wallet;
pub use wallet::Wallet;
mod nonce_manager;
pub(crate) use nonce_manager::NonceManager;
mod client;
pub use client::{Client, ClientError};

View File

@ -0,0 +1,24 @@
use ethers_core::types::U256;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug)]
pub(crate) struct NonceManager {
pub initialized: AtomicBool,
pub nonce: AtomicU64,
}
impl NonceManager {
/// Instantiates the nonce manager with a 0 nonce.
pub fn new() -> Self {
NonceManager {
initialized: false.into(),
nonce: 0.into(),
}
}
/// Returns the next nonce to be used
pub fn next(&self) -> U256 {
let nonce = self.nonce.fetch_add(1, Ordering::SeqCst);
nonce.into()
}
}

View File

@ -122,6 +122,7 @@ impl Wallet {
signer: Some(self),
provider,
gas_oracle: None,
nonce_manager: None,
}
}

View File

@ -75,6 +75,53 @@ mod eth_tests {
assert!(balance_before > balance_after);
}
#[tokio::test]
async fn nonce_manager() {
let provider = Provider::<Http>::try_from(
"https://rinkeby.infura.io/v3/fd8b88b56aa84f6da87b60f5441d6778",
)
.unwrap()
.interval(Duration::from_millis(2000u64));
let client = "59c37cb6b16fa2de30675f034c8008f890f4b2696c729d6267946d29736d73e4"
.parse::<Wallet>()
.unwrap()
.connect(provider)
.with_nonce_manager();
let nonce = client
.get_transaction_count(Some(BlockNumber::Pending))
.await
.unwrap()
.as_u64();
let mut tx_hashes = Vec::new();
for _ in 0..10 {
let tx = client
.send_transaction(
TransactionRequest::pay(client.address(), 100u64),
Some(BlockNumber::Pending),
)
.await
.unwrap();
tx_hashes.push(tx);
}
let mut nonces = Vec::new();
for tx_hash in tx_hashes {
nonces.push(
client
.get_transaction(tx_hash)
.await
.unwrap()
.nonce
.as_u64(),
);
}
assert_eq!(nonces, (nonce..nonce + 10).collect::<Vec<_>>())
}
#[tokio::test]
async fn using_gas_oracle() {
let ganache = Ganache::new().spawn();