feature: initial delay in PendingTransaction (#339)

* feature: initial delay in PendingTransaction

* bug: missing wake and add tracing::debug

* chore: fmt

* feature: set initial delay from PendingTransaction interval
This commit is contained in:
James Prestwich 2021-07-13 12:34:11 -07:00 committed by GitHub
parent 04bbd1b424
commit d31d19c3b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 30 additions and 4 deletions

View File

@ -5,6 +5,7 @@ use crate::{
};
use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
use futures_core::stream::Stream;
use futures_timer::Delay;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use std::{
@ -33,12 +34,12 @@ pub struct PendingTransaction<'a, P> {
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction(tx_hash));
let delay = Box::pin(Delay::new(DEFAULT_POLL_INTERVAL));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingTx(fut),
state: PendingTxState::InitialDelay(delay),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
}
}
@ -52,7 +53,14 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Sets the polling interval
pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
self.interval = Box::new(interval(duration.into()));
let duration = duration.into();
self.interval = Box::new(interval(duration));
if matches!(self.state, PendingTxState::InitialDelay(_)) {
self.state = PendingTxState::InitialDelay(Box::pin(Delay::new(duration)))
}
self
}
}
@ -80,6 +88,12 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
let this = self.project();
match this.state {
PendingTxState::InitialDelay(fut) => {
let _ready = futures_util::ready!(fut.as_mut().poll(ctx));
tracing::debug!("Starting to poll pending tx {:?}", *this.tx_hash);
let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
rewake_with_new_state!(ctx, this, PendingTxState::GettingTx(fut));
}
PendingTxState::PausedGettingTx => {
// Wait the polling period so that we do not spam the chain when no
// new block has been mined
@ -102,6 +116,7 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
let tx_opt = tx_res.unwrap();
// If the tx is no longer in the mempool, return Ok(None)
if tx_opt.is_none() {
tracing::debug!("Dropped from mempool, pending tx {:?}", *this.tx_hash);
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(None));
}
@ -116,8 +131,9 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
);
// Start polling for the receipt now
tracing::debug!("Getting receipt for pending tx {:?}", *this.tx_hash);
let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
*this.state = PendingTxState::GettingReceipt(fut);
rewake_with_new_state!(ctx, this, PendingTxState::GettingReceipt(fut));
}
PendingTxState::PausedGettingReceipt => {
// Wait the polling period so that we do not spam the chain when no
@ -129,6 +145,7 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
}
PendingTxState::GettingReceipt(fut) => {
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
tracing::debug!("Checking receipt for pending tx {:?}", *this.tx_hash);
*this.state = PendingTxState::CheckingReceipt(receipt)
} else {
*this.state = PendingTxState::PausedGettingReceipt
@ -146,6 +163,11 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
tracing::debug!(
"Waiting on confirmations for pending tx {:?}",
*this.tx_hash
);
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
@ -235,6 +257,9 @@ impl<'a, P> Deref for PendingTransaction<'a, P> {
// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Initial delay to ensure the GettingTx loop doesn't immediately fail
InitialDelay(Pin<Box<futures_timer::Delay>>),
/// Waiting for interval to elapse before calling API again
PausedGettingTx,
@ -265,6 +290,7 @@ enum PendingTxState<'a> {
impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::InitialDelay(_) => "InitialDelay",
PendingTxState::PausedGettingTx => "PausedGettingTx",
PendingTxState::GettingTx(_) => "GettingTx",
PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",