Add streamed logs to the provider (#9)

* feat(provider): implement Streamed logs

This utilizes eth_getFilterChanges. The stream struct must be instantiated with a factory that yields logs/hashes.
Consumers are expected to use the `FilterStream` trait in order to simplify their type definitions

* feat(provider): expose streaming methods

* test(provider): add new blocks/pending txs test

* feat(contract): allow events to be streamed

* test(contract): add integration test for streaming event logs

* perf(contract-factory): take abi and bytecode by value instead of reference

The abi, bytecode and the factory's deploy method now consume the structs instead of being passed by reference. While this means that
consumers might need to clone before using them, this gives us some more flexiblity around factories inside helper functions

* refactor(contract): use test helpers to reduce code dup

* chore: make clippy happy
This commit is contained in:
Georgios Konstantopoulos 2020-06-15 11:46:07 +03:00 committed by GitHub
parent beb480f22b
commit d90b03da06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 620 additions and 151 deletions

View File

@ -3,43 +3,10 @@ version: 2.1
commands:
setup-lints:
steps:
- run:
name: Install cargo-audit
command: cargo install cargo-audit
- run:
name: Install clippy
command: rustup component add clippy
# https://medium.com/@edouard.oger/rust-caching-on-circleci-using-sccache-c996344f0115
setup-sccache:
steps:
- run:
name: Install sccache
command: |
cargo install sccache
# This configures Rust to use sccache.
echo 'export "RUSTC_WRAPPER"="sccache"' >> $BASH_ENV
# This is the maximum space sccache cache will use on disk.
echo 'export "SCCACHE_CACHE_SIZE"="1G"' >> $BASH_ENV
sccache --version
restore-sccache-cache:
steps:
- restore_cache:
name: Restore sccache cache
key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }}
save-sccache-cache:
steps:
- save_cache:
name: Save sccache cache
# We use {{ epoch }} to always upload a fresh cache:
# Of course, restore_cache will not find this exact key,
# but it will fall back to the closest key (aka the most recent).
# See https://discuss.circleci.com/t/add-mechanism-to-update-existing-cache-key/9014/13
key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }}-{{ epoch }}
paths:
- "~/.cache/sccache"
jobs:
build:
docker:
@ -47,18 +14,18 @@ jobs:
steps:
- checkout
- setup-lints
- setup-sccache
- restore-sccache-cache
- run:
name: Reduce codegen Units
# If we don't include this, the linker runs out of memory when building
# the project on CI. We don't include this normally though because
# it should be able to build with more units on other machines
command: printf "[profile.dev]\ncodegen-units = 1\n" >> Cargo.toml
- run:
name: tests
# skip these temporarily until we get ganache-cli and solc on CI
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth
command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events
- run:
name: Check style
command: |
cargo fmt --all -- --check
cargo clippy --all-targets --all-features -- -D warnings
- run:
name: Audit Dependencies
command: cargo audit
- save-sccache-cache

49
Cargo.lock generated
View File

