diff --git a/.circleci/config.yml b/.circleci/config.yml index d9291546..a8972554 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,43 +3,10 @@ version: 2.1 commands: setup-lints: steps: - - run: - name: Install cargo-audit - command: cargo install cargo-audit - run: name: Install clippy command: rustup component add clippy - # https://medium.com/@edouard.oger/rust-caching-on-circleci-using-sccache-c996344f0115 - setup-sccache: - steps: - - run: - name: Install sccache - command: | - cargo install sccache - # This configures Rust to use sccache. - echo 'export "RUSTC_WRAPPER"="sccache"' >> $BASH_ENV - # This is the maximum space sccache cache will use on disk. - echo 'export "SCCACHE_CACHE_SIZE"="1G"' >> $BASH_ENV - sccache --version - - restore-sccache-cache: - steps: - - restore_cache: - name: Restore sccache cache - key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }} - save-sccache-cache: - steps: - - save_cache: - name: Save sccache cache - # We use {{ epoch }} to always upload a fresh cache: - # Of course, restore_cache will not find this exact key, - # but it will fall back to the closest key (aka the most recent). - # See https://discuss.circleci.com/t/add-mechanism-to-update-existing-cache-key/9014/13 - key: sccache-cache-stable-{{ arch }}-{{ .Environment.CIRCLE_JOB }}-{{ epoch }} - paths: - - "~/.cache/sccache" - jobs: build: docker: @@ -47,18 +14,18 @@ jobs: steps: - checkout - setup-lints - - setup-sccache - - restore-sccache-cache + - run: + name: Reduce codegen Units + # If we don't include this, the linker runs out of memory when building + # the project on CI. We don't include this normally though because + # it should be able to build with more units on other machines + command: printf "[profile.dev]\ncodegen-units = 1\n" >> Cargo.toml - run: name: tests # skip these temporarily until we get ganache-cli and solc on CI - command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth + command: cargo test --all -- --skip deploy_and_call_contract --skip send_eth --skip watch_events --skip get_past_events - run: name: Check style command: | cargo fmt --all -- --check cargo clippy --all-targets --all-features -- -D warnings - - run: - name: Audit Dependencies - command: cargo audit - - save-sccache-cache diff --git a/Cargo.lock b/Cargo.lock index 2ff32515..39a85304 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ dependencies = [ "ethers-core", "ethers-providers", "ethers-signers", + "futures", "once_cell", "rustc-hex", "serde", @@ -358,6 +359,9 @@ version = "0.1.0" dependencies = [ "async-trait", "ethers-core", + "futures-core", + "futures-util", + "pin-project", "reqwest", "rustc-hex", "serde", @@ -418,6 +422,21 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.5" @@ -425,6 +444,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -433,6 +453,23 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + [[package]] name = "futures-macro" version = "0.3.5" @@ -466,9 +503,13 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project", "pin-utils", "proc-macro-hack", @@ -860,18 +901,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc93aeee735e60ecb40cf740eb319ff23eab1c5748abfdb5c180e4ce49f7791" +checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e58db2081ba5b4c93bd6be09c40fd36cb9193a8336c384f3b40012e531aa7e40" +checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" dependencies = [ "proc-macro2", "quote", diff --git a/ethers-contract/Cargo.toml b/ethers-contract/Cargo.toml index 231f0957..cd4e2102 100644 --- a/ethers-contract/Cargo.toml +++ b/ethers-contract/Cargo.toml @@ -17,6 +17,7 @@ rustc-hex = { version = "2.1.0", default-features = false } thiserror = { version = "1.0.19", default-features = false } once_cell = { version = "1.4.0", default-features = false } tokio = { version = "0.2.21", default-features = false } +futures = "0.3.5" [dev-dependencies] tokio = { version = "0.2.21", default-features = false, features = ["macros"] } diff --git a/ethers-contract/ethers-contract-abigen/src/contract.rs b/ethers-contract/ethers-contract-abigen/src/contract.rs index f344ece1..5ad5f828 100644 --- a/ethers-contract/ethers-contract-abigen/src/contract.rs +++ b/ethers-contract/ethers-contract-abigen/src/contract.rs @@ -72,7 +72,7 @@ impl Context { /// client at the given `Address`. The contract derefs to a `ethers::Contract` /// object pub fn new>(address: T, client: &'a Client) -> Self { - let contract = Contract::new(address.into(), &#abi_name, client); + let contract = Contract::new(address.into(), #abi_name.clone(), client); Self(contract) } diff --git a/ethers-contract/src/contract.rs b/ethers-contract/src/contract.rs index a7ff34f7..8f4aeda0 100644 --- a/ethers-contract/src/contract.rs +++ b/ethers-contract/src/contract.rs @@ -75,7 +75,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; /// .parse::()?.connect(provider); /// /// // create the contract object at the address -/// let contract = Contract::new(address, &abi, &client); +/// let contract = Contract::new(address, abi, &client); /// /// // Calling constant methods is done by calling `call()` on the method builder. /// // (if the function takes no arguments, then you must use `()` as the argument) @@ -112,7 +112,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; /// # let abi: Abi = serde_json::from_str(r#"[]"#)?; /// # let provider = Provider::::try_from("http://localhost:8545").unwrap(); /// # let client = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc".parse::()?.connect(provider); -/// # let contract = Contract::new(address, &abi, &client); +/// # let contract = Contract::new(address, abi, &client); /// /// #[derive(Clone, Debug)] /// struct ValueChanged { @@ -160,7 +160,7 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData}; #[derive(Debug, Clone)] pub struct Contract<'a, P, S> { client: &'a Client, - abi: &'a Abi, + abi: Abi, address: Address, /// A mapping from method signature to a name-index pair for accessing @@ -176,7 +176,7 @@ where P: JsonRpcClient, { /// Creates a new contract from the provided client, abi and address - pub fn new(address: Address, abi: &'a Abi, client: &'a Client) -> Self { + pub fn new(address: Address, abi: Abi, client: &'a Client) -> Self { let methods = create_mapping(&abi.functions, |function| function.selector()); Self { diff --git a/ethers-contract/src/event.rs b/ethers-contract/src/event.rs index d13bbee6..053a474f 100644 --- a/ethers-contract/src/event.rs +++ b/ethers-contract/src/event.rs @@ -1,12 +1,13 @@ use crate::ContractError; -use ethers_providers::{JsonRpcClient, Provider}; +use ethers_providers::{FilterStream, JsonRpcClient, Provider}; use ethers_core::{ abi::{Detokenize, Event as AbiEvent, RawLog}, types::{BlockNumber, Filter, Log, ValueOrArray, H256}, }; +use futures::stream::{Stream, StreamExt}; use std::{collections::HashMap, marker::PhantomData}; /// Helper for managing the event filter before querying or streaming its logs @@ -60,6 +61,21 @@ impl Event<'_, '_, P, D> { } } +impl<'a, 'b, P, D> Event<'a, 'b, P, D> +where + P: JsonRpcClient, + D: 'b + Detokenize + Clone, + 'a: 'b, +{ + /// Returns a stream for the event + pub async fn stream( + self, + ) -> Result> + 'b, ContractError> { + let filter = self.provider.watch(&self.filter).await?; + Ok(filter.stream().map(move |log| self.parse_log(log))) + } +} + impl Event<'_, '_, P, D> where P: JsonRpcClient, @@ -107,6 +123,4 @@ where // convert the tokens to the requested datatype Ok(D::from_tokens(tokens)?) } - - // TODO: Add filter watchers } diff --git a/ethers-contract/src/factory.rs b/ethers-contract/src/factory.rs index 24667373..76d9ed8b 100644 --- a/ethers-contract/src/factory.rs +++ b/ethers-contract/src/factory.rs @@ -17,7 +17,7 @@ const POLL_INTERVAL: u64 = 7000; #[derive(Debug, Clone)] /// Helper which manages the deployment transaction of a smart contract pub struct Deployer<'a, P, S> { - abi: &'a Abi, + abi: Abi, client: &'a Client, tx: TransactionRequest, confs: usize, @@ -61,7 +61,7 @@ where time::delay_for(Duration::from_millis(POLL_INTERVAL)).await; } - let contract = Contract::new(address, self.abi, self.client); + let contract = Contract::new(address, self.abi.clone(), self.client); Ok(contract) } @@ -107,7 +107,7 @@ where /// .parse::()?.connect(provider); /// /// // create a factory which will be used to deploy instances of the contract -/// let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); +/// let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client); /// /// // The deployer created by the `deploy` call exposes a builder which gets consumed /// // by the async `send` call @@ -121,8 +121,8 @@ where /// # } pub struct ContractFactory<'a, P, S> { client: &'a Client, - abi: &'a Abi, - bytecode: &'a Bytes, + abi: Abi, + bytecode: Bytes, } impl<'a, P, S> ContractFactory<'a, P, S> @@ -133,7 +133,7 @@ where /// Creates a factory for deployment of the Contract with bytecode, and the /// constructor defined in the abi. The client will be used to send any deployment /// transaction. - pub fn new(abi: &'a Abi, bytecode: &'a Bytes, client: &'a Client) -> Self { + pub fn new(abi: Abi, bytecode: Bytes, client: &'a Client) -> Self { Self { client, abi, @@ -150,7 +150,7 @@ where /// 1. The default poll duration is 7 seconds. /// 1. The default number of confirmations is 1 block. pub fn deploy( - &self, + self, constructor_args: T, ) -> Result, ContractError> { // Encode the constructor args & concatenate with the bytecode if necessary diff --git a/ethers-contract/tests/common/mod.rs b/ethers-contract/tests/common/mod.rs new file mode 100644 index 00000000..3e9c7890 --- /dev/null +++ b/ethers-contract/tests/common/mod.rs @@ -0,0 +1,71 @@ +use ethers_core::{ + abi::{Abi, Detokenize, InvalidOutputType, Token}, + types::{Address, Bytes}, +}; + +use ethers_contract::{Contract, ContractFactory}; +use ethers_core::utils::{Ganache, GanacheInstance, Solc}; +use ethers_providers::{Http, Provider}; +use ethers_signers::{Client, Wallet}; +use std::convert::TryFrom; + +// Note: We also provide the `abigen` macro for generating these bindings automatically +#[derive(Clone, Debug)] +pub struct ValueChanged { + pub old_author: Address, + pub new_author: Address, + pub old_value: String, + pub new_value: String, +} + +impl Detokenize for ValueChanged { + fn from_tokens(tokens: Vec) -> Result { + let old_author: Address = tokens[1].clone().to_address().unwrap(); + let new_author: Address = tokens[1].clone().to_address().unwrap(); + let old_value = tokens[2].clone().to_string().unwrap(); + let new_value = tokens[3].clone().to_string().unwrap(); + + Ok(Self { + old_author, + new_author, + old_value, + new_value, + }) + } +} + +/// compiles the test contract +pub fn compile() -> (Abi, Bytes) { + let compiled = Solc::new("./tests/contract.sol").build().unwrap(); + let contract = compiled + .get("SimpleStorage") + .expect("could not find contract"); + (contract.abi.clone(), contract.bytecode.clone()) +} + +/// connects the private key to http://localhost:8545 +pub fn connect(private_key: &str) -> Client { + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + private_key.parse::().unwrap().connect(provider) +} + +/// Launches a ganache instance and deploys the SimpleStorage contract +pub async fn deploy<'a>( + client: &'a Client, + abi: Abi, + bytecode: Bytes, +) -> (GanacheInstance, Contract<'a, Http, Wallet>) { + let ganache = Ganache::new() + .mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle") + .spawn(); + + let factory = ContractFactory::new(abi, bytecode, client); + let contract = factory + .deploy("initial value".to_string()) + .unwrap() + .send() + .await + .unwrap(); + + (ganache, contract) +} diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index 05c40188..8e97e5b5 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -1,48 +1,28 @@ use ethers_contract::ContractFactory; use ethers_core::{ - abi::{Detokenize, InvalidOutputType, Token}, types::{Address, H256}, - utils::{Ganache, Solc}, + utils::Ganache, }; -use ethers_providers::{Http, Provider}; -use ethers_signers::Wallet; -use std::convert::TryFrom; + +mod common; +pub use common::*; #[tokio::test] async fn deploy_and_call_contract() { - // compile the contract - let compiled = Solc::new("./tests/contract.sol").build().unwrap(); - let contract = compiled - .get("SimpleStorage") - .expect("could not find contract"); + let (abi, bytecode) = compile(); // launch ganache - let port = 8546u64; - let url = format!("http://localhost:{}", port).to_string(); - let _ganache = Ganache::new().port(port) + let _ganache = Ganache::new() .mnemonic("abstract vacuum mammal awkward pudding scene penalty purchase dinner depart evoke puzzle") .spawn(); - // connect to the network - let provider = Provider::::try_from(url.as_str()).unwrap(); - - // instantiate our wallets - let [wallet1, wallet2]: [Wallet; 2] = [ - "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc" - .parse() - .unwrap(), - "cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e" - .parse() - .unwrap(), - ]; - // Instantiate the clients. We assume that clients consume the provider and the wallet // (which makes sense), so for multi-client tests, you must clone the provider. - let client = wallet1.connect(provider.clone()); - let client2 = wallet2.connect(provider); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let client2 = connect("cc96601bc52293b53c4736a12af9130abf347669b3813f9ec4cafdf6991b087e"); // create a factory which will be used to deploy instances of the contract - let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); + let factory = ContractFactory::new(abi, bytecode, &client); // `send` consumes the deployer so it must be cloned for later re-use // (practically it's not expected that you'll need to deploy multiple instances of @@ -86,62 +66,4 @@ async fn deploy_and_call_contract() { .unwrap(); assert_eq!(init_address, Address::zero()); assert_eq!(init_value, "initial value"); - - // we can still interact with the old contract instance - let _tx_hash = contract - .method::<_, H256>("setValue", "hi2".to_owned()) - .unwrap() - .send() - .await - .unwrap(); - assert_eq!(last_sender.clone().call().await.unwrap(), client.address()); - assert_eq!(get_value.clone().call().await.unwrap(), "hi2"); - - // and we can fetch the events - let logs: Vec = contract - .event("ValueChanged") - .unwrap() - .from_block(0u64) - .topic1(client.address()) // Corresponds to the first indexed parameter - .query() - .await - .unwrap(); - assert_eq!(logs[0].new_value, "initial value"); - assert_eq!(logs[1].new_value, "hi2"); - assert_eq!(logs.len(), 2); - - let logs: Vec = contract2 - .event("ValueChanged") - .unwrap() - .from_block(0u64) - .query() - .await - .unwrap(); - assert_eq!(logs[0].new_value, "initial value"); - assert_eq!(logs.len(), 1); -} - -// Note: We also provide the `abigen` macro for generating these bindings automatically -#[derive(Clone, Debug)] -struct ValueChanged { - old_author: Address, - new_author: Address, - old_value: String, - new_value: String, -} - -impl Detokenize for ValueChanged { - fn from_tokens(tokens: Vec) -> Result { - let old_author: Address = tokens[1].clone().to_address().unwrap(); - let new_author: Address = tokens[1].clone().to_address().unwrap(); - let old_value = tokens[2].clone().to_string().unwrap(); - let new_value = tokens[3].clone().to_string().unwrap(); - - Ok(Self { - old_author, - new_author, - old_value, - new_value, - }) - } } diff --git a/ethers-contract/tests/get_past_events.rs b/ethers-contract/tests/get_past_events.rs new file mode 100644 index 00000000..dcd51ed1 --- /dev/null +++ b/ethers-contract/tests/get_past_events.rs @@ -0,0 +1,32 @@ +use ethers_core::types::H256; + +mod common; +use common::{compile, connect, deploy, ValueChanged}; + +#[tokio::test] +async fn get_past_events() { + let (abi, bytecode) = compile(); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let (_ganache, contract) = deploy(&client, abi, bytecode).await; + + // make a call with `client2` + let _tx_hash = contract + .method::<_, H256>("setValue", "hi".to_owned()) + .unwrap() + .send() + .await + .unwrap(); + + // and we can fetch the events + let logs: Vec = contract + .event("ValueChanged") + .unwrap() + .from_block(0u64) + .topic1(client.address()) // Corresponds to the first indexed parameter + .query() + .await + .unwrap(); + assert_eq!(logs[0].new_value, "initial value"); + assert_eq!(logs[1].new_value, "hi"); + assert_eq!(logs.len(), 2); +} diff --git a/ethers-contract/tests/watch_events.rs b/ethers-contract/tests/watch_events.rs new file mode 100644 index 00000000..d8b9ccaa --- /dev/null +++ b/ethers-contract/tests/watch_events.rs @@ -0,0 +1,38 @@ +use ethers_core::types::H256; +use ethers_providers::StreamExt; + +mod common; +use common::{compile, connect, deploy, ValueChanged}; + +#[tokio::test] +async fn watch_events() { + let (abi, bytecode) = compile(); + let client = connect("380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"); + let (_ganache, contract) = deploy(&client, abi, bytecode).await; + + // We spawn the event listener: + let mut stream = contract + .event::("ValueChanged") + .unwrap() + .stream() + .await + .unwrap(); + + let num_calls = 3u64; + + // and we make a few calls + for i in 0..num_calls { + let _tx_hash = contract + .method::<_, H256>("setValue", i.to_string()) + .unwrap() + .send() + .await + .unwrap(); + } + + for i in 0..num_calls { + // unwrap the option of the stream, then unwrap the decoding result + let log = stream.next().await.unwrap().unwrap(); + assert_eq!(log.new_value, i.to_string()); + } +} diff --git a/ethers-core/src/utils/mod.rs b/ethers-core/src/utils/mod.rs index 13069f5b..3d0ffc73 100644 --- a/ethers-core/src/utils/mod.rs +++ b/ethers-core/src/utils/mod.rs @@ -1,6 +1,6 @@ /// Utilities for launching a ganache-cli testnet instance mod ganache; -pub use ganache::Ganache; +pub use ganache::{Ganache, GanacheInstance}; /// Solidity compiler bindings mod solc; diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 5f55ec14..40c61fa3 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -14,6 +14,12 @@ serde_json = { version = "1.0.53", default-features = false } thiserror = { version = "1.0.19", default-features = false } url = { version = "2.1.1", default-features = false } +# required for implementing stream on the filters +futures-core = { version = "0.3.5", default-features = false } +futures-util = { version = "0.3.5", default-features = false } +pin-project = { version = "0.4.20", default-features = false } +tokio = { version = "0.2.21", default-features = false, features = ["time"] } + [dev-dependencies] rustc-hex = "2.1.0" tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] } diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 0dafd68c..82bfc083 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -6,6 +6,12 @@ mod provider; // ENS support mod ens; +mod stream; +pub use stream::FilterStream; +// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream` +// without having to import futures themselves +pub use futures_util::StreamExt; + use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::{error::Error, fmt::Debug}; diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 567fb06e..c90254d6 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -1,10 +1,15 @@ -use crate::{ens, http::Provider as HttpProvider, JsonRpcClient}; +use crate::{ + ens, + http::Provider as HttpProvider, + stream::{FilterStream, FilterWatcher}, + JsonRpcClient, +}; use ethers_core::{ abi::{self, Detokenize, ParamType}, types::{ Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, NameOrAddress, Selector, - Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, U256, U64, + Signature, Transaction, TransactionReceipt, TransactionRequest, TxHash, H256, U256, U64, }, utils, }; @@ -52,6 +57,19 @@ pub enum ProviderError { EnsError(String), } +/// Types of filters supported by the JSON-RPC. +#[derive(Clone, Debug)] +pub enum FilterKind<'a> { + /// `eth_newBlockFilter` + Logs(&'a Filter), + + /// `eth_newBlockFilter` filter + NewBlocks, + + /// `eth_newPendingTransactionFilter` filter + PendingTransactions, +} + // JSON RPC bindings impl Provider

