From d31d19c3b79aa7dac356ac158db5148197187ae6 Mon Sep 17 00:00:00 2001 From: James Prestwich <10149425+prestwich@users.noreply.github.com> Date: Tue, 13 Jul 2021 12:34:11 -0700 Subject: [PATCH] feature: initial delay in PendingTransaction (#339) * feature: initial delay in PendingTransaction * bug: missing wake and add tracing::debug * chore: fmt * feature: set initial delay from PendingTransaction interval --- ethers-providers/src/pending_transaction.rs | 34 ++++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/ethers-providers/src/pending_transaction.rs b/ethers-providers/src/pending_transaction.rs index f8fca735..2247de40 100644 --- a/ethers-providers/src/pending_transaction.rs +++ b/ethers-providers/src/pending_transaction.rs @@ -5,6 +5,7 @@ use crate::{ }; use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64}; use futures_core::stream::Stream; +use futures_timer::Delay; use futures_util::stream::StreamExt; use pin_project::pin_project; use std::{ @@ -33,12 +34,12 @@ pub struct PendingTransaction<'a, P> { impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { /// Creates a new pending transaction poller from a hash and a provider pub fn new(tx_hash: TxHash, provider: &'a Provider

) -> Self { - let fut = Box::pin(provider.get_transaction(tx_hash)); + let delay = Box::pin(Delay::new(DEFAULT_POLL_INTERVAL)); Self { tx_hash, confirmations: 1, provider, - state: PendingTxState::GettingTx(fut), + state: PendingTxState::InitialDelay(delay), interval: Box::new(interval(DEFAULT_POLL_INTERVAL)), } } @@ -52,7 +53,14 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { /// Sets the polling interval pub fn interval>(mut self, duration: T) -> Self { - self.interval = Box::new(interval(duration.into())); + let duration = duration.into(); + + self.interval = Box::new(interval(duration)); + + if matches!(self.state, PendingTxState::InitialDelay(_)) { + self.state = PendingTxState::InitialDelay(Box::pin(Delay::new(duration))) + } + self } } @@ -80,6 +88,12 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { let this = self.project(); match this.state { + PendingTxState::InitialDelay(fut) => { + let _ready = futures_util::ready!(fut.as_mut().poll(ctx)); + tracing::debug!("Starting to poll pending tx {:?}", *this.tx_hash); + let fut = Box::pin(this.provider.get_transaction(*this.tx_hash)); + rewake_with_new_state!(ctx, this, PendingTxState::GettingTx(fut)); + } PendingTxState::PausedGettingTx => { // Wait the polling period so that we do not spam the chain when no // new block has been mined @@ -102,6 +116,7 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { let tx_opt = tx_res.unwrap(); // If the tx is no longer in the mempool, return Ok(None) if tx_opt.is_none() { + tracing::debug!("Dropped from mempool, pending tx {:?}", *this.tx_hash); *this.state = PendingTxState::Completed; return Poll::Ready(Ok(None)); } @@ -116,8 +131,9 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { ); // Start polling for the receipt now + tracing::debug!("Getting receipt for pending tx {:?}", *this.tx_hash); let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash)); - *this.state = PendingTxState::GettingReceipt(fut); + rewake_with_new_state!(ctx, this, PendingTxState::GettingReceipt(fut)); } PendingTxState::PausedGettingReceipt => { // Wait the polling period so that we do not spam the chain when no @@ -129,6 +145,7 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { } PendingTxState::GettingReceipt(fut) => { if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) { + tracing::debug!("Checking receipt for pending tx {:?}", *this.tx_hash); *this.state = PendingTxState::CheckingReceipt(receipt) } else { *this.state = PendingTxState::PausedGettingReceipt @@ -146,6 +163,11 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { // If we requested more than 1 confirmation, we need to compare the receipt's // block number and the current block if *this.confirmations > 1 { + tracing::debug!( + "Waiting on confirmations for pending tx {:?}", + *this.tx_hash + ); + let fut = Box::pin(this.provider.get_block_number()); *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take()); @@ -235,6 +257,9 @@ impl<'a, P> Deref for PendingTransaction<'a, P> { // We box the TransactionReceipts to keep the enum small. enum PendingTxState<'a> { + /// Initial delay to ensure the GettingTx loop doesn't immediately fail + InitialDelay(Pin>), + /// Waiting for interval to elapse before calling API again PausedGettingTx, @@ -265,6 +290,7 @@ enum PendingTxState<'a> { impl<'a> fmt::Debug for PendingTxState<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let state = match self { + PendingTxState::InitialDelay(_) => "InitialDelay", PendingTxState::PausedGettingTx => "PausedGettingTx", PendingTxState::GettingTx(_) => "GettingTx", PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",