@ -296,6 +296,7 @@ dependencies = [
"ethers-core",
"ethers-providers",
"ethers-signers",
"futures",
"once_cell",
"rustc-hex",
"serde",
@ -358,6 +359,9 @@ version = "0.1.0"
dependencies = [
"async-trait",
"ethers-core",
"futures-core",
"futures-util",
"pin-project",
"reqwest",
"rustc-hex",
"serde",
@ -418,6 +422,21 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.5"
@ -425,6 +444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -433,6 +453,23 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
name = "futures-executor"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
[[package]]
name = "futures-macro"
version = "0.3.5"
@ -466,9 +503,13 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-utils",
"proc-macro-hack",
@ -860,18 +901,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "0.4.17"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791"
checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "0.4.17"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40"
checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d"
dependencies = [
"proc-macro2",
"quote",

View File

@ -17,6 +17,7 @@ rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
once_cell = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.21", default-features = false }
futures = "0.3.5"
[dev-dependencies]
tokio = { version = "0.2.21", default-features = false, features = ["macros"] }

View File

@ -72,7 +72,7 @@ impl Context {
/// client at the given `Address`. The contract derefs to a `ethers::Contract`
/// object
pub fn new<T: Into<Address>>(address: T, client: &'a Client<P, S>) -> Self {
let contract = Contract::new(address.into(), &#abi_name, client);
let contract = Contract::new(address.into(), #abi_name.clone(), client);
Self(contract)
}

View File

@ -75,7 +75,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData};
/// .parse::<Wallet>()?.connect(provider);
///
/// // create the contract object at the address
/// let contract = Contract::new(address, &abi, &client);
/// let contract = Contract::new(address, abi, &client);
///
/// // Calling constant methods is done by calling `call()` on the method builder.
/// // (if the function takes no arguments, then you must use `()` as the argument)
@ -112,7 +112,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData};
/// # let abi: Abi = serde_json::from_str(r#"[]"#)?;
/// # let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
/// # let client = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc".parse::<Wallet>()?.connect(provider);
/// # let contract = Contract::new(address, &abi, &client);
/// # let contract = Contract::new(address, abi, &client);
///
/// #[derive(Clone, Debug)]
/// struct ValueChanged {
@ -160,7 +160,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData};
#[derive(Debug, Clone)]
pub struct Contract<'a, P, S> {
client: &'a Client<P, S>,
abi: &'a Abi,
abi: Abi,
address: Address,
/// A mapping from method signature to a name-index pair for accessing
@ -176,7 +176,7 @@ where
P: JsonRpcClient,
{
/// Creates a new contract from the provided client, abi and address
pub fn new(address: Address, abi: &'a Abi, client: &'a Client<P, S>) -> Self {
pub fn new(address: Address, abi: Abi, client: &'a Client<P, S>) -> Self {
let methods = create_mapping(&abi.functions, |function| function.selector());
Self {

View File

@ -1,12 +1,13 @@
use crate::ContractError;
use ethers_providers::{JsonRpcClient, Provider};
use ethers_providers::{FilterStream, JsonRpcClient, Provider};
use ethers_core::{
abi::{Detokenize, Event as AbiEvent, RawLog},
types::{BlockNumber, Filter, Log, ValueOrArray, H256},
};
use futures::stream::{Stream, StreamExt};
use std::{collections::HashMap, marker::PhantomData};
/// Helper for managing the event filter before querying or streaming its logs
@ -60,6 +61,21 @@ impl<P, D: Detokenize> Event<'_, '_, P, D> {
}
}
impl<'a, 'b, P, D> Event<'a, 'b, P, D>
where
P: JsonRpcClient,
D: 'b + Detokenize + Clone,
'a: 'b,
{
/// Returns a stream for the event
pub async fn stream(
self,
) -> Result<impl Stream<Item = Result<D, ContractError>> + 'b, ContractError> {
let filter = self.provider.watch(&self.filter).await?;
Ok(filter.stream().map(move |log| self.parse_log(log)))
}
}
impl<P, D> Event<'_, '_, P, D>
where
P: JsonRpcClient,
@ -107,6 +123,4 @@ where
// convert the tokens to the requested datatype
Ok(D::from_tokens(tokens)?)
}
// TODO: Add filter watchers
}

View File

@ -17,7 +17,7 @@ const POLL_INTERVAL: u64 = 7000;
#[derive(Debug, Clone)]
/// Helper which manages the deployment transaction of a smart contract
pub struct Deployer<'a, P, S> {
abi: &'a Abi,
abi: Abi,
client: &'a Client<P, S>,
tx: TransactionRequest,
confs: usize,
@ -61,7 +61,7 @@ where
time::delay_for(Duration::from_millis(POLL_INTERVAL)).await;
}
let contract = Contract::new(address, self.abi, self.client);
let contract = Contract::new(address, self.abi.clone(), self.client);
Ok(contract)
}
@ -107,7 +107,7 @@ where
/// .parse::<Wallet>()?.connect(provider);
///
/// // create a factory which will be used to deploy instances of the contract
/// let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client);
/// let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client);
///
/// // The deployer created by the `deploy` call exposes a builder which gets consumed
/// // by the async `send` call
@ -121,8 +121,8 @@ where
/// # }
pub struct ContractFactory<'a, P, S> {
client: &'a Client<P, S>,
abi: &'a Abi,
bytecode: &'a Bytes,
abi: Abi,
bytecode: Bytes,
}
impl<'a, P, S> ContractFactory<'a, P, S>
@ -133,7 +133,7 @@ where
/// Creates a factory for deployment of the Contract with bytecode, and the
/// constructor defined in the abi. The client will be used to send any deployment
/// transaction.
pub fn new(abi: &'a Abi, bytecode: &'a Bytes, client: &'a Client<P, S>) -> Self {
pub fn new(abi: Abi, bytecode: Bytes, client: &'a Client<P, S>) -> Self {
Self {
client,
abi,
@ -150,7 +150,7 @@ where
/// 1. The default poll duration is 7 seconds.
/// 1. The default number of confirmations is 1 block.
pub fn deploy<T: Tokenize>(
&self,
self,
constructor_args: T,
) -> Result<Deployer<'a, P, S>, ContractError> {
// Encode the constructor args & concatenate with the bytecode if necessary

View File

@ -0,0 +1,71 @@
use ethers_core::{
abi::{Abi, Detokenize, InvalidOutputType, Token},
types::{Address, Bytes},
};
use ethers_contract::{Contract, ContractFactory};
use ethers_core::utils::{Ganache, GanacheInstance, Solc};
use ethers_providers::{Http, Provider};
use ethers_signers::{Client, Wallet};
use std::convert::TryFrom;
// Note: We also provide the `abigen` macro for generating these bindings automatically
#[derive(Clone, Debug)]
pub struct ValueChanged {
pub old_author: Address,
pub new_author: Address,
pub old_value: String,
pub new_value: String,
}
impl Detokenize for ValueChanged {
fn from_tokens(tokens: Vec<Token>) -> Result<ValueChanged, InvalidOutputType> {
let old_author: Address = tokens[1].clone().to_address().unwrap();
let new_author: Address = tokens[1].clone().to_address().unwrap();
let old_value = tokens[2].clone().to_string().unwrap();
let new_value = tokens[3].clone().to_string().unwrap();
Ok(Self {
old_author,
new_author,
old_value,
new_value,
})
}
}
/// compiles the test contract
pub fn compile() -> (Abi, Bytes) {
let compiled = Solc::new("./tests/contract.sol").build().unwrap();
let contract = compiled
.get("SimpleStorage")
.expect("could not find contract");
(contract.abi.clone(), contract.bytecode.clone())
}
/// connects the private key to http://localhost:8545
pub fn connect(private_key: &str) -> Client<Http, Wallet> {
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
private_key.parse::<Wallet>().unwrap().connect(provider)
}
/// Launches a ganache instance and deploys the SimpleStorage contract
pub async fn deploy<'a>(
client: &'a Client<Http, Wallet>,
abi: Abi,
bytecode: Bytes,
) -> (GanacheInstance, Contract<'a, Http, Wallet>) {
let ganache = Ganache::new()
.mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle")
.spawn();
let factory = ContractFactory::new(abi, bytecode, client);
let contract = factory
.deploy("initial value".to_string())
.unwrap()
.send()
.await
.unwrap();
(ganache, contract)
}

View File

@ -1,48 +1,28 @@
use ethers_contract::ContractFactory;
use ethers_core::{
abi::{Detokenize, InvalidOutputType, Token},
types::{Address, H256},
utils::{Ganache, Solc},
utils::Ganache,
};
use ethers_providers::{Http, Provider};
use ethers_signers::Wallet;
use std::convert::TryFrom;
mod common;
pub use common::*;
#[tokio::test]
async fn deploy_and_call_contract() {
// compile the contract
let compiled = Solc::new("./tests/contract.sol").build().unwrap();
let contract = compiled
.get("SimpleStorage")
.expect("could not find contract");
let (abi, bytecode) = compile();
// launch ganache
let port = 8546u64;
let url = format!("http://localhost:{}", port).to_string();
let _ganache = Ganache::new().port(port)
let _ganache = Ganache::new()
.mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle")
.spawn();
// connect to the network
let provider = Provider::<Http>::try_from(url.as_str()).unwrap();
// instantiate our wallets
let [wallet1, wallet2]: [Wallet; 2] = [
"380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"
.parse()
.unwrap(),
"cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e"
.parse()
.unwrap(),
];
// Instantiate the clients. We assume that clients consume the provider and the wallet
// (which makes sense), so for multi-client tests, you must clone the provider.
let client = wallet1.connect(provider.clone());
let client2 = wallet2.connect(provider);
let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc");
let client2 = connect("cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e");
// create a factory which will be used to deploy instances of the contract
let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client);
let factory = ContractFactory::new(abi, bytecode, &client);
// `send` consumes the deployer so it must be cloned for later re-use
// (practically it's not expected that you'll need to deploy multiple instances of
@ -86,62 +66,4 @@ async fn deploy_and_call_contract() {
.unwrap();
assert_eq!(init_address, Address::zero());
assert_eq!(init_value, "initial value");
// we can still interact with the old contract instance
let _tx_hash = contract
.method::<_, H256>("setValue", "hi2".to_owned())
.unwrap()
.send()
.await
.unwrap();
assert_eq!(last_sender.clone().call().await.unwrap(), client.address());
assert_eq!(get_value.clone().call().await.unwrap(), "hi2");
// and we can fetch the events
let logs: Vec<ValueChanged> = contract
.event("ValueChanged")
.unwrap()
.from_block(0u64)
.topic1(client.address()) // Corresponds to the first indexed parameter
.query()
.await
.unwrap();
assert_eq!(logs[0].new_value, "initial value");
assert_eq!(logs[1].new_value, "hi2");
assert_eq!(logs.len(), 2);
let logs: Vec<ValueChanged> = contract2
.event("ValueChanged")
.unwrap()
.from_block(0u64)
.query()
.await
.unwrap();
assert_eq!(logs[0].new_value, "initial value");
assert_eq!(logs.len(), 1);
}
// Note: We also provide the `abigen` macro for generating these bindings automatically
#[derive(Clone, Debug)]
struct ValueChanged {
old_author: Address,
new_author: Address,
old_value: String,
new_value: String,
}
impl Detokenize for ValueChanged {
fn from_tokens(tokens: Vec<Token>) -> Result<ValueChanged, InvalidOutputType> {
let old_author: Address = tokens[1].clone().to_address().unwrap();
let new_author: Address = tokens[1].clone().to_address().unwrap();
let old_value = tokens[2].clone().to_string().unwrap();
let new_value = tokens[3].clone().to_string().unwrap();
Ok(Self {
old_author,
new_author,
old_value,
new_value,
})
}
}

View File

@ -0,0 +1,32 @@
use ethers_core::types::H256;
mod common;
use common::{compile, connect, deploy, ValueChanged};
#[tokio::test]
async fn get_past_events() {
let (abi, bytecode) = compile();
let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc");
let (_ganache, contract) = deploy(&client, abi, bytecode).await;
// make a call with `client2`
let _tx_hash = contract
.method::<_, H256>("setValue", "hi".to_owned())
.unwrap()
.send()
.await
.unwrap();
// and we can fetch the events
let logs: Vec<ValueChanged> = contract
.event("ValueChanged")
.unwrap()
.from_block(0u64)
.topic1(client.address()) // Corresponds to the first indexed parameter
.query()
.await
.unwrap();
assert_eq!(logs[0].new_value, "initial value");
assert_eq!(logs[1].new_value, "hi");
assert_eq!(logs.len(), 2);
}

View File

@ -0,0 +1,38 @@
use ethers_core::types::H256;
use ethers_providers::StreamExt;
mod common;
use common::{compile, connect, deploy, ValueChanged};
#[tokio::test]
async fn watch_events() {
let (abi, bytecode) = compile();
let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc");
let (_ganache, contract) = deploy(&client, abi, bytecode).await;
// We spawn the event listener:
let mut stream = contract
.event::<ValueChanged>("ValueChanged")
.unwrap()
.stream()
.await
.unwrap();
let num_calls = 3u64;
// and we make a few calls
for i in 0..num_calls {
let _tx_hash = contract
.method::<_, H256>("setValue", i.to_string())
.unwrap()
.send()
.await
.unwrap();
}
for i in 0..num_calls {
// unwrap the option of the stream, then unwrap the decoding result
let log = stream.next().await.unwrap().unwrap();
assert_eq!(log.new_value, i.to_string());
}
}

View File

@ -1,6 +1,6 @@
/// Utilities for launching a ganache-cli testnet instance
mod ganache;
pub use ganache::Ganache;
pub use ganache::{Ganache, GanacheInstance};
/// Solidity compiler bindings
mod solc;

View File

@ -14,6 +14,12 @@ serde_json = { version = "1.0.53", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
url = { version = "2.1.1", default-features = false }
# required for implementing stream on the filters
futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false }
pin-project = { version = "0.4.20", default-features = false }
tokio = { version = "0.2.21", default-features = false, features = ["time"] }
[dev-dependencies]
rustc-hex = "2.1.0"
tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] }

