From d722c1a6af6b0fc4a2cbbb66fe9c83ef35e81bfd Mon Sep 17 00:00:00 2001 From: James Prestwich <10149425+prestwich@users.noreply.github.com> Date: Tue, 6 Jul 2021 01:06:18 -0700 Subject: [PATCH] feature: PendingTransaction returns Option (#327) * feature: PendingTransaction returns Option * chore: expand safety reasoning in pending tx expect * chore: clippy lints * bug: check readiness of future before taking receipt option --- ethers-contract/src/factory.rs | 3 +- ethers-contract/tests/contract.rs | 2 +- .../src/transformer/ds_proxy/mod.rs | 3 +- ethers-providers/src/pending_transaction.rs | 119 ++++++++++++++---- ethers-providers/src/provider.rs | 2 +- ethers-providers/src/stream.rs | 7 +- ethers-providers/tests/provider.rs | 2 +- ethers/examples/ens.rs | 6 +- ethers/examples/local_signer.rs | 4 +- 9 files changed, 116 insertions(+), 32 deletions(-) diff --git a/ethers-contract/src/factory.rs b/ethers-contract/src/factory.rs index 0ad62731..d0405ec8 100644 --- a/ethers-contract/src/factory.rs +++ b/ethers-contract/src/factory.rs @@ -45,7 +45,8 @@ impl Deployer { let receipt = pending_tx .confirmations(self.confs) .await - .map_err(|_| ContractError::ContractNotDeployed)?; + .map_err(|_| ContractError::ContractNotDeployed)? + .ok_or(ContractError::ContractNotDeployed)?; let address = receipt .contract_address .ok_or(ContractError::ContractNotDeployed)?; diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index e74058b2..059d0281 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -53,7 +53,7 @@ mod eth_tests { let gas_estimate = contract_call.estimate_gas().await.unwrap(); let pending_tx = contract_call.send().await.unwrap(); let tx = client.get_transaction(*pending_tx).await.unwrap().unwrap(); - let tx_receipt = pending_tx.await.unwrap(); + let tx_receipt = pending_tx.await.unwrap().unwrap(); assert_eq!(last_sender.clone().call().await.unwrap(), client2.address()); assert_eq!(get_value.clone().call().await.unwrap(), "hi"); assert_eq!(tx.input, calldata); diff --git a/ethers-middleware/src/transformer/ds_proxy/mod.rs b/ethers-middleware/src/transformer/ds_proxy/mod.rs index 4a866cc3..df75aa0c 100644 --- a/ethers-middleware/src/transformer/ds_proxy/mod.rs +++ b/ethers-middleware/src/transformer/ds_proxy/mod.rs @@ -110,7 +110,8 @@ impl DsProxy { .send() .await? .await - .map_err(ContractError::ProviderError)?; + .map_err(ContractError::ProviderError)? + .ok_or(ContractError::ContractNotDeployed)?; // decode the event log to get the address of the deployed contract. if tx_receipt.status == Some(U64::from(1u64)) { diff --git a/ethers-providers/src/pending_transaction.rs b/ethers-providers/src/pending_transaction.rs index 5c6c6096..f8fca735 100644 --- a/ethers-providers/src/pending_transaction.rs +++ b/ethers-providers/src/pending_transaction.rs @@ -3,7 +3,7 @@ use crate::{ stream::{interval, DEFAULT_POLL_INTERVAL}, JsonRpcClient, PinBoxFut, Provider, ProviderError, }; -use ethers_core::types::{TransactionReceipt, TxHash, U64}; +use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64}; use futures_core::stream::Stream; use futures_util::stream::StreamExt; use pin_project::pin_project; @@ -33,12 +33,12 @@ pub struct PendingTransaction<'a, P> { impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { /// Creates a new pending transaction poller from a hash and a provider pub fn new(tx_hash: TxHash, provider: &'a Provider

) -> Self { - let fut = Box::pin(provider.get_transaction_receipt(tx_hash)); + let fut = Box::pin(provider.get_transaction(tx_hash)); Self { tx_hash, confirmations: 1, provider, - state: PendingTxState::GettingReceipt(fut), + state: PendingTxState::GettingTx(fut), interval: Box::new(interval(DEFAULT_POLL_INTERVAL)), } } @@ -57,13 +57,68 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { } } +macro_rules! rewake_with_new_state { + ($ctx:ident, $this:ident, $new_state:expr) => { + *$this.state = $new_state; + $ctx.waker().wake_by_ref(); + return Poll::Pending; + }; +} + +macro_rules! rewake_with_new_state_if { + ($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => { + if $condition { + rewake_with_new_state!($ctx, $this, $new_state); + } + }; +} + impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { - type Output = Result; + type Output = Result, ProviderError>; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { let this = self.project(); match this.state { + PendingTxState::PausedGettingTx => { + // Wait the polling period so that we do not spam the chain when no + // new block has been mined + let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx)); + let fut = Box::pin(this.provider.get_transaction(*this.tx_hash)); + *this.state = PendingTxState::GettingTx(fut); + ctx.waker().wake_by_ref(); + } + PendingTxState::GettingTx(fut) => { + let tx_res = futures_util::ready!(fut.as_mut().poll(ctx)); + // If the provider errors, just try again after the interval. + // nbd. + rewake_with_new_state_if!( + tx_res.is_err(), + ctx, + this, + PendingTxState::PausedGettingTx + ); + + let tx_opt = tx_res.unwrap(); + // If the tx is no longer in the mempool, return Ok(None) + if tx_opt.is_none() { + *this.state = PendingTxState::Completed; + return Poll::Ready(Ok(None)); + } + + // If it hasn't confirmed yet, poll again later + let tx = tx_opt.unwrap(); + rewake_with_new_state_if!( + tx.block_number.is_none(), + ctx, + this, + PendingTxState::PausedGettingTx + ); + + // Start polling for the receipt now + let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash)); + *this.state = PendingTxState::GettingReceipt(fut); + } PendingTxState::PausedGettingReceipt => { // Wait the polling period so that we do not spam the chain when no // new block has been mined @@ -73,25 +128,31 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { ctx.waker().wake_by_ref(); } PendingTxState::GettingReceipt(fut) => { - if let Ok(Some(receipt)) = futures_util::ready!(fut.as_mut().poll(ctx)) { - *this.state = PendingTxState::CheckingReceipt(Box::new(receipt)) + if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) { + *this.state = PendingTxState::CheckingReceipt(receipt) } else { *this.state = PendingTxState::PausedGettingReceipt } ctx.waker().wake_by_ref(); } PendingTxState::CheckingReceipt(receipt) => { + rewake_with_new_state_if!( + receipt.is_none(), + ctx, + this, + PendingTxState::PausedGettingReceipt + ); + // If we requested more than 1 confirmation, we need to compare the receipt's // block number and the current block if *this.confirmations > 1 { let fut = Box::pin(this.provider.get_block_number()); - *this.state = - PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone())); + *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take()); // Schedule the waker to poll again ctx.waker().wake_by_ref(); } else { - let receipt = *receipt.clone(); + let receipt = receipt.take(); *this.state = PendingTxState::Completed; return Poll::Ready(Ok(receipt)); } @@ -104,26 +165,30 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { // we need to re-instantiate the get_block_number future so that // we poll again let fut = Box::pin(this.provider.get_block_number()); - *this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone()); + *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take()); ctx.waker().wake_by_ref(); } PendingTxState::GettingBlockNumber(fut, receipt) => { + let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?; + + // This is safe so long as we only enter the `GettingBlock` + // loop from `CheckingReceipt`, which contains an explicit + // `is_none` check + let receipt = receipt.take().expect("GettingBlockNumber without receipt"); + // Wait for the interval let inclusion_block = receipt .block_number .expect("Receipt did not have a block number. This should never happen"); - - let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?; - // if the transaction has at least K confirmations, return the receipt // (subtract 1 since the tx already has 1 conf when it's mined) if current_block > inclusion_block + *this.confirmations - 1 { - let receipt = *receipt.clone(); + let receipt = Some(receipt); *this.state = PendingTxState::Completed; return Poll::Ready(Ok(receipt)); } else { tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations); - *this.state = PendingTxState::PausedGettingBlockNumber(receipt.clone()); + *this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt)); ctx.waker().wake_by_ref(); } } @@ -170,22 +235,28 @@ impl<'a, P> Deref for PendingTransaction<'a, P> { // We box the TransactionReceipts to keep the enum small. enum PendingTxState<'a> { + /// Waiting for interval to elapse before calling API again + PausedGettingTx, + + /// Polling The blockchain to see if the Tx has confirmed or dropped + GettingTx(PinBoxFut<'a, Option>), + /// Waiting for interval to elapse before calling API again PausedGettingReceipt, /// Polling the blockchain for the receipt GettingReceipt(PinBoxFut<'a, Option>), - /// Waiting for interval to elapse before calling API again - PausedGettingBlockNumber(Box), - - /// Polling the blockchain for the current block number - GettingBlockNumber(PinBoxFut<'a, U64>, Box), - /// If the pending tx required only 1 conf, it will return early. Otherwise it will /// proceed to the next state which will poll the block number until there have been /// enough confirmations - CheckingReceipt(Box), + CheckingReceipt(Option), + + /// Waiting for interval to elapse before calling API again + PausedGettingBlockNumber(Option), + + /// Polling the blockchain for the current block number + GettingBlockNumber(PinBoxFut<'a, U64>, Option), /// Future has completed and should panic if polled again Completed, @@ -194,8 +265,10 @@ enum PendingTxState<'a> { impl<'a> fmt::Debug for PendingTxState<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let state = match self { - PendingTxState::GettingReceipt(_) => "GettingReceipt", + PendingTxState::PausedGettingTx => "PausedGettingTx", + PendingTxState::GettingTx(_) => "GettingTx", PendingTxState::PausedGettingReceipt => "PausedGettingReceipt", + PendingTxState::GettingReceipt(_) => "GettingReceipt", PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber", PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber", PendingTxState::CheckingReceipt(_) => "CheckingReceipt", diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 1f1e795d..e9efcc0a 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -964,7 +964,7 @@ mod tests { .is_none()); let hash = *pending_tx; - let receipt = pending_tx.await.unwrap(); + let receipt = pending_tx.await.unwrap().unwrap(); assert_eq!(receipt.transaction_hash, hash); } diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 15c56951..ecda3110 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -289,6 +289,7 @@ mod tests { .unwrap() .await .unwrap() + .unwrap() }), ) .fuse(); @@ -361,7 +362,7 @@ mod tests { let stream = TransactionStream::new( &provider, - stream::iter(txs.iter().map(|tx| tx.transaction_hash)), + stream::iter(txs.iter().cloned().map(|tx| tx.unwrap().transaction_hash)), 10, ); let res = stream @@ -374,7 +375,9 @@ mod tests { assert_eq!(res.len(), txs.len()); assert_eq!( res.into_iter().map(|tx| tx.hash).collect::>(), - txs.into_iter().map(|tx| tx.transaction_hash).collect() + txs.into_iter() + .map(|tx| tx.unwrap().transaction_hash) + .collect() ); } } diff --git a/ethers-providers/tests/provider.rs b/ethers-providers/tests/provider.rs index 6b86a888..e37dcaca 100644 --- a/ethers-providers/tests/provider.rs +++ b/ethers-providers/tests/provider.rs @@ -125,7 +125,7 @@ mod eth_tests { let tx = TransactionRequest::new().to(who).from(who); let pending_tx = provider.send_transaction(tx, None).await.unwrap(); let tx_hash = *pending_tx; - let receipt = pending_tx.confirmations(3).await.unwrap(); + let receipt = pending_tx.confirmations(3).await.unwrap().unwrap(); // got the correct receipt assert_eq!(receipt.transaction_hash, tx_hash); } diff --git a/ethers/examples/ens.rs b/ethers/examples/ens.rs index 5f79585e..c6bf8056 100644 --- a/ethers/examples/ens.rs +++ b/ethers/examples/ens.rs @@ -18,7 +18,11 @@ async fn main() -> Result<()> { let tx = TransactionRequest::new().to("vitalik.eth").value(100_000); // send it! - let receipt = client.send_transaction(tx, None).await?.await?; + let receipt = client + .send_transaction(tx, None) + .await? + .await? + .ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?; let tx = client.get_transaction(receipt.transaction_hash).await?; println!("{}", serde_json::to_string(&tx)?); diff --git a/ethers/examples/local_signer.rs b/ethers/examples/local_signer.rs index edfe6cea..d4a4b5cc 100644 --- a/ethers/examples/local_signer.rs +++ b/ethers/examples/local_signer.rs @@ -22,7 +22,9 @@ async fn main() -> Result<()> { let pending_tx = client.send_transaction(tx, None).await?; // get the mined tx - let receipt = pending_tx.await?; + let receipt = pending_tx + .await? + .ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?; let tx = client.get_transaction(receipt.transaction_hash).await?; println!("Sent tx: {}\n", serde_json::to_string(&tx)?);