From 1ece5d2020f9704bd5514e6af1255f8da453bbb7 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 30 Nov 2020 11:33:06 +0200 Subject: [PATCH] Contract & Provider eth_subscribe support (#100) * fix(block): fix block decoding from ws * feat(pubsub): add pubsub traits and sub stream Also use DeserializeOwned alias * feat(transports): add notification type * feat(ws): rewrite Ws for subscription support * feat(provider): add eth_subscribe * fix(celo): disable some celo tests due to ganache incompatibilities * test(rinkeby): fix flaky test * feat(contract): WS subscription bindings (#101) * feat(middleware): add subscriptions to middleware methods * feat(contract): add subscribe method to contracts --- Cargo.lock | 77 +++-- ethers-contract/Cargo.toml | 2 + ethers-contract/src/event.rs | 49 ++- ethers-contract/src/lib.rs | 3 + ethers-contract/src/stream.rs | 41 +++ ethers-contract/tests/contract.rs | 20 +- ethers-core/src/types/block.rs | 22 +- ethers-core/src/types/transaction.rs | 2 +- ethers-middleware/tests/nonce_manager.rs | 3 + ethers-providers/Cargo.toml | 3 +- ethers-providers/src/lib.rs | 63 +++- ethers-providers/src/provider.rs | 96 +++++- ethers-providers/src/pubsub.rs | 98 ++++++ ethers-providers/src/stream.rs | 6 +- ethers-providers/src/transports/common.rs | 17 +- ethers-providers/src/transports/http.rs | 4 +- ethers-providers/src/transports/mock.rs | 4 +- ethers-providers/src/transports/mod.rs | 2 +- ethers-providers/src/transports/ws.rs | 378 ++++++++++++++++------ 19 files changed, 723 insertions(+), 167 deletions(-) create mode 100644 ethers-contract/src/stream.rs create mode 100644 ethers-providers/src/pubsub.rs diff --git a/Cargo.lock b/Cargo.lock index 77be0279..c7ce4b46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,7 +252,7 @@ dependencies = [ "futures-util", "log", "native-tls", - "pin-project", + "pin-project 0.4.25", "tokio", "tokio-native-tls", "tungstenite", @@ -775,7 +775,9 @@ dependencies = [ "ethers-providers", "ethers-signers", "futures", + "futures-util", "once_cell", + "pin-project 1.0.2", "rustc-hex", "serde", "serde_json", @@ -866,10 +868,11 @@ dependencies = [ "async-tungstenite", "ethers", "ethers-core", + "futures-channel", "futures-core", "futures-timer", "futures-util", - "pin-project", + "pin-project 1.0.2", "reqwest", "rustc-hex", "serde", @@ -994,9 +997,9 @@ checksum = "0ba62103ce691c2fd80fbae2213dfdda9ce60804973ac6b6e97de818ea7f52c8" [[package]] name = "futures" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0" dependencies = [ "futures-channel", "futures-core", @@ -1009,9 +1012,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" dependencies = [ "futures-core", "futures-sink", @@ -1019,15 +1022,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" [[package]] name = "futures-executor" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65" dependencies = [ "futures-core", "futures-task", @@ -1037,9 +1040,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" [[package]] name = "futures-lite" @@ -1058,9 +1061,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -1070,15 +1073,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" [[package]] name = "futures-task" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" dependencies = [ "once_cell", ] @@ -1091,9 +1094,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" dependencies = [ "futures-channel", "futures-core", @@ -1102,7 +1105,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project", + "pin-project 1.0.2", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1276,7 +1279,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project", + "pin-project 0.4.25", "socket2", "tokio", "tower-service", @@ -1751,7 +1754,16 @@ version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b9e280448854bd91559252582173b3bd1f8e094a0e644791c0628ca9b1f144f" dependencies = [ - "pin-project-internal", + "pin-project-internal 0.4.25", +] + +[[package]] +name = "pin-project" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" +dependencies = [ + "pin-project-internal 1.0.2", ] [[package]] @@ -1765,6 +1777,17 @@ dependencies = [ "syn", ] +[[package]] +name = "pin-project-internal" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.10" @@ -1817,9 +1840,9 @@ dependencies = [ [[package]] name = "proc-macro-hack" -version = "0.5.18" +version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro-nested" @@ -2249,9 +2272,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd" [[package]] name = "syn" -version = "1.0.42" +version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c51d92969d209b54a98397e1b91c8ae82d8c87a7bb87df0b29aa2ad81454228" +checksum = "3b4f34193997d92804d359ed09953e25d5138df6bcc055a71bf68ee89fdf9223" dependencies = [ "proc-macro2", "quote", diff --git a/ethers-contract/Cargo.toml b/ethers-contract/Cargo.toml index f19cd253..2d1e01dd 100644 --- a/ethers-contract/Cargo.toml +++ b/ethers-contract/Cargo.toml @@ -22,6 +22,8 @@ rustc-hex = { version = "2.1.0", default-features = false } thiserror = { version = "1.0.15", default-features = false } once_cell = { version = "1.3.1", default-features = false } futures = "0.3.5" +futures-util = "0.3.8" +pin-project = "1.0.2" [dev-dependencies] ethers = { version = "0.1.3", path = "../ethers" } diff --git a/ethers-contract/src/event.rs b/ethers-contract/src/event.rs index f5c6432e..3e53b60b 100644 --- a/ethers-contract/src/event.rs +++ b/ethers-contract/src/event.rs @@ -1,13 +1,10 @@ -use crate::{base::decode_event, ContractError}; - -use ethers_providers::Middleware; +use crate::{base::decode_event, ContractError, EventStream}; use ethers_core::{ abi::{Detokenize, Event as AbiEvent}, types::{BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U64}, }; - -use futures::stream::{Stream, StreamExt}; +use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream}; use std::marker::PhantomData; /// Helper for managing the event filter before querying or streaming its logs @@ -67,18 +64,52 @@ impl<'a, 'b, M, D> Event<'a, 'b, M, D> where M: Middleware, D: 'b + Detokenize + Clone, - 'a: 'b, { /// Returns a stream for the event pub async fn stream( - self, - ) -> Result>> + 'b, ContractError> { + &'a self, + ) -> Result< + // Wraps the FilterWatcher with a mapping to the event + EventStream<'a, FilterWatcher<'a, M::Provider, Log>, D, ContractError>, + ContractError, + > { let filter = self .provider .watch(&self.filter) .await .map_err(ContractError::MiddlewareError)?; - Ok(filter.stream().map(move |log| self.parse_log(log))) + Ok(EventStream::new( + filter.id, + filter, + Box::new(move |log| self.parse_log(log)), + )) + } +} + +impl<'a, 'b, M, D> Event<'a, 'b, M, D> +where + M: Middleware, + ::Provider: PubsubClient, + D: 'b + Detokenize + Clone, +{ + /// Returns a subscription for the event + pub async fn subscribe( + &'a self, + ) -> Result< + // Wraps the SubscriptionStream with a mapping to the event + EventStream<'a, SubscriptionStream<'a, M::Provider, Log>, D, ContractError>, + ContractError, + > { + let filter = self + .provider + .subscribe_logs(&self.filter) + .await + .map_err(ContractError::MiddlewareError)?; + Ok(EventStream::new( + filter.id, + filter, + Box::new(move |log| self.parse_log(log)), + )) } } diff --git a/ethers-contract/src/lib.rs b/ethers-contract/src/lib.rs index b36e267e..423d32a5 100644 --- a/ethers-contract/src/lib.rs +++ b/ethers-contract/src/lib.rs @@ -27,6 +27,9 @@ pub use factory::ContractFactory; mod event; +mod stream; +pub use stream::EventStream; + mod multicall; pub use multicall::Multicall; diff --git a/ethers-contract/src/stream.rs b/ethers-contract/src/stream.rs new file mode 100644 index 00000000..938c796f --- /dev/null +++ b/ethers-contract/src/stream.rs @@ -0,0 +1,41 @@ +use ethers_core::types::{Log, U256}; +use futures::stream::{Stream, StreamExt}; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type MapEvent<'a, R, E> = Box Result + 'a>; + +#[pin_project] +/// Generic wrapper around Log streams, mapping their content to a specific +/// deserialized log struct. +/// +/// We use this wrapper type instead of `StreamExt::map` in order to preserve +/// information about the filter/subscription's id. +pub struct EventStream<'a, T, R, E> { + pub id: U256, + #[pin] + stream: T, + parse: MapEvent<'a, R, E>, +} + +impl<'a, T, R, E> EventStream<'a, T, R, E> { + pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self { + Self { id, stream, parse } + } +} + +impl<'a, T, R, E> Stream for EventStream<'a, T, R, E> +where + T: Stream + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + let mut this = self.project(); + match futures_util::ready!(this.stream.poll_next_unpin(ctx)) { + Some(item) => Poll::Ready(Some((this.parse)(item))), + None => Poll::Pending, + } + } +} diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index 4d342353..84833f42 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -125,15 +125,19 @@ mod eth_tests { let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol"); let ganache = Ganache::new().spawn(); let client = connect(&ganache, 0); - let contract = deploy(client, abi, bytecode).await; + let contract = deploy(client, abi.clone(), bytecode).await; // We spawn the event listener: - let mut stream = contract - .event::("ValueChanged") - .unwrap() - .stream() - .await - .unwrap(); + let event = contract.event::("ValueChanged").unwrap(); + let mut stream = event.stream().await.unwrap(); + assert_eq!(stream.id, 1.into()); + + // Also set up a subscription for the same thing + let ws = Provider::connect(ganache.ws_endpoint()).await.unwrap(); + let contract2 = ethers_contract::Contract::new(contract.address(), abi, ws); + let event2 = contract2.event::("ValueChanged").unwrap(); + let mut subscription = event2.subscribe().await.unwrap(); + assert_eq!(subscription.id, 2.into()); let num_calls = 3u64; @@ -151,6 +155,8 @@ mod eth_tests { for i in 0..num_calls { // unwrap the option of the stream, then unwrap the decoding result let log = stream.next().await.unwrap().unwrap(); + let log2 = subscription.next().await.unwrap().unwrap(); + assert_eq!(log.new_value, log2.new_value); assert_eq!(log.new_value, i.to_string()); } } diff --git a/ethers-core/src/types/block.rs b/ethers-core/src/types/block.rs index 5c54df3d..7e734fe6 100644 --- a/ethers-core/src/types/block.rs +++ b/ethers-core/src/types/block.rs @@ -9,43 +9,45 @@ pub struct Block { /// Hash of the block pub hash: Option, /// Hash of the parent - #[serde(rename = "parentHash")] + #[serde(default, rename = "parentHash")] pub parent_hash: H256, /// Hash of the uncles #[cfg(not(feature = "celo"))] - #[serde(rename = "sha3Uncles")] + #[serde(default, rename = "sha3Uncles")] pub uncles_hash: H256, /// Miner/author's address. - #[serde(rename = "miner")] + #[serde(default, rename = "miner")] pub author: Address, /// State root hash - #[serde(rename = "stateRoot")] + #[serde(default, rename = "stateRoot")] pub state_root: H256, /// Transactions root hash - #[serde(rename = "transactionsRoot")] + #[serde(default, rename = "transactionsRoot")] pub transactions_root: H256, /// Transactions receipts root hash - #[serde(rename = "receiptsRoot")] + #[serde(default, rename = "receiptsRoot")] pub receipts_root: H256, /// Block number. None if pending. pub number: Option, /// Gas Used - #[serde(rename = "gasUsed")] + #[serde(default, rename = "gasUsed")] pub gas_used: U256, /// Gas Limit #[cfg(not(feature = "celo"))] - #[serde(rename = "gasLimit")] + #[serde(default, rename = "gasLimit")] pub gas_limit: U256, /// Extra data - #[serde(rename = "extraData")] + #[serde(default, rename = "extraData")] pub extra_data: Bytes, /// Logs bloom #[serde(rename = "logsBloom")] pub logs_bloom: Option, /// Timestamp + #[serde(default)] pub timestamp: U256, /// Difficulty #[cfg(not(feature = "celo"))] + #[serde(default)] pub difficulty: U256, /// Total difficulty #[serde(rename = "totalDifficulty")] @@ -55,8 +57,10 @@ pub struct Block { pub seal_fields: Vec, /// Uncles' hashes #[cfg(not(feature = "celo"))] + #[serde(default)] pub uncles: Vec, /// Transactions + #[serde(bound = "TX: Serialize + serde::de::DeserializeOwned", default)] pub transactions: Vec, /// Size in bytes pub size: Option, diff --git a/ethers-core/src/types/transaction.rs b/ethers-core/src/types/transaction.rs index 154006e1..208188fc 100644 --- a/ethers-core/src/types/transaction.rs +++ b/ethers-core/src/types/transaction.rs @@ -241,7 +241,7 @@ fn rlp_opt(rlp: &mut RlpStream, opt: Option) { } /// Details of a signed transaction -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] pub struct Transaction { /// The transaction's hash pub hash: H256, diff --git a/ethers-middleware/tests/nonce_manager.rs b/ethers-middleware/tests/nonce_manager.rs index ef422530..189bc8f4 100644 --- a/ethers-middleware/tests/nonce_manager.rs +++ b/ethers-middleware/tests/nonce_manager.rs @@ -39,6 +39,9 @@ async fn nonce_manager() { tx_hashes.push(tx); } + // sleep a bit to ensure there's no flakiness in the test + std::thread::sleep(std::time::Duration::new(3, 0)); + let mut nonces = Vec::new(); for tx_hash in tx_hashes { nonces.push( diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 5be4e37d..0bd2dc46 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -27,7 +27,8 @@ url = { version = "2.1.1", default-features = false } futures-core = { version = "0.3.5", default-features = false } futures-util = { version = "0.3.5", default-features = false } futures-timer = { version = "3.0.2", default-features = false } -pin-project = { version = "0.4.20", default-features = false } +futures-channel = { version = "0.3.8", default-features = false } +pin-project = { version = "1.0.2", default-features = false } # ws support async-std and tokio runtimes for the convenience methods async-tungstenite = { version = "0.6.0", default-features = false, optional = true } diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 2d2d4427..2a00bb9f 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -113,8 +113,11 @@ mod stream; pub use futures_util::StreamExt; pub use stream::{interval, FilterWatcher, DEFAULT_POLL_INTERVAL}; +mod pubsub; +pub use pubsub::{PubsubClient, SubscriptionStream}; + use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use std::{error::Error, fmt::Debug, future::Future, pin::Pin}; pub use provider::{FilterKind, Provider, ProviderError}; @@ -134,7 +137,7 @@ pub trait JsonRpcClient: Debug + Send + Sync { async fn request(&self, method: &str, params: T) -> Result where T: Debug + Serialize + Send + Sync, - R: for<'a> Deserialize<'a>; + R: DeserializeOwned; } use ethers_core::types::*; @@ -319,7 +322,7 @@ pub trait Middleware: Sync + Send + Debug { async fn get_filter_changes(&self, id: T) -> Result, Self::Error> where T: Into + Send + Sync, - R: for<'a> Deserialize<'a> + Send + Sync, + R: DeserializeOwned + Send + Sync, { self.inner() .get_filter_changes(id) @@ -468,4 +471,58 @@ pub trait Middleware: Sync + Send + Debug { .await .map_err(FromErr::from) } + + async fn subscribe( + &self, + params: T, + ) -> Result, Self::Error> + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send + Sync, + ::Provider: PubsubClient, + { + self.inner().subscribe(params).await.map_err(FromErr::from) + } + + async fn unsubscribe(&self, id: T) -> Result + where + T: Into + Send + Sync, + ::Provider: PubsubClient, + { + self.inner().unsubscribe(id).await.map_err(FromErr::from) + } + + async fn subscribe_blocks( + &self, + ) -> Result>, Self::Error> + where + ::Provider: PubsubClient, + { + self.inner().subscribe_blocks().await.map_err(FromErr::from) + } + + async fn subscribe_pending_txs( + &self, + ) -> Result, Self::Error> + where + ::Provider: PubsubClient, + { + self.inner() + .subscribe_pending_txs() + .await + .map_err(FromErr::from) + } + + async fn subscribe_logs<'a>( + &'a self, + filter: &Filter, + ) -> Result, Self::Error> + where + ::Provider: PubsubClient, + { + self.inner() + .subscribe_logs(filter) + .await + .map_err(FromErr::from) + } } diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 78e8c5e6..b960e42e 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -1,5 +1,6 @@ use crate::{ ens, + pubsub::{PubsubClient, SubscriptionStream}, stream::{FilterWatcher, DEFAULT_POLL_INTERVAL}, FromErr, Http as HttpProvider, JsonRpcClient, MockProvider, PendingTransaction, }; @@ -16,7 +17,7 @@ use ethers_core::{ use crate::Middleware; use async_trait::async_trait; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Serialize}; use thiserror::Error; use url::{ParseError, Url}; @@ -96,7 +97,7 @@ impl Provider

{ self } - async fn get_block_gen Deserialize<'a>>( + async fn get_block_gen( &self, id: BlockId, include_txs: bool, @@ -428,7 +429,7 @@ impl Middleware for Provider

{ async fn get_filter_changes(&self, id: T) -> Result, ProviderError> where T: Into + Send + Sync, - R: for<'a> Deserialize<'a> + Send + Sync, + R: DeserializeOwned + Send + Sync, { let id = utils::serialize(&id.into()); Ok(self @@ -659,6 +660,66 @@ impl Middleware for Provider

{ .await .map_err(Into::into) } + + async fn subscribe( + &self, + params: T, + ) -> Result, ProviderError> + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send + Sync, + P: PubsubClient, + { + let id: U256 = self + .0 + .request("eth_subscribe", params) + .await + .map_err(Into::into)?; + SubscriptionStream::new(id, self).map_err(Into::into) + } + + async fn unsubscribe(&self, id: T) -> Result + where + T: Into + Send + Sync, + P: PubsubClient, + { + let ok: bool = self + .0 + .request("eth_unsubscribe", [id.into()]) + .await + .map_err(Into::into)?; + Ok(ok) + } + + async fn subscribe_blocks( + &self, + ) -> Result>, ProviderError> + where + P: PubsubClient, + { + self.subscribe(["newHeads"]).await + } + + async fn subscribe_pending_txs( + &self, + ) -> Result, ProviderError> + where + P: PubsubClient, + { + self.subscribe(["newPendingTransactions"]).await + } + + async fn subscribe_logs<'a>( + &'a self, + filter: &Filter, + ) -> Result, ProviderError> + where + P: PubsubClient, + { + let logs = utils::serialize(&"logs"); // TODO: Make this a static + let filter = utils::serialize(filter); + self.subscribe([logs, filter]).await + } } impl Provider

{ @@ -722,6 +783,17 @@ impl Provider

{ } } +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +impl Provider { + /// Direct connection to a websocket endpoint + pub async fn connect( + url: impl async_tungstenite::tungstenite::client::IntoClientRequest + Unpin, + ) -> Result { + let ws = crate::Ws::connect(url).await?; + Ok(Self::new(ws)) + } +} + impl Provider { /// Returns a `Provider` instantiated with an internal "mock" transport. /// @@ -935,4 +1007,22 @@ mod tests { let receipts = provider.parity_block_receipts(10657200).await.unwrap(); assert!(!receipts.is_empty()); } + + #[tokio::test] + // Celo blocks can not get parsed when used with Ganache + #[cfg(not(feature = "celo"))] + async fn block_subscribe() { + use ethers_core::utils::Ganache; + use futures_util::StreamExt; + let ganache = Ganache::new().block_time(2u64).spawn(); + let provider = Provider::connect(ganache.ws_endpoint()).await.unwrap(); + + let stream = provider.subscribe_blocks().await.unwrap(); + let blocks = stream + .take(3) + .map(|x| x.number.unwrap().as_u64()) + .collect::>() + .await; + assert_eq!(blocks, vec![1, 2, 3]); + } } diff --git a/ethers-providers/src/pubsub.rs b/ethers-providers/src/pubsub.rs new file mode 100644 index 00000000..2b192aea --- /dev/null +++ b/ethers-providers/src/pubsub.rs @@ -0,0 +1,98 @@ +use crate::{JsonRpcClient, Middleware, Provider}; + +use ethers_core::types::U256; + +use futures_util::stream::Stream; +use pin_project::{pin_project, pinned_drop}; +use serde::de::DeserializeOwned; +use serde_json::Value; +use std::{ + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +/// A transport implementation supporting pub sub subscriptions. +pub trait PubsubClient: JsonRpcClient { + /// The type of stream this transport returns + type NotificationStream: futures_core::Stream; + + /// Add a subscription to this transport + fn subscribe>(&self, id: T) -> Result; + + /// Remove a subscription from this transport + fn unsubscribe>(&self, id: T) -> Result<(), Self::Error>; +} + +#[must_use = "subscriptions do nothing unless you stream them"] +#[pin_project(PinnedDrop)] +pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { + /// The subscription's installed id on the ethereum node + pub id: U256, + + provider: &'a Provider

, + + #[pin] + rx: P::NotificationStream, + + ret: PhantomData, +} + +impl<'a, P, R> SubscriptionStream<'a, P, R> +where + P: PubsubClient, + R: DeserializeOwned, +{ + /// Creates a new subscription stream for the provided subscription id + pub fn new(id: U256, provider: &'a Provider

) -> Result { + // Call the underlying PubsubClient's subscribe + let rx = provider.as_ref().subscribe(id)?; + Ok(Self { + id, + provider, + rx, + ret: PhantomData, + }) + } + + /// Unsubscribes from the subscription + pub async fn unsubscribe(&self) -> Result { + self.provider.unsubscribe(self.id).await + } +} + +// Each subscription item is a serde_json::Value which must be decoded to the +// subscription's return type. +// TODO: Can this be replaced with an `rx.map` in the constructor? +impl<'a, P, R> Stream for SubscriptionStream<'a, P, R> +where + P: PubsubClient, + R: DeserializeOwned, +{ + type Item = R; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + let this = self.project(); + match futures_util::ready!(this.rx.poll_next(ctx)) { + Some(item) => match serde_json::from_value(item) { + Ok(res) => Poll::Ready(Some(res)), + _ => Poll::Pending, + }, + None => Poll::Pending, + } + } +} + +#[pinned_drop] +impl<'a, P, R> PinnedDrop for SubscriptionStream<'a, P, R> +where + P: PubsubClient, + R: DeserializeOwned, +{ + fn drop(self: Pin<&mut Self>) { + // on drop it removes the handler from the websocket so that it stops + // getting populated. We need to call `unsubscribe` explicitly to cancel + // the subscription + let _ = (*self.provider).as_ref().unsubscribe(self.id); + } +} diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index c038a195..329f16a3 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -6,7 +6,7 @@ use futures_core::stream::Stream; use futures_timer::Delay; use futures_util::{stream, FutureExt, StreamExt}; use pin_project::pin_project; -use serde::Deserialize; +use serde::de::DeserializeOwned; use std::{ pin::Pin, task::{Context, Poll}, @@ -45,7 +45,7 @@ pub struct FilterWatcher<'a, P, R> { impl<'a, P, R> FilterWatcher<'a, P, R> where P: JsonRpcClient, - R: Send + Sync + for<'de> Deserialize<'de>, + R: Send + Sync + DeserializeOwned, { /// Creates a new watcher with the provided factory and filter id. pub fn new>(id: T, provider: &'a Provider

) -> Self { @@ -75,7 +75,7 @@ where impl<'a, P, R> Stream for FilterWatcher<'a, P, R> where P: JsonRpcClient, - R: Send + Sync + for<'de> Deserialize<'de> + 'a, + R: Send + Sync + DeserializeOwned + 'a, { type Item = R; diff --git a/ethers-providers/src/transports/common.rs b/ethers-providers/src/transports/common.rs index e99dbbbf..97071cb3 100644 --- a/ethers-providers/src/transports/common.rs +++ b/ethers-providers/src/transports/common.rs @@ -1,4 +1,5 @@ // Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc +use ethers_core::types::U256; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt; @@ -39,6 +40,20 @@ pub struct Request<'a, T> { params: T, } +#[derive(Serialize, Deserialize, Debug)] +/// A JSON-RPC Notifcation +pub struct Notification { + jsonrpc: String, + method: String, + pub params: Subscription, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Subscription { + pub subscription: U256, + pub result: R, +} + impl<'a, T> Request<'a, T> { /// Creates a new JSON RPC request pub fn new(id: u64, method: &'a str, params: T) -> Self { @@ -53,7 +68,7 @@ impl<'a, T> Request<'a, T> { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Response { - id: u64, + pub(crate) id: u64, jsonrpc: String, #[serde(flatten)] pub data: ResponseData, diff --git a/ethers-providers/src/transports/http.rs b/ethers-providers/src/transports/http.rs index 304a0c8f..665df65f 100644 --- a/ethers-providers/src/transports/http.rs +++ b/ethers-providers/src/transports/http.rs @@ -3,7 +3,7 @@ use crate::{provider::ProviderError, JsonRpcClient}; use async_trait::async_trait; use reqwest::{Client, Error as ReqwestError}; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use std::{ str::FromStr, sync::atomic::{AtomicU64, Ordering}, @@ -64,7 +64,7 @@ impl JsonRpcClient for Provider { /// Sends a POST request with the provided method and the params serialized as JSON /// over HTTP - async fn request Deserialize<'a>>( + async fn request( &self, method: &str, params: T, diff --git a/ethers-providers/src/transports/mock.rs b/ethers-providers/src/transports/mock.rs index b61ecd09..35df798b 100644 --- a/ethers-providers/src/transports/mock.rs +++ b/ethers-providers/src/transports/mock.rs @@ -1,7 +1,7 @@ use crate::{JsonRpcClient, ProviderError}; use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use std::{ borrow::Borrow, @@ -29,7 +29,7 @@ impl JsonRpcClient for MockProvider { /// Pushes the `(method, input)` to the back of the `requests` queue, /// pops the responses from the back of the `responses` queue - async fn request Deserialize<'a>>( + async fn request( &self, method: &str, input: T, diff --git a/ethers-providers/src/transports/mod.rs b/ethers-providers/src/transports/mod.rs index 3e46b58d..394b6fdc 100644 --- a/ethers-providers/src/transports/mod.rs +++ b/ethers-providers/src/transports/mod.rs @@ -6,7 +6,7 @@ pub use http::Provider as Http; #[cfg(feature = "ws")] mod ws; #[cfg(feature = "ws")] -pub use ws::Provider as Ws; +pub use ws::Ws; mod mock; pub use mock::{MockError, MockProvider}; diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index 83ffa8d0..f1ad37fd 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -1,55 +1,33 @@ -use crate::{provider::ProviderError, JsonRpcClient}; +use crate::{ + provider::ProviderError, + transports::common::{JsonRpcError, Notification, Request, Response}, + JsonRpcClient, PubsubClient, +}; +use ethers_core::types::U256; use async_trait::async_trait; use async_tungstenite::tungstenite::{self, protocol::Message}; +use futures_channel::{mpsc, oneshot}; use futures_util::{ - lock::Mutex, sink::{Sink, SinkExt}, - stream::{Stream, StreamExt}, + stream::{Fuse, Stream, StreamExt}, +}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + collections::BTreeMap, + fmt::{self, Debug}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; -use serde::{Deserialize, Serialize}; -use std::fmt::{self, Debug}; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use thiserror::Error; -use super::common::{JsonRpcError, Request, ResponseData}; - -// Convenience methods for connecting with async-std/tokio: - -#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -use async_tungstenite::WebSocketStream; - -// connect_async +// `connect_async` adapter #[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))] use async_tungstenite::async_std::connect_async; #[cfg(feature = "tokio-runtime")] -use async_tungstenite::tokio::{connect_async, TokioAdapter}; - -#[cfg(feature = "tokio-runtime")] -type TcpStream = TokioAdapter; -#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))] -type TcpStream = async_std::net::TcpStream; - -// If there is no TLS, just use the TCP Stream -#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-tls")))] -pub type MaybeTlsStream = TcpStream; -#[cfg(all(feature = "async-std-runtime", not(feature = "async-std-tls")))] -pub type MaybeTlsStream = TcpStream; - -// Use either -#[cfg(feature = "tokio-tls")] -type TlsStream = real_tokio_native_tls::TlsStream; -#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))] -type TlsStream = async_tls::client::TlsStream; - -#[cfg(any(feature = "tokio-tls", feature = "async-std-tls"))] -pub use async_tungstenite::stream::Stream as StreamSwitcher; -#[cfg(feature = "tokio-tls")] -pub type MaybeTlsStream = - StreamSwitcher>>>; -#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))] -pub type MaybeTlsStream = StreamSwitcher>; +use async_tungstenite::tokio::connect_async; /// A JSON-RPC Client over Websockets. /// @@ -91,42 +69,131 @@ pub type MaybeTlsStream = StreamSwitcher>; /// consider importing `async-tungstenite` with the [corresponding feature /// flag](https://github.com/sdroege/async-tungstenite/blob/master/Cargo.toml#L15-L22) /// for your runtime. -pub struct Provider { - id: AtomicU64, - ws: Arc>, +#[derive(Clone)] +pub struct Ws { + id: Arc, + requests: mpsc::UnboundedSender, } -impl Clone for Provider { - fn clone(&self) -> Self { - Self { - id: AtomicU64::new(self.id.load(Ordering::SeqCst)), - ws: self.ws.clone(), - } - } +type Pending = oneshot::Sender; +type Subscription = mpsc::UnboundedSender; + +enum TransportMessage { + Request { + id: u64, + request: String, + sender: Pending, + }, + Subscribe { + id: U256, + sink: Subscription, + }, + Unsubscribe { + id: U256, + }, } -impl Debug for Provider { +impl Debug for Ws { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WebsocketProvider") .field("id", &self.id) - .field("ws", &stringify!(ws)) .finish() } } -#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -impl Provider> { - /// Initializes a new WebSocket Client. - /// separately. +impl Ws { + /// Initializes a new WebSocket Client, given a Stream/Sink Websocket implementer. + /// The websocket connection must be initiated separately. + pub fn new(ws: S) -> Self + where + S: Send + + Sync + + Stream> + + Sink + + Unpin, + { + let (sink, stream) = mpsc::unbounded(); + + // Spawn the server + WsServer::new(ws, stream).spawn(); + + Self { + id: Arc::new(AtomicU64::new(0)), + requests: sink, + } + } + + /// Initializes a new WebSocket Client, assuming usage of tokio or async-std + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] pub async fn connect( url: impl tungstenite::client::IntoClientRequest + Unpin, - ) -> Result { + ) -> Result { let (ws, _) = connect_async(url).await?; Ok(Self::new(ws)) } + + fn send(&self, msg: TransportMessage) -> Result<(), ClientError> { + self.requests.unbounded_send(msg).map_err(to_client_error) + } } -impl Provider +#[async_trait] +impl JsonRpcClient for Ws { + type Error = ClientError; + + async fn request( + &self, + method: &str, + params: T, + ) -> Result { + let next_id = self.id.load(Ordering::SeqCst) + 1; + self.id.store(next_id, Ordering::SeqCst); + + // send the message + let (sender, receiver) = oneshot::channel(); + let payload = TransportMessage::Request { + id: next_id, + request: serde_json::to_string(&Request::new(next_id, method, params))?, + sender, + }; + + // send the data + self.send(payload).map_err(to_client_error)?; + + // wait for the response + let res = receiver.await?; + + // parse it + Ok(serde_json::from_value(res)?) + } +} + +impl PubsubClient for Ws { + type NotificationStream = mpsc::UnboundedReceiver; + + fn subscribe>(&self, id: T) -> Result { + let (sink, stream) = mpsc::unbounded(); + self.send(TransportMessage::Subscribe { + id: id.into(), + sink, + })?; + Ok(stream) + } + + fn unsubscribe>(&self, id: T) -> Result<(), ClientError> { + self.send(TransportMessage::Unsubscribe { id: id.into() }) + } +} + +struct WsServer { + ws: Fuse, + requests: Fuse>, + + pending: BTreeMap, + subscriptions: BTreeMap, +} + +impl WsServer where S: Send + Sync @@ -134,14 +201,125 @@ where + Sink + Unpin, { - /// Initializes a new WebSocket Client. The websocket connection must be initiated - /// separately. - pub fn new(ws: S) -> Self { + /// Instantiates the Websocket Server + fn new(ws: S, requests: mpsc::UnboundedReceiver) -> Self { Self { - id: AtomicU64::new(0), - ws: Arc::new(Mutex::new(ws)), + // Fuse the 2 steams together, so that we can `select` them in the + // Stream implementation + ws: ws.fuse(), + requests: requests.fuse(), + pending: BTreeMap::default(), + subscriptions: BTreeMap::default(), } } + + /// Spawns the event loop + #[allow(unused)] + fn spawn(mut self) + where + S: 'static, + { + let f = async move { + loop { + self.process().await.expect("WS Server panic"); + } + }; + + #[cfg(all(not(feature = "async-std-runtime"), feature = "tokio-runtime"))] + tokio::spawn(f); + // TODO: Ensure that this works with both async-std and tokio. + // Remove allow(unused) when fixed. + #[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))] + async_std::task::spawn(f); + } + + /// Processes 1 item selected from the incoming `requests` or `ws` + #[allow(clippy::single_match)] + async fn process(&mut self) -> Result<(), ClientError> { + futures_util::select! { + // Handle requests + msg = self.requests.next() => match msg { + Some(msg) => self.handle_request(msg).await?, + None => {}, + }, + // Handle ws messages + msg = self.ws.next() => match msg { + Some(Ok(msg)) => self.handle_ws(msg).await?, + // TODO: Log the error? + Some(Err(_)) => {}, + None => {}, + }, + // finished + complete => {}, + }; + + Ok(()) + } + + async fn handle_request(&mut self, msg: TransportMessage) -> Result<(), ClientError> { + match msg { + TransportMessage::Request { + id, + request, + sender, + } => { + if self.pending.insert(id, sender).is_some() { + println!("Replacing a pending request with id {:?}", id); + } + + if let Err(e) = self.ws.send(Message::Text(request)).await { + println!("WS connection error: {:?}", e); + self.pending.remove(&id); + } + } + TransportMessage::Subscribe { id, sink } => { + if self.subscriptions.insert(id, sink).is_some() { + println!("Replacing already-registered subscription with id {:?}", id); + } + } + TransportMessage::Unsubscribe { id } => { + if self.subscriptions.remove(&id).is_none() { + println!( + "Unsubscribing from non-existent subscription with id {:?}", + id + ); + } + } + }; + + Ok(()) + } + + async fn handle_ws(&mut self, resp: Message) -> Result<(), ClientError> { + // Get the inner text received from the websocket + let inner = match resp { + Message::Text(inner) => inner, + _ => return Err(ClientError::NoResponse), + }; + + if let Ok(resp) = serde_json::from_str::>(&inner) { + if let Some(request) = self.pending.remove(&resp.id) { + request + .send(resp.data.into_result()?) + .map_err(to_client_error)?; + } + } else if let Ok(notification) = + serde_json::from_str::>(&inner) + { + let id = notification.params.subscription; + if let Some(stream) = self.subscriptions.get(&id) { + stream + .unbounded_send(notification.params.result) + .map_err(to_client_error)?; + } + } + Ok(()) + } +} + +// TrySendError is private :( +fn to_client_error(err: T) -> ClientError { + ClientError::ChannelError(err.to_string()) } #[derive(Error, Debug)] @@ -156,12 +334,18 @@ pub enum ClientError { JsonRpcError(#[from] JsonRpcError), /// Thrown if the websocket didn't respond to our message - #[error("Websocket connection did not respond with data")] + #[error("Websocket connection did not respond with text data")] NoResponse, /// Thrown if there's an error over the WS connection #[error(transparent)] TungsteniteError(#[from] tungstenite::Error), + + #[error("{0}")] + ChannelError(String), + + #[error(transparent)] + Canceled(#[from] oneshot::Canceled), } impl From for ProviderError { @@ -170,44 +354,42 @@ impl From for ProviderError { } } -#[async_trait] -impl JsonRpcClient for Provider -where - S: Send - + Sync - + Stream> - + Sink - + Unpin, -{ - type Error = ClientError; +#[cfg(test)] +#[cfg(not(feature = "celo"))] +mod tests { + use super::*; + use ethers_core::types::{Block, TxHash, U256}; + use ethers_core::utils::Ganache; - /// Sends a POST request with the provided method and the params serialized as JSON - /// over WebSockets - async fn request Deserialize<'a>>( - &self, - method: &str, - params: T, - ) -> Result { - // we get a lock on the websocket to avoid race conditions with multiple borrows - let mut lock = self.ws.lock().await; + #[tokio::test] + async fn request() { + let ganache = Ganache::new().block_time(1u64).spawn(); + let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap(); - let next_id = self.id.load(Ordering::SeqCst) + 1; - self.id.store(next_id, Ordering::SeqCst); + let block_num: U256 = ws.request("eth_blockNumber", ()).await.unwrap(); + std::thread::sleep(std::time::Duration::new(3, 0)); + let block_num2: U256 = ws.request("eth_blockNumber", ()).await.unwrap(); + assert!(block_num2 > block_num); + } - // send the message - let payload = serde_json::to_string(&Request::new(next_id, method, params))?; - lock.send(Message::text(payload)).await?; + #[tokio::test] + async fn subscription() { + let ganache = Ganache::new().block_time(1u64).spawn(); + let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap(); - // get the response bytes - let resp = lock.next().await.ok_or(ClientError::NoResponse)??; + // Subscribing requires sending the sub request and then subscribing to + // the returned sub_id + let sub_id: U256 = ws.request("eth_subscribe", ["newHeads"]).await.unwrap(); + let mut stream = ws.subscribe(sub_id).unwrap(); - let data: ResponseData = match resp { - Message::Text(inner) => serde_json::from_str(&inner)?, - Message::Binary(inner) => serde_json::from_slice(&inner)?, - // TODO: Should we do something if we receive a Ping, Pong or Close? - _ => return Err(ClientError::NoResponse), - }; + let mut blocks = Vec::new(); + for _ in 0..3 { + let item = stream.next().await.unwrap(); + let block = serde_json::from_value::>(item).unwrap(); + blocks.push(block.number.unwrap_or_default().as_u64()); + } - Ok(data.into_result()?) + assert_eq!(sub_id, 1.into()); + assert_eq!(blocks, vec![1, 2, 3]) } }