View File

@ -6,6 +6,12 @@ mod provider;
// ENS support
mod ens;
mod stream;
pub use stream::FilterStream;
// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream`
// without having to import futures themselves
pub use futures_util::StreamExt;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{error::Error, fmt::Debug};

View File

@ -1,10 +1,15 @@
use crate::{ens, http::Provider as HttpProvider, JsonRpcClient};
use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient,
};
use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, NameOrAddress, Selector,
Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, U256, U64,
Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, H256, U256, U64,
},
utils,
};
@ -52,6 +57,19 @@ pub enum ProviderError {
EnsError(String),
}
/// Types of filters supported by the JSON-RPC.
#[derive(Clone, Debug)]
pub enum FilterKind<'a> {
/// `eth_newBlockFilter`
Logs(&'a Filter),
/// `eth_newBlockFilter` filter
NewBlocks,
/// `eth_newPendingTransactionFilter` filter
PendingTransactions,
}
// JSON RPC bindings
impl<P: JsonRpcClient> Provider<P> {
/// Instantiate a new provider with a backend.
@ -263,7 +281,7 @@ impl<P: JsonRpcClient> Provider<P> {
Ok(self
.0
.request("eth_sendTransaction", Some(tx))
.request("eth_sendTransaction", Some(vec![tx]))
.await
.map_err(Into::into)?)
}
@ -305,6 +323,83 @@ impl<P: JsonRpcClient> Provider<P> {
.map_err(Into::into)?)
}
/// Streams matching filter logs
pub async fn watch(
&self,
filter: &Filter,
) -> Result<impl FilterStream<Log> + '_, ProviderError> {
let id = self.new_filter(FilterKind::Logs(filter)).await?;
let fut = move || Box::pin(self.get_filter_changes(id));
Ok(FilterWatcher::new(id, fut))
}
/// Streams new block hashes
pub async fn watch_blocks(&self) -> Result<impl FilterStream<H256> + '_, ProviderError> {
let id = self.new_filter(FilterKind::NewBlocks).await?;
let fut = move || Box::pin(self.get_filter_changes(id));
Ok(FilterWatcher::new(id, fut))
}
/// Streams pending transactions
pub async fn watch_pending_transactions(
&self,
) -> Result<impl FilterStream<H256> + '_, ProviderError> {
let id = self.new_filter(FilterKind::PendingTransactions).await?;
let fut = move || Box::pin(self.get_filter_changes(id));
Ok(FilterWatcher::new(id, fut))
}
/// Creates a filter object, based on filter options, to notify when the state changes (logs).
/// To check if the state has changed, call `get_filter_changes` with the filter id.
pub async fn new_filter(&self, filter: FilterKind<'_>) -> Result<U256, ProviderError> {
let (method, args) = match filter {
FilterKind::NewBlocks => ("eth_newBlockFilter", utils::serialize(&())),
FilterKind::PendingTransactions => {
("eth_newPendingTransactionFilter", utils::serialize(&()))
}
FilterKind::Logs(filter) => ("eth_newFilter", utils::serialize(&filter)),
};
Ok(self
.0
.request(method, Some(vec![args]))
.await
.map_err(Into::into)?)
}
/// Uninstalls a filter
pub async fn uninstall_filter<T: Into<U256>>(&self, id: T) -> Result<bool, ProviderError> {
let id = utils::serialize(&id.into());
Ok(self
.0
.request("eth_uninstallFilter", Some(vec![id]))
.await
.map_err(Into::into)?)
}
/// Polling method for a filter, which returns an array of logs which occurred since last poll.
///
/// This method must be called with one of the following return types, depending on the filter
/// type:
/// - `eth_newBlockFilter`: `H256`, returns block hashes
/// - `eth_newPendingTransactionFilter`: `H256`, returns transaction hashes
/// - `eth_newFilter`: `Log`, returns raw logs
///
/// If one of these types is not used, decoding will fail and the method will
/// return an error.
pub async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, ProviderError>
where
T: Into<U256>,
R: for<'a> Deserialize<'a>,
{
let id = utils::serialize(&id.into());
Ok(self
.0
.request("eth_getFilterChanges", Some(vec![id]))
.await
.map_err(Into::into)?)
}
// TODO: get_code, get_storage_at
////// Ethereum Naming Service
@ -364,6 +459,18 @@ impl<P: JsonRpcClient> Provider<P> {
Ok(decode_bytes(param, data))
}
#[cfg(test)]
/// ganache-only function for mining empty blocks
pub async fn mine(&self, num_blocks: usize) -> Result<(), ProviderError> {
for _ in 0..num_blocks {
self.0
.request::<_, U256>("evm_mine", None::<()>)
.await
.map_err(Into::into)?;
}
Ok(())
}
/// Sets the ENS Address (default: mainnet)
pub fn ens<T: Into<Address>>(mut self, ens: T) -> Self {
self.1 = Some(ens.into());
@ -438,3 +545,69 @@ mod ens_tests {
.unwrap_err();
}
}
#[cfg(test)]
mod tests {
use super::*;
use ethers_core::types::H256;
use futures_util::StreamExt;
#[tokio::test]
#[ignore]
// Ganache new block filters are super buggy! This test must be run with
// geth or parity running e.g. `geth --dev --rpc --dev.period 1`
async fn test_new_block_filter() {
let num_blocks = 3;
let provider = Provider::<HttpProvider>::try_from("http://localhost:8545").unwrap();
let start_block = provider.get_block_number().await.unwrap();
let stream = provider
.watch_blocks()
.await
.unwrap()
.interval(1000u64)
.stream();
let hashes: Vec<H256> = stream.take(num_blocks).collect::<Vec<H256>>().await;
for (i, hash) in hashes.iter().enumerate() {
let block = provider
.get_block(start_block + i as u64 + 1)
.await
.unwrap();
assert_eq!(*hash, block.hash.unwrap());
}
}
// this must be run with geth or parity since ganache-core still does not support
// eth_pendingTransactions, https://github.com/trufflesuite/ganache-core/issues/405
// example command: `geth --dev --rpc --dev.period 1`
#[tokio::test]
#[ignore]
async fn test_new_pending_txs_filter() {
let num_txs = 5;
let provider = Provider::<HttpProvider>::try_from("http://localhost:8545").unwrap();
let accounts = provider.get_accounts().await.unwrap();
let stream = provider
.watch_pending_transactions()
.await
.unwrap()
.interval(1000u64)
.stream();
let mut tx_hashes = Vec::new();
let tx = TransactionRequest::new()
.from(accounts[0])
.to(accounts[0])
.value(1e18 as u64);
for _ in 0..num_txs {
tx_hashes.push(provider.send_transaction(tx.clone()).await.unwrap());
}
let hashes: Vec<H256> = stream.take(num_txs).collect::<Vec<H256>>().await;
assert_eq!(tx_hashes, hashes);
}
}

