Contract & Provider eth_subscribe support (#100)

* fix(block): fix block decoding from ws

* feat(pubsub): add pubsub traits and sub stream

Also use DeserializeOwned alias

* feat(transports): add notification type

* feat(ws): rewrite Ws for subscription support

* feat(provider): add eth_subscribe

* fix(celo): disable some celo tests due to ganache incompatibilities

* test(rinkeby): fix flaky test

* feat(contract): WS subscription bindings (#101)

* feat(middleware): add subscriptions to middleware methods

* feat(contract): add subscribe method to contracts
This commit is contained in:
Georgios Konstantopoulos 2020-11-30 11:33:06 +02:00 committed by GitHub
parent 09413dca6f
commit 1ece5d2020
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 723 additions and 167 deletions

77
Cargo.lock generated
View File

@ -252,7 +252,7 @@ dependencies = [
"futures-util",
"log",
"native-tls",
"pin-project",
"pin-project 0.4.25",
"tokio",
"tokio-native-tls",
"tungstenite",
@ -775,7 +775,9 @@ dependencies = [
"ethers-providers",
"ethers-signers",
"futures",
"futures-util",
"once_cell",
"pin-project 1.0.2",
"rustc-hex",
"serde",
"serde_json",
@ -866,10 +868,11 @@ dependencies = [
"async-tungstenite",
"ethers",
"ethers-core",
"futures-channel",
"futures-core",
"futures-timer",
"futures-util",
"pin-project",
"pin-project 1.0.2",
"reqwest",
"rustc-hex",
"serde",
@ -994,9 +997,9 @@ checksum = "0ba62103ce691c2fd80fbae2213dfdda9ce60804973ac6b6e97de818ea7f52c8"
[[package]]
name = "futures"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0"
dependencies = [
"futures-channel",
"futures-core",
@ -1009,9 +1012,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
dependencies = [
"futures-core",
"futures-sink",
@ -1019,15 +1022,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
[[package]]
name = "futures-executor"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65"
dependencies = [
"futures-core",
"futures-task",
@ -1037,9 +1040,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb"
[[package]]
name = "futures-lite"
@ -1058,9 +1061,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556"
dependencies = [
"proc-macro-hack",
"proc-macro2",
@ -1070,15 +1073,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d"
[[package]]
name = "futures-task"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d"
dependencies = [
"once_cell",
]
@ -1091,9 +1094,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.5"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
dependencies = [
"futures-channel",
"futures-core",
@ -1102,7 +1105,7 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-project 1.0.2",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
@ -1276,7 +1279,7 @@ dependencies = [
"httparse",
"httpdate",
"itoa",
"pin-project",
"pin-project 0.4.25",
"socket2",
"tokio",
"tower-service",
@ -1751,7 +1754,16 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b9e280448854bd91559252582173b3bd1f8e094a0e644791c0628ca9b1f144f"
dependencies = [
"pin-project-internal",
"pin-project-internal 0.4.25",
]
[[package]]
name = "pin-project"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
dependencies = [
"pin-project-internal 1.0.2",
]
[[package]]
@ -1765,6 +1777,17 @@ dependencies = [
"syn",
]
[[package]]
name = "pin-project-internal"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.1.10"
@ -1817,9 +1840,9 @@ dependencies = [
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
@ -2249,9 +2272,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd"
[[package]]
name = "syn"
version = "1.0.42"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c51d92969d209b54a98397e1b91c8ae82d8c87a7bb87df0b29aa2ad81454228"
checksum = "3b4f34193997d92804d359ed09953e25d5138df6bcc055a71bf68ee89fdf9223"
dependencies = [
"proc-macro2",
"quote",

View File

@ -22,6 +22,8 @@ rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.15", default-features = false }
once_cell = { version = "1.3.1", default-features = false }
futures = "0.3.5"
futures-util = "0.3.8"
pin-project = "1.0.2"
[dev-dependencies]
ethers = { version = "0.1.3", path = "../ethers" }

View File

@ -1,13 +1,10 @@
use crate::{base::decode_event, ContractError};
use ethers_providers::Middleware;
use crate::{base::decode_event, ContractError, EventStream};
use ethers_core::{
abi::{Detokenize, Event as AbiEvent},
types::{BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U64},
};
use futures::stream::{Stream, StreamExt};
use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream};
use std::marker::PhantomData;
/// Helper for managing the event filter before querying or streaming its logs
@ -67,18 +64,52 @@ impl<'a, 'b, M, D> Event<'a, 'b, M, D>
where
M: Middleware,
D: 'b + Detokenize + Clone,
'a: 'b,
{
/// Returns a stream for the event
pub async fn stream(
self,
) -> Result<impl Stream<Item = Result<D, ContractError<M>>> + 'b, ContractError<M>> {
&'a self,
) -> Result<
// Wraps the FilterWatcher with a mapping to the event
EventStream<'a, FilterWatcher<'a, M::Provider, Log>, D, ContractError<M>>,
ContractError<M>,
> {
let filter = self
.provider
.watch(&self.filter)
.await
.map_err(ContractError::MiddlewareError)?;
Ok(filter.stream().map(move |log| self.parse_log(log)))
Ok(EventStream::new(
filter.id,
filter,
Box::new(move |log| self.parse_log(log)),
))
}
}
impl<'a, 'b, M, D> Event<'a, 'b, M, D>
where
M: Middleware,
<M as Middleware>::Provider: PubsubClient,
D: 'b + Detokenize + Clone,
{
/// Returns a subscription for the event
pub async fn subscribe(
&'a self,
) -> Result<
// Wraps the SubscriptionStream with a mapping to the event
EventStream<'a, SubscriptionStream<'a, M::Provider, Log>, D, ContractError<M>>,
ContractError<M>,
> {
let filter = self
.provider
.subscribe_logs(&self.filter)
.await
.map_err(ContractError::MiddlewareError)?;
Ok(EventStream::new(
filter.id,
filter,
Box::new(move |log| self.parse_log(log)),
))
}
}

View File

@ -27,6 +27,9 @@ pub use factory::ContractFactory;
mod event;
mod stream;
pub use stream::EventStream;
mod multicall;
pub use multicall::Multicall;

View File

@ -0,0 +1,41 @@
use ethers_core::types::{Log, U256};
use futures::stream::{Stream, StreamExt};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
type MapEvent<'a, R, E> = Box<dyn Fn(Log) -> Result<R, E> + 'a>;
#[pin_project]
/// Generic wrapper around Log streams, mapping their content to a specific
/// deserialized log struct.
///
/// We use this wrapper type instead of `StreamExt::map` in order to preserve
/// information about the filter/subscription's id.
pub struct EventStream<'a, T, R, E> {
pub id: U256,
#[pin]
stream: T,
parse: MapEvent<'a, R, E>,
}
impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self {
Self { id, stream, parse }
}
}
impl<'a, T, R, E> Stream for EventStream<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
{
type Item = Result<R, E>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match futures_util::ready!(this.stream.poll_next_unpin(ctx)) {
Some(item) => Poll::Ready(Some((this.parse)(item))),
None => Poll::Pending,
}
}
}

View File

@ -125,15 +125,19 @@ mod eth_tests {
let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol");
let ganache = Ganache::new().spawn();
let client = connect(&ganache, 0);
let contract = deploy(client, abi, bytecode).await;
let contract = deploy(client, abi.clone(), bytecode).await;
// We spawn the event listener:
let mut stream = contract
.event::<ValueChanged>("ValueChanged")
.unwrap()
.stream()
.await
.unwrap();
let event = contract.event::<ValueChanged>("ValueChanged").unwrap();
let mut stream = event.stream().await.unwrap();
assert_eq!(stream.id, 1.into());
// Also set up a subscription for the same thing
let ws = Provider::connect(ganache.ws_endpoint()).await.unwrap();
let contract2 = ethers_contract::Contract::new(contract.address(), abi, ws);
let event2 = contract2.event::<ValueChanged>("ValueChanged").unwrap();
let mut subscription = event2.subscribe().await.unwrap();
assert_eq!(subscription.id, 2.into());
let num_calls = 3u64;
@ -151,6 +155,8 @@ mod eth_tests {
for i in 0..num_calls {
// unwrap the option of the stream, then unwrap the decoding result
let log = stream.next().await.unwrap().unwrap();
let log2 = subscription.next().await.unwrap().unwrap();
assert_eq!(log.new_value, log2.new_value);
assert_eq!(log.new_value, i.to_string());
}
}

View File

@ -9,43 +9,45 @@ pub struct Block<TX> {
/// Hash of the block
pub hash: Option<H256>,
/// Hash of the parent
#[serde(rename = "parentHash")]
#[serde(default, rename = "parentHash")]
pub parent_hash: H256,
/// Hash of the uncles
#[cfg(not(feature = "celo"))]
#[serde(rename = "sha3Uncles")]
#[serde(default, rename = "sha3Uncles")]
pub uncles_hash: H256,
/// Miner/author's address.
#[serde(rename = "miner")]
#[serde(default, rename = "miner")]
pub author: Address,
/// State root hash
#[serde(rename = "stateRoot")]
#[serde(default, rename = "stateRoot")]
pub state_root: H256,
/// Transactions root hash
#[serde(rename = "transactionsRoot")]
#[serde(default, rename = "transactionsRoot")]
pub transactions_root: H256,
/// Transactions receipts root hash
#[serde(rename = "receiptsRoot")]
#[serde(default, rename = "receiptsRoot")]
pub receipts_root: H256,
/// Block number. None if pending.
pub number: Option<U64>,
/// Gas Used
#[serde(rename = "gasUsed")]
#[serde(default, rename = "gasUsed")]
pub gas_used: U256,
/// Gas Limit
#[cfg(not(feature = "celo"))]
#[serde(rename = "gasLimit")]
#[serde(default, rename = "gasLimit")]
pub gas_limit: U256,
/// Extra data
#[serde(rename = "extraData")]
#[serde(default, rename = "extraData")]
pub extra_data: Bytes,
/// Logs bloom
#[serde(rename = "logsBloom")]
pub logs_bloom: Option<Bloom>,
/// Timestamp
#[serde(default)]
pub timestamp: U256,
/// Difficulty
#[cfg(not(feature = "celo"))]
#[serde(default)]
pub difficulty: U256,
/// Total difficulty
#[serde(rename = "totalDifficulty")]
@ -55,8 +57,10 @@ pub struct Block<TX> {
pub seal_fields: Vec<Bytes>,
/// Uncles' hashes
#[cfg(not(feature = "celo"))]
#[serde(default)]
pub uncles: Vec<H256>,
/// Transactions
#[serde(bound = "TX: Serialize + serde::de::DeserializeOwned", default)]
pub transactions: Vec<TX>,
/// Size in bytes
pub size: Option<U256>,

View File

@ -241,7 +241,7 @@ fn rlp_opt<T: rlp::Encodable>(rlp: &mut RlpStream, opt: Option<T>) {
}
/// Details of a signed transaction
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct Transaction {
/// The transaction's hash
pub hash: H256,

View File

@ -39,6 +39,9 @@ async fn nonce_manager() {
tx_hashes.push(tx);
}
// sleep a bit to ensure there's no flakiness in the test
std::thread::sleep(std::time::Duration::new(3, 0));
let mut nonces = Vec::new();
for tx_hash in tx_hashes {
nonces.push(

View File

@ -27,7 +27,8 @@ url = { version = "2.1.1", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false }
futures-timer = { version = "3.0.2", default-features = false }
pin-project = { version = "0.4.20", default-features = false }
futures-channel = { version = "0.3.8", default-features = false }
pin-project = { version = "1.0.2", default-features = false }
# ws support async-std and tokio runtimes for the convenience methods
async-tungstenite = { version = "0.6.0", default-features = false, optional = true }

View File

@ -113,8 +113,11 @@ mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, DEFAULT_POLL_INTERVAL};
mod pubsub;
pub use pubsub::{PubsubClient, SubscriptionStream};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Serialize};
use std::{error::Error, fmt::Debug, future::Future, pin::Pin};
pub use provider::{FilterKind, Provider, ProviderError};
@ -134,7 +137,7 @@ pub trait JsonRpcClient: Debug + Send + Sync {
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: for<'a> Deserialize<'a>;
R: DeserializeOwned;
}
use ethers_core::types::*;
@ -319,7 +322,7 @@ pub trait Middleware: Sync + Send + Debug {
async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, Self::Error>
where
T: Into<U256> + Send + Sync,
R: for<'a> Deserialize<'a> + Send + Sync,
R: DeserializeOwned + Send + Sync,
{
self.inner()
.get_filter_changes(id)
@ -468,4 +471,58 @@ pub trait Middleware: Sync + Send + Debug {
.await
.map_err(FromErr::from)
}
async fn subscribe<T, R>(
&self,
params: T,
) -> Result<SubscriptionStream<'_, Self::Provider, R>, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send + Sync,
<Self as Middleware>::Provider: PubsubClient,
{
self.inner().subscribe(params).await.map_err(FromErr::from)
}
async fn unsubscribe<T>(&self, id: T) -> Result<bool, Self::Error>
where
T: Into<U256> + Send + Sync,
<Self as Middleware>::Provider: PubsubClient,
{
self.inner().unsubscribe(id).await.map_err(FromErr::from)
}
async fn subscribe_blocks(
&self,
) -> Result<SubscriptionStream<'_, Self::Provider, Block<TxHash>>, Self::Error>
where
<Self as Middleware>::Provider: PubsubClient,
{
self.inner().subscribe_blocks().await.map_err(FromErr::from)
}
async fn subscribe_pending_txs(
&self,
) -> Result<SubscriptionStream<'_, Self::Provider, TxHash>, Self::Error>
where
<Self as Middleware>::Provider: PubsubClient,
{
self.inner()
.subscribe_pending_txs()
.await
.map_err(FromErr::from)
}
async fn subscribe_logs<'a>(
&'a self,
filter: &Filter,
) -> Result<SubscriptionStream<'a, Self::Provider, Log>, Self::Error>
where
<Self as Middleware>::Provider: PubsubClient,
{
self.inner()
.subscribe_logs(filter)
.await
.map_err(FromErr::from)
}
}

View File

@ -1,5 +1,6 @@
use crate::{
ens,
pubsub::{PubsubClient, SubscriptionStream},
stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
FromErr, Http as HttpProvider, JsonRpcClient, MockProvider, PendingTransaction,
};
@ -16,7 +17,7 @@ use ethers_core::{
use crate::Middleware;
use async_trait::async_trait;
use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize};
use thiserror::Error;
use url::{ParseError, Url};
@ -96,7 +97,7 @@ impl<P: JsonRpcClient> Provider<P> {
self
}
async fn get_block_gen<Tx: for<'a> Deserialize<'a>>(
async fn get_block_gen<Tx: Default + Serialize + DeserializeOwned>(
&self,
id: BlockId,
include_txs: bool,
@ -428,7 +429,7 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
async fn get_filter_changes<T, R>(&self, id: T) -> Result<Vec<R>, ProviderError>
where
T: Into<U256> + Send + Sync,
R: for<'a> Deserialize<'a> + Send + Sync,
R: DeserializeOwned + Send + Sync,
{
let id = utils::serialize(&id.into());
Ok(self
@ -659,6 +660,66 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
.await
.map_err(Into::into)
}
async fn subscribe<T, R>(
&self,
params: T,
) -> Result<SubscriptionStream<'_, P, R>, ProviderError>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send + Sync,
P: PubsubClient,
{
let id: U256 = self
.0
.request("eth_subscribe", params)
.await
.map_err(Into::into)?;
SubscriptionStream::new(id, self).map_err(Into::into)
}
async fn unsubscribe<T>(&self, id: T) -> Result<bool, ProviderError>
where
T: Into<U256> + Send + Sync,
P: PubsubClient,
{
let ok: bool = self
.0
.request("eth_unsubscribe", [id.into()])
.await
.map_err(Into::into)?;
Ok(ok)
}
async fn subscribe_blocks(
&self,
) -> Result<SubscriptionStream<'_, P, Block<TxHash>>, ProviderError>
where
P: PubsubClient,
{
self.subscribe(["newHeads"]).await
}
async fn subscribe_pending_txs(
&self,
) -> Result<SubscriptionStream<'_, P, TxHash>, ProviderError>
where
P: PubsubClient,
{
self.subscribe(["newPendingTransactions"]).await
}
async fn subscribe_logs<'a>(
&'a self,
filter: &Filter,
) -> Result<SubscriptionStream<'a, P, Log>, ProviderError>
where
P: PubsubClient,
{
let logs = utils::serialize(&"logs"); // TODO: Make this a static
let filter = utils::serialize(filter);
self.subscribe([logs, filter]).await
}
}
impl<P: JsonRpcClient> Provider<P> {
@ -722,6 +783,17 @@ impl<P: JsonRpcClient> Provider<P> {
}
}
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
impl Provider<crate::Ws> {
/// Direct connection to a websocket endpoint
pub async fn connect(
url: impl async_tungstenite::tungstenite::client::IntoClientRequest + Unpin,
) -> Result<Self, ProviderError> {
let ws = crate::Ws::connect(url).await?;
Ok(Self::new(ws))
}
}
impl Provider<MockProvider> {
/// Returns a `Provider` instantiated with an internal "mock" transport.
///
@ -935,4 +1007,22 @@ mod tests {
let receipts = provider.parity_block_receipts(10657200).await.unwrap();
assert!(!receipts.is_empty());
}
#[tokio::test]
// Celo blocks can not get parsed when used with Ganache
#[cfg(not(feature = "celo"))]
async fn block_subscribe() {
use ethers_core::utils::Ganache;
use futures_util::StreamExt;
let ganache = Ganache::new().block_time(2u64).spawn();
let provider = Provider::connect(ganache.ws_endpoint()).await.unwrap();
let stream = provider.subscribe_blocks().await.unwrap();
let blocks = stream
.take(3)
.map(|x| x.number.unwrap().as_u64())
.collect::<Vec<_>>()
.await;
assert_eq!(blocks, vec![1, 2, 3]);
}
}

View File

@ -0,0 +1,98 @@
use crate::{JsonRpcClient, Middleware, Provider};
use ethers_core::types::U256;
use futures_util::stream::Stream;
use pin_project::{pin_project, pinned_drop};
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
/// A transport implementation supporting pub sub subscriptions.
pub trait PubsubClient: JsonRpcClient {
/// The type of stream this transport returns
type NotificationStream: futures_core::Stream<Item = Value>;
/// Add a subscription to this transport
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
/// Remove a subscription from this transport
fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), Self::Error>;
}
#[must_use = "subscriptions do nothing unless you stream them"]
#[pin_project(PinnedDrop)]
pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
pub id: U256,
provider: &'a Provider<P>,
#[pin]
rx: P::NotificationStream,
ret: PhantomData<R>,
}
impl<'a, P, R> SubscriptionStream<'a, P, R>
where
P: PubsubClient,
R: DeserializeOwned,
{
/// Creates a new subscription stream for the provided subscription id
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?;
Ok(Self {
id,
provider,
rx,
ret: PhantomData,
})
}
/// Unsubscribes from the subscription
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
}
}
// Each subscription item is a serde_json::Value which must be decoded to the
// subscription's return type.
// TODO: Can this be replaced with an `rx.map` in the constructor?
impl<'a, P, R> Stream for SubscriptionStream<'a, P, R>
where
P: PubsubClient,
R: DeserializeOwned,
{
type Item = R;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match futures_util::ready!(this.rx.poll_next(ctx)) {
Some(item) => match serde_json::from_value(item) {
Ok(res) => Poll::Ready(Some(res)),
_ => Poll::Pending,
},
None => Poll::Pending,
}
}
}
#[pinned_drop]
impl<'a, P, R> PinnedDrop for SubscriptionStream<'a, P, R>
where
P: PubsubClient,
R: DeserializeOwned,
{
fn drop(self: Pin<&mut Self>) {
// on drop it removes the handler from the websocket so that it stops
// getting populated. We need to call `unsubscribe` explicitly to cancel
// the subscription
let _ = (*self.provider).as_ref().unsubscribe(self.id);
}
}

View File

@ -6,7 +6,7 @@ use futures_core::stream::Stream;
use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project;
use serde::Deserialize;
use serde::de::DeserializeOwned;
use std::{
pin::Pin,
task::{Context, Poll},
@ -45,7 +45,7 @@ pub struct FilterWatcher<'a, P, R> {
impl<'a, P, R> FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Send + Sync + for<'de> Deserialize<'de>,
R: Send + Sync + DeserializeOwned,
{
/// Creates a new watcher with the provided factory and filter id.
pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
@ -75,7 +75,7 @@ where
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Send + Sync + for<'de> Deserialize<'de> + 'a,
R: Send + Sync + DeserializeOwned + 'a,
{
type Item = R;

View File

@ -1,4 +1,5 @@
// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc
use ethers_core::types::U256;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
@ -39,6 +40,20 @@ pub struct Request<'a, T> {
params: T,
}
#[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC Notifcation
pub struct Notification<R> {
jsonrpc: String,
method: String,
pub params: Subscription<R>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Subscription<R> {
pub subscription: U256,
pub result: R,
}
impl<'a, T> Request<'a, T> {
/// Creates a new JSON RPC request
pub fn new(id: u64, method: &'a str, params: T) -> Self {
@ -53,7 +68,7 @@ impl<'a, T> Request<'a, T> {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Response<T> {
id: u64,
pub(crate) id: u64,
jsonrpc: String,
#[serde(flatten)]
pub data: ResponseData<T>,

View File

@ -3,7 +3,7 @@ use crate::{provider::ProviderError, JsonRpcClient};
use async_trait::async_trait;
use reqwest::{Client, Error as ReqwestError};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Serialize};
use std::{
str::FromStr,
sync::atomic::{AtomicU64, Ordering},
@ -64,7 +64,7 @@ impl JsonRpcClient for Provider {
/// Sends a POST request with the provided method and the params serialized as JSON
/// over HTTP
async fn request<T: Serialize + Send + Sync, R: for<'a> Deserialize<'a>>(
async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
params: T,

View File

@ -1,7 +1,7 @@
use crate::{JsonRpcClient, ProviderError};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use std::{
borrow::Borrow,
@ -29,7 +29,7 @@ impl JsonRpcClient for MockProvider {
/// Pushes the `(method, input)` to the back of the `requests` queue,
/// pops the responses from the back of the `responses` queue
async fn request<T: Serialize + Send + Sync, R: for<'a> Deserialize<'a>>(
async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
input: T,

View File

@ -6,7 +6,7 @@ pub use http::Provider as Http;
#[cfg(feature = "ws")]
mod ws;
#[cfg(feature = "ws")]
pub use ws::Provider as Ws;
pub use ws::Ws;
mod mock;
pub use mock::{MockError, MockProvider};

View File

@ -1,55 +1,33 @@
use crate::{provider::ProviderError, JsonRpcClient};
use crate::{
provider::ProviderError,
transports::common::{JsonRpcError, Notification, Request, Response},
JsonRpcClient, PubsubClient,
};
use ethers_core::types::U256;
use async_trait::async_trait;
use async_tungstenite::tungstenite::{self, protocol::Message};
use futures_channel::{mpsc, oneshot};
use futures_util::{
lock::Mutex,
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
stream::{Fuse, Stream, StreamExt},
};
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::BTreeMap,
fmt::{self, Debug},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;
use super::common::{JsonRpcError, Request, ResponseData};
// Convenience methods for connecting with async-std/tokio:
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
use async_tungstenite::WebSocketStream;
// connect_async
// `connect_async` adapter
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
use async_tungstenite::async_std::connect_async;
#[cfg(feature = "tokio-runtime")]
use async_tungstenite::tokio::{connect_async, TokioAdapter};
#[cfg(feature = "tokio-runtime")]
type TcpStream = TokioAdapter<tokio::net::TcpStream>;
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
type TcpStream = async_std::net::TcpStream;
// If there is no TLS, just use the TCP Stream
#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-tls")))]
pub type MaybeTlsStream = TcpStream;
#[cfg(all(feature = "async-std-runtime", not(feature = "async-std-tls")))]
pub type MaybeTlsStream = TcpStream;
// Use either
#[cfg(feature = "tokio-tls")]
type TlsStream<S> = real_tokio_native_tls::TlsStream<S>;
#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))]
type TlsStream<S> = async_tls::client::TlsStream<S>;
#[cfg(any(feature = "tokio-tls", feature = "async-std-tls"))]
pub use async_tungstenite::stream::Stream as StreamSwitcher;
#[cfg(feature = "tokio-tls")]
pub type MaybeTlsStream =
StreamSwitcher<TcpStream, TokioAdapter<TlsStream<TokioAdapter<TcpStream>>>>;
#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))]
pub type MaybeTlsStream = StreamSwitcher<TcpStream, TlsStream<TcpStream>>;
use async_tungstenite::tokio::connect_async;
/// A JSON-RPC Client over Websockets.
///
@ -91,42 +69,131 @@ pub type MaybeTlsStream = StreamSwitcher<TcpStream, TlsStream<TcpStream>>;
/// consider importing `async-tungstenite` with the [corresponding feature
/// flag](https://github.com/sdroege/async-tungstenite/blob/master/Cargo.toml#L15-L22)
/// for your runtime.
pub struct Provider<S> {
id: AtomicU64,
ws: Arc<Mutex<S>>,
#[derive(Clone)]
pub struct Ws {
id: Arc<AtomicU64>,
requests: mpsc::UnboundedSender<TransportMessage>,
}
impl<S> Clone for Provider<S> {
fn clone(&self) -> Self {
Self {
id: AtomicU64::new(self.id.load(Ordering::SeqCst)),
ws: self.ws.clone(),
}
}
type Pending = oneshot::Sender<serde_json::Value>;
type Subscription = mpsc::UnboundedSender<serde_json::Value>;
enum TransportMessage {
Request {
id: u64,
request: String,
sender: Pending,
},
Subscribe {
id: U256,
sink: Subscription,
},
Unsubscribe {
id: U256,
},
}
impl<S> Debug for Provider<S> {
impl Debug for Ws {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WebsocketProvider")
.field("id", &self.id)
.field("ws", &stringify!(ws))
.finish()
}
}
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
impl Provider<WebSocketStream<MaybeTlsStream>> {
/// Initializes a new WebSocket Client.
/// separately.
impl Ws {
/// Initializes a new WebSocket Client, given a Stream/Sink Websocket implementer.
/// The websocket connection must be initiated separately.
pub fn new<S: 'static>(ws: S) -> Self
where
S: Send
+ Sync
+ Stream<Item = Result<Message, tungstenite::Error>>
+ Sink<Message, Error = tungstenite::Error>
+ Unpin,
{
let (sink, stream) = mpsc::unbounded();
// Spawn the server
WsServer::new(ws, stream).spawn();
Self {
id: Arc::new(AtomicU64::new(0)),
requests: sink,
}
}
/// Initializes a new WebSocket Client, assuming usage of tokio or async-std
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
pub async fn connect(
url: impl tungstenite::client::IntoClientRequest + Unpin,
) -> Result<Self, tungstenite::Error> {
) -> Result<Self, ClientError> {
let (ws, _) = connect_async(url).await?;
Ok(Self::new(ws))
}
fn send(&self, msg: TransportMessage) -> Result<(), ClientError> {
self.requests.unbounded_send(msg).map_err(to_client_error)
}
}
impl<S> Provider<S>
#[async_trait]
impl JsonRpcClient for Ws {
type Error = ClientError;
async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
&self,
method: &str,
params: T,
) -> Result<R, ClientError> {
let next_id = self.id.load(Ordering::SeqCst) + 1;
self.id.store(next_id, Ordering::SeqCst);
// send the message
let (sender, receiver) = oneshot::channel();
let payload = TransportMessage::Request {
id: next_id,
request: serde_json::to_string(&Request::new(next_id, method, params))?,
sender,
};
// send the data
self.send(payload).map_err(to_client_error)?;
// wait for the response
let res = receiver.await?;
// parse it
Ok(serde_json::from_value(res)?)
}
}
impl PubsubClient for Ws {
type NotificationStream = mpsc::UnboundedReceiver<serde_json::Value>;
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, ClientError> {
let (sink, stream) = mpsc::unbounded();
self.send(TransportMessage::Subscribe {
id: id.into(),
sink,
})?;
Ok(stream)
}
fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), ClientError> {
self.send(TransportMessage::Unsubscribe { id: id.into() })
}
}
struct WsServer<S> {
ws: Fuse<S>,
requests: Fuse<mpsc::UnboundedReceiver<TransportMessage>>,
pending: BTreeMap<u64, Pending>,
subscriptions: BTreeMap<U256, Subscription>,
}
impl<S> WsServer<S>
where
S: Send
+ Sync
@ -134,14 +201,125 @@ where
+ Sink<Message, Error = tungstenite::Error>
+ Unpin,
{
/// Initializes a new WebSocket Client. The websocket connection must be initiated
/// separately.
pub fn new(ws: S) -> Self {
/// Instantiates the Websocket Server
fn new(ws: S, requests: mpsc::UnboundedReceiver<TransportMessage>) -> Self {
Self {
id: AtomicU64::new(0),
ws: Arc::new(Mutex::new(ws)),
// Fuse the 2 steams together, so that we can `select` them in the
// Stream implementation
ws: ws.fuse(),
requests: requests.fuse(),
pending: BTreeMap::default(),
subscriptions: BTreeMap::default(),
}
}
/// Spawns the event loop
#[allow(unused)]
fn spawn(mut self)
where
S: 'static,
{
let f = async move {
loop {
self.process().await.expect("WS Server panic");
}
};
#[cfg(all(not(feature = "async-std-runtime"), feature = "tokio-runtime"))]
tokio::spawn(f);
// TODO: Ensure that this works with both async-std and tokio.
// Remove allow(unused) when fixed.
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
async_std::task::spawn(f);
}
/// Processes 1 item selected from the incoming `requests` or `ws`
#[allow(clippy::single_match)]
async fn process(&mut self) -> Result<(), ClientError> {
futures_util::select! {
// Handle requests
msg = self.requests.next() => match msg {
Some(msg) => self.handle_request(msg).await?,
None => {},
},
// Handle ws messages
msg = self.ws.next() => match msg {
Some(Ok(msg)) => self.handle_ws(msg).await?,
// TODO: Log the error?
Some(Err(_)) => {},
None => {},
},
// finished
complete => {},
};
Ok(())
}
async fn handle_request(&mut self, msg: TransportMessage) -> Result<(), ClientError> {
match msg {
TransportMessage::Request {
id,
request,
sender,
} => {
if self.pending.insert(id, sender).is_some() {
println!("Replacing a pending request with id {:?}", id);
}
if let Err(e) = self.ws.send(Message::Text(request)).await {
println!("WS connection error: {:?}", e);
self.pending.remove(&id);
}
}
TransportMessage::Subscribe { id, sink } => {
if self.subscriptions.insert(id, sink).is_some() {
println!("Replacing already-registered subscription with id {:?}", id);
}
}
TransportMessage::Unsubscribe { id } => {
if self.subscriptions.remove(&id).is_none() {
println!(
"Unsubscribing from non-existent subscription with id {:?}",
id
);
}
}
};
Ok(())
}
async fn handle_ws(&mut self, resp: Message) -> Result<(), ClientError> {
// Get the inner text received from the websocket
let inner = match resp {
Message::Text(inner) => inner,
_ => return Err(ClientError::NoResponse),
};
if let Ok(resp) = serde_json::from_str::<Response<serde_json::Value>>(&inner) {
if let Some(request) = self.pending.remove(&resp.id) {
request
.send(resp.data.into_result()?)
.map_err(to_client_error)?;
}
} else if let Ok(notification) =
serde_json::from_str::<Notification<serde_json::Value>>(&inner)
{
let id = notification.params.subscription;
if let Some(stream) = self.subscriptions.get(&id) {
stream
.unbounded_send(notification.params.result)
.map_err(to_client_error)?;
}
}
Ok(())
}
}
// TrySendError is private :(
fn to_client_error<T: ToString>(err: T) -> ClientError {
ClientError::ChannelError(err.to_string())
}
#[derive(Error, Debug)]
@ -156,12 +334,18 @@ pub enum ClientError {
JsonRpcError(#[from] JsonRpcError),
/// Thrown if the websocket didn't respond to our message
#[error("Websocket connection did not respond with data")]
#[error("Websocket connection did not respond with text data")]
NoResponse,
/// Thrown if there's an error over the WS connection
#[error(transparent)]
TungsteniteError(#[from] tungstenite::Error),
#[error("{0}")]
ChannelError(String),
#[error(transparent)]
Canceled(#[from] oneshot::Canceled),
}
impl From<ClientError> for ProviderError {
@ -170,44 +354,42 @@ impl From<ClientError> for ProviderError {
}
}
#[async_trait]
impl<S> JsonRpcClient for Provider<S>
where
S: Send
+ Sync
+ Stream<Item = Result<Message, tungstenite::Error>>
+ Sink<Message, Error = tungstenite::Error>
+ Unpin,
{
type Error = ClientError;
#[cfg(test)]
#[cfg(not(feature = "celo"))]
mod tests {
use super::*;
use ethers_core::types::{Block, TxHash, U256};
use ethers_core::utils::Ganache;
/// Sends a POST request with the provided method and the params serialized as JSON
/// over WebSockets
async fn request<T: Serialize + Send + Sync, R: for<'a> Deserialize<'a>>(
&self,
method: &str,
params: T,
) -> Result<R, ClientError> {
// we get a lock on the websocket to avoid race conditions with multiple borrows
let mut lock = self.ws.lock().await;
#[tokio::test]
async fn request() {
let ganache = Ganache::new().block_time(1u64).spawn();
let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap();
let next_id = self.id.load(Ordering::SeqCst) + 1;
self.id.store(next_id, Ordering::SeqCst);
let block_num: U256 = ws.request("eth_blockNumber", ()).await.unwrap();
std::thread::sleep(std::time::Duration::new(3, 0));
let block_num2: U256 = ws.request("eth_blockNumber", ()).await.unwrap();
assert!(block_num2 > block_num);
}
// send the message
let payload = serde_json::to_string(&Request::new(next_id, method, params))?;
lock.send(Message::text(payload)).await?;
#[tokio::test]
async fn subscription() {
let ganache = Ganache::new().block_time(1u64).spawn();
let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap();
// get the response bytes
let resp = lock.next().await.ok_or(ClientError::NoResponse)??;
// Subscribing requires sending the sub request and then subscribing to
// the returned sub_id
let sub_id: U256 = ws.request("eth_subscribe", ["newHeads"]).await.unwrap();
let mut stream = ws.subscribe(sub_id).unwrap();
let data: ResponseData<R> = match resp {
Message::Text(inner) => serde_json::from_str(&inner)?,
Message::Binary(inner) => serde_json::from_slice(&inner)?,
// TODO: Should we do something if we receive a Ping, Pong or Close?
_ => return Err(ClientError::NoResponse),
};
let mut blocks = Vec::new();
for _ in 0..3 {
let item = stream.next().await.unwrap();
let block = serde_json::from_value::<Block<TxHash>>(item).unwrap();
blocks.push(block.number.unwrap_or_default().as_u64());
}
Ok(data.into_result()?)
assert_eq!(sub_id, 1.into());
assert_eq!(blocks, vec![1, 2, 3])
}
}