add pending tx type to wait for tx confirmations (#11)

* feat: add pending tx type

* feat(pending-txs): implement the full state machine

* tests(ethers): fix transfer eth example

* feat: use the pending transaction struct when deploying a contract

* ci: skip the pending tx test

* chore: fix doctests
This commit is contained in:
Georgios Konstantopoulos 2020-06-15 15:40:06 +03:00 committed by GitHub
parent 20da946aa2
commit 79b21b9ea0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 253 additions and 70 deletions

View File

@ -23,7 +23,7 @@ jobs:
- run:
name: tests
# skip these temporarily until we get ganache-cli and solc on CI
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events --skip test_pending_tx
- run:
name: Check style
command: |

View File

@ -16,7 +16,6 @@ serde = { version = "1.0.110", default-features = false }
rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
once_cell = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.21", default-features = false }
futures = "0.3.5"
[dev-dependencies]

View File

@ -1,8 +1,8 @@
use ethers_core::{
abi::{Detokenize, Error as AbiError, Function, InvalidOutputType},
types::{Address, BlockNumber, TransactionRequest, H256, U256},
types::{Address, BlockNumber, TransactionRequest, U256},
};
use ethers_providers::{JsonRpcClient, ProviderError};
use ethers_providers::{JsonRpcClient, PendingTransaction, ProviderError};
use ethers_signers::{Client, ClientError, Signer};
use std::{fmt::Debug, marker::PhantomData};
@ -110,7 +110,7 @@ where
}
/// Signs and broadcasts the provided transaction
pub async fn send(self) -> Result<H256, ContractError> {
pub async fn send(self) -> Result<PendingTransaction<'a, P>, ContractError> {
Ok(self.client.send_transaction(self.tx, self.block).await?)
}
}

View File

@ -7,13 +7,6 @@ use ethers_core::{
use ethers_providers::JsonRpcClient;
use ethers_signers::{Client, Signer};
use std::time::Duration;
use tokio::time;
/// Poll for tx confirmation once every 7 seconds.
// TODO: Can this be improved by replacing polling with an "on new block" subscription?
const POLL_INTERVAL: u64 = 7000;
#[derive(Debug, Clone)]
/// Helper which manages the deployment transaction of a smart contract
pub struct Deployer<'a, P, S> {
@ -21,7 +14,6 @@ pub struct Deployer<'a, P, S> {
client: &'a Client<P, S>,
tx: TransactionRequest,
confs: usize,
poll_interval: Duration,
}
impl<'a, P, S> Deployer<'a, P, S>
@ -29,13 +21,6 @@ where
S: Signer,
P: JsonRpcClient,
{
/// Sets the poll frequency for checking the number of confirmations for
/// the contract deployment transaction
pub fn poll_interval<T: Into<Duration>>(mut self, interval: T) -> Self {
self.poll_interval = interval.into();
self
}
/// Sets the number of confirmations to wait for the contract deployment transaction
pub fn confirmations<T: Into<usize>>(mut self, confirmations: T) -> Self {
self.confs = confirmations.into();
@ -46,20 +31,13 @@ where
/// be sufficiently confirmed (default: 1), it returns a [`Contract`](./struct.Contract.html)
/// struct at the deployed contract's address.
pub async fn send(self) -> Result<Contract<'a, P, S>, ContractError> {
let tx_hash = self.client.send_transaction(self.tx, None).await?;
let pending_tx = self.client.send_transaction(self.tx, None).await?;
// poll for the receipt
let address;
loop {
if let Ok(receipt) = self.client.get_transaction_receipt(tx_hash).await {
address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;
break;
}
let receipt = pending_tx.confirmations(self.confs).await?;
time::delay_for(Duration::from_millis(POLL_INTERVAL)).await;
}
let address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;
let contract = Contract::new(address, self.abi.clone(), self.client);
Ok(contract)
@ -177,7 +155,6 @@ where
abi: self.abi,
tx,
confs: 1,
poll_interval: Duration::from_millis(POLL_INTERVAL),
})
}
}

View File

@ -6,6 +6,9 @@ mod provider;
// ENS support
mod ens;
mod pending_transaction;
pub use pending_transaction::PendingTransaction;
mod stream;
pub use stream::FilterStream;
// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream`
@ -28,6 +31,6 @@ pub trait JsonRpcClient: Debug + Clone {
/// Sends a request with the provided JSON-RPC and parameters serialized as JSON
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Serialize + Send + Sync,
T: Debug + Serialize + Send + Sync,
R: for<'a> Deserialize<'a>;
}

View File