View File

@ -0,0 +1,198 @@
use crate::ProviderError;
use ethers_core::types::U256;
use futures_core::{stream::Stream, TryFuture};
use futures_util::StreamExt;
use pin_project::pin_project;
use serde::Deserialize;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
vec::IntoIter,
};
use tokio::time::{interval, Interval};
const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);
/// Trait for streaming filters. You can get the id.
pub trait FilterStream<R>: StreamExt + Stream<Item = R>
where
R: for<'de> Deserialize<'de>,
{
/// Returns the filter's ID for it to be uninstalled
fn id(&self) -> U256;
/// Sets the stream's polling interval
fn interval<T: Into<u64>>(self, duration: T) -> Self;
/// Alias for Box::pin, must be called in order to pin the stream and be able
/// to call `next` on it.
fn stream(self) -> Pin<Box<Self>>
where
Self: Sized,
{
Box::pin(self)
}
}
enum FilterWatcherState<F, R> {
WaitForInterval,
GetFilterChanges(F),
NextItem(IntoIter<R>),
}
#[must_use = "filters do nothing unless you stream them"]
#[pin_project]
pub(crate) struct FilterWatcher<F: FutureFactory, R> {
id: U256,
#[pin]
// Future factory for generating new calls on each loop
factory: F,
// The polling interval
interval: Interval,
state: FilterWatcherState<F::FutureItem, R>,
}
impl<F, R> FilterWatcher<F, R>
where
F: FutureFactory,
R: for<'de> Deserialize<'de>,
{
/// Creates a new watcher with the provided factory and filter id.
pub fn new<T: Into<U256>>(id: T, factory: F) -> Self {
Self {
id: id.into(),
interval: interval(DEFAULT_POLL_DURATION),
state: FilterWatcherState::WaitForInterval,
factory,
}
}
}
impl<F, R> FilterStream<R> for FilterWatcher<F, R>
where
F: FutureFactory,
F::FutureItem: Future<Output = Result<Vec<R>, ProviderError>>,
R: for<'de> Deserialize<'de>,
{
fn id(&self) -> U256 {
self.id
}
fn interval<T: Into<u64>>(mut self, duration: T) -> Self {
self.interval = interval(Duration::from_millis(duration.into()));
self
}
}
// Pattern for flattening the returned Vec of filter changes taken from
// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67
impl<F, R> Stream for FilterWatcher<F, R>
where
F: FutureFactory,
F::FutureItem: Future<Output = Result<Vec<R>, ProviderError>>,
R: for<'de> Deserialize<'de>,
{
type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
*this.state = match this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let mut interval = Box::pin(this.interval.tick());
let _ready = futures_util::ready!(interval.as_mut().poll(cx));
// create a new instance of the future
FilterWatcherState::GetFilterChanges(this.factory.as_mut().new())
}
FilterWatcherState::GetFilterChanges(fut) => {
// wait for the future to be ready
let mut fut = Box::pin(fut);
// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> =
futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
},
};
}
}
}
// Do not leak private trait
// Pattern for re-usable futures from: https://gitlab.com/Ploppz/futures-retry/-/blob/std-futures/src/future.rs#L13
use factory::FutureFactory;
mod factory {
use super::*;
/// A factory trait used to create futures.
///
/// We need a factory for the stream logic because when (and if) a future
/// is polled to completion, it can't be polled again. Hence we need to
/// create a new one.
///
/// This trait is implemented for any closure that returns a `Future`, so you don't
/// have to write your own type and implement it to handle some simple cases.
pub trait FutureFactory {
/// A future type that is created by the `new` method.
type FutureItem: TryFuture + Unpin;
/// Creates a new future. We don't need the factory to be immutable so we
/// pass `self` as a mutable reference.
fn new(self: Pin<&mut Self>) -> Self::FutureItem;
}
impl<T, F> FutureFactory for T
where
T: Unpin + FnMut() -> F,
F: TryFuture + Unpin,
{
type FutureItem = F;
#[allow(clippy::new_ret_no_self)]
fn new(self: Pin<&mut Self>) -> F {
(*self.get_mut())()
}
}
}
#[cfg(test)]
mod watch {
use super::*;
use futures_util::StreamExt;
#[tokio::test]
async fn stream() {
let factory = || Box::pin(async { Ok::<Vec<u64>, ProviderError>(vec![1, 2, 3]) });
let filter = FilterWatcher::<_, u64>::new(1, factory);
let mut stream = filter.interval(1u64).stream();
assert_eq!(stream.next().await.unwrap(), 1);
assert_eq!(stream.next().await.unwrap(), 2);
assert_eq!(stream.next().await.unwrap(), 3);
// this will poll the factory function again since it consumed the entire
// vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds
// until new blocks are mined, until the call to the factory returns a non-empty
// vector of logs
assert_eq!(stream.next().await.unwrap(), 1);
}
}

View File

@ -38,7 +38,7 @@ async fn main() -> Result<()> {
let client = wallet.connect(provider);
// 6. create a factory which will be used to deploy instances of the contract
let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client);
let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client);
// 7. deploy it with the constructor arguments
let contract = factory.deploy("initial value".to_string())?.send().await?;