From 79b21b9ea0a1643a0c933d836e8b9c0c29a0a617 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 15 Jun 2020 15:40:06 +0300 Subject: [PATCH] add pending tx type to wait for tx confirmations (#11) * feat: add pending tx type * feat(pending-txs): implement the full state machine * tests(ethers): fix transfer eth example * feat: use the pending transaction struct when deploying a contract * ci: skip the pending tx test * chore: fix doctests --- .circleci/config.yml | 2 +- ethers-contract/Cargo.toml | 1 - ethers-contract/src/call.rs | 6 +- ethers-contract/src/factory.rs | 33 +--- ethers-providers/src/lib.rs | 5 +- ethers-providers/src/pending_transaction.rs | 191 ++++++++++++++++++++ ethers-providers/src/provider.rs | 19 +- ethers-signers/src/client.rs | 37 ++-- ethers/examples/ens.rs | 8 +- ethers/examples/local_signer.rs | 7 +- ethers/examples/transfer_eth.rs | 4 +- ethers/src/lib.rs | 10 +- 12 files changed, 253 insertions(+), 70 deletions(-) create mode 100644 ethers-providers/src/pending_transaction.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index a8972554..80519ba9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,7 +23,7 @@ jobs: - run: name: tests # skip these temporarily until we get ganache-cli and solc on CI - command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events + command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events --skip test_pending_tx - run: name: Check style command: | diff --git a/ethers-contract/Cargo.toml b/ethers-contract/Cargo.toml index cd4e2102..9f7e50b1 100644 --- a/ethers-contract/Cargo.toml +++ b/ethers-contract/Cargo.toml @@ -16,7 +16,6 @@ serde = { version = "1.0.110", default-features = false } rustc-hex = { version = "2.1.0", default-features = false } thiserror = { version = "1.0.19", default-features = false } once_cell = { version = "1.4.0", default-features = false } -tokio = { version = "0.2.21", default-features = false } futures = "0.3.5" [dev-dependencies] diff --git a/ethers-contract/src/call.rs b/ethers-contract/src/call.rs index 8c6d0b43..c76c6194 100644 --- a/ethers-contract/src/call.rs +++ b/ethers-contract/src/call.rs @@ -1,8 +1,8 @@ use ethers_core::{ abi::{Detokenize, Error as AbiError, Function, InvalidOutputType}, - types::{Address, BlockNumber, TransactionRequest, H256, U256}, + types::{Address, BlockNumber, TransactionRequest, U256}, }; -use ethers_providers::{JsonRpcClient, ProviderError}; +use ethers_providers::{JsonRpcClient, PendingTransaction, ProviderError}; use ethers_signers::{Client, ClientError, Signer}; use std::{fmt::Debug, marker::PhantomData}; @@ -110,7 +110,7 @@ where } /// Signs and broadcasts the provided transaction - pub async fn send(self) -> Result { + pub async fn send(self) -> Result, ContractError> { Ok(self.client.send_transaction(self.tx, self.block).await?) } } diff --git a/ethers-contract/src/factory.rs b/ethers-contract/src/factory.rs index 76d9ed8b..9ac36269 100644 --- a/ethers-contract/src/factory.rs +++ b/ethers-contract/src/factory.rs @@ -7,13 +7,6 @@ use ethers_core::{ use ethers_providers::JsonRpcClient; use ethers_signers::{Client, Signer}; -use std::time::Duration; -use tokio::time; - -/// Poll for tx confirmation once every 7 seconds. -// TODO: Can this be improved by replacing polling with an "on new block" subscription? -const POLL_INTERVAL: u64 = 7000; - #[derive(Debug, Clone)] /// Helper which manages the deployment transaction of a smart contract pub struct Deployer<'a, P, S> { @@ -21,7 +14,6 @@ pub struct Deployer<'a, P, S> { client: &'a Client, tx: TransactionRequest, confs: usize, - poll_interval: Duration, } impl<'a, P, S> Deployer<'a, P, S> @@ -29,13 +21,6 @@ where S: Signer, P: JsonRpcClient, { - /// Sets the poll frequency for checking the number of confirmations for - /// the contract deployment transaction - pub fn poll_interval>(mut self, interval: T) -> Self { - self.poll_interval = interval.into(); - self - } - /// Sets the number of confirmations to wait for the contract deployment transaction pub fn confirmations>(mut self, confirmations: T) -> Self { self.confs = confirmations.into(); @@ -46,20 +31,13 @@ where /// be sufficiently confirmed (default: 1), it returns a [`Contract`](./struct.Contract.html) /// struct at the deployed contract's address. pub async fn send(self) -> Result, ContractError> { - let tx_hash = self.client.send_transaction(self.tx, None).await?; + let pending_tx = self.client.send_transaction(self.tx, None).await?; - // poll for the receipt - let address; - loop { - if let Ok(receipt) = self.client.get_transaction_receipt(tx_hash).await { - address = receipt - .contract_address - .ok_or(ContractError::ContractNotDeployed)?; - break; - } + let receipt = pending_tx.confirmations(self.confs).await?; - time::delay_for(Duration::from_millis(POLL_INTERVAL)).await; - } + let address = receipt + .contract_address + .ok_or(ContractError::ContractNotDeployed)?; let contract = Contract::new(address, self.abi.clone(), self.client); Ok(contract) @@ -177,7 +155,6 @@ where abi: self.abi, tx, confs: 1, - poll_interval: Duration::from_millis(POLL_INTERVAL), }) } } diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index ace6151d..2f9a7c56 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -6,6 +6,9 @@ mod provider; // ENS support mod ens; +mod pending_transaction; +pub use pending_transaction::PendingTransaction; + mod stream; pub use stream::FilterStream; // re-export `StreamExt` so that consumers can call `next()` on the `FilterStream` @@ -28,6 +31,6 @@ pub trait JsonRpcClient: Debug + Clone { /// Sends a request with the provided JSON-RPC and parameters serialized as JSON async fn request(&self, method: &str, params: T) -> Result where - T: Serialize + Send + Sync, + T: Debug + Serialize + Send + Sync, R: for<'a> Deserialize<'a>; } diff --git a/ethers-providers/src/pending_transaction.rs b/ethers-providers/src/pending_transaction.rs new file mode 100644 index 00000000..23d3d622 --- /dev/null +++ b/ethers-providers/src/pending_transaction.rs @@ -0,0 +1,191 @@ +use crate::{JsonRpcClient, Provider, ProviderError}; +use ethers_core::types::{TransactionReceipt, TxHash, U64}; +use pin_project::pin_project; +use std::{ + fmt, + future::Future, + ops::Deref, + pin::Pin, + task::{Context, Poll}, +}; + +/// A pending transaction is a transaction which has been submitted but is not yet mined. +/// `await`'ing on a pending transaction will resolve to a transaction receipt +/// once the transaction has enough `confirmations`. The default number of confirmations +/// is 1, but may be adjusted with the `confirmations` method. If the transaction does not +/// have enough confirmations or is not mined, the future will stay in the pending state. +#[pin_project] +pub struct PendingTransaction<'a, P> { + tx_hash: TxHash, + confirmations: usize, + provider: &'a Provider

, + state: PendingTxState<'a>, +} + +impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> { + /// Creates a new pending transaction poller from a hash and a provider + pub fn new(tx_hash: TxHash, provider: &'a Provider

) -> Self { + let fut = Box::pin(provider.get_transaction_receipt(tx_hash)); + Self { + tx_hash, + confirmations: 1, + provider, + state: PendingTxState::GettingReceipt(fut), + } + } + + /// Sets the number of confirmations for the pending transaction to resolve + /// to a receipt + pub fn confirmations(mut self, confs: usize) -> Self { + self.confirmations = confs; + self + } +} + +impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + let this = self.project(); + + match this.state { + PendingTxState::GettingReceipt(fut) => { + let receipt = futures_util::ready!(fut.as_mut().poll(ctx))?; + *this.state = PendingTxState::CheckingReceipt(Box::new(receipt)) + } + PendingTxState::CheckingReceipt(receipt) => { + // If we requested more than 1 confirmation, we need to compare the receipt's + // block number and the current block + if *this.confirmations > 1 { + let fut = Box::pin(this.provider.get_block_number()); + *this.state = + PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone())) + } else { + let receipt = *receipt.clone(); + *this.state = PendingTxState::Completed; + return Poll::Ready(Ok(receipt)); + } + } + PendingTxState::GettingBlockNumber(fut, receipt) => { + let inclusion_block = receipt + .block_number + .expect("Receipt did not have a block number. This should never happen"); + + let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?; + + // if the transaction has at least K confirmations, return the receipt + // (subtract 1 since the tx already has 1 conf when it's mined) + if current_block >= inclusion_block + *this.confirmations - 1 { + let receipt = *receipt.clone(); + *this.state = PendingTxState::Completed; + return Poll::Ready(Ok(receipt)); + } else { + // we need to re-instantiate the get_block_number future so that + // we poll again + let fut = Box::pin(this.provider.get_block_number()); + *this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone()); + return Poll::Pending; + } + } + PendingTxState::Completed => { + panic!("polled pending transaction future after completion") + } + }; + + Poll::Pending + } +} + +impl<'a, P> fmt::Debug for PendingTransaction<'a, P> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PendingTransaction") + .field("tx_hash", &self.tx_hash) + .field("confirmations", &self.confirmations) + .field("state", &self.state) + .finish() + } +} + +impl<'a, P> PartialEq for PendingTransaction<'a, P> { + fn eq(&self, other: &Self) -> bool { + self.tx_hash == other.tx_hash + } +} + +impl<'a, P> PartialEq for PendingTransaction<'a, P> { + fn eq(&self, other: &TxHash) -> bool { + &self.tx_hash == other + } +} + +impl<'a, P> Eq for PendingTransaction<'a, P> {} + +impl<'a, P> Deref for PendingTransaction<'a, P> { + type Target = TxHash; + + fn deref(&self) -> &Self::Target { + &self.tx_hash + } +} + +// Helper type alias +type PinBoxFut<'a, T> = Pin> + 'a>>; + +// We box the TransactionReceipts to keep the enum small. +enum PendingTxState<'a> { + /// Polling the blockchain for the receipt + GettingReceipt(PinBoxFut<'a, TransactionReceipt>), + + /// Polling the blockchain for the current block number + GettingBlockNumber(PinBoxFut<'a, U64>, Box), + + /// If the pending tx required only 1 conf, it will return early. Otherwise it will + /// proceed to the next state which will poll the block number until there have been + /// enough confirmations + CheckingReceipt(Box), + + /// Future has completed and should panic if polled again + Completed, +} + +impl<'a> fmt::Debug for PendingTxState<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = match self { + PendingTxState::GettingReceipt(_) => "GettingReceipt", + PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber", + PendingTxState::CheckingReceipt(_) => "CheckingReceipt", + PendingTxState::Completed => "Completed", + }; + + f.debug_struct("PendingTxState") + .field("state", &state) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Http; + use ethers_core::{types::TransactionRequest, utils::Ganache}; + use std::convert::TryFrom; + + #[tokio::test] + async fn test_pending_tx() { + let _ganache = Ganache::new().spawn(); + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let accounts = provider.get_accounts().await.unwrap(); + let tx = TransactionRequest::pay(accounts[0], 1000).from(accounts[0]); + + let pending_tx = provider.send_transaction(tx).await.unwrap(); + + let receipt = provider + .get_transaction_receipt(pending_tx.tx_hash) + .await + .unwrap(); + + // the pending tx resolves to the same receipt + let tx_receipt = pending_tx.confirmations(1).await.unwrap(); + assert_eq!(receipt, tx_receipt); + } +} diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index e34ec711..0510466e 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -2,7 +2,7 @@ use crate::{ ens, http::Provider as HttpProvider, stream::{FilterStream, FilterWatcher}, - JsonRpcClient, + JsonRpcClient, PendingTransaction, }; use ethers_core::{ @@ -266,7 +266,7 @@ impl Provider