@ -0,0 +1,191 @@
use crate::{JsonRpcClient, Provider, ProviderError};
use ethers_core::types::{TransactionReceipt, TxHash, U64};
use pin_project::pin_project;
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};
/// A pending transaction is a transaction which has been submitted but is not yet mined.
/// `await`'ing on a pending transaction will resolve to a transaction receipt
/// once the transaction has enough `confirmations`. The default number of confirmations
/// is 1, but may be adjusted with the `confirmations` method. If the transaction does not
/// have enough confirmations or is not mined, the future will stay in the pending state.
#[pin_project]
pub struct PendingTransaction<'a, P> {
tx_hash: TxHash,
confirmations: usize,
provider: &'a Provider<P>,
state: PendingTxState<'a>,
}
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction_receipt(tx_hash));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingReceipt(fut),
}
}
/// Sets the number of confirmations for the pending transaction to resolve
/// to a receipt
pub fn confirmations(mut self, confs: usize) -> Self {
self.confirmations = confs;
self
}
}
impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<TransactionReceipt, ProviderError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
match this.state {
PendingTxState::GettingReceipt(fut) => {
let receipt = futures_util::ready!(fut.as_mut().poll(ctx))?;
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
}
PendingTxState::CheckingReceipt(receipt) => {
// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
let fut = Box::pin(this.provider.get_block_number());
*this.state =
PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone()))
} else {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
}
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");
let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block >= inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
// we need to re-instantiate the get_block_number future so that
// we poll again
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone());
return Poll::Pending;
}
}
PendingTxState::Completed => {
panic!("polled pending transaction future after completion")
}
};
Poll::Pending
}
}
impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingTransaction")
.field("tx_hash", &self.tx_hash)
.field("confirmations", &self.confirmations)
.field("state", &self.state)
.finish()
}
}
impl<'a, P> PartialEq for PendingTransaction<'a, P> {
fn eq(&self, other: &Self) -> bool {
self.tx_hash == other.tx_hash
}
}
impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
fn eq(&self, other: &TxHash) -> bool {
&self.tx_hash == other
}
}
impl<'a, P> Eq for PendingTransaction<'a, P> {}
impl<'a, P> Deref for PendingTransaction<'a, P> {
type Target = TxHash;
fn deref(&self) -> &Self::Target {
&self.tx_hash
}
}
// Helper type alias
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, TransactionReceipt>),
/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Box<TransactionReceipt>),
/// If the pending tx required only 1 conf, it will return early. Otherwise it will
/// proceed to the next state which will poll the block number until there have been
/// enough confirmations
CheckingReceipt(Box<TransactionReceipt>),
/// Future has completed and should panic if polled again
Completed,
}
impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
PendingTxState::Completed => "Completed",
};
f.debug_struct("PendingTxState")
.field("state", &state)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Http;
use ethers_core::{types::TransactionRequest, utils::Ganache};
use std::convert::TryFrom;
#[tokio::test]
async fn test_pending_tx() {
let _ganache = Ganache::new().spawn();
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
let accounts = provider.get_accounts().await.unwrap();
let tx = TransactionRequest::pay(accounts[0], 1000).from(accounts[0]);
let pending_tx = provider.send_transaction(tx).await.unwrap();
let receipt = provider
.get_transaction_receipt(pending_tx.tx_hash)
.await
.unwrap();
// the pending tx resolves to the same receipt
let tx_receipt = pending_tx.confirmations(1).await.unwrap();
assert_eq!(receipt, tx_receipt);
}
}

View File

