feature: PendingTransaction returns Option (#327)

* feature: PendingTransaction returns Option

* chore: expand safety reasoning in pending tx expect

* chore: clippy lints

* bug: check readiness of future before taking receipt option
This commit is contained in:
James Prestwich 2021-07-06 01:06:18 -07:00 committed by GitHub
parent dd98a593e2
commit d722c1a6af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 116 additions and 32 deletions

View File

@ -45,7 +45,8 @@ impl<M: Middleware> Deployer<M> {
let receipt = pending_tx
.confirmations(self.confs)
.await
.map_err(|_| ContractError::ContractNotDeployed)?;
.map_err(|_| ContractError::ContractNotDeployed)?
.ok_or(ContractError::ContractNotDeployed)?;
let address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;

View File

@ -53,7 +53,7 @@ mod eth_tests {
let gas_estimate = contract_call.estimate_gas().await.unwrap();
let pending_tx = contract_call.send().await.unwrap();
let tx = client.get_transaction(*pending_tx).await.unwrap().unwrap();
let tx_receipt = pending_tx.await.unwrap();
let tx_receipt = pending_tx.await.unwrap().unwrap();
assert_eq!(last_sender.clone().call().await.unwrap(), client2.address());
assert_eq!(get_value.clone().call().await.unwrap(), "hi");
assert_eq!(tx.input, calldata);

View File

@ -110,7 +110,8 @@ impl DsProxy {
.send()
.await?
.await
.map_err(ContractError::ProviderError)?;
.map_err(ContractError::ProviderError)?
.ok_or(ContractError::ContractNotDeployed)?;
// decode the event log to get the address of the deployed contract.
if tx_receipt.status == Some(U64::from(1u64)) {

View File

@ -3,7 +3,7 @@ use crate::{
stream::{interval, DEFAULT_POLL_INTERVAL},
JsonRpcClient, PinBoxFut, Provider, ProviderError,
};
use ethers_core::types::{TransactionReceipt, TxHash, U64};
use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
@ -33,12 +33,12 @@ pub struct PendingTransaction<'a, P> {
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction_receipt(tx_hash));
let fut = Box::pin(provider.get_transaction(tx_hash));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingReceipt(fut),
state: PendingTxState::GettingTx(fut),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
}
}
@ -57,13 +57,68 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
}
}
macro_rules! rewake_with_new_state {
($ctx:ident, $this:ident, $new_state:expr) => {
*$this.state = $new_state;
$ctx.waker().wake_by_ref();
return Poll::Pending;
};
}
macro_rules! rewake_with_new_state_if {
($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => {
if $condition {
rewake_with_new_state!($ctx, $this, $new_state);
}
};
}
impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<TransactionReceipt, ProviderError>;
type Output = Result<Option<TransactionReceipt>, ProviderError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match this.state {
PendingTxState::PausedGettingTx => {
// Wait the polling period so that we do not spam the chain when no
// new block has been mined
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
*this.state = PendingTxState::GettingTx(fut);
ctx.waker().wake_by_ref();
}
PendingTxState::GettingTx(fut) => {
let tx_res = futures_util::ready!(fut.as_mut().poll(ctx));
// If the provider errors, just try again after the interval.
// nbd.
rewake_with_new_state_if!(
tx_res.is_err(),
ctx,
this,
PendingTxState::PausedGettingTx
);
let tx_opt = tx_res.unwrap();
// If the tx is no longer in the mempool, return Ok(None)
if tx_opt.is_none() {
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(None));
}
// If it hasn't confirmed yet, poll again later
let tx = tx_opt.unwrap();
rewake_with_new_state_if!(
tx.block_number.is_none(),
ctx,
this,
PendingTxState::PausedGettingTx
);
// Start polling for the receipt now
let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
*this.state = PendingTxState::GettingReceipt(fut);
}
PendingTxState::PausedGettingReceipt => {
// Wait the polling period so that we do not spam the chain when no
// new block has been mined
@ -73,25 +128,31 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
ctx.waker().wake_by_ref();
}
PendingTxState::GettingReceipt(fut) => {
if let Ok(Some(receipt)) = futures_util::ready!(fut.as_mut().poll(ctx)) {
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
*this.state = PendingTxState::CheckingReceipt(receipt)
} else {
*this.state = PendingTxState::PausedGettingReceipt
}
ctx.waker().wake_by_ref();
}
PendingTxState::CheckingReceipt(receipt) => {
rewake_with_new_state_if!(
receipt.is_none(),
ctx,
this,
PendingTxState::PausedGettingReceipt
);
// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
let fut = Box::pin(this.provider.get_block_number());
*this.state =
PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone()));
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
// Schedule the waker to poll again
ctx.waker().wake_by_ref();
} else {
let receipt = *receipt.clone();
let receipt = receipt.take();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
}
@ -104,26 +165,30 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
// we need to re-instantiate the get_block_number future so that
// we poll again
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
ctx.waker().wake_by_ref();
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
// This is safe so long as we only enter the `GettingBlock`
// loop from `CheckingReceipt`, which contains an explicit
// `is_none` check
let receipt = receipt.take().expect("GettingBlockNumber without receipt");
// Wait for the interval
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");
let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block > inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
let receipt = Some(receipt);
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
*this.state = PendingTxState::PausedGettingBlockNumber(receipt.clone());
*this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt));
ctx.waker().wake_by_ref();
}
}
@ -170,22 +235,28 @@ impl<'a, P> Deref for PendingTransaction<'a, P> {
// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Waiting for interval to elapse before calling API again
PausedGettingTx,
/// Polling The blockchain to see if the Tx has confirmed or dropped
GettingTx(PinBoxFut<'a, Option<Transaction>>),
/// Waiting for interval to elapse before calling API again
PausedGettingReceipt,
/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
/// Waiting for interval to elapse before calling API again
PausedGettingBlockNumber(Box<TransactionReceipt>),
/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Box<TransactionReceipt>),
/// If the pending tx required only 1 conf, it will return early. Otherwise it will
/// proceed to the next state which will poll the block number until there have been
/// enough confirmations
CheckingReceipt(Box<TransactionReceipt>),
CheckingReceipt(Option<TransactionReceipt>),
/// Waiting for interval to elapse before calling API again
PausedGettingBlockNumber(Option<TransactionReceipt>),
/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),
/// Future has completed and should panic if polled again
Completed,
@ -194,8 +265,10 @@ enum PendingTxState<'a> {
impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::PausedGettingTx => "PausedGettingTx",
PendingTxState::GettingTx(_) => "GettingTx",
PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",

