Websockets + TLS for Async-Std / Tokio (#30)

* fix: relax trait bounds on JsonRpcClient

* refactor(provider): move http client to separate dir

* feat(provider): add initial Websocket support over Stream/Sink + Tungstenite

* test(provider): add websocket test

* feat(provider): add convenience method using tokio/async-std behind a feature flag

* test(provider): add websocket ssl test

* feat(provider): add TLS websockets for tokio/async-std

* docs(provider): add websocket docs / examples

* fix(provider): make tokio an optional dep
This commit is contained in:
Georgios Konstantopoulos 2020-06-21 10:17:11 +03:00 committed by GitHub
parent ded8f50ef4
commit 0cfeadadf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 877 additions and 130 deletions

391
Cargo.lock generated
View File

@ -37,6 +37,58 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "async-attributes"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "async-std"
version = "1.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00d68a33ebc8b57800847d00787307f84a562224a14db069b0acefe4c2abbf5d"
dependencies = [
"async-attributes",
"async-task",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"kv-log-macro",
"log",
"memchr",
"num_cpus",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"smol",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3"
[[package]]
name = "async-tls"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95fd83426b89b034bf4e9ceb9c533c2f2386b813fd3dcae0a425ec6f1837d78a"
dependencies = [
"futures",
"rustls",
"webpki",
"webpki-roots",
]
[[package]]
name = "async-trait"
version = "0.1.31"
@ -48,6 +100,24 @@ dependencies = [
"syn",
]
[[package]]
name = "async-tungstenite"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ea20d9a2b068769745fbe27942952ea2b9a3967c52b5f0139637c7eca6f547"
dependencies = [
"async-std",
"async-tls",
"futures-io",
"futures-util",
"log",
"native-tls",
"pin-project",
"tokio",
"tokio-native-tls",
"tungstenite",
]
[[package]]
name = "autocfg"
version = "1.0.0"
@ -113,6 +183,19 @@ dependencies = [
"byte-tools",
]
[[package]]
name = "blocking"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d17efb70ce4421e351d61aafd90c16a20fb5bfe339fcdc32a86816280e62ce0"
dependencies = [
"futures-channel",
"futures-util",
"once_cell",
"parking",
"waker-fn",
]
[[package]]
name = "bumpalo"
version = "3.3.0"
@ -143,6 +226,12 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
[[package]]
name = "cache-padded"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24508e28c677875c380c20f4d28124fab6f8ed4ef929a1397d7b1a31e92f1005"
[[package]]
name = "cc"
version = "1.0.41"
@ -164,6 +253,42 @@ dependencies = [
"bitflags",
]
[[package]]
name = "concurrent-queue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080"
dependencies = [
"cache-padded",
]
[[package]]
name = "core-foundation"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
name = "crunchy"
version = "0.2.2"
@ -369,18 +494,24 @@ dependencies = [
name = "ethers-providers"
version = "0.1.3"
dependencies = [
"async-std",
"async-tls",
"async-trait",
"async-tungstenite",
"ethers",
"ethers-core",
"futures-core",
"futures-timer",
"futures-util",
"pin-project",
"reqwest",
"rustc-hex",
"serde",
"serde_json",
"serial_test",
"thiserror",
"tokio",
"tokio-native-tls",
"url",
]
@ -404,6 +535,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fastrand"
version = "1.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a64b0126b293b050395b37b10489951590ed024c03d7df4f249d219c8ded7cbf"
[[package]]
name = "fixed-hash"
version = "0.6.1"
@ -422,6 +559,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
@ -513,6 +665,12 @@ dependencies = [
"once_cell",
]
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.5"
@ -578,6 +736,15 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "hermit-abi"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909"
dependencies = [
"libc",
]
[[package]]
name = "hmac"
version = "0.7.1"
@ -713,6 +880,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "input_buffer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754"
dependencies = [
"bytes",
]
[[package]]
name = "iovec"
version = "0.1.4"
@ -747,6 +923,15 @@ dependencies = [
"winapi-build",
]
[[package]]
name = "kv-log-macro"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ff57d6d215f7ca7eb35a9a64d656ba4d9d2bef114d741dc08048e75e2f5d418"
dependencies = [
"log",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -864,6 +1049,24 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "native-tls"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b0d88c06fe90d5ee94048ba40409ef1d9315d86f6f38c2efdaad4fb50c58b2d"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "net2"
version = "0.2.34"
@ -875,6 +1078,16 @@ dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.4.0"
@ -887,6 +1100,20 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "openssl"
version = "0.10.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cee6d85f4cb4c4f59a6a85d5b68a233d280c82e29e822913b9c8b129fbf20bdd"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"lazy_static",
"libc",
"openssl-sys",
]
[[package]]
name = "openssl-probe"
version = "0.1.2"
@ -918,6 +1145,12 @@ dependencies = [
"serde",
]
[[package]]
name = "parking"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a7fad362df89617628a7508b3e9d588ade1b0ac31aa25de168193ad999c2dd4"
[[package]]
name = "parking_lot"
version = "0.10.2"
@ -980,6 +1213,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "piper"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01608bfa680dafb103f9207fa944facf572e4e3e708d10de19a0d0c3d36e5f18"
dependencies = [
"crossbeam-utils",
"futures-io",
"futures-sink",
"futures-util",
]
[[package]]
name = "pkg-config"
version = "0.3.17"
@ -1106,6 +1351,15 @@ version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "reqwest"
version = "0.10.6"
@ -1201,6 +1455,12 @@ dependencies = [
"winapi 0.3.8",
]
[[package]]
name = "scoped-tls"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -1217,6 +1477,29 @@ dependencies = [
"untrusted",
]
[[package]]
name = "security-framework"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.112"
@ -1282,6 +1565,18 @@ dependencies = [
"syn",
]
[[package]]
name = "sha-1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
dependencies = [
"block-buffer",
"digest",
"fake-simd",
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.8.2"
@ -1306,6 +1601,27 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4"
[[package]]
name = "smol"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c3546640e104d1ee544df9c08ac4896d44ef1de89a6d7cc4a3c22f677d5ef38"
dependencies = [
"async-task",
"blocking",
"concurrent-queue",
"fastrand",
"futures-io",
"futures-util",
"libc",
"once_cell",
"piper",
"scoped-tls",
"slab",
"socket2",
"wepoll-binding",
]
[[package]]
name = "socket2"
version = "0.3.12"
@ -1353,6 +1669,20 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tempfile"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
dependencies = [
"cfg-if",
"libc",
"rand",
"redox_syscall",
"remove_dir_all",
"winapi 0.3.8",
]
[[package]]
name = "thiserror"
version = "1.0.19"
@ -1439,6 +1769,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd608593a919a8e05a7d1fc6df885e40f6a88d3a70a3a7eff23ff27964eda069"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.13.1"
@ -1477,6 +1817,26 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382"
[[package]]
name = "tungstenite"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5c7d464221cb0b538a1cd12f6d9127ed1e6bb7f3ffca98fb3cd4c6e3af8175c"
dependencies = [
"base64 0.12.1",
"byteorder",
"bytes",
"http",
"httparse",
"input_buffer",
"log",
"native-tls",
"rand",
"sha-1",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.12.0"
@ -1545,6 +1905,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
[[package]]
name = "vcpkg"
version = "0.2.8"
@ -1557,6 +1923,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "waker-fn"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7"
[[package]]
name = "want"
version = "0.3.0"
@ -1670,6 +2042,25 @@ dependencies = [
"webpki",
]
[[package]]
name = "wepoll-binding"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374fff4ff9701ff8b6ad0d14bacd3156c44063632d8c136186ff5967d48999a7"
dependencies = [
"bitflags",
"wepoll-sys",
]
[[package]]
name = "wepoll-sys"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9082a777aed991f6769e2b654aa0cb29f1c3d615daf009829b07b66c7aff6a24"
dependencies = [
"cc",
]
[[package]]
name = "winapi"
version = "0.2.8"

View File

@ -260,7 +260,10 @@ where
/// Returns a new contract instance at `address`.
///
/// Clones `self` internally
pub fn at<T: Into<Address>>(&self, address: T) -> Self {
pub fn at<T: Into<Address>>(&self, address: T) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.address = address.into();
this
@ -269,7 +272,10 @@ where
/// Returns a new contract instance using the provided client
///
/// Clones `self` internally
pub fn connect(&self, client: &'a Client<P, S>) -> Self {
pub fn connect(&self, client: &'a Client<P, S>) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.client = client;
this

View File

@ -9,6 +9,10 @@ homepage = "https://docs.rs/ethers"
repository = "https://github.com/gakonst/ethers-rs"
keywords = ["ethereum", "web3", "celo", "ethers"]
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
ethers-core = { version = "0.1.3", path = "../ethers-core" }
@ -23,17 +27,45 @@ url = { version = "2.1.1", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false }
pin-project = { version = "0.4.20", default-features = false }
tokio = { version = "0.2.21", default-features = false, features = ["time"] }
async-tungstenite = { version = "0.6.0", default-features = false }
# ws support async-std and tokio runtimes for the convenience methods
async-std = { version = "1.6.2", default-features = false, optional = true }
tokio = { version = "0.2.21", default-features = false, optional = true }
# needed for tls
real-tokio-native-tls = { package = "tokio-native-tls", version = "0.1.0", optional = true }
async-tls = { version = "0.7.0", optional = true }
futures-timer = "3.0.2"
[dev-dependencies]
ethers = { version = "0.1.3", path = "../ethers" }
rustc-hex = "2.1.0"
tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] }
async-std = { version = "1.6.2", default-features = false, features = ["attributes"] }
async-tungstenite = { version = "0.6.0", features = ["tokio-runtime"] }
serial_test = "0.4.0"
[features]
celo = ["ethers-core/celo"]
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
tokio-runtime = [
"tokio",
"async-tungstenite/tokio-runtime"
]
tokio-tls = [
"tokio-runtime",
"async-tungstenite/tokio-native-tls",
"real-tokio-native-tls"
]
async-std-runtime = [
"async-std",
"async-tungstenite/async-std-runtime"
]
async-std-tls = [
"async-std-runtime",
"async-tungstenite/async-tls",
"async-tls"
]

View File

@ -28,6 +28,55 @@
//! # }
//! ```
//!
//! # Websockets
//!
//! The crate has support for WebSockets. If none of the provided async runtime
//! features are enabled, you must manually instantiate the WS connection and wrap
//! it with with a [`Ws::new`](method@crate::Ws::new) call.
//!
//! ```ignore
//! use ethers::providers::Ws;
//!
//! let ws = Ws::new(...);
//! ```
//!
//! If you have compiled the library with any of the following features, you may
//! instantiate the websocket instance with the `connect` call and your URL:
//! - `tokio-runtime`: Uses `tokio` as the runtime
//! - `async-std-runtime`: Uses `async-std-runtime`
//!
//! ```no_run
//! # #[cfg(any(
//! # feature = "tokio-runtime",
//! # feature = "tokio-tls",
//! # feature = "async-std-runtime",
//! # feature = "async-std-tls",
//! # ))]
//! # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
//! # use ethers::providers::Ws;
//! let ws = Ws::connect("ws://localhost:8545").await?;
//! # Ok(())
//! # }
//! ```
//!
//! TLS support is also provided via the following feature flags:
//! - `tokio-tls`
//! - `async-tls`
//!
//! ```no_run
//! # #[cfg(any(
//! # feature = "tokio-runtime",
//! # feature = "tokio-tls",
//! # feature = "async-std-runtime",
//! # feature = "async-std-tls",
//! # ))]
//! # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
//! # use ethers::providers::Ws;
//! let ws = Ws::connect("wss://localhost:8545").await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Ethereum Name Service
//!
//! The provider may also be used to resolve [Ethereum Name Service](https://ens.domains) (ENS) names
@ -50,8 +99,8 @@
//! # Ok(())
//! # }
//! ```
mod http;
pub use http::Provider as Http;
mod transports;
pub use transports::{Http, Ws};
mod provider;
@ -76,7 +125,7 @@ pub use provider::{Provider, ProviderError};
#[async_trait]
/// Trait which must be implemented by data transports to be used with the Ethereum
/// JSON-RPC provider.
pub trait JsonRpcClient: Debug + Clone + Send + Sync {
pub trait JsonRpcClient: Send + Sync {
/// A JSON-RPC Error
type Error: Error + Into<ProviderError>;

View File

@ -1,8 +1,7 @@
use crate::{
ens,
http::Provider as HttpProvider,
stream::{FilterStream, FilterWatcher},
JsonRpcClient, PendingTransaction,
Http as HttpProvider, JsonRpcClient, PendingTransaction,
};
use ethers_core::{

View File

@ -3,7 +3,8 @@ use crate::ProviderError;
use ethers_core::types::U256;
use futures_core::{stream::Stream, TryFuture};
use futures_util::StreamExt;
use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project;
use serde::Deserialize;
use std::{
@ -13,7 +14,11 @@ use std::{
time::Duration,
vec::IntoIter,
};
use tokio::time::{interval, Interval};
// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20
fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}
const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);
@ -54,7 +59,7 @@ pub(crate) struct FilterWatcher<F: FutureFactory, R> {
factory: F,
// The polling interval
interval: Interval,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
state: FilterWatcherState<F::FutureItem, R>,
}
@ -68,7 +73,7 @@ where
pub fn new<T: Into<U256>>(id: T, factory: F) -> Self {
Self {
id: id.into(),
interval: interval(DEFAULT_POLL_DURATION),
interval: Box::new(interval(DEFAULT_POLL_DURATION)),
state: FilterWatcherState::WaitForInterval,
factory,
}
@ -86,7 +91,7 @@ where
}
fn interval<T: Into<u64>>(mut self, duration: T) -> Self {
self.interval = interval(Duration::from_millis(duration.into()));
self.interval = Box::new(interval(Duration::from_millis(duration.into())));
self
}
}
@ -107,21 +112,18 @@ where
*this.state = match this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let mut interval = Box::pin(this.interval.tick());
let _ready = futures_util::ready!(interval.as_mut().poll(cx));
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
// create a new instance of the future
cx.waker().wake_by_ref();
FilterWatcherState::GetFilterChanges(this.factory.as_mut().new())
}
FilterWatcherState::GetFilterChanges(fut) => {
// wait for the future to be ready
let mut fut = Box::pin(fut);
// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
let items: Vec<R> = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
@ -130,7 +132,10 @@ where
// for new logs
FilterWatcherState::NextItem(iter) => match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
None => {
cx.waker().wake_by_ref();
FilterWatcherState::WaitForInterval
}
},
};
@ -184,14 +189,16 @@ mod watch {
async fn stream() {
let factory = || Box::pin(async { Ok::<Vec<u64>, ProviderError>(vec![1, 2, 3]) });
let filter = FilterWatcher::<_, u64>::new(1, factory);
let mut stream = filter.interval(1u64).stream();
assert_eq!(stream.next().await.unwrap(), 1);
// stream combinator calls are still doable since FilterStream extends
// Stream and StreamExt
let mut stream = filter.interval(100u64).stream().map(|x| 2 * x);
assert_eq!(stream.next().await.unwrap(), 2);
assert_eq!(stream.next().await.unwrap(), 3);
assert_eq!(stream.next().await.unwrap(), 4);
assert_eq!(stream.next().await.unwrap(), 6);
// this will poll the factory function again since it consumed the entire
// vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds
// until new blocks are mined, until the call to the factory returns a non-empty
// vector of logs
assert_eq!(stream.next().await.unwrap(), 1);
assert_eq!(stream.next().await.unwrap(), 2);
}
}

View File

@ -0,0 +1,85 @@
// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt;
use thiserror::Error;
#[derive(Serialize, Deserialize, Debug, Clone, Error)]
/// A JSON-RPC 2.0 error
pub struct JsonRpcError {
/// The error code
pub code: i64,
/// The error message
pub message: String,
/// Additional data
pub data: Option<Value>,
}
impl fmt::Display for JsonRpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"(code: {}, message: {}, data: {:?})",
self.code, self.message, self.data
)
}
}
#[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC request
pub struct Request<'a, T> {
id: u64,
jsonrpc: &'a str,
method: &'a str,
params: T,
}
impl<'a, T> Request<'a, T> {
/// Creates a new JSON RPC request
pub fn new(id: u64, method: &'a str, params: T) -> Self {
Self {
id,
jsonrpc: "2.0",
method,
params,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Response<T> {
id: u64,
jsonrpc: String,
#[serde(flatten)]
pub data: ResponseData<T>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum ResponseData<R> {
Error { error: JsonRpcError },
Success { result: R },
}
impl<R> ResponseData<R> {
/// Consume response and return value
pub fn into_result(self) -> Result<R, JsonRpcError> {
match self {
ResponseData::Success { result } => Ok(result),
ResponseData::Error { error } => Err(error),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn response() {
let response: Response<u64> =
serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap();
assert_eq!(response.id, 1);
assert_eq!(response.data.into_result().unwrap(), 19);
}
}

View File

@ -4,15 +4,15 @@ use crate::{provider::ProviderError, JsonRpcClient};
use async_trait::async_trait;
use reqwest::{Client, Error as ReqwestError};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
fmt,
str::FromStr,
sync::atomic::{AtomicU64, Ordering},
};
use thiserror::Error;
use url::Url;
use super::common::{JsonRpcError, Request, Response};
/// A low-level JSON-RPC Client over HTTP.
///
/// # Example
@ -42,7 +42,7 @@ pub enum ClientError {
ReqwestError(#[from] ReqwestError),
#[error(transparent)]
/// Thrown if the response could not be parsed
JsonRpcError(#[from] errors::JsonRpcError),
JsonRpcError(#[from] JsonRpcError),
}
impl From<ClientError> for ProviderError {
@ -118,88 +118,3 @@ impl Clone for Provider {
}
}
}
// leak private type w/o exposing it
mod errors {
use super::*;
#[derive(Serialize, Deserialize, Debug, Clone, Error)]
/// A JSON-RPC 2.0 error
pub struct JsonRpcError {
/// The error code
pub code: i64,
/// The error message
pub message: String,
/// Additional data
pub data: Option<Value>,
}
impl fmt::Display for JsonRpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"(code: {}, message: {}, data: {:?})",
self.code, self.message, self.data
)
}
}
}
#[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC request
struct Request<'a, T> {
id: u64,
jsonrpc: &'a str,
method: &'a str,
params: T,
}
impl<'a, T> Request<'a, T> {
/// Creates a new JSON RPC request
fn new(id: u64, method: &'a str, params: T) -> Self {
Self {
id,
jsonrpc: "2.0",
method,
params,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Response<T> {
id: u64,
jsonrpc: String,
#[serde(flatten)]
data: ResponseData<T>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
enum ResponseData<R> {
Error { error: errors::JsonRpcError },
Success { result: R },
}
impl<R> ResponseData<R> {
/// Consume response and return value
fn into_result(self) -> Result<R, errors::JsonRpcError> {
match self {
ResponseData::Success { result } => Ok(result),
ResponseData::Error { error } => Err(error),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn response() {
let response: Response<u64> =
serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap();
assert_eq!(response.id, 1);
assert_eq!(response.data.into_result().unwrap(), 19);
}
}

View File

@ -0,0 +1,7 @@
mod common;
mod http;
pub use http::Provider as Http;
mod ws;
pub use ws::Provider as Ws;

View File

@ -0,0 +1,194 @@
use crate::{provider::ProviderError, JsonRpcClient};
use async_trait::async_trait;
use async_tungstenite::tungstenite::{self, protocol::Message};
use futures_util::{
lock::Mutex,
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering};
use thiserror::Error;
use super::common::{JsonRpcError, Request, ResponseData};
// Convenience methods for connecting with async-std/tokio:
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
use async_tungstenite::WebSocketStream;
// connect_async
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
use async_tungstenite::async_std::connect_async;
#[cfg(feature = "tokio-runtime")]
use async_tungstenite::tokio::{connect_async, TokioAdapter};
#[cfg(feature = "tokio-runtime")]
type TcpStream = TokioAdapter<tokio::net::TcpStream>;
#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))]
type TcpStream = async_std::net::TcpStream;
// If there is no TLS, just use the TCP Stream
#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-tls")))]
pub type MaybeTlsStream = TcpStream;
#[cfg(all(feature = "async-std-runtime", not(feature = "async-std-tls")))]
pub type MaybeTlsStream = TcpStream;
// Use either
#[cfg(feature = "tokio-tls")]
type TlsStream<S> = real_tokio_native_tls::TlsStream<S>;
#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))]
type TlsStream<S> = async_tls::client::TlsStream<S>;
#[cfg(any(feature = "tokio-tls", feature = "async-std-tls"))]
pub use async_tungstenite::stream::Stream as StreamSwitcher;
#[cfg(feature = "tokio-tls")]
pub type MaybeTlsStream =
StreamSwitcher<TcpStream, TokioAdapter<TlsStream<TokioAdapter<TcpStream>>>>;
#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))]
pub type MaybeTlsStream = StreamSwitcher<TcpStream, TlsStream<TcpStream>>;
/// A JSON-RPC Client over Websockets.
///
/// If the library is not compiled with any runtime support, then you will have
/// to manually instantiate a websocket connection and call `Provider::new` on it.
///
/// ```ignore
/// use ethers::providers::Ws;
///
/// let ws = Ws::new(...)
/// ```
///
/// If you have compiled the library with any of the following features, you may
/// instantiate the websocket instance with the `connect` call and your URL:
/// - `tokio-runtime`: Uses `tokio` as the runtime
/// - `tokio-tls`: Same as `tokio-runtime` but with TLS support
/// - `async-std-runtime`: Uses `async-std-runtime`
/// - `async-tls`: Same as `async-std-runtime` but with TLS support
///
/// ```no_run
/// # #[cfg(any(
/// # feature = "tokio-runtime",
/// # feature = "tokio-tls",
/// # feature = "async-std-runtime",
/// # feature = "async-std-tls",
/// # ))]
/// # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
/// use ethers::providers::Ws;
///
/// let ws = Ws::connect("ws://localhost:8545").await?;
///
/// // If built with TLS support (otherwise will get a "TLS Support not compiled in" error)
/// let ws = Ws::connect("wss://localhost:8545").await?;
/// # Ok(())
/// # }
/// ```
///
/// This feature is built using [`async-tungstenite`](https://docs.rs/async-tungstenite). If you need other runtimes,
/// consider importing `async-tungstenite` with the [corresponding feature
/// flag](https://github.com/sdroege/async-tungstenite/blob/master/Cargo.toml#L15-L22)
/// for your runtime.
pub struct Provider<S> {
id: AtomicU64,
ws: Mutex<S>,
}
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
impl Provider<WebSocketStream<MaybeTlsStream>> {
/// Initializes a new WebSocket Client.
/// separately.
pub async fn connect(
url: impl tungstenite::client::IntoClientRequest + Unpin,
) -> Result<Self, tungstenite::Error> {
let (ws, _) = connect_async(url).await?;
Ok(Self::new(ws))
}
}
impl<S> Provider<S>
where
S: Send
+ Sync
+ Stream<Item = Result<Message, tungstenite::Error>>
+ Sink<Message, Error = tungstenite::Error>
+ Unpin,
{
/// Initializes a new WebSocket Client. The websocket connection must be initiated
/// separately.
pub fn new(ws: S) -> Self {
Self {
id: AtomicU64::new(0),
ws: Mutex::new(ws),
}
}
}
#[derive(Error, Debug)]
/// Error thrown when sending a WS message
pub enum ClientError {
/// Thrown if deserialization failed
#[error(transparent)]
JsonError(#[from] serde_json::Error),
#[error(transparent)]
/// Thrown if the response could not be parsed
JsonRpcError(#[from] JsonRpcError),
/// Thrown if the websocket didn't respond to our message
#[error("Websocket connection did not respond with data")]
NoResponse,
/// Thrown if there's an error over the WS connection
#[error(transparent)]
TungsteniteError(#[from] tungstenite::Error),
}
impl From<ClientError> for ProviderError {
fn from(src: ClientError) -> Self {
ProviderError::JsonRpcClientError(Box::new(src))
}
}
#[async_trait]
impl<S> JsonRpcClient for Provider<S>
where
S: Send
+ Sync
+ Stream<Item = Result<Message, tungstenite::Error>>
+ Sink<Message, Error = tungstenite::Error>
+ Unpin,
{
type Error = ClientError;
/// Sends a POST request with the provided method and the params serialized as JSON
/// over WebSockets
async fn request<T: Serialize + Send + Sync, R: for<'a> Deserialize<'a>>(
&self,
method: &str,
params: T,
) -> Result<R, ClientError> {
// we get a lock on the websocket to avoid race conditions with multiple borrows
let mut lock = self.ws.lock().await;
let next_id = self.id.load(Ordering::SeqCst) + 1;
self.id.store(next_id, Ordering::SeqCst);
// send the message
let payload = serde_json::to_string(&Request::new(next_id, method, params))?;
lock.send(Message::text(payload)).await?;
// get the response bytes
let resp = lock.next().await.ok_or(ClientError::NoResponse)??;
let data: ResponseData<R> = match resp {
Message::Text(inner) => serde_json::from_str(&inner)?,
Message::Binary(inner) => serde_json::from_slice(&inner)?,
// TODO: Should we do something if we receive a Ping, Pong or Close?
_ => return Err(ClientError::NoResponse),
};
Ok(data.into_result()?)
}
}

View File

@ -1,14 +1,68 @@
#![allow(unused_braces)]
use ethers::providers::{Http, Provider};
use std::convert::TryFrom;
#[tokio::test]
#[cfg(not(feature = "celo"))]
async fn pending_txs_with_confirmations_ganache() {
mod eth_tests {
use super::*;
use ethers::{
types::TransactionRequest,
utils::{parse_ether, Ganache},
};
use serial_test::serial;
// Without TLS this would error with "TLS Support not compiled in"
#[test]
#[cfg(any(feature = "async-std-tls", feature = "tokio-tls"))]
fn ssl_websocket() {
// this is extremely ugly but I couldn't figure out a better way of having
// a shared async test for both runtimes
#[cfg(feature = "async-std-tls")]
let block_on = async_std::task::block_on;
#[cfg(feature = "tokio-tls")]
let mut runtime = tokio::runtime::Runtime::new().unwrap();
#[cfg(feature = "tokio-tls")]
let mut block_on = |x| runtime.block_on(x);
use ethers::providers::Ws;
block_on(async move {
let ws = Ws::connect("wss://rinkeby.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
.await
.unwrap();
let provider = Provider::new(ws);
let _number = provider.get_block_number().await.unwrap();
});
}
#[tokio::test]
#[serial]
#[cfg(feature = "tokio-runtime")]
async fn watch_blocks_websocket() {
use ethers::{
providers::{FilterStream, StreamExt, Ws},
types::H256,
};
let _ganache = Ganache::new().block_time(2u64).spawn();
let (ws, _) = async_tungstenite::tokio::connect_async("ws://localhost:8545")
.await
.unwrap();
let provider = Provider::new(Ws::new(ws));
let stream = provider
.watch_blocks()
.await
.unwrap()
.interval(2000u64)
.stream();
let _blocks = stream.take(3usize).collect::<Vec<H256>>().await;
let _number = provider.get_block_number().await.unwrap();
}
#[tokio::test]
#[serial]
async fn pending_txs_with_confirmations_ganache() {
let _ganache = Ganache::new().block_time(2u64).spawn();
let provider = Provider::<Http>::try_from("http://localhost:8545").unwrap();
let accounts = provider.get_accounts().await.unwrap();
@ -20,6 +74,7 @@ async fn pending_txs_with_confirmations_ganache() {
// got the correct receipt
assert_eq!(receipt.transaction_hash, hash);
}
}
#[cfg(feature = "celo")]

View File

@ -183,7 +183,10 @@ where
/// calls.
///
/// Clones internally.
pub fn with_signer(&self, signer: S) -> Self {
pub fn with_signer(&self, signer: S) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.signer = signer;
this
@ -193,7 +196,10 @@ where
/// calls.
///
/// Clones internally.
pub fn with_provider(&self, provider: Provider<P>) -> Self {
pub fn with_provider(&self, provider: Provider<P>) -> Self
where
P: Clone,
{
let mut this = self.clone();
this.provider = provider;
this

View File

@ -49,6 +49,7 @@ ethers-signers = { version = "0.1.3", path = "../ethers-signers", optional = tru
[dev-dependencies]
ethers-contract = { version = "0.1.3", path = "../ethers-contract", features = ["abigen"] }
ethers-providers = { version = "0.1.3", path = "../ethers-providers", features = ["tokio-runtime"] }
anyhow = "1.0.31"
rand = "0.7"

View File

@ -1,9 +1,9 @@
use ethers::prelude::*;
use std::convert::TryFrom;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let provider = Provider::<Http>::try_from("http://localhost:8545")?;
let ws = Ws::connect("ws://localhost:8546").await?;
let provider = Provider::new(ws);
let mut stream = provider.watch_blocks().await?.interval(2000u64).stream();
while let Some(block) = stream.next().await {
dbg!(block);