diff --git a/ethers-providers/src/pending_transaction.rs b/ethers-providers/src/pending_transaction.rs index 78db7ff5..2867bcfa 100644 --- a/ethers-providers/src/pending_transaction.rs +++ b/ethers-providers/src/pending_transaction.rs @@ -1,5 +1,10 @@ -use crate::{JsonRpcClient, Provider, ProviderError}; +use crate::{ + stream::{interval, DEFAULT_POLL_DURATION}, + JsonRpcClient, Provider, ProviderError, +}; use ethers_core::types::{TransactionReceipt, TxHash, U64}; +use futures_core::stream::Stream; +use futures_util::stream::StreamExt; use pin_project::pin_project; use std::{ fmt, @@ -7,6 +12,7 @@ use std::{ ops::Deref, pin::Pin, task::{Context, Poll}, + time::Duration, }; /// A pending transaction is a transaction which has been submitted but is not yet mined. @@ -20,6 +26,7 @@ pub struct PendingTransaction<'a, P> { confirmations: usize, provider: &'a Provider

, state: PendingTxState<'a>, + interval: Box + Send + Unpin>, } impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { @@ -31,6 +38,7 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { confirmations: 1, provider, state: PendingTxState::GettingReceipt(fut), + interval: Box::new(interval(DEFAULT_POLL_DURATION)), } } @@ -40,6 +48,12 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { self.confirmations = confs; self } + + /// Sets the polling interval + pub fn interval>(mut self, duration: T) -> Self { + self.interval = Box::new(interval(Duration::from_millis(duration.into()))); + self + } } impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { @@ -50,6 +64,10 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { match this.state { PendingTxState::GettingReceipt(fut) => { + // Wait the polling period so that we do not spam the chain when no + // new block has been mined + let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx)); + if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) { *this.state = PendingTxState::CheckingReceipt(Box::new(receipt)) } else { @@ -74,6 +92,11 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { } } PendingTxState::GettingBlockNumber(fut, receipt) => { + // Wait the polling period so that we do not spam the chain when no + // new block has been mined + let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx)); + + // Wait for the interval let inclusion_block = receipt .block_number .expect("Receipt did not have a block number. This should never happen"); diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 3517523c..6f5e3afb 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -16,11 +16,11 @@ use std::{ }; // https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20 -fn interval(duration: Duration) -> impl Stream + Send + Unpin { +pub fn interval(duration: Duration) -> impl Stream + Send + Unpin { stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop) } -const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000); +pub const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000); /// Trait for streaming filters. pub trait FilterStream: StreamExt + Stream diff --git a/ethers-providers/tests/provider.rs b/ethers-providers/tests/provider.rs index 7189b534..19c873f2 100644 --- a/ethers-providers/tests/provider.rs +++ b/ethers-providers/tests/provider.rs @@ -6,6 +6,7 @@ use std::convert::TryFrom; mod eth_tests { use super::*; use ethers::{ + providers::JsonRpcClient, types::TransactionRequest, utils::{parse_ether, Ganache}, }; @@ -65,12 +66,27 @@ mod eth_tests { async fn pending_txs_with_confirmations_ganache() { let _ganache = Ganache::new().block_time(2u64).spawn(); let provider = Provider::::try_from("http://localhost:8545").unwrap(); + generic_pending_txs_test(provider).await; + } + + #[tokio::test] + #[serial] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-tls"))] + async fn websocket_pending_txs_with_confirmations_ganache() { + use ethers::providers::Ws; + let _ganache = Ganache::new().block_time(2u64).port(8546u64).spawn(); + let ws = Ws::connect("ws://localhost:8546").await.unwrap(); + let provider = Provider::new(ws); + generic_pending_txs_test(provider).await; + } + + async fn generic_pending_txs_test(provider: Provider

) { let accounts = provider.get_accounts().await.unwrap(); - let tx = TransactionRequest::pay(accounts[1], parse_ether(1u64).unwrap()).from(accounts[0]); + let tx = TransactionRequest::pay(accounts[0], parse_ether(1u64).unwrap()).from(accounts[0]); let pending_tx = provider.send_transaction(tx).await.unwrap(); let hash = *pending_tx; - let receipt = pending_tx.confirmations(5).await.unwrap(); + let receipt = pending_tx.interval(500u64).confirmations(5).await.unwrap(); // got the correct receipt assert_eq!(receipt.transaction_hash, hash);