{ /// Instantiate a new provider with a backend. @@ -263,7 +281,7 @@ impl Provider

{ Ok(self .0 - .request("eth_sendTransaction", Some(tx)) + .request("eth_sendTransaction", Some(vec![tx])) .await .map_err(Into::into)?) } @@ -305,6 +323,83 @@ impl Provider

{ .map_err(Into::into)?) } + /// Streams matching filter logs + pub async fn watch( + &self, + filter: &Filter, + ) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::Logs(filter)).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Streams new block hashes + pub async fn watch_blocks(&self) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::NewBlocks).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Streams pending transactions + pub async fn watch_pending_transactions( + &self, + ) -> Result + '_, ProviderError> { + let id = self.new_filter(FilterKind::PendingTransactions).await?; + let fut = move || Box::pin(self.get_filter_changes(id)); + Ok(FilterWatcher::new(id, fut)) + } + + /// Creates a filter object, based on filter options, to notify when the state changes (logs). + /// To check if the state has changed, call `get_filter_changes` with the filter id. + pub async fn new_filter(&self, filter: FilterKind<'_>) -> Result { + let (method, args) = match filter { + FilterKind::NewBlocks => ("eth_newBlockFilter", utils::serialize(&())), + FilterKind::PendingTransactions => { + ("eth_newPendingTransactionFilter", utils::serialize(&())) + } + FilterKind::Logs(filter) => ("eth_newFilter", utils::serialize(&filter)), + }; + + Ok(self + .0 + .request(method, Some(vec![args])) + .await + .map_err(Into::into)?) + } + + /// Uninstalls a filter + pub async fn uninstall_filter>(&self, id: T) -> Result { + let id = utils::serialize(&id.into()); + Ok(self + .0 + .request("eth_uninstallFilter", Some(vec![id])) + .await + .map_err(Into::into)?) + } + + /// Polling method for a filter, which returns an array of logs which occurred since last poll. + /// + /// This method must be called with one of the following return types, depending on the filter + /// type: + /// - `eth_newBlockFilter`: `H256`, returns block hashes + /// - `eth_newPendingTransactionFilter`: `H256`, returns transaction hashes + /// - `eth_newFilter`: `Log`, returns raw logs + /// + /// If one of these types is not used, decoding will fail and the method will + /// return an error. + pub async fn get_filter_changes(&self, id: T) -> Result, ProviderError> + where + T: Into, + R: for<'a> Deserialize<'a>, + { + let id = utils::serialize(&id.into()); + Ok(self + .0 + .request("eth_getFilterChanges", Some(vec![id])) + .await + .map_err(Into::into)?) + } + // TODO: get_code, get_storage_at ////// Ethereum Naming Service @@ -364,6 +459,18 @@ impl Provider

