From d90b03da0602e808bb3b3b941d89893631b61e7f Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 15 Jun 2020 11:46:07 +0300 Subject: [PATCH] Add streamed logs to the provider (#9) * feat(provider): implement Streamed logs This utilizes eth_getFilterChanges. The stream struct must be instantiated with a factory that yields logs/hashes. Consumers are expected to use the `FilterStream` trait in order to simplify their type definitions * feat(provider): expose streaming methods * test(provider): add new blocks/pending txs test * feat(contract): allow events to be streamed * test(contract): add integration test for streaming event logs * perf(contract-factory): take abi and bytecode by value instead of reference The abi, bytecode and the factory's deploy method now consume the structs instead of being passed by reference. While this means that consumers might need to clone before using them, this gives us some more flexiblity around factories inside helper functions * refactor(contract): use test helpers to reduce code dup * chore: make clippy happy --- .circleci/config.yml | 47 +---- Cargo.lock | 49 ++++- ethers-contract/Cargo.toml | 1 + .../ethers-contract-abigen/src/contract.rs | 2 +- ethers-contract/src/contract.rs | 8 +- ethers-contract/src/event.rs | 20 +- ethers-contract/src/factory.rs | 14 +- ethers-contract/tests/common/mod.rs | 71 +++++++ ethers-contract/tests/contract.rs | 96 +-------- ethers-contract/tests/get_past_events.rs | 32 +++ ethers-contract/tests/watch_events.rs | 38 ++++ ethers-core/src/utils/mod.rs | 2 +- ethers-providers/Cargo.toml | 6 + ethers-providers/src/lib.rs | 6 + ethers-providers/src/provider.rs | 179 +++++++++++++++- ethers-providers/src/stream.rs | 198 ++++++++++++++++++ ethers/examples/contract.rs | 2 +- 17 files changed, 620 insertions(+), 151 deletions(-) create mode 100644 ethers-contract/tests/common/mod.rs create mode 100644 ethers-contract/tests/get_past_events.rs create mode 100644 ethers-contract/tests/watch_events.rs create mode 100644 ethers-providers/src/stream.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index d9291546..a8972554 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,43 +3,10 @@ version: 2.1 commands: setup-lints: steps: - - run: - name: Install cargo-audit - command: cargo install cargo-audit - run: name: Install clippy command: rustup component add clippy - # https://medium.com/@edouard.oger/rust-caching-on-circleci-using-sccache-c996344f0115 - setup-sccache: - steps: - - run: - name: Install sccache - command: | - cargo install sccache - # This configures Rust to use sccache. - echo 'export "RUSTC_WRAPPER"="sccache"' >> $BASH_ENV - # This is the maximum space sccache cache will use on disk. - echo 'export "SCCACHE_CACHE_SIZE"="1G"' >> $BASH_ENV - sccache --version - - restore-sccache-cache: - steps: - - restore_cache: - name: Restore sccache cache - key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }} - save-sccache-cache: - steps: - - save_cache: - name: Save sccache cache - # We use {{ epoch }} to always upload a fresh cache: - # Of course, restore_cache will not find this exact key, - # but it will fall back to the closest key (aka the most recent). - # See https://discuss.circleci.com/t/add-mechanism-to-update-existing-cache-key/9014/13 - key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }}-{{ epoch }} - paths: - - "~/.cache/sccache" - jobs: build: docker: @@ -47,18 +14,18 @@ jobs: steps: - checkout - setup-lints - - setup-sccache - - restore-sccache-cache + - run: + name: Reduce codegen Units + # If we don't include this, the linker runs out of memory when building + # the project on CI. We don't include this normally though because + # it should be able to build with more units on other machines + command: printf "[profile.dev]\ncodegen-units = 1\n" >> Cargo.toml - run: name: tests # skip these temporarily until we get ganache-cli and solc on CI - command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth + command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events - run: name: Check style command: | cargo fmt --all -- --check cargo clippy --all-targets --all-features -- -D warnings - - run: - name: Audit Dependencies - command: cargo audit - - save-sccache-cache diff --git a/Cargo.lock b/Cargo.lock index 2ff32515..39a85304 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ dependencies = [ "ethers-core", "ethers-providers", "ethers-signers", + "futures", "once_cell", "rustc-hex", "serde", @@ -358,6 +359,9 @@ version = "0.1.0" dependencies = [ "async-trait", "ethers-core", + "futures-core", + "futures-util", + "pin-project", "reqwest", "rustc-hex", "serde", @@ -418,6 +422,21 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.5" @@ -425,6 +444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -433,6 +453,23 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + [[package]] name = "futures-macro" version = "0.3.5" @@ -466,9 +503,13 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project", "pin-utils", "proc-macro-hack", @@ -860,18 +901,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" +checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" +checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" dependencies = [ "proc-macro2", "quote", diff --git a/ethers-contract/Cargo.toml b/ethers-contract/Cargo.toml index 231f0957..cd4e2102 100644 --- a/ethers-contract/Cargo.toml +++ b/ethers-contract/Cargo.toml @@ -17,6 +17,7 @@ rustc-hex = { version = "2.1.0", default-features = false } thiserror = { version = "1.0.19", default-features = false } once_cell = { version = "1.4.0", default-features = false } tokio = { version = "0.2.21", default-features = false } +futures = "0.3.5" [dev-dependencies] tokio = { version = "0.2.21", default-features = false, features = ["macros"] } diff --git a/ethers-contract/ethers-contract-abigen/src/contract.rs b/ethers-contract/ethers-contract-abigen/src/contract.rs index f344ece1..5ad5f828 100644 --- a/ethers-contract/ethers-contract-abigen/src/contract.rs +++ b/ethers-contract/ethers-contract-abigen/src/contract.rs @@ -72,7 +72,7 @@ impl Context { /// client at the given `Address`. The contract derefs to a `ethers::Contract` /// object pub fn new>(address: T, client: &'a Client) -> Self { - let contract = Contract::new(address.into(), &#abi_name, client); + let contract = Contract::new(address.into(), #abi_name.clone(), client); Self(contract) } diff --git a/ethers-contract/src/contract.rs b/ethers-contract/src/contract.rs index a7ff34f7..8f4aeda0 100644 --- a/ethers-contract/src/contract.rs +++ b/ethers-contract/src/contract.rs @@ -75,7 +75,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; /// .parse::()?.connect(provider); /// /// // create the contract object at the address -/// let contract = Contract::new(address, &abi, &client); +/// let contract = Contract::new(address, abi, &client); /// /// // Calling constant methods is done by calling `call()` on the method builder. /// // (if the function takes no arguments, then you must use `()` as the argument) @@ -112,7 +112,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; /// # let abi: Abi = serde_json::from_str(r#"[]"#)?; /// # let provider = Provider::::try_from("http://localhost:8545").unwrap(); /// # let client = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc".parse::()?.connect(provider); -/// # let contract = Contract::new(address, &abi, &client); +/// # let contract = Contract::new(address, abi, &client); /// /// #[derive(Clone, Debug)] /// struct ValueChanged { @@ -160,7 +160,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; #[derive(Debug, Clone)] pub struct Contract<'a, P, S> { client: &'a Client, - abi: &'a Abi, + abi: Abi, address: Address, /// A mapping from method signature to a name-index pair for accessing @@ -176,7 +176,7 @@ where P: JsonRpcClient, { /// Creates a new contract from the provided client, abi and address - pub fn new(address: Address, abi: &'a Abi, client: &'a Client) -> Self { + pub fn new(address: Address, abi: Abi, client: &'a Client) -> Self { let methods = create_mapping(&abi.functions, |function| function.selector()); Self { diff --git a/ethers-contract/src/event.rs b/ethers-contract/src/event.rs index d13bbee6..053a474f 100644 --- a/ethers-contract/src/event.rs +++ b/ethers-contract/src/event.rs @@ -1,12 +1,13 @@ use crate::ContractError; -use ethers_providers::{JsonRpcClient, Provider}; +use ethers_providers::{FilterStream, JsonRpcClient, Provider}; use ethers_core::{ abi::{Detokenize, Event as AbiEvent, RawLog}, types::{BlockNumber, Filter, Log, ValueOrArray, H256}, }; +use futures::stream::{Stream, StreamExt}; use std::{collections::HashMap, marker::PhantomData}; /// Helper for managing the event filter before querying or streaming its logs @@ -60,6 +61,21 @@ impl Event<'_, '_, P, D> { } } +impl<'a, 'b, P, D> Event<'a, 'b, P, D> +where + P: JsonRpcClient, + D: 'b + Detokenize + Clone, + 'a: 'b, +{ + /// Returns a stream for the event + pub async fn stream( + self, + ) -> Result> + 'b, ContractError> { + let filter = self.provider.watch(&self.filter).await?; + Ok(filter.stream().map(move |log| self.parse_log(log))) + } +} + impl Event<'_, '_, P, D> where P: JsonRpcClient, @@ -107,6 +123,4 @@ where // convert the tokens to the requested datatype Ok(D::from_tokens(tokens)?) } - - // TODO: Add filter watchers } diff --git a/ethers-contract/src/factory.rs b/ethers-contract/src/factory.rs index 24667373..76d9ed8b 100644 --- a/ethers-contract/src/factory.rs +++ b/ethers-contract/src/factory.rs @@ -17,7 +17,7 @@ const POLL_INTERVAL: u64 = 7000; #[derive(Debug, Clone)] /// Helper which manages the deployment transaction of a smart contract pub struct Deployer<'a, P, S> { - abi: &'a Abi, + abi: Abi, client: &'a Client, tx: TransactionRequest, confs: usize, @@ -61,7 +61,7 @@ where time::delay_for(Duration::from_millis(POLL_INTERVAL)).await; } - let contract = Contract::new(address, self.abi, self.client); + let contract = Contract::new(address, self.abi.clone(), self.client); Ok(contract) } @@ -107,7 +107,7 @@ where /// .parse::()?.connect(provider); /// /// // create a factory which will be used to deploy instances of the contract -/// let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); +/// let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client); /// /// // The deployer created by the `deploy` call exposes a builder which gets consumed /// // by the async `send` call @@ -121,8 +121,8 @@ where /// # } pub struct ContractFactory<'a, P, S> { client: &'a Client, - abi: &'a Abi, - bytecode: &'a Bytes, + abi: Abi, + bytecode: Bytes, } impl<'a, P, S> ContractFactory<'a, P, S> @@ -133,7 +133,7 @@ where /// Creates a factory for deployment of the Contract with bytecode, and the /// constructor defined in the abi. The client will be used to send any deployment /// transaction. - pub fn new(abi: &'a Abi, bytecode: &'a Bytes, client: &'a Client) -> Self { + pub fn new(abi: Abi, bytecode: Bytes, client: &'a Client) -> Self { Self { client, abi, @@ -150,7 +150,7 @@ where /// 1. The default poll duration is 7 seconds. /// 1. The default number of confirmations is 1 block. pub fn deploy( - &self, + self, constructor_args: T, ) -> Result, ContractError> { // Encode the constructor args & concatenate with the bytecode if necessary diff --git a/ethers-contract/tests/common/mod.rs b/ethers-contract/tests/common/mod.rs new file mode 100644 index 00000000..3e9c7890 --- /dev/null +++ b/ethers-contract/tests/common/mod.rs @@ -0,0 +1,71 @@ +use ethers_core::{ + abi::{Abi, Detokenize, InvalidOutputType, Token}, + types::{Address, Bytes}, +}; + +use ethers_contract::{Contract, ContractFactory}; +use ethers_core::utils::{Ganache, GanacheInstance, Solc}; +use ethers_providers::{Http, Provider}; +use ethers_signers::{Client, Wallet}; +use std::convert::TryFrom; + +// Note: We also provide the `abigen` macro for generating these bindings automatically +#[derive(Clone, Debug)] +pub struct ValueChanged { + pub old_author: Address, + pub new_author: Address, + pub old_value: String, + pub new_value: String, +} + +impl Detokenize for ValueChanged { + fn from_tokens(tokens: Vec) -> Result { + let old_author: Address = tokens[1].clone().to_address().unwrap(); + let new_author: Address = tokens[1].clone().to_address().unwrap(); + let old_value = tokens[2].clone().to_string().unwrap(); + let new_value = tokens[3].clone().to_string().unwrap(); + + Ok(Self { + old_author, + new_author, + old_value, + new_value, + }) + } +} + +/// compiles the test contract +pub fn compile() -> (Abi, Bytes) { + let compiled = Solc::new("./tests/contract.sol").build().unwrap(); + let contract = compiled + .get("SimpleStorage") + .expect("could not find contract"); + (contract.abi.clone(), contract.bytecode.clone()) +} + +/// connects the private key to http://localhost:8545 +pub fn connect(private_key: &str) -> Client { + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + private_key.parse::().unwrap().connect(provider) +} + +/// Launches a ganache instance and deploys the SimpleStorage contract +pub async fn deploy<'a>( + client: &'a Client, + abi: Abi, + bytecode: Bytes, +) -> (GanacheInstance, Contract<'a, Http, Wallet>) { + let ganache = Ganache::new() + .mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle") + .spawn(); + + let factory = ContractFactory::new(abi, bytecode, client); + let contract = factory + .deploy("initial value".to_string()) + .unwrap() + .send() + .await + .unwrap(); + + (ganache, contract) +} diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index 05c40188..8e97e5b5 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -1,48 +1,28 @@ use ethers_contract::ContractFactory; use ethers_core::{ - abi::{Detokenize, InvalidOutputType, Token}, types::{Address, H256}, - utils::{Ganache, Solc}, + utils::Ganache, }; -use ethers_providers::{Http, Provider}; -use ethers_signers::Wallet; -use std::convert::TryFrom; + +mod common; +pub use common::*; #[tokio::test] async fn deploy_and_call_contract() { - // compile the contract - let compiled = Solc::new("./tests/contract.sol").build().unwrap(); - let contract = compiled - .get("SimpleStorage") - .expect("could not find contract"); + let (abi, bytecode) = compile(); // launch ganache - let port = 8546u64; - let url = format!("http://localhost:{}", port).to_string(); - let _ganache = Ganache::new().port(port) + let _ganache = Ganache::new() .mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle") .spawn(); - // connect to the network - let provider = Provider::::try_from(url.as_str()).unwrap(); - - // instantiate our wallets - let [wallet1, wallet2]: [Wallet; 2] = [ - "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc" - .parse() - .unwrap(), - "cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e" - .parse() - .unwrap(), - ]; - // Instantiate the clients. We assume that clients consume the provider and the wallet // (which makes sense), so for multi-client tests, you must clone the provider. - let client = wallet1.connect(provider.clone()); - let client2 = wallet2.connect(provider); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let client2 = connect("cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e"); // create a factory which will be used to deploy instances of the contract - let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); + let factory = ContractFactory::new(abi, bytecode, &client); // `send` consumes the deployer so it must be cloned for later re-use // (practically it's not expected that you'll need to deploy multiple instances of @@ -86,62 +66,4 @@ async fn deploy_and_call_contract() { .unwrap(); assert_eq!(init_address, Address::zero()); assert_eq!(init_value, "initial value"); - - // we can still interact with the old contract instance - let _tx_hash = contract - .method::<_, H256>("setValue", "hi2".to_owned()) - .unwrap() - .send() - .await - .unwrap(); - assert_eq!(last_sender.clone().call().await.unwrap(), client.address()); - assert_eq!(get_value.clone().call().await.unwrap(), "hi2"); - - // and we can fetch the events - let logs: Vec = contract - .event("ValueChanged") - .unwrap() - .from_block(0u64) - .topic1(client.address()) // Corresponds to the first indexed parameter - .query() - .await - .unwrap(); - assert_eq!(logs[0].new_value, "initial value"); - assert_eq!(logs[1].new_value, "hi2"); - assert_eq!(logs.len(), 2); - - let logs: Vec = contract2 - .event("ValueChanged") - .unwrap() - .from_block(0u64) - .query() - .await - .unwrap(); - assert_eq!(logs[0].new_value, "initial value"); - assert_eq!(logs.len(), 1); -} - -// Note: We also provide the `abigen` macro for generating these bindings automatically -#[derive(Clone, Debug)] -struct ValueChanged { - old_author: Address, - new_author: Address, - old_value: String, - new_value: String, -} - -impl Detokenize for ValueChanged { - fn from_tokens(tokens: Vec) -> Result { - let old_author: Address = tokens[1].clone().to_address().unwrap(); - let new_author: Address = tokens[1].clone().to_address().unwrap(); - let old_value = tokens[2].clone().to_string().unwrap(); - let new_value = tokens[3].clone().to_string().unwrap(); - - Ok(Self { - old_author, - new_author, - old_value, - new_value, - }) - } } diff --git a/ethers-contract/tests/get_past_events.rs b/ethers-contract/tests/get_past_events.rs new file mode 100644 index 00000000..dcd51ed1 --- /dev/null +++ b/ethers-contract/tests/get_past_events.rs @@ -0,0 +1,32 @@ +use ethers_core::types::H256; + +mod common; +use common::{compile, connect, deploy, ValueChanged}; + +#[tokio::test] +async fn get_past_events() { + let (abi, bytecode) = compile(); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let (_ganache, contract) = deploy(&client, abi, bytecode).await; + + // make a call with `client2` + let _tx_hash = contract + .method::<_, H256>("setValue", "hi".to_owned()) + .unwrap() + .send() + .await + .unwrap(); + + // and we can fetch the events + let logs: Vec = contract + .event("ValueChanged") + .unwrap() + .from_block(0u64) + .topic1(client.address()) // Corresponds to the first indexed parameter + .query() + .await + .unwrap(); + assert_eq!(logs[0].new_value, "initial value"); + assert_eq!(logs[1].new_value, "hi"); + assert_eq!(logs.len(), 2); +} diff --git a/ethers-contract/tests/watch_events.rs b/ethers-contract/tests/watch_events.rs new file mode 100644 index 00000000..d8b9ccaa --- /dev/null +++ b/ethers-contract/tests/watch_events.rs @@ -0,0 +1,38 @@ +use ethers_core::types::H256; +use ethers_providers::StreamExt; + +mod common; +use common::{compile, connect, deploy, ValueChanged}; + +#[tokio::test] +async fn watch_events() { + let (abi, bytecode) = compile(); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let (_ganache, contract) = deploy(&client, abi, bytecode).await; + + // We spawn the event listener: + let mut stream = contract + .event::("ValueChanged") + .unwrap() + .stream() + .await + .unwrap(); + + let num_calls = 3u64; + + // and we make a few calls + for i in 0..num_calls { + let _tx_hash = contract + .method::<_, H256>("setValue", i.to_string()) + .unwrap() + .send() + .await + .unwrap(); + } + + for i in 0..num_calls { + // unwrap the option of the stream, then unwrap the decoding result + let log = stream.next().await.unwrap().unwrap(); + assert_eq!(log.new_value, i.to_string()); + } +} diff --git a/ethers-core/src/utils/mod.rs b/ethers-core/src/utils/mod.rs index 13069f5b..3d0ffc73 100644 --- a/ethers-core/src/utils/mod.rs +++ b/ethers-core/src/utils/mod.rs @@ -1,6 +1,6 @@ /// Utilities for launching a ganache-cli testnet instance mod ganache; -pub use ganache::Ganache; +pub use ganache::{Ganache, GanacheInstance}; /// Solidity compiler bindings mod solc; diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 5f55ec14..40c61fa3 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -14,6 +14,12 @@ serde_json = { version = "1.0.53", default-features = false } thiserror = { version = "1.0.19", default-features = false } url = { version = "2.1.1", default-features = false } +# required for implementing stream on the filters +futures-core = { version = "0.3.5", default-features = false } +futures-util = { version = "0.3.5", default-features = false } +pin-project = { version = "0.4.20", default-features = false } +tokio = { version = "0.2.21", default-features = false, features = ["time"] } + [dev-dependencies] rustc-hex = "2.1.0" tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] } diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 0dafd68c..82bfc083 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -6,6 +6,12 @@ mod provider; // ENS support mod ens; +mod stream; +pub use stream::FilterStream; +// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream` +// without having to import futures themselves +pub use futures_util::StreamExt; + use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::{error::Error, fmt::Debug}; diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 567fb06e..c90254d6 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -1,10 +1,15 @@ -use crate::{ens, http::Provider as HttpProvider, JsonRpcClient}; +use crate::{ + ens, + http::Provider as HttpProvider, + stream::{FilterStream, FilterWatcher}, + JsonRpcClient, +}; use ethers_core::{ abi::{self, Detokenize, ParamType}, types::{ Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, NameOrAddress, Selector, - Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, U256, U64, + Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, H256, U256, U64, }, utils, }; @@ -52,6 +57,19 @@ pub enum ProviderError { EnsError(String), } +/// Types of filters supported by the JSON-RPC. +#[derive(Clone, Debug)] +pub enum FilterKind<'a> { + /// `eth_newBlockFilter` + Logs(&'a Filter), + + /// `eth_newBlockFilter` filter + NewBlocks, + + /// `eth_newPendingTransactionFilter` filter + PendingTransactions, +} + // JSON RPC bindings impl Provider

{ /// Instantiate a new provider with a backend. @@ -263,7 +281,7 @@ impl Provider

{ Ok(self .0 - .request("eth_sendTransaction", Some(tx)) + .request("eth_sendTransaction", Some(vec![tx])) .await .map_err(Into::into)?) } @@ -305,6 +323,83 @@ impl Provider

{ .map_err(Into::into)?) } + /// Streams matching filter logs + pub async fn watch( + &self, + filter: &Filter, + ) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::Logs(filter)).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Streams new block hashes + pub async fn watch_blocks(&self) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::NewBlocks).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Streams pending transactions + pub async fn watch_pending_transactions( + &self, + ) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::PendingTransactions).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Creates a filter object, based on filter options, to notify when the state changes (logs). + /// To check if the state has changed, call `get_filter_changes` with the filter id. + pub async fn new_filter(&self, filter: FilterKind<'_>) -> Result { + let (method, args) = match filter { + FilterKind::NewBlocks => ("eth_newBlockFilter", utils::serialize(&())), + FilterKind::PendingTransactions => { + ("eth_newPendingTransactionFilter", utils::serialize(&())) + } + FilterKind::Logs(filter) => ("eth_newFilter", utils::serialize(&filter)), + }; + + Ok(self + .0 + .request(method, Some(vec![args])) + .await + .map_err(Into::into)?) + } + + /// Uninstalls a filter + pub async fn uninstall_filter>(&self, id: T) -> Result { + let id = utils::serialize(&id.into()); + Ok(self + .0 + .request("eth_uninstallFilter", Some(vec![id])) + .await + .map_err(Into::into)?) + } + + /// Polling method for a filter, which returns an array of logs which occurred since last poll. + /// + /// This method must be called with one of the following return types, depending on the filter + /// type: + /// - `eth_newBlockFilter`: `H256`, returns block hashes + /// - `eth_newPendingTransactionFilter`: `H256`, returns transaction hashes + /// - `eth_newFilter`: `Log`, returns raw logs + /// + /// If one of these types is not used, decoding will fail and the method will + /// return an error. + pub async fn get_filter_changes(&self, id: T) -> Result, ProviderError> + where + T: Into, + R: for<'a> Deserialize<'a>, + { + let id = utils::serialize(&id.into()); + Ok(self + .0 + .request("eth_getFilterChanges", Some(vec![id])) + .await + .map_err(Into::into)?) + } + // TODO: get_code, get_storage_at ////// Ethereum Naming Service @@ -364,6 +459,18 @@ impl Provider

{ Ok(decode_bytes(param, data)) } + #[cfg(test)] + /// ganache-only function for mining empty blocks + pub async fn mine(&self, num_blocks: usize) -> Result<(), ProviderError> { + for _ in 0..num_blocks { + self.0 + .request::<_, U256>("evm_mine", None::<()>) + .await + .map_err(Into::into)?; + } + Ok(()) + } + /// Sets the ENS Address (default: mainnet) pub fn ens>(mut self, ens: T) -> Self { self.1 = Some(ens.into()); @@ -438,3 +545,69 @@ mod ens_tests { .unwrap_err(); } } + +#[cfg(test)] +mod tests { + use super::*; + use ethers_core::types::H256; + use futures_util::StreamExt; + + #[tokio::test] + #[ignore] + // Ganache new block filters are super buggy! This test must be run with + // geth or parity running e.g. `geth --dev --rpc --dev.period 1` + async fn test_new_block_filter() { + let num_blocks = 3; + + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let start_block = provider.get_block_number().await.unwrap(); + + let stream = provider + .watch_blocks() + .await + .unwrap() + .interval(1000u64) + .stream(); + + let hashes: Vec = stream.take(num_blocks).collect::>().await; + for (i, hash) in hashes.iter().enumerate() { + let block = provider + .get_block(start_block + i as u64 + 1) + .await + .unwrap(); + assert_eq!(*hash, block.hash.unwrap()); + } + } + + // this must be run with geth or parity since ganache-core still does not support + // eth_pendingTransactions, https://github.com/trufflesuite/ganache-core/issues/405 + // example command: `geth --dev --rpc --dev.period 1` + #[tokio::test] + #[ignore] + async fn test_new_pending_txs_filter() { + let num_txs = 5; + + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let accounts = provider.get_accounts().await.unwrap(); + + let stream = provider + .watch_pending_transactions() + .await + .unwrap() + .interval(1000u64) + .stream(); + + let mut tx_hashes = Vec::new(); + let tx = TransactionRequest::new() + .from(accounts[0]) + .to(accounts[0]) + .value(1e18 as u64); + + for _ in 0..num_txs { + tx_hashes.push(provider.send_transaction(tx.clone()).await.unwrap()); + } + + let hashes: Vec = stream.take(num_txs).collect::>().await; + assert_eq!(tx_hashes, hashes); + } +} diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs new file mode 100644 index 00000000..613567ea --- /dev/null +++ b/ethers-providers/src/stream.rs @@ -0,0 +1,198 @@ +use crate::ProviderError; + +use ethers_core::types::U256; + +use futures_core::{stream::Stream, TryFuture}; +use futures_util::StreamExt; +use pin_project::pin_project; +use serde::Deserialize; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, + vec::IntoIter, +}; +use tokio::time::{interval, Interval}; + +const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000); + +/// Trait for streaming filters. You can get the id. +pub trait FilterStream: StreamExt + Stream +where + R: for<'de> Deserialize<'de>, +{ + /// Returns the filter's ID for it to be uninstalled + fn id(&self) -> U256; + + /// Sets the stream's polling interval + fn interval>(self, duration: T) -> Self; + + /// Alias for Box::pin, must be called in order to pin the stream and be able + /// to call `next` on it. + fn stream(self) -> Pin> + where + Self: Sized, + { + Box::pin(self) + } +} + +enum FilterWatcherState { + WaitForInterval, + GetFilterChanges(F), + NextItem(IntoIter), +} + +#[must_use = "filters do nothing unless you stream them"] +#[pin_project] +pub(crate) struct FilterWatcher { + id: U256, + + #[pin] + // Future factory for generating new calls on each loop + factory: F, + + // The polling interval + interval: Interval, + + state: FilterWatcherState, +} + +impl FilterWatcher +where + F: FutureFactory, + R: for<'de> Deserialize<'de>, +{ + /// Creates a new watcher with the provided factory and filter id. + pub fn new>(id: T, factory: F) -> Self { + Self { + id: id.into(), + interval: interval(DEFAULT_POLL_DURATION), + state: FilterWatcherState::WaitForInterval, + factory, + } + } +} + +impl FilterStream for FilterWatcher +where + F: FutureFactory, + F::FutureItem: Future, ProviderError>>, + R: for<'de> Deserialize<'de>, +{ + fn id(&self) -> U256 { + self.id + } + + fn interval>(mut self, duration: T) -> Self { + self.interval = interval(Duration::from_millis(duration.into())); + self + } +} + +// Pattern for flattening the returned Vec of filter changes taken from +// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67 +impl Stream for FilterWatcher +where + F: FutureFactory, + F::FutureItem: Future, ProviderError>>, + R: for<'de> Deserialize<'de>, +{ + type Item = R; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + *this.state = match this.state { + FilterWatcherState::WaitForInterval => { + // Wait the polling period + let mut interval = Box::pin(this.interval.tick()); + let _ready = futures_util::ready!(interval.as_mut().poll(cx)); + + // create a new instance of the future + FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) + } + FilterWatcherState::GetFilterChanges(fut) => { + // wait for the future to be ready + let mut fut = Box::pin(fut); + + // NOTE: If the provider returns an error, this will return an empty + // vector. Should we make this return a Result instead? Ideally if we're + // in a streamed loop we wouldn't want the loop to terminate if an error + // is encountered (since it might be a temporary error). + let items: Vec = + futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); + FilterWatcherState::NextItem(items.into_iter()) + } + // Consume 1 element from the vector. If more elements are in the vector, + // the next call will immediately go to this branch instead of trying to get + // filter changes again. Once the whole vector is consumed, it will poll again + // for new logs + FilterWatcherState::NextItem(iter) => match iter.next() { + Some(item) => return Poll::Ready(Some(item)), + None => FilterWatcherState::WaitForInterval, + }, + }; + } + } +} + +// Do not leak private trait +// Pattern for re-usable futures from: https://gitlab.com/Ploppz/futures-retry/-/blob/std-futures/src/future.rs#L13 +use factory::FutureFactory; +mod factory { + use super::*; + + /// A factory trait used to create futures. + /// + /// We need a factory for the stream logic because when (and if) a future + /// is polled to completion, it can't be polled again. Hence we need to + /// create a new one. + /// + /// This trait is implemented for any closure that returns a `Future`, so you don't + /// have to write your own type and implement it to handle some simple cases. + pub trait FutureFactory { + /// A future type that is created by the `new` method. + type FutureItem: TryFuture + Unpin; + + /// Creates a new future. We don't need the factory to be immutable so we + /// pass `self` as a mutable reference. + fn new(self: Pin<&mut Self>) -> Self::FutureItem; + } + + impl FutureFactory for T + where + T: Unpin + FnMut() -> F, + F: TryFuture + Unpin, + { + type FutureItem = F; + + #[allow(clippy::new_ret_no_self)] + fn new(self: Pin<&mut Self>) -> F { + (*self.get_mut())() + } + } +} + +#[cfg(test)] +mod watch { + use super::*; + use futures_util::StreamExt; + + #[tokio::test] + async fn stream() { + let factory = || Box::pin(async { Ok::, ProviderError>(vec![1, 2, 3]) }); + let filter = FilterWatcher::<_, u64>::new(1, factory); + let mut stream = filter.interval(1u64).stream(); + assert_eq!(stream.next().await.unwrap(), 1); + assert_eq!(stream.next().await.unwrap(), 2); + assert_eq!(stream.next().await.unwrap(), 3); + // this will poll the factory function again since it consumed the entire + // vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds + // until new blocks are mined, until the call to the factory returns a non-empty + // vector of logs + assert_eq!(stream.next().await.unwrap(), 1); + } +} diff --git a/ethers/examples/contract.rs b/ethers/examples/contract.rs index 2203e3e8..ebc24af1 100644 --- a/ethers/examples/contract.rs +++ b/ethers/examples/contract.rs @@ -38,7 +38,7 @@ async fn main() -> Result<()> { let client = wallet.connect(provider); // 6. create a factory which will be used to deploy instances of the contract - let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); + let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client); // 7. deploy it with the constructor arguments let contract = factory.deploy("initial value".to_string())?.send().await?;