{ pub async fn send_transaction( &self, mut tx: TransactionRequest, - ) -> Result { + ) -> Result, ProviderError> { if let Some(ref to) = tx.to { if let NameOrAddress::Name(ens_name) = to { // resolve to an address @@ -277,22 +277,27 @@ impl Provider

{ } } - Ok(self + let tx_hash = self .0 .request("eth_sendTransaction", [tx]) .await - .map_err(Into::into)?) + .map_err(Into::into)?; + Ok(PendingTransaction::new(tx_hash, self)) } /// Send the raw RLP encoded transaction to the entire Ethereum network and returns the transaction's hash /// This will consume gas from the account that signed the transaction. - pub async fn send_raw_transaction(&self, tx: &Transaction) -> Result { + pub async fn send_raw_transaction( + &self, + tx: &Transaction, + ) -> Result, ProviderError> { let rlp = utils::serialize(&tx.rlp()); - Ok(self + let tx_hash = self .0 .request("eth_sendRawTransaction", [rlp]) .await - .map_err(Into::into)?) + .map_err(Into::into)?; + Ok(PendingTransaction::new(tx_hash, self)) } /// Signs data using a specific account. This account needs to be unlocked. diff --git a/ethers-signers/src/client.rs b/ethers-signers/src/client.rs index 91a0cb34..56849107 100644 --- a/ethers-signers/src/client.rs +++ b/ethers-signers/src/client.rs @@ -1,9 +1,9 @@ use crate::Signer; use ethers_core::types::{ - Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest, TxHash, + Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest, }; -use ethers_providers::{JsonRpcClient, Provider, ProviderError}; +use ethers_providers::{JsonRpcClient, PendingTransaction, Provider, ProviderError}; use futures_util::{future::ok, join}; use std::{future::Future, ops::Deref}; @@ -42,7 +42,14 @@ use thiserror::Error; /// let signed_msg = client.provider().sign(b"hello".to_vec(), &client.address()).await?; /// /// let tx = TransactionRequest::pay("vitalik.eth", 100); -/// let tx_hash = client.send_transaction(tx, None).await?; +/// let pending_tx = client.send_transaction(tx, None).await?; +/// +/// // You can get the transaction hash by dereferencing it +/// let tx_hash = *pending_tx; +/// +/// // Or you can `await` on the pending transaction to get the receipt with a pre-specified +/// // number of confirmations +/// let receipt = pending_tx.confirmations(6).await?; /// /// // You can connect with other wallets at runtime via the `with_signer` function /// let wallet2: Wallet = "cd8c407233c0560f6de24bb2dc60a8b02335c959a1a17f749ce6c1ccf63d74a7" @@ -113,7 +120,7 @@ where &self, mut tx: TransactionRequest, block: Option, - ) -> Result { + ) -> Result, ClientError> { if let Some(ref to) = tx.to { if let NameOrAddress::Name(ens_name) = to { let addr = self.resolve_name(&ens_name).await?; @@ -128,9 +135,7 @@ where let signed_tx = self.signer.sign_transaction(tx).map_err(Into::into)?; // broadcast it - self.provider.send_raw_transaction(&signed_tx).await?; - - Ok(signed_tx.hash) + Ok(self.provider.send_raw_transaction(&signed_tx).await?) } async fn fill_transaction( @@ -176,16 +181,22 @@ where /// Sets the signer and returns a mutable reference to self so that it can be used in chained /// calls. - pub fn with_signer(&mut self, signer: S) -> &mut Self { - self.signer = signer; - self + /// + /// Clones internally. + pub fn with_signer(&self, signer: S) -> Self { + let mut this = self.clone(); + this.signer = signer; + this } /// Sets the provider and returns a mutable reference to self so that it can be used in chained /// calls. - pub fn with_provider(&mut self, provider: Provider

) -> &mut Self { - self.provider = provider; - self + /// + /// Clones internally. + pub fn with_provider(&self, provider: Provider

) -> Self { + let mut this = self.clone(); + this.provider = provider; + this } } diff --git a/ethers/examples/ens.rs b/ethers/examples/ens.rs index 0a342944..f721c520 100644 --- a/ethers/examples/ens.rs +++ b/ethers/examples/ens.rs @@ -18,12 +18,10 @@ async fn main() -> Result<()> { let tx = TransactionRequest::new().to("vitalik.eth").value(100_000); // send it! - let hash = client.send_transaction(tx, None).await?; + let pending_tx = client.send_transaction(tx, None).await?; - // get the mined tx - let tx = client.get_transaction(hash).await?; - - let receipt = client.get_transaction_receipt(tx.hash).await?; + let receipt = pending_tx.await?; + let tx = client.get_transaction(receipt.transaction_hash).await?; println!("{}", serde_json::to_string(&tx)?); println!("{}", serde_json::to_string(&receipt)?); diff --git a/ethers/examples/local_signer.rs b/ethers/examples/local_signer.rs index 2699c1f2..43ce7b7b 100644 --- a/ethers/examples/local_signer.rs +++ b/ethers/examples/local_signer.rs @@ -27,12 +27,11 @@ async fn main() -> Result<()> { .value(10000); // send it! - let hash = client.send_transaction(tx, None).await?; + let pending_tx = client.send_transaction(tx, None).await?; // get the mined tx - let tx = client.get_transaction(hash).await?; - - let receipt = client.get_transaction_receipt(tx.hash).await?; + let receipt = pending_tx.await?; + let tx = client.get_transaction(receipt.transaction_hash).await?; println!("Sent tx: {}\n", serde_json::to_string(&tx)?); println!("Tx receipt: {}", serde_json::to_string(&receipt)?); diff --git a/ethers/examples/transfer_eth.rs b/ethers/examples/transfer_eth.rs index 985be945..e1316dc3 100644 --- a/ethers/examples/transfer_eth.rs +++ b/ethers/examples/transfer_eth.rs @@ -22,9 +22,9 @@ async fn main() -> Result<()> { let balance_before = provider.get_balance(from, None).await?; // broadcast it via the eth_sendTransaction API - let tx_hash = provider.send_transaction(tx).await?; + let pending_tx = provider.send_transaction(tx).await?; - let tx = provider.get_transaction(tx_hash).await?; + let tx = pending_tx.await?; println!("{}", serde_json::to_string(&tx)?); diff --git a/ethers/src/lib.rs b/ethers/src/lib.rs index dbb0ee75..096a656e 100644 --- a/ethers/src/lib.rs +++ b/ethers/src/lib.rs @@ -190,13 +190,13 @@ pub mod providers { /// .value(10000); /// /// // send it! (this will resolve the ENS name to an address under the hood) -/// let hash = client.send_transaction(tx, None).await?; -/// -/// // get the mined tx -/// let tx = client.get_transaction(hash).await?; +/// let pending_tx = client.send_transaction(tx, None).await?; /// /// // get the receipt -/// let receipt = client.get_transaction_receipt(tx.hash).await?; +/// let receipt = pending_tx.await?; +/// +/// // get the mined tx +/// let tx = client.get_transaction(receipt.transaction_hash).await?; /// /// println!("{}", serde_json::to_string(&tx)?); /// println!("{}", serde_json::to_string(&receipt)?);