@ -2,7 +2,7 @@ use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient,
JsonRpcClient, PendingTransaction,
};
use ethers_core::{
@ -266,7 +266,7 @@ impl<P: JsonRpcClient> Provider<P> {
pub async fn send_transaction(
&self,
mut tx: TransactionRequest,
) -> Result<TxHash, ProviderError> {
) -> Result<PendingTransaction<'_, P>, ProviderError> {
if let Some(ref to) = tx.to {
if let NameOrAddress::Name(ens_name) = to {
// resolve to an address
@ -277,22 +277,27 @@ impl<P: JsonRpcClient> Provider<P> {
}
}
Ok(self
let tx_hash = self
.0
.request("eth_sendTransaction", [tx])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}
/// Send the raw RLP encoded transaction to the entire Ethereum network and returns the transaction's hash
/// This will consume gas from the account that signed the transaction.
pub async fn send_raw_transaction(&self, tx: &Transaction) -> Result<TxHash, ProviderError> {
pub async fn send_raw_transaction(
&self,
tx: &Transaction,
) -> Result<PendingTransaction<'_, P>, ProviderError> {
let rlp = utils::serialize(&tx.rlp());
Ok(self
let tx_hash = self
.0
.request("eth_sendRawTransaction", [rlp])
.await
.map_err(Into::into)?)
.map_err(Into::into)?;
Ok(PendingTransaction::new(tx_hash, self))
}
/// Signs data using a specific account. This account needs to be unlocked.

View File

@ -1,9 +1,9 @@
use crate::Signer;
use ethers_core::types::{
Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest, TxHash,
Address, BlockNumber, Bytes, NameOrAddress, Signature, TransactionRequest,
};
use ethers_providers::{JsonRpcClient, Provider, ProviderError};
use ethers_providers::{JsonRpcClient, PendingTransaction, Provider, ProviderError};
use futures_util::{future::ok, join};
use std::{future::Future, ops::Deref};
@ -42,7 +42,14 @@ use thiserror::Error;
/// let signed_msg = client.provider().sign(b"hello".to_vec(), &client.address()).await?;
///
/// let tx = TransactionRequest::pay("vitalik.eth", 100);
/// let tx_hash = client.send_transaction(tx, None).await?;
/// let pending_tx = client.send_transaction(tx, None).await?;
///
/// // You can get the transaction hash by dereferencing it
/// let tx_hash = *pending_tx;
///
/// // Or you can `await` on the pending transaction to get the receipt with a pre-specified
/// // number of confirmations
/// let receipt = pending_tx.confirmations(6).await?;
///
/// // You can connect with other wallets at runtime via the `with_signer` function
/// let wallet2: Wallet = "cd8c407233c0560f6de24bb2dc60a8b02335c959a1a17f749ce6c1ccf63d74a7"
@ -113,7 +120,7 @@ where
&self,
mut tx: TransactionRequest,
block: Option<BlockNumber>,
) -> Result<TxHash, ClientError> {
) -> Result<PendingTransaction<'_, P>, ClientError> {
if let Some(ref to) = tx.to {
if let NameOrAddress::Name(ens_name) = to {
let addr = self.resolve_name(&ens_name).await?;
@ -128,9 +135,7 @@ where
let signed_tx = self.signer.sign_transaction(tx).map_err(Into::into)?;
// broadcast it
self.provider.send_raw_transaction(&signed_tx).await?;
Ok(signed_tx.hash)
Ok(self.provider.send_raw_transaction(&signed_tx).await?)
}
async fn fill_transaction(
@ -176,16 +181,22 @@ where
/// Sets the signer and returns a mutable reference to self so that it can be used in chained
/// calls.
pub fn with_signer(&mut self, signer: S) -> &mut Self {
self.signer = signer;
self
///
/// Clones internally.
pub fn with_signer(&self, signer: S) -> Self {
let mut this = self.clone();
this.signer = signer;
this
}
/// Sets the provider and returns a mutable reference to self so that it can be used in chained
/// calls.
pub fn with_provider(&mut self, provider: Provider<P>) -> &mut Self {
self.provider = provider;
self
///
/// Clones internally.
pub fn with_provider(&self, provider: Provider<P>) -> Self {
let mut this = self.clone();
this.provider = provider;
this
}
}

View File

@ -18,12 +18,10 @@ async fn main() -> Result<()> {
let tx = TransactionRequest::new().to("vitalik.eth").value(100_000);
// send it!
let hash = client.send_transaction(tx, None).await?;
let pending_tx = client.send_transaction(tx, None).await?;
// get the mined tx
let tx = client.get_transaction(hash).await?;
let receipt = client.get_transaction_receipt(tx.hash).await?;
let receipt = pending_tx.await?;
let tx = client.get_transaction(receipt.transaction_hash).await?;
println!("{}", serde_json::to_string(&tx)?);
println!("{}", serde_json::to_string(&receipt)?);

View File

@ -27,12 +27,11 @@ async fn main() -> Result<()> {
.value(10000);
// send it!
let hash = client.send_transaction(tx, None).await?;
let pending_tx = client.send_transaction(tx, None).await?;
// get the mined tx
let tx = client.get_transaction(hash).await?;
let receipt = client.get_transaction_receipt(tx.hash).await?;
let receipt = pending_tx.await?;
let tx = client.get_transaction(receipt.transaction_hash).await?;
println!("Sent tx: {}\n", serde_json::to_string(&tx)?);
println!("Tx receipt: {}", serde_json::to_string(&receipt)?);

View File

@ -22,9 +22,9 @@ async fn main() -> Result<()> {
let balance_before = provider.get_balance(from, None).await?;
// broadcast it via the eth_sendTransaction API
let tx_hash = provider.send_transaction(tx).await?;
let pending_tx = provider.send_transaction(tx).await?;
let tx = provider.get_transaction(tx_hash).await?;
let tx = pending_tx.await?;
println!("{}", serde_json::to_string(&tx)?);

View File

@ -190,13 +190,13 @@ pub mod providers {
/// .value(10000);
///
/// // send it! (this will resolve the ENS name to an address under the hood)
/// let hash = client.send_transaction(tx, None).await?;
///
/// // get the mined tx
/// let tx = client.get_transaction(hash).await?;
/// let pending_tx = client.send_transaction(tx, None).await?;
///
/// // get the receipt
/// let receipt = client.get_transaction_receipt(tx.hash).await?;
/// let receipt = pending_tx.await?;
///
/// // get the mined tx
/// let tx = client.get_transaction(receipt.transaction_hash).await?;
///
/// println!("{}", serde_json::to_string(&tx)?);
/// println!("{}", serde_json::to_string(&receipt)?);