feat(provider): introduce a pending tx polling delay so that we do not spam the chain (#31)
This commit is contained in:
parent
0cfeadadf4
commit
9a0c97286b
|
@ -1,5 +1,10 @@
|
||||||
use crate::{JsonRpcClient, Provider, ProviderError};
|
use crate::{
|
||||||
|
stream::{interval, DEFAULT_POLL_DURATION},
|
||||||
|
JsonRpcClient, Provider, ProviderError,
|
||||||
|
};
|
||||||
use ethers_core::types::{TransactionReceipt, TxHash, U64};
|
use ethers_core::types::{TransactionReceipt, TxHash, U64};
|
||||||
|
use futures_core::stream::Stream;
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
|
@ -7,6 +12,7 @@ use std::{
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A pending transaction is a transaction which has been submitted but is not yet mined.
|
/// A pending transaction is a transaction which has been submitted but is not yet mined.
|
||||||
|
@ -20,6 +26,7 @@ pub struct PendingTransaction<'a, P> {
|
||||||
confirmations: usize,
|
confirmations: usize,
|
||||||
provider: &'a Provider<P>,
|
provider: &'a Provider<P>,
|
||||||
state: PendingTxState<'a>,
|
state: PendingTxState<'a>,
|
||||||
|
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
|
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
|
||||||
|
@ -31,6 +38,7 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
|
||||||
confirmations: 1,
|
confirmations: 1,
|
||||||
provider,
|
provider,
|
||||||
state: PendingTxState::GettingReceipt(fut),
|
state: PendingTxState::GettingReceipt(fut),
|
||||||
|
interval: Box::new(interval(DEFAULT_POLL_DURATION)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +48,12 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
|
||||||
self.confirmations = confs;
|
self.confirmations = confs;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the polling interval
|
||||||
|
pub fn interval<T: Into<u64>>(mut self, duration: T) -> Self {
|
||||||
|
self.interval = Box::new(interval(Duration::from_millis(duration.into())));
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
|
impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
|
||||||
|
@ -50,6 +64,10 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
|
||||||
|
|
||||||
match this.state {
|
match this.state {
|
||||||
PendingTxState::GettingReceipt(fut) => {
|
PendingTxState::GettingReceipt(fut) => {
|
||||||
|
// Wait the polling period so that we do not spam the chain when no
|
||||||
|
// new block has been mined
|
||||||
|
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
|
||||||
|
|
||||||
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
|
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
|
||||||
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
|
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
|
||||||
} else {
|
} else {
|
||||||
|
@ -74,6 +92,11 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PendingTxState::GettingBlockNumber(fut, receipt) => {
|
PendingTxState::GettingBlockNumber(fut, receipt) => {
|
||||||
|
// Wait the polling period so that we do not spam the chain when no
|
||||||
|
// new block has been mined
|
||||||
|
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
|
||||||
|
|
||||||
|
// Wait for the interval
|
||||||
let inclusion_block = receipt
|
let inclusion_block = receipt
|
||||||
.block_number
|
.block_number
|
||||||
.expect("Receipt did not have a block number. This should never happen");
|
.expect("Receipt did not have a block number. This should never happen");
|
||||||
|
|
|
@ -16,11 +16,11 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20
|
// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20
|
||||||
fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
|
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
|
||||||
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
|
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
|
||||||
}
|
}
|
||||||
|
|
||||||
const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);
|
pub const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);
|
||||||
|
|
||||||
/// Trait for streaming filters.
|
/// Trait for streaming filters.
|
||||||
pub trait FilterStream<R>: StreamExt + Stream<Item = R>
|
pub trait FilterStream<R>: StreamExt + Stream<Item = R>
|
||||||
|
|
|
@ -6,6 +6,7 @@ use std::convert::TryFrom;
|
||||||
mod eth_tests {
|
mod eth_tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use ethers::{
|
use ethers::{
|
||||||
|
providers::JsonRpcClient,
|
||||||
types::TransactionRequest,
|
types::TransactionRequest,
|
||||||
utils::{parse_ether, Ganache},
|
utils::{parse_ether, Ganache},
|
||||||
};
|
};
|
||||||
|
@ -65,12 +66,27 @@ mod eth_tests {
|
||||||
async fn pending_txs_with_confirmations_ganache() {
|
async fn pending_txs_with_confirmations_ganache() {
|
||||||
let _ganache = Ganache::new().block_time(2u64).spawn();
|
let _ganache = Ganache::new().block_time(2u64).spawn();
|
||||||
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
|
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
|
||||||
|
generic_pending_txs_test(provider).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial]
|
||||||
|
#[cfg(any(feature = "tokio-runtime", feature = "tokio-tls"))]
|
||||||
|
async fn websocket_pending_txs_with_confirmations_ganache() {
|
||||||
|
use ethers::providers::Ws;
|
||||||
|
let _ganache = Ganache::new().block_time(2u64).port(8546u64).spawn();
|
||||||
|
let ws = Ws::connect("ws://localhost:8546").await.unwrap();
|
||||||
|
let provider = Provider::new(ws);
|
||||||
|
generic_pending_txs_test(provider).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn generic_pending_txs_test<P: JsonRpcClient>(provider: Provider<P>) {
|
||||||
let accounts = provider.get_accounts().await.unwrap();
|
let accounts = provider.get_accounts().await.unwrap();
|
||||||
|
|
||||||
let tx = TransactionRequest::pay(accounts[1], parse_ether(1u64).unwrap()).from(accounts[0]);
|
let tx = TransactionRequest::pay(accounts[0], parse_ether(1u64).unwrap()).from(accounts[0]);
|
||||||
let pending_tx = provider.send_transaction(tx).await.unwrap();
|
let pending_tx = provider.send_transaction(tx).await.unwrap();
|
||||||
let hash = *pending_tx;
|
let hash = *pending_tx;
|
||||||
let receipt = pending_tx.confirmations(5).await.unwrap();
|
let receipt = pending_tx.interval(500u64).confirmations(5).await.unwrap();
|
||||||
|
|
||||||
// got the correct receipt
|
// got the correct receipt
|
||||||
assert_eq!(receipt.transaction_hash, hash);
|
assert_eq!(receipt.transaction_hash, hash);
|
||||||
|
|
Loading…
Reference in New Issue