{ Ok(decode_bytes(param, data)) } + #[cfg(test)] + /// ganache-only function for mining empty blocks + pub async fn mine(&self, num_blocks: usize) -> Result<(), ProviderError> { + for _ in 0..num_blocks { + self.0 + .request::<_, U256>("evm_mine", None::<()>) + .await + .map_err(Into::into)?; + } + Ok(()) + } + /// Sets the ENS Address (default: mainnet) pub fn ens>(mut self, ens: T) -> Self { self.1 = Some(ens.into()); @@ -438,3 +545,69 @@ mod ens_tests { .unwrap_err(); } } + +#[cfg(test)] +mod tests { + use super::*; + use ethers_core::types::H256; + use futures_util::StreamExt; + + #[tokio::test] + #[ignore] + // Ganache new block filters are super buggy! This test must be run with + // geth or parity running e.g. `geth --dev --rpc --dev.period 1` + async fn test_new_block_filter() { + let num_blocks = 3; + + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let start_block = provider.get_block_number().await.unwrap(); + + let stream = provider + .watch_blocks() + .await + .unwrap() + .interval(1000u64) + .stream(); + + let hashes: Vec = stream.take(num_blocks).collect::>().await; + for (i, hash) in hashes.iter().enumerate() { + let block = provider + .get_block(start_block + i as u64 + 1) + .await + .unwrap(); + assert_eq!(*hash, block.hash.unwrap()); + } + } + + // this must be run with geth or parity since ganache-core still does not support + // eth_pendingTransactions, https://github.com/trufflesuite/ganache-core/issues/405 + // example command: `geth --dev --rpc --dev.period 1` + #[tokio::test] + #[ignore] + async fn test_new_pending_txs_filter() { + let num_txs = 5; + + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let accounts = provider.get_accounts().await.unwrap(); + + let stream = provider + .watch_pending_transactions() + .await + .unwrap() + .interval(1000u64) + .stream(); + + let mut tx_hashes = Vec::new(); + let tx = TransactionRequest::new() + .from(accounts[0]) + .to(accounts[0]) + .value(1e18 as u64); + + for _ in 0..num_txs { + tx_hashes.push(provider.send_transaction(tx.clone()).await.unwrap()); + } + + let hashes: Vec = stream.take(num_txs).collect::>().await; + assert_eq!(tx_hashes, hashes); + } +} diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs new file mode 100644 index 00000000..613567ea --- /dev/null +++ b/ethers-providers/src/stream.rs @@ -0,0 +1,198 @@ +use crate::ProviderError; + +use ethers_core::types::U256; + +use futures_core::{stream::Stream, TryFuture}; +use futures_util::StreamExt; +use pin_project::pin_project; +use serde::Deserialize; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, + vec::IntoIter, +}; +use tokio::time::{interval, Interval}; + +const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000); + +/// Trait for streaming filters. You can get the id. +pub trait FilterStream: StreamExt + Stream +where + R: for<'de> Deserialize<'de>, +{ + /// Returns the filter's ID for it to be uninstalled + fn id(&self) -> U256; + + /// Sets the stream's polling interval + fn interval>(self, duration: T) -> Self; + + /// Alias for Box::pin, must be called in order to pin the stream and be able + /// to call `next` on it. + fn stream(self) -> Pin> + where + Self: Sized, + { + Box::pin(self) + } +} + +enum FilterWatcherState { + WaitForInterval, + GetFilterChanges(F), + NextItem(IntoIter), +} + +#[must_use = "filters do nothing unless you stream them"] +#[pin_project] +pub(crate) struct FilterWatcher { + id: U256, + + #[pin] + // Future factory for generating new calls on each loop + factory: F, + + // The polling interval + interval: Interval, + + state: FilterWatcherState, +} + +impl FilterWatcher +where + F: FutureFactory, + R: for<'de> Deserialize<'de>, +{ + /// Creates a new watcher with the provided factory and filter id. + pub fn new>(id: T, factory: F) -> Self { + Self { + id: id.into(), + interval: interval(DEFAULT_POLL_DURATION), + state: FilterWatcherState::WaitForInterval, + factory, + } + } +} + +impl FilterStream for FilterWatcher +where + F: FutureFactory, + F::FutureItem: Future, ProviderError>>, + R: for<'de> Deserialize<'de>, +{ + fn id(&self) -> U256 { + self.id + } + + fn interval>(mut self, duration: T) -> Self { + self.interval = interval(Duration::from_millis(duration.into())); + self + } +} + +// Pattern for flattening the returned Vec of filter changes taken from +// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67 +impl Stream for FilterWatcher +where + F: FutureFactory, + F::FutureItem: Future, ProviderError>>, + R: for<'de> Deserialize<'de>, +{ + type Item = R; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + *this.state = match this.state { + FilterWatcherState::WaitForInterval => { + // Wait the polling period + let mut interval = Box::pin(this.interval.tick()); + let _ready = futures_util::ready!(interval.as_mut().poll(cx)); + + // create a new instance of the future + FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) + } + FilterWatcherState::GetFilterChanges(fut) => { + // wait for the future to be ready + let mut fut = Box::pin(fut); + + // NOTE: If the provider returns an error, this will return an empty + // vector. Should we make this return a Result instead? Ideally if we're + // in a streamed loop we wouldn't want the loop to terminate if an error + // is encountered (since it might be a temporary error). + let items: Vec = + futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); + FilterWatcherState::NextItem(items.into_iter()) + } + // Consume 1 element from the vector. If more elements are in the vector, + // the next call will immediately go to this branch instead of trying to get + // filter changes again. Once the whole vector is consumed, it will poll again + // for new logs + FilterWatcherState::NextItem(iter) => match iter.next() { + Some(item) => return Poll::Ready(Some(item)), + None => FilterWatcherState::WaitForInterval, + }, + }; + } + } +} + +// Do not leak private trait +// Pattern for re-usable futures from: https://gitlab.com/Ploppz/futures-retry/-/blob/std-futures/src/future.rs#L13 +use factory::FutureFactory; +mod factory { + use super::*; + + /// A factory trait used to create futures. + /// + /// We need a factory for the stream logic because when (and if) a future + /// is polled to completion, it can't be polled again. Hence we need to + /// create a new one. + /// + /// This trait is implemented for any closure that returns a `Future`, so you don't + /// have to write your own type and implement it to handle some simple cases. + pub trait FutureFactory { + /// A future type that is created by the `new` method. + type FutureItem: TryFuture + Unpin; + + /// Creates a new future. We don't need the factory to be immutable so we + /// pass `self` as a mutable reference. + fn new(self: Pin<&mut Self>) -> Self::FutureItem; + } + + impl FutureFactory for T + where + T: Unpin + FnMut() -> F, + F: TryFuture + Unpin, + { + type FutureItem = F; + + #[allow(clippy::new_ret_no_self)] + fn new(self: Pin<&mut Self>) -> F { + (*self.get_mut())() + } + } +} + +#[cfg(test)] +mod watch { + use super::*; + use futures_util::StreamExt; + + #[tokio::test] + async fn stream() { + let factory = || Box::pin(async { Ok::, ProviderError>(vec![1, 2, 3]) }); + let filter = FilterWatcher::<_, u64>::new(1, factory); + let mut stream = filter.interval(1u64).stream(); + assert_eq!(stream.next().await.unwrap(), 1); + assert_eq!(stream.next().await.unwrap(), 2); + assert_eq!(stream.next().await.unwrap(), 3); + // this will poll the factory function again since it consumed the entire + // vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds + // until new blocks are mined, until the call to the factory returns a non-empty + // vector of logs + assert_eq!(stream.next().await.unwrap(), 1); + } +} diff --git a/ethers/examples/contract.rs b/ethers/examples/contract.rs index 2203e3e8..ebc24af1 100644 --- a/ethers/examples/contract.rs +++ b/ethers/examples/contract.rs @@ -38,7 +38,7 @@ async fn main() -> Result<()> { let client = wallet.connect(provider); // 6. create a factory which will be used to deploy instances of the contract - let factory = ContractFactory::new(&contract.abi, &contract.bytecode, &client); + let factory = ContractFactory::new(contract.abi.clone(), contract.bytecode.clone(), &client); // 7. deploy it with the constructor arguments let contract = factory.deploy("initial value".to_string())?.send().await?;