View File

@ -964,7 +964,7 @@ mod tests {
.is_none());
let hash = *pending_tx;
let receipt = pending_tx.await.unwrap();
let receipt = pending_tx.await.unwrap().unwrap();
assert_eq!(receipt.transaction_hash, hash);
}

View File

@ -289,6 +289,7 @@ mod tests {
.unwrap()
.await
.unwrap()
.unwrap()
}),
)
.fuse();
@ -361,7 +362,7 @@ mod tests {
let stream = TransactionStream::new(
&provider,
stream::iter(txs.iter().map(|tx| tx.transaction_hash)),
stream::iter(txs.iter().cloned().map(|tx| tx.unwrap().transaction_hash)),
10,
);
let res = stream
@ -374,7 +375,9 @@ mod tests {
assert_eq!(res.len(), txs.len());
assert_eq!(
res.into_iter().map(|tx| tx.hash).collect::<HashSet<_>>(),
txs.into_iter().map(|tx| tx.transaction_hash).collect()
txs.into_iter()
.map(|tx| tx.unwrap().transaction_hash)
.collect()
);
}
}

View File

@ -125,7 +125,7 @@ mod eth_tests {
let tx = TransactionRequest::new().to(who).from(who);
let pending_tx = provider.send_transaction(tx, None).await.unwrap();
let tx_hash = *pending_tx;
let receipt = pending_tx.confirmations(3).await.unwrap();
let receipt = pending_tx.confirmations(3).await.unwrap().unwrap();
// got the correct receipt
assert_eq!(receipt.transaction_hash, tx_hash);
}

View File

@ -18,7 +18,11 @@ async fn main() -> Result<()> {
let tx = TransactionRequest::new().to("vitalik.eth").value(100_000);
// send it!
let receipt = client.send_transaction(tx, None).await?.await?;
let receipt = client
.send_transaction(tx, None)
.await?
.await?
.ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?;
let tx = client.get_transaction(receipt.transaction_hash).await?;
println!("{}", serde_json::to_string(&tx)?);

View File

@ -22,7 +22,9 @@ async fn main() -> Result<()> {
let pending_tx = client.send_transaction(tx, None).await?;
// get the mined tx
let receipt = pending_tx.await?;
let receipt = pending_tx
.await?
.ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?;
let tx = client.get_transaction(receipt.transaction_hash).await?;
println!("Sent tx: {}\n", serde_json::to_string